How to Build a Social Media AI Agent

Build a social media AI agent with CrewAI using three specialized agents: a Trend Researcher, Content Creator, and Scheduler. Includes Twitter/X API tools, LinkedIn integration, content calendar management, and A/B post variation generation. Full Python code.

Hands holding smartphone displaying social media feed for brand monitoring and content management
Photo by Collabstr on Unsplash
Person using MacBook Pro to manage social media content calendar and marketing campaigns
Photo by Campaign Creators on Unsplash

Social media marketing teams spend hours every week monitoring trending topics, writing platform-specific copy, scheduling posts at optimal times, and analyzing what performed. A multi-agent system can handle all four tasks continuously β€” discovering trends as they emerge, generating post variations tuned to each platform's format, and placing posts on a content calendar for human review before publishing.

In this tutorial you will build a three-agent social media crew using CrewAI: a Trend Researcher agent that monitors topics and competitor activity, a Content Creator agent that produces platform-specific post variations with A/B testing built in, and a Scheduler agent that manages the content calendar and posts through the Twitter/X and LinkedIn APIs. You will get full working Python code, API integration patterns, and a production deployment guide.

Prerequisites#

Install the required packages:

pip install crewai crewai-tools langchain-openai \
    tweepy requests pydantic python-dotenv \
    schedule pytz

You also need:

  • Python 3.10 or later
  • An OpenAI API key
  • Twitter/X API credentials (Bearer Token + OAuth 1.0a for posting)
  • Optional: LinkedIn API credentials (see Step 4 for setup)
# .env
OPENAI_API_KEY=sk-...
TAVILY_API_KEY=tvly-...

# Twitter/X API v2
TWITTER_BEARER_TOKEN=...
TWITTER_API_KEY=...
TWITTER_API_SECRET=...
TWITTER_ACCESS_TOKEN=...
TWITTER_ACCESS_TOKEN_SECRET=...

# LinkedIn (optional)
LINKEDIN_ACCESS_TOKEN=...
LINKEDIN_PERSON_URN=urn:li:person:...

# Your brand configuration
BRAND_NAME=YourBrand
BRAND_HANDLE=@yourbrand
BRAND_VOICE=professional yet approachable, uses data-driven insights

Architecture Overview#

Three CrewAI agents collaborate through a sequential task pipeline, with each agent's output feeding directly into the next agent's input.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    CrewAI Orchestrator                   β”‚
└───────────────────────────────────────────────────────--β”˜
           β”‚               β”‚                β”‚
           β–Ό               β–Ό                β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   AGENT 1    β”‚  β”‚    AGENT 2      β”‚  β”‚     AGENT 3       β”‚
β”‚   Trend      β”‚β†’ β”‚   Content       β”‚β†’ β”‚    Scheduler      β”‚
β”‚  Researcher  β”‚  β”‚   Creator       β”‚  β”‚                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
 β€’ Tavily search   β€’ 3 platform        β€’ Posts to Twitter/X
 β€’ Competitor      variations per       via Tweepy
   monitoring      topic               β€’ LinkedIn API calls
 β€’ Trend scoring   β€’ 2 A/B variants    β€’ Content calendar
 β€’ Hashtag         per post             management
   discovery       β€’ CTA optimization  β€’ Optimal time slots
                   β€’ Character limits   β€’ CSV export

Agent outputs are structured JSON, making it easy to inspect results at each stage and add human review gates before the Scheduler posts anything.

Step 1: Define Content Data Models#

# models.py
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum
from datetime import datetime

class Platform(str, Enum):
    TWITTER = "twitter"
    LINKEDIN = "linkedin"
    INSTAGRAM = "instagram"

class PostVariant(BaseModel):
    variant_id: str          # "A" or "B"
    text: str
    hashtags: List[str] = Field(default_factory=list)
    character_count: int = 0
    estimated_engagement_score: float = 0.0

class PlatformPost(BaseModel):
    platform: Platform
    topic: str
    variants: List[PostVariant]
    best_posting_times: List[str]  # e.g. ["09:00", "17:30"]
    target_audience: str

class TrendReport(BaseModel):
    trending_topics: List[dict]
    competitor_topics: List[dict]
    recommended_topics: List[str]
    hashtag_suggestions: List[str]
    trend_window: str  # e.g. "last 24 hours"

class ContentCalendarEntry(BaseModel):
    post_id: str
    platform: Platform
    scheduled_time: str  # ISO format
    text: str
    hashtags: List[str]
    status: str = "scheduled"  # scheduled / posted / failed
    posted_url: Optional[str] = None

Step 2: Build the Agent Tools#

Tool 1: Trend Research Tools#

# tools/research_tools.py
from crewai_tools import tool
from tavily import TavilyClient
import os

tavily = TavilyClient(api_key=os.environ.get("TAVILY_API_KEY", ""))

@tool("search_trending_topics")
def search_trending_topics(query: str) -> str:
    """
    Search for trending topics, news, and discussions related to a query.
    Returns a formatted list of trending topics with context and source URLs.
    Use this to discover what's being discussed in your industry right now.
    """
    results = tavily.search(
        query=query,
        search_depth="basic",
        max_results=8,
        topic="news",
    )
    formatted = []
    for r in results.get("results", []):
        formatted.append(
            f"TOPIC: {r.get('title', '')}\n"
            f"CONTEXT: {r.get('content', '')[:300]}\n"
            f"SOURCE: {r.get('url', '')}\n"
        )
    return "\n---\n".join(formatted) if formatted else "No trending topics found."

@tool("monitor_competitor_content")
def monitor_competitor_content(competitor_handles: str) -> str:
    """
    Search for recent content from competitor brands or handles.
    Input: comma-separated list of competitor names or Twitter handles.
    Returns recent posts/articles from those competitors.
    """
    handles = [h.strip().lstrip("@") for h in competitor_handles.split(",")]
    all_results = []
    for handle in handles[:3]:  # Limit to 3 competitors per call
        results = tavily.search(
            query=f"{handle} site:twitter.com OR site:linkedin.com",
            search_depth="basic",
            max_results=3,
        )
        for r in results.get("results", []):
            all_results.append(
                f"COMPETITOR: {handle}\n"
                f"CONTENT: {r.get('title', '')} β€” {r.get('content', '')[:200]}\n"
                f"URL: {r.get('url', '')}"
            )
    return "\n---\n".join(all_results) if all_results else "No competitor content found."

@tool("get_trending_hashtags")
def get_trending_hashtags(topic: str, platform: str = "twitter") -> str:
    """
    Find relevant trending hashtags for a topic on a given platform.
    Returns a list of hashtags ranked by estimated reach and relevance.
    """
    results = tavily.search(
        query=f"trending hashtags {topic} {platform} 2026",
        search_depth="basic",
        max_results=5,
    )
    # Extract hashtag patterns from results
    hashtags = set()
    for r in results.get("results", []):
        import re
        found = re.findall(r'#\w+', r.get("content", "") + r.get("title", ""))
        hashtags.update(found[:5])

    if not hashtags:
        # Generate relevant hashtags from topic words as fallback
        words = topic.split()
        hashtags = {f"#{w.capitalize()}" for w in words if len(w) > 3}
        hashtags.add(f"#{topic.replace(' ', '')}")

    return f"Trending hashtags for '{topic}' on {platform}:\n" + "\n".join(sorted(hashtags)[:15])

Tool 2: Content Creation Tools#

# tools/content_tools.py
from crewai_tools import tool
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
import json
import os

llm = ChatOpenAI(model="gpt-4o", temperature=0.8)

PLATFORM_CONSTRAINTS = {
    "twitter": {"max_chars": 280, "style": "concise, punchy, uses threads for long content"},
    "linkedin": {"max_chars": 3000, "style": "professional, data-driven, includes personal insight"},
    "instagram": {"max_chars": 2200, "style": "visual-first, storytelling, lifestyle-focused"},
}

@tool("generate_post_variations")
def generate_post_variations(
    topic: str,
    platform: str,
    brand_voice: str,
    hashtags: str,
) -> str:
    """
    Generate two A/B post variations for a topic on a specific platform.
    Input: topic, platform (twitter/linkedin/instagram), brand voice description,
    comma-separated hashtags.
    Returns JSON with two variants including text, character count, and CTA.
    """
    constraints = PLATFORM_CONSTRAINTS.get(platform, PLATFORM_CONSTRAINTS["twitter"])

    prompt = ChatPromptTemplate.from_messages([
        ("system", (
            f"You are a social media copywriter expert in {platform} content. "
            f"Brand voice: {brand_voice}. "
            f"Platform constraints: max {constraints['max_chars']} characters. "
            f"Style: {constraints['style']}. "
            "Generate TWO distinct A/B test variants. "
            "Output valid JSON with keys: variant_a, variant_b. "
            "Each variant has: text (string), cta (call to action), "
            "angle (the hook/angle used)."
        )),
        ("human", (
            f"Topic: {topic}\n"
            f"Hashtags to use: {hashtags}\n"
            f"Write two variants for {platform}. "
            f"Keep under {constraints['max_chars']} characters each."
        )),
    ])

    chain = prompt | llm
    result = chain.invoke({})

    try:
        content = result.content.strip()
        if content.startswith("```"):
            content = "\n".join(content.split("\n")[1:-1])
        variants = json.loads(content)
    except json.JSONDecodeError:
        variants = {
            "variant_a": {"text": result.content[:constraints["max_chars"]], "cta": "", "angle": "direct"},
            "variant_b": {"text": result.content[:constraints["max_chars"]], "cta": "", "angle": "question"},
        }

    # Add character counts
    for key in ["variant_a", "variant_b"]:
        variants[key]["character_count"] = len(variants[key].get("text", ""))
        variants[key]["platform"] = platform

    return json.dumps(variants, indent=2)

@tool("optimize_posting_time")
def optimize_posting_time(platform: str, target_audience: str) -> str:
    """
    Return optimal posting times for a platform based on target audience.
    Input: platform name, target audience description.
    Returns recommended posting times in UTC with rationale.
    """
    time_data = {
        "twitter": {
            "b2b": ["08:00 UTC", "13:00 UTC", "17:00 UTC"],
            "b2c": ["09:00 UTC", "12:00 UTC", "20:00 UTC"],
            "default": ["09:00 UTC", "15:00 UTC", "18:00 UTC"],
        },
        "linkedin": {
            "b2b": ["08:00 UTC", "12:00 UTC", "17:00 UTC"],
            "default": ["07:30 UTC", "12:30 UTC"],
        },
        "instagram": {
            "b2c": ["11:00 UTC", "13:00 UTC", "19:00 UTC"],
            "default": ["10:00 UTC", "14:00 UTC", "19:00 UTC"],
        },
    }

    audience_type = "b2b" if "professional" in target_audience.lower() or "business" in target_audience.lower() else "b2c"
    platform_times = time_data.get(platform, time_data["twitter"])
    times = platform_times.get(audience_type, platform_times.get("default", ["09:00 UTC"]))

    return (
        f"Optimal posting times for {platform} targeting {audience_type} audience:\n"
        + "\n".join(f"  β€’ {t}" for t in times)
        + f"\nBest day: Tuesday–Thursday for {platform}."
    )

@tool("create_content_calendar_entry")
def create_content_calendar_entry(
    platform: str,
    post_text: str,
    hashtags: str,
    scheduled_time: str,
) -> str:
    """
    Add a post to the content calendar CSV file for review and scheduling.
    Input: platform, post text, comma-separated hashtags, scheduled time (ISO format).
    Returns the calendar entry ID and confirmation.
    """
    import csv
    import uuid
    from datetime import datetime

    calendar_file = "content_calendar.csv"
    entry_id = str(uuid.uuid4())[:8].upper()

    fieldnames = ["id", "platform", "scheduled_time", "text", "hashtags", "status", "created_at"]
    file_exists = False
    try:
        with open(calendar_file, "r") as f:
            file_exists = True
    except FileNotFoundError:
        pass

    with open(calendar_file, "a", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        if not file_exists:
            writer.writeheader()
        writer.writerow({
            "id": entry_id,
            "platform": platform,
            "scheduled_time": scheduled_time,
            "text": post_text,
            "hashtags": hashtags,
            "status": "scheduled",
            "created_at": datetime.utcnow().isoformat(),
        })

    return (
        f"Calendar entry created:\n"
        f"  ID: {entry_id}\n"
        f"  Platform: {platform}\n"
        f"  Scheduled: {scheduled_time}\n"
        f"  Text preview: {post_text[:80]}...\n"
        f"  Status: scheduled (pending human review)"
    )

Tool 3: Scheduler and Posting Tools#

# tools/scheduler_tools.py
import os
import json
import tweepy
import requests
from crewai_tools import tool
from dotenv import load_dotenv

load_dotenv()

def get_twitter_client():
    return tweepy.Client(
        bearer_token=os.getenv("TWITTER_BEARER_TOKEN"),
        consumer_key=os.getenv("TWITTER_API_KEY"),
        consumer_secret=os.getenv("TWITTER_API_SECRET"),
        access_token=os.getenv("TWITTER_ACCESS_TOKEN"),
        access_token_secret=os.getenv("TWITTER_ACCESS_TOKEN_SECRET"),
    )

@tool("post_to_twitter")
def post_to_twitter(text: str, in_reply_to_tweet_id: str = None) -> str:
    """
    Post a tweet to Twitter/X using the authenticated account.
    For threads, pass in_reply_to_tweet_id to chain tweets.
    Returns the tweet URL on success.
    Set DRY_RUN=true in environment to simulate without posting.
    """
    if os.getenv("DRY_RUN", "false").lower() == "true":
        return f"DRY RUN β€” Would post to Twitter: {text[:100]}..."

    try:
        client = get_twitter_client()
        kwargs = {"text": text}
        if in_reply_to_tweet_id:
            kwargs["in_reply_to_tweet_id"] = in_reply_to_tweet_id

        response = client.create_tweet(**kwargs)
        tweet_id = response.data["id"]
        handle = os.getenv("BRAND_HANDLE", "yourbrand").lstrip("@")
        return f"Posted successfully: https://twitter.com/{handle}/status/{tweet_id}"
    except tweepy.TweepyException as e:
        return f"Twitter posting failed: {str(e)}"

@tool("post_to_linkedin")
def post_to_linkedin(text: str) -> str:
    """
    Post to LinkedIn using the LinkedIn API v2 with the authenticated account.
    Returns the post URL on success or an error message.
    Set DRY_RUN=true in environment to simulate without posting.
    """
    if os.getenv("DRY_RUN", "false").lower() == "true":
        return f"DRY RUN β€” Would post to LinkedIn: {text[:100]}..."

    access_token = os.getenv("LINKEDIN_ACCESS_TOKEN")
    person_urn = os.getenv("LINKEDIN_PERSON_URN")

    if not access_token or not person_urn:
        return "LinkedIn credentials not configured. Set LINKEDIN_ACCESS_TOKEN and LINKEDIN_PERSON_URN."

    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
        "X-Restli-Protocol-Version": "2.0.0",
    }

    payload = {
        "author": person_urn,
        "lifecycleState": "PUBLISHED",
        "specificContent": {
            "com.linkedin.ugc.ShareContent": {
                "shareCommentary": {"text": text},
                "shareMediaCategory": "NONE",
            }
        },
        "visibility": {
            "com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"
        },
    }

    try:
        response = requests.post(
            "https://api.linkedin.com/v2/ugcPosts",
            headers=headers,
            json=payload,
            timeout=15,
        )
        response.raise_for_status()
        post_id = response.headers.get("x-restli-id", "unknown")
        return f"LinkedIn post published successfully. Post ID: {post_id}"
    except requests.RequestException as e:
        return f"LinkedIn posting failed: {str(e)}"

@tool("update_calendar_status")
def update_calendar_status(entry_id: str, status: str, posted_url: str = "") -> str:
    """
    Update the status of a content calendar entry after posting.
    Input: entry_id, status (posted/failed), posted_url (optional).
    """
    import csv
    import tempfile
    import shutil

    calendar_file = "content_calendar.csv"
    updated = False

    try:
        with open(calendar_file, "r", newline="", encoding="utf-8") as f:
            rows = list(csv.DictReader(f))
    except FileNotFoundError:
        return f"Calendar file not found."

    for row in rows:
        if row["id"] == entry_id:
            row["status"] = status
            if posted_url:
                row["posted_url"] = posted_url
            updated = True

    if updated:
        with open(calendar_file, "w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=rows[0].keys())
            writer.writeheader()
            writer.writerows(rows)
        return f"Entry {entry_id} updated to status: {status}"
    else:
        return f"Entry {entry_id} not found in calendar."

Step 3: Define the Three CrewAI Agents#

# agents.py
from crewai import Agent
from langchain_openai import ChatOpenAI
from tools.research_tools import (
    search_trending_topics,
    monitor_competitor_content,
    get_trending_hashtags,
)
from tools.content_tools import (
    generate_post_variations,
    optimize_posting_time,
    create_content_calendar_entry,
)
from tools.scheduler_tools import (
    post_to_twitter,
    post_to_linkedin,
    update_calendar_status,
)
import os

llm = ChatOpenAI(model="gpt-4o", temperature=0.3)

trend_researcher = Agent(
    role="Social Media Trend Researcher",
    goal=(
        "Monitor trending topics, competitor activity, and emerging discussions "
        "in the target industry. Identify the 3 best topics for content this week "
        "with supporting hashtag research."
    ),
    backstory=(
        "You are a senior social media strategist who excels at identifying "
        "what conversations are gaining momentum before they peak. You analyze "
        "competitor content to find gaps your brand can fill, and you always "
        "back your topic recommendations with evidence from real trending data."
    ),
    tools=[
        search_trending_topics,
        monitor_competitor_content,
        get_trending_hashtags,
    ],
    llm=llm,
    verbose=True,
    max_iter=10,
)

content_creator = Agent(
    role="Social Media Content Creator",
    goal=(
        "Take the trending topics from the Trend Researcher and generate two A/B "
        "post variants for each platform (Twitter/X and LinkedIn). Every post must "
        "match platform character limits, include the recommended hashtags, and "
        "have a clear call-to-action."
    ),
    backstory=(
        "You are an award-winning social media copywriter who has managed content "
        "for B2B SaaS brands with 500K+ followers. You understand that Twitter "
        "needs punchy hooks under 280 characters while LinkedIn rewards "
        "professional storytelling. You always write two versions of every post "
        "so the team can A/B test engagement."
    ),
    tools=[
        generate_post_variations,
        optimize_posting_time,
    ],
    llm=llm,
    verbose=True,
    max_iter=15,
)

scheduler = Agent(
    role="Social Media Scheduler",
    goal=(
        "Take the approved post variations from the Content Creator and schedule "
        "them into the content calendar at optimal posting times. Post to Twitter/X "
        "and add LinkedIn posts to the calendar for human review. "
        "Always use DRY_RUN mode unless explicitly told to publish live."
    ),
    backstory=(
        "You are a meticulous social media operations specialist. You never post "
        "without checking the content calendar for conflicts, and you always "
        "schedule posts at data-driven optimal times for maximum reach. "
        "You maintain a complete audit trail of every post attempt."
    ),
    tools=[
        create_content_calendar_entry,
        post_to_twitter,
        post_to_linkedin,
        update_calendar_status,
    ],
    llm=llm,
    verbose=True,
    max_iter=15,
)

Step 4: Define the Tasks and Run the Crew#

# crew.py
from crewai import Crew, Task, Process
from agents import trend_researcher, content_creator, scheduler
import os

def create_research_task(topic_query: str, competitors: str) -> Task:
    return Task(
        description=(
            f"Research trending topics related to: '{topic_query}'. "
            f"Also monitor these competitors: {competitors}. "
            "Identify the TOP 3 topics with the highest engagement potential. "
            "For each topic provide: topic title, why it's trending, "
            "3-5 recommended hashtags, and competitor coverage gap."
        ),
        expected_output=(
            "A structured report with 3 recommended topics. Each topic includes: "
            "topic name, trending rationale, hashtag list, and content angle suggestion."
        ),
        agent=trend_researcher,
    )

def create_content_task(brand_voice: str, platforms: list) -> Task:
    return Task(
        description=(
            "Using the trending topics from the research report, generate post content. "
            f"For each of the 3 topics, create posts for these platforms: {', '.join(platforms)}. "
            "Each platform post needs TWO A/B variants. "
            f"Brand voice: {brand_voice}. "
            "Include: post text, hashtags, optimal posting times, and the hook/angle for each variant. "
            "Strictly respect character limits: Twitter 280, LinkedIn 3000."
        ),
        expected_output=(
            "A content plan with 3 topics Γ— 2 platforms Γ— 2 variants = 12 post variations. "
            "Each variation includes final text (within character limit), "
            "hashtags, posting time, and variant angle."
        ),
        agent=content_creator,
        context=[],  # Will be populated with research task output automatically
    )

def create_scheduling_task(dry_run: bool = True) -> Task:
    mode = "DRY RUN (simulate only, do not actually post)" if dry_run else "LIVE MODE (post to platforms)"
    return Task(
        description=(
            f"Schedule the content variations from the Content Creator. Mode: {mode}. "
            "For each post variation: "
            "1. Add it to the content calendar CSV with scheduled time. "
            "2. Post the 'variant_a' version to Twitter/X (or simulate in dry run). "
            "3. Add the LinkedIn posts to the calendar with status 'pending_review'. "
            "4. Update calendar entry status after each action. "
            "Report a final summary of all calendar entries created."
        ),
        expected_output=(
            "A scheduling summary listing every post added to the calendar, "
            "posting times, platforms, and status (posted/scheduled/pending_review). "
            "Include the content_calendar.csv entry IDs for each post."
        ),
        agent=scheduler,
    )

def run_social_media_crew(
    topic_query: str,
    competitors: str = "HubSpot, Salesforce, Marketo",
    brand_voice: str = None,
    platforms: list = None,
    dry_run: bool = True,
) -> str:
    """
    Run the full social media crew pipeline.
    Returns a text summary of all content created and scheduled.
    """
    if brand_voice is None:
        brand_voice = os.getenv(
            "BRAND_VOICE",
            "professional yet approachable, data-driven, uses concrete examples"
        )
    if platforms is None:
        platforms = ["twitter", "linkedin"]

    os.environ["DRY_RUN"] = "true" if dry_run else "false"

    research_task = create_research_task(topic_query, competitors)
    content_task = create_content_task(brand_voice, platforms)
    content_task.context = [research_task]

    scheduling_task = create_scheduling_task(dry_run)
    scheduling_task.context = [research_task, content_task]

    crew = Crew(
        agents=[trend_researcher, content_creator, scheduler],
        tasks=[research_task, content_task, scheduling_task],
        process=Process.sequential,
        verbose=True,
    )

    result = crew.kickoff()
    return str(result)

Step 5: Content Calendar Management#

The scheduler writes to a local CSV content calendar. Here is a utility to read and display it:

# calendar_manager.py
import csv
from datetime import datetime, timedelta
from typing import List, Optional

def get_scheduled_posts(
    status_filter: Optional[str] = None,
    platform_filter: Optional[str] = None,
    days_ahead: int = 7,
) -> List[dict]:
    """Return scheduled posts from the content calendar."""
    calendar_file = "content_calendar.csv"
    cutoff = datetime.utcnow() + timedelta(days=days_ahead)

    try:
        with open(calendar_file, "r", newline="", encoding="utf-8") as f:
            rows = list(csv.DictReader(f))
    except FileNotFoundError:
        return []

    filtered = []
    for row in rows:
        if status_filter and row.get("status") != status_filter:
            continue
        if platform_filter and row.get("platform") != platform_filter:
            continue
        try:
            scheduled = datetime.fromisoformat(row.get("scheduled_time", ""))
            if scheduled <= cutoff:
                filtered.append(row)
        except ValueError:
            filtered.append(row)  # Include entries with invalid dates

    return sorted(filtered, key=lambda r: r.get("scheduled_time", ""))

def print_calendar_summary():
    """Print a formatted weekly content calendar."""
    posts = get_scheduled_posts(days_ahead=7)
    if not posts:
        print("Content calendar is empty.")
        return

    print(f"\n{'='*60}")
    print(f"CONTENT CALENDAR β€” Next 7 Days ({len(posts)} posts)")
    print(f"{'='*60}")

    current_date = None
    for post in posts:
        try:
            dt = datetime.fromisoformat(post["scheduled_time"])
            date_str = dt.strftime("%A, %B %d")
        except ValueError:
            date_str = post["scheduled_time"]

        if date_str != current_date:
            print(f"\n{date_str}")
            current_date = date_str

        status_icon = {"scheduled": "πŸ•", "posted": "βœ…", "failed": "❌",
                       "pending_review": "πŸ‘"}.get(post.get("status", ""), "β€’")
        print(
            f"  {status_icon} [{post.get('platform', '').upper()}] "
            f"{post.get('scheduled_time', '')[11:16]} UTC β€” "
            f"{post.get('text', '')[:60]}..."
        )

Step 6: Automated Weekly Runner#

# weekly_runner.py
import schedule
import time
from crew import run_social_media_crew
from calendar_manager import print_calendar_summary

def weekly_content_run():
    """Run the content pipeline every Monday morning."""
    print("Starting weekly social media content generation...")
    result = run_social_media_crew(
        topic_query="AI agents enterprise software automation 2026",
        competitors="HubSpot, Salesforce, Notion",
        platforms=["twitter", "linkedin"],
        dry_run=False,  # Set to True for testing
    )
    print(result)
    print_calendar_summary()

# Schedule to run every Monday at 7 AM UTC
schedule.every().monday.at("07:00").do(weekly_content_run)

if __name__ == "__main__":
    print("Social media scheduler started. Running first batch now...")
    weekly_content_run()  # Run immediately on start

    print("Waiting for next scheduled run (every Monday 07:00 UTC)...")
    while True:
        schedule.run_pending()
        time.sleep(60)

Testing the Crew#

# tests/test_crew.py
import pytest
import os

def test_generate_post_variations_twitter():
    from tools.content_tools import generate_post_variations
    import json

    result = generate_post_variations(
        topic="AI agents transforming B2B sales",
        platform="twitter",
        brand_voice="professional, data-driven",
        hashtags="#AIAgents,#B2BSales,#SalesAutomation",
    )
    variants = json.loads(result)
    assert "variant_a" in variants
    assert "variant_b" in variants
    assert variants["variant_a"]["character_count"] <= 280
    assert variants["variant_b"]["character_count"] <= 280

def test_generate_post_variations_linkedin():
    from tools.content_tools import generate_post_variations
    import json

    result = generate_post_variations(
        topic="How AI agents reduce sales cycle by 30%",
        platform="linkedin",
        brand_voice="thought leadership, includes personal insight",
        hashtags="#AI,#SalesEnablement,#FutureOfWork",
    )
    variants = json.loads(result)
    assert variants["variant_a"]["platform"] == "linkedin"

def test_content_calendar_entry_creation(tmp_path, monkeypatch):
    monkeypatch.chdir(tmp_path)
    from tools.content_tools import create_content_calendar_entry

    result = create_content_calendar_entry(
        platform="twitter",
        post_text="Test post content for unit testing",
        hashtags="#Test,#AI",
        scheduled_time="2026-02-26T09:00:00",
    )
    assert "Calendar entry created" in result
    assert "ID:" in result

    # Verify CSV was written
    import csv
    with open("content_calendar.csv", "r") as f:
        rows = list(csv.DictReader(f))
    assert len(rows) == 1
    assert rows[0]["platform"] == "twitter"

def test_dry_run_twitter_post(monkeypatch):
    monkeypatch.setenv("DRY_RUN", "true")
    from tools.scheduler_tools import post_to_twitter

    result = post_to_twitter(text="This is a test tweet for dry run mode.")
    assert "DRY RUN" in result

def test_optimize_posting_time():
    from tools.content_tools import optimize_posting_time
    result = optimize_posting_time("twitter", "B2B professional audience")
    assert "UTC" in result
    assert "twitter" in result.lower()

Run tests:

DRY_RUN=true pytest tests/ -v

LinkedIn API Setup#

The LinkedIn API requires additional configuration compared to Twitter. Here are the key steps:

  1. Create a LinkedIn Developer App at developer.linkedin.com
  2. Request the w_member_social permission scope for posting
  3. Complete the OAuth 2.0 authorization flow to get an access token
  4. Find your Person URN by calling GET https://api.linkedin.com/v2/userinfo with your access token
  5. Set LINKEDIN_ACCESS_TOKEN and LINKEDIN_PERSON_URN in your .env file

LinkedIn access tokens expire after 60 days. For production, implement the refresh token flow or use a social media management platform API (Buffer, Hootsuite) that handles token refresh automatically.

Production Considerations#

Human Review Gate: Never auto-publish without review in production. The scheduler places LinkedIn posts in pending_review status by default. Build a simple web interface (or Slack bot) that surfaces pending posts for approval before final publishing.

Rate Limits: Twitter/X API v2 Free tier allows 1,500 tweets/month for posting. The Basic tier ($100/month) allows 10,000 tweets/month. The post_to_twitter tool will return error messages when limits are hit β€” wrap calls in try/except and implement a backoff queue.

Content Policy Compliance: Both platforms have automated spam detection. Avoid posting identical content across accounts, posting at inhuman speeds (slower than 10 minutes between posts is safer), or using hashtag spam (5–8 hashtags maximum on Twitter, 3–5 on LinkedIn).

A/B Test Tracking: The current implementation generates two variants but posts only variant A. To properly A/B test, alternate which variant posts on different days and track engagement via the Twitter/X API GET /2/tweets/:id endpoint for like/reply/retweet counts. Store results in your CSV calendar to build a performance dataset.

Cost Optimization: The crew uses GPT-4o for all three agents. For high-volume content generation (100+ posts/week), switch content_creator and scheduler to GPT-4o-mini to reduce costs by 15–20x with minimal quality difference for structured tasks.

Running the Agent#

# main.py
from crew import run_social_media_crew
from calendar_manager import print_calendar_summary

# Run in dry-run mode first to preview content
result = run_social_media_crew(
    topic_query="AI agents for enterprise software buyers 2026",
    competitors="Salesforce, HubSpot, Monday.com",
    brand_voice="insightful, conversational, backs claims with data",
    platforms=["twitter", "linkedin"],
    dry_run=True,  # Change to False to post live
)

print(result)
print_calendar_summary()

Frequently Asked Questions#

Can I add Instagram posting support? Instagram Graph API requires a Meta Business account and only allows posting to Instagram Business or Creator accounts. Add a post_to_instagram tool using the Graph API endpoint POST /{ig-user-id}/media followed by POST /{ig-user-id}/media_publish. The process has two steps: first create a media container, then publish it.

How do I handle platform-specific media attachments? Extend the PlatformPost model with an image_url field and update the posting tools to include media uploads. For Twitter, use tweepy.API.media_upload with OAuth 1.0a. For LinkedIn, use the LinkedIn Assets API to upload images before creating the post.

What if the Trend Researcher finds no relevant topics? The search_trending_topics tool returns a "No trending topics found" message if Tavily returns empty results. The Content Creator agent is instructed to fall back to evergreen topics related to the brand's core themes when research returns insufficient results. Add a fallback_topics list to your configuration.

How do I prevent the crew from re-posting content it already created? Before the Scheduler adds a calendar entry, it should check the CSV for posts with similar text using fuzzy matching. Add a check_duplicate_content tool that reads existing calendar entries and flags posts with more than 80% text similarity.

Can I run this on a serverless platform like AWS Lambda? CrewAI and LangChain work on Lambda but require packaging dependencies into a Lambda layer due to size constraints. The schedule library does not work on Lambda β€” replace it with an EventBridge scheduled rule that triggers the Lambda function. Use DynamoDB instead of a local CSV for the content calendar.

Next Steps#