CrewAI
Use Phoenix to trace and evaluate different CrewAI agent patterns
CrewAI is an open-source framework for building and orchestrating collaborative AI agents that act like a team of specialized virtual employees. Built on LangChain, it enables users to define roles, goals, and workflows for each agent, allowing them to work together autonomously on complex tasks with minimal setup.
Core Concepts of CrewAI
Agents
Agents are autonomous, role-driven entities designed to perform specific functions—like a Researcher, Writer, or Support Rep. They can be richly customized with goals, backstories, verbosity settings, delegation permissions, and access to tools. This flexibility makes agents expressive and task-aware, helping model real-world team dynamics.
Tasks
Tasks are the atomic units of work in CrewAI. Each task includes a description, expected output, responsible agent, and optional tools. Tasks can be executed solo or collaboratively, and they serve as the bridge between high-level goals and actionable steps.
Tools
Tools give agents capabilities beyond language generation—such as browsing the web, fetching documents, or performing calculations. Tools can be native or developer-defined using the BaseTool
class, and each must have a clear name and purpose so agents can invoke them appropriately.Tools must include clear descriptions to help agents use them effectively.
Processes
CrewAI supports multiple orchestration strategies:
Sequential: Tasks run in a fixed order—simple and predictable.
Hierarchical: A manager agent or LLM delegates tasks dynamically, enabling top-down workflows.
Consensual (planned): Future support for democratic, collaborative task routing. Each process type shapes how coordination and delegation unfold within a crew.
Crews
A crew is a collection of agents and tasks governed by a defined process. It represents a fully operational unit with an execution strategy, internal collaboration logic, and control settings for verbosity and output formatting. Think of it as the operating system for multi-agent workflows.
Pipelines
Pipelines chain multiple crews together, enabling multi-phase workflows where the output of one crew becomes the input to the next. This allows developers to modularize complex applications into reusable, composable segments of logic.
Planning
With planning enabled, CrewAI generates a task-by-task strategy before execution using an AgentPlanner. This enriches each task with context and sequencing logic, improving coordination—especially in multi-step or loosely defined workflows.
Design Considerations and Limitations
Agent Roles
Explicit role configuration gives flexibility, but poor design can cause overlap or miscommunication
State Management
Stateless by default. Developers must implement external state or context passing for continuity across tasks
Task Planning
Supports sequential and branching workflows, but all logic must be manually defined—no built-in planning
Tool Usage
Agents support tools via config. No automatic selection; all tool-to-agent mappings are manual
Termination Logic
No auto-termination handling. Developers must define explicit conditions to break recursive or looping behavior
Memory
No built-in memory layer. Integration with vector stores or databases must be handled externally
Agent Design Patterns
Prompt Chaining
Prompt chaining decomposes a complex task into a sequence of smaller steps, where each LLM call operates on the output of the previous one. This workflow introduces the ability to add programmatic checks (such as “gates”) between steps, validating intermediate outputs before continuing. The result is higher control, accuracy, and debuggability—at the cost of increased latency.
CrewAI makes it straightforward to build prompt chaining workflows using a sequential process. Each step is modeled as a Task
, assigned to a specialized Agent
, and executed in order using Process.sequential
. You can insert validation logic between tasks or configure agents to flag issues before passing outputs forward.
Notebook: Research-to-Content Prompt Chaining Workflow
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<center>\n",
" <p style=\"text-align:center\">\n",
" <img alt=\"phoenix logo\" src=\"https://raw.githubusercontent.com/Arize-ai/phoenix-assets/9e6101d95936f4bd4d390efc9ce646dc6937fb2d/images/socal/github-large-banner-phoenix.jpg\" width=\"1000\"/>\n",
" <br>\n",
" <br>\n",
" <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n",
" |\n",
" <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n",
" |\n",
" <a href=\"https://arize-ai.slack.com/join/shared_invite/zt-2w57bhem8-hq24MB6u7yE_ZF_ilOYSBw#/shared-invite/email\">Community</a>\n",
" </p>\n",
"</center>\n",
"<h1 align=\"center\">Tracing CrewAI with Arize Phoenix - Prompt Chaining Workflow</h1>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install -q arize-phoenix opentelemetry-sdk opentelemetry-exporter-otlp crewai crewai_tools openinference-instrumentation-crewai"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5-gPdVmIndw9"
},
"source": [
"# Set up Keys and Dependencies"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note: For this colab you'll need:\n",
"\n",
"* OpenAI API key (https://openai.com/)\n",
"* Serper API key (https://serper.dev/)\n",
"* Phoenix API key (https://app.phoenix.arize.com/)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import getpass\n",
"import os\n",
"\n",
"# Prompt the user for their API keys if they haven't been set\n",
"openai_key = os.getenv(\"OPENAI_API_KEY\", \"OPENAI_API_KEY\")\n",
"serper_key = os.getenv(\"SERPER_API_KEY\", \"SERPER_API_KEY\")\n",
"\n",
"if openai_key == \"OPENAI_API_KEY\":\n",
" openai_key = getpass.getpass(\"Please enter your OPENAI_API_KEY: \")\n",
"\n",
"if serper_key == \"SERPER_API_KEY\":\n",
" serper_key = getpass.getpass(\"Please enter your SERPER_API_KEY: \")\n",
"\n",
"# Set the environment variables with the provided keys\n",
"os.environ[\"OPENAI_API_KEY\"] = openai_key\n",
"os.environ[\"SERPER_API_KEY\"] = serper_key\n",
"\n",
"if \"PHOENIX_API_KEY\" not in os.environ:\n",
" os.environ[\"PHOENIX_API_KEY\"] = getpass.getpass(\"Enter your Phoenix API key: \")\n",
"\n",
"os.environ[\"PHOENIX_CLIENT_HEADERS\"] = f\"api_key={os.environ['PHOENIX_API_KEY']}\"\n",
"os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = \"https://app.phoenix.arize.com/\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "r9X87mdGnpbc"
},
"source": [
"## Configure Tracing"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from phoenix.otel import register\n",
"\n",
"tracer_provider = register(\n",
" project_name=\"crewai-agents\", endpoint=\"https://app.phoenix.arize.com/v1/traces\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "vYT-EU56ni94"
},
"source": [
"# Instrument CrewAI"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from openinference.instrumentation.crewai import CrewAIInstrumentor\n",
"\n",
"CrewAIInstrumentor().instrument(skip_dep_check=True, tracer_provider=tracer_provider)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define your Agents"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from crewai import Agent, Crew, Task\n",
"from crewai.process import Process\n",
"\n",
"research_analyst = Agent(\n",
" role=\"Senior Research Analyst\",\n",
" goal=\"Research cutting-edge AI topics and summarize the top 3 trends.\",\n",
" backstory=\"Expert in AI research and trend analysis.\",\n",
" verbose=True,\n",
")\n",
"\n",
"content_strategist = Agent(\n",
" role=\"Tech Content Strategist\",\n",
" goal=\"Create a structured article outline from the research.\",\n",
" backstory=\"Technical storyteller who crafts engaging outlines.\",\n",
" verbose=True,\n",
")\n",
"\n",
"content_reviewer = Agent(\n",
" role=\"Content Reviewer\",\n",
" goal=\"Validate outline for clarity, tone, and completeness.\",\n",
" backstory=\"Editorial expert with a focus on technical accuracy.\",\n",
" verbose=True,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define your Tasks"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"research_task = Task(\n",
" description=\"Summarize the top 3 trends in open-source LLM development.\",\n",
" agent=research_analyst,\n",
" expected_output=\"Bullet points of top 3 trends with brief explanations.\",\n",
")\n",
"\n",
"outline_task = Task(\n",
" description=\"Generate an article outline for CTOs based on the research.\",\n",
" agent=content_strategist,\n",
" expected_output=\"Outline with title, sections, and key points.\",\n",
")\n",
"\n",
"review_task = Task(\n",
" description=\"Review the outline for quality and alignment.\",\n",
" agent=content_reviewer,\n",
" expected_output=\"Reviewed outline with suggestions or approval.\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Crew"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"crew = Crew(\n",
" agents=[research_analyst, content_strategist, content_reviewer],\n",
" tasks=[research_task, outline_task, review_task],\n",
" process=Process.sequential,\n",
" verbose=True,\n",
" full_output=True,\n",
")\n",
"\n",
"result = crew.kickoff()\n",
"print(result)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fH0uVMgxpLql"
},
"source": [
"### Check your Phoenix project to view the traces and spans from your runs."
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Routing
Routing is a pattern designed to classify incoming requests and dispatch them to the single most appropriate specialist agent or workflow, ensuring each input is handled by a focused, expert-driven routine.
In CrewAI, you implement routing by defining a Router Agent that inspects each input, emits a category label, and then dynamically delegates to downstream agents (or crews) tailored for that category—each equipped with its own tools and prompts. This separation of concerns delivers more accurate, maintainable pipelines.
Notebook: Research-Content Routing Workflow
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<center>\n",
" <p style=\"text-align:center\">\n",
" <img alt=\"phoenix logo\" src=\"https://raw.githubusercontent.com/Arize-ai/phoenix-assets/9e6101d95936f4bd4d390efc9ce646dc6937fb2d/images/socal/github-large-banner-phoenix.jpg\" width=\"1000\"/>\n",
" <br>\n",
" <br>\n",
" <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n",
" |\n",
" <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n",
" |\n",
" <a href=\"https://arize-ai.slack.com/join/shared_invite/zt-2w57bhem8-hq24MB6u7yE_ZF_ilOYSBw#/shared-invite/email\">Community</a>\n",
" </p>\n",
"</center>\n",
"<h1 align=\"center\">Tracing CrewAI with Arize Phoenix - Routing Workflow</h1>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install -q arize-phoenix opentelemetry-sdk opentelemetry-exporter-otlp crewai crewai_tools openinference-instrumentation-crewai"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5-gPdVmIndw9"
},
"source": [
"## Set up Keys and Dependencies"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note: For this colab you'll need:\n",
"\n",
"* OpenAI API key (https://openai.com/)\n",
"* Serper API key (https://serper.dev/)\n",
"* Phoenix API key (https://app.phoenix.arize.com/)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import getpass\n",
"import os\n",
"\n",
"# Prompt the user for their API keys if they haven't been set\n",
"openai_key = os.getenv(\"OPENAI_API_KEY\", \"OPENAI_API_KEY\")\n",
"serper_key = os.getenv(\"SERPER_API_KEY\", \"SERPER_API_KEY\")\n",
"\n",
"if openai_key == \"OPENAI_API_KEY\":\n",
" openai_key = getpass.getpass(\"Please enter your OPENAI_API_KEY: \")\n",
"\n",
"if serper_key == \"SERPER_API_KEY\":\n",
" serper_key = getpass.getpass(\"Please enter your SERPER_API_KEY: \")\n",
"\n",
"# Set the environment variables with the provided keys\n",
"os.environ[\"OPENAI_API_KEY\"] = openai_key\n",
"os.environ[\"SERPER_API_KEY\"] = serper_key\n",
"\n",
"if \"PHOENIX_API_KEY\" not in os.environ:\n",
" os.environ[\"PHOENIX_API_KEY\"] = getpass.getpass(\"Enter your Phoenix API key: \")\n",
"\n",
"os.environ[\"PHOENIX_CLIENT_HEADERS\"] = f\"api_key={os.environ['PHOENIX_API_KEY']}\"\n",
"os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = \"https://app.phoenix.arize.com/\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "r9X87mdGnpbc"
},
"source": [
"## Configure Tracing"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from phoenix.otel import register\n",
"\n",
"tracer_provider = register(\n",
" project_name=\"crewai-agents\", endpoint=\"https://app.phoenix.arize.com/v1/traces\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "vYT-EU56ni94"
},
"source": [
"# Instrument CrewAI"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from openinference.instrumentation.crewai import CrewAIInstrumentor\n",
"\n",
"CrewAIInstrumentor().instrument(skip_dep_check=True, tracer_provider=tracer_provider)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define your Working Agents"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import nest_asyncio\n",
"from crewai import Agent, Crew, Process, Task\n",
"from crewai.flow import Flow, listen, router, start\n",
"from pydantic import BaseModel\n",
"\n",
"research_analyst = Agent(\n",
" role=\"Senior Research Analyst\",\n",
" goal=\"Gather and summarize data on the requested topic.\",\n",
" backstory=\"Expert in tech market trends.\",\n",
" allow_delegation=False,\n",
")\n",
"\n",
"content_strategist = Agent(\n",
" role=\"Tech Content Strategist\",\n",
" goal=\"Craft an article outline based on provided research.\",\n",
" backstory=\"Storyteller who turns data into narratives.\",\n",
" allow_delegation=False,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"From here, there are two ways to do this -- through a routing Agent or through ```@router()``` decorator in Flows (allows you to define conditional routing logic based on the output of a method)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Option 1: Define your logic for Router Agent to classify the query & run corresponding Agent"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"routerAgent = Agent(\n",
" role=\"Router\",\n",
" goal=\"Classify each query as either 'research' or 'content outline'.\",\n",
" backstory=\"Triage bot for content workflows.\",\n",
" verbose=False,\n",
")\n",
"\n",
"\n",
"def route(user_input: str, router):\n",
" router_task = Task(\n",
" description=user_input, agent=router, expected_output=\"One word: 'research' or 'content'\"\n",
" )\n",
" router_classify = Crew(\n",
" agents=[router], tasks=[router_task], process=Process.sequential, verbose=False\n",
" )\n",
" router_results = router_classify.kickoff()\n",
" return router_results\n",
"\n",
"\n",
"def type_of_task(router_results):\n",
" if isinstance(router_results, list):\n",
" result = router_results[0]\n",
" result_text = result.text if hasattr(result, \"text\") else str(result)\n",
" else:\n",
" result_text = (\n",
" router_results.text if hasattr(router_results, \"text\") else str(router_results)\n",
" )\n",
" task_type = result_text.strip().lower()\n",
"\n",
" return task_type\n",
"\n",
"\n",
"def working_agent(task_type, user_input: str):\n",
" if \"research\" in task_type:\n",
" agent = research_analyst\n",
" label = \"Research Analyst\"\n",
" else:\n",
" agent = content_strategist\n",
" label = \"Content Strategist\"\n",
"\n",
" work_task = Task(description=user_input, agent=agent, expected_output=\"Agent response\")\n",
" worker_crew = Crew(agents=[agent], tasks=[work_task], process=Process.sequential, verbose=True)\n",
" work_results = worker_crew.kickoff()\n",
" if isinstance(work_results, list):\n",
" output = work_results[0].text if hasattr(work_results[0], \"text\") else str(work_results[0])\n",
" else:\n",
" output = work_results.text if hasattr(work_results, \"text\") else str(work_results)\n",
"\n",
" print(f\"\\n=== Routed to {label} ({task_type}) ===\\n{output}\\n\")\n",
" return output"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Examples Runs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# âââ 4) Example Runs âââââââââââââââââââââââââââââââââââââââââââââââââââââââââ\n",
"for query in [\n",
" \"Please research the latest AI safety papers.\",\n",
" \"Outline an article on AI safety trends.\",\n",
"]:\n",
" router_output = route(query, routerAgent)\n",
" task_output = type_of_task(router_output)\n",
" working_agent(task_output, query)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Option 2: Define your logic for ```@router()``` Decorator to define routing logic"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"nest_asyncio.apply()\n",
"\n",
"\n",
"# Define Flow State\n",
"class RoutingState(BaseModel):\n",
" query: str = \"\"\n",
" route: str = \"\"\n",
"\n",
"\n",
"# Define Structured Flow\n",
"class RoutingFlow(Flow[RoutingState]):\n",
" def __init__(self, query: str):\n",
" super().__init__(state=RoutingState(query=query))\n",
"\n",
" @start()\n",
" def handle_query(self):\n",
" print(f\"ð¥ Incoming Query: {self.state.query}\")\n",
"\n",
" @router(handle_query)\n",
" def decide_route(self):\n",
" if \"research\" in self.state.query.lower():\n",
" self.state.route = \"research\"\n",
" return \"research\"\n",
" else:\n",
" self.state.route = \"outline\"\n",
" return \"outline\"\n",
"\n",
" @listen(\"research\")\n",
" def run_research(self):\n",
" task = Task(\n",
" description=self.state.query,\n",
" expected_output=\"Summary of findings on AI safety\",\n",
" agent=research_analyst,\n",
" )\n",
" crew = Crew(\n",
" agents=[research_analyst], tasks=[task], process=Process.sequential, verbose=True\n",
" )\n",
" crew.kickoff()\n",
"\n",
" @listen(\"outline\")\n",
" def run_content_strategy(self):\n",
" task = Task(\n",
" description=self.state.query,\n",
" expected_output=\"An article outline about the given topic\",\n",
" agent=content_strategist,\n",
" )\n",
" crew = Crew(\n",
" agents=[content_strategist], tasks=[task], process=Process.sequential, verbose=True\n",
" )\n",
" crew.kickoff()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Examples Runs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"queries = [\n",
" \"Please research the latest AI safety papers.\",\n",
" \"Outline an article on AI safety trends.\",\n",
"]\n",
"\n",
"for query in queries:\n",
" flow = RoutingFlow(query=query)\n",
" flow.kickoff()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fH0uVMgxpLql"
},
"source": [
"### Check your Phoenix project to view the traces and spans from your runs."
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Parallelization
Parallelization is a powerful agent workflow where multiple tasks are executed simultaneously, enabling faster and more scalable LLM pipelines. This pattern is particularly effective when tasks are independent and don’t depend on each other’s outputs.
While CrewAI does not enforce true multithreaded execution, it provides a clean and intuitive structure for defining parallel logic through multiple agents and tasks. These can be executed concurrently in terms of logic, and then gathered or synthesized by a downstream agent.
Notebook: Parallel Research Agent
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<center>\n",
" <p style=\"text-align:center\">\n",
" <img alt=\"phoenix logo\" src=\"https://raw.githubusercontent.com/Arize-ai/phoenix-assets/9e6101d95936f4bd4d390efc9ce646dc6937fb2d/images/socal/github-large-banner-phoenix.jpg\" width=\"1000\"/>\n",
" <br>\n",
" <br>\n",
" <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n",
" |\n",
" <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n",
" |\n",
" <a href=\"https://join.slack.com/t/arize-ai/shared_invite/zt-1px8dcmlf-fmThhDFD_V_48oU7ALan4Q\">Community</a>\n",
" </p>\n",
"</center>\n",
"<h1 align=\"center\">Tracing CrewAI with Arize Phoenix - Parallelization Workflow</h1>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install -q arize-phoenix opentelemetry-sdk opentelemetry-exporter-otlp crewai crewai_tools openinference-instrumentation-crewai"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5-gPdVmIndw9"
},
"source": [
"# Set up Keys and Dependencies"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note: For this colab you'll need:\n",
"\n",
"* OpenAI API key (https://openai.com/)\n",
"* Serper API key (https://serper.dev/)\n",
"* Phoenix API key (https://app.phoenix.arize.com/)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import getpass\n",
"import os\n",
"\n",
"# Prompt the user for their API keys if they haven't been set\n",
"openai_key = os.getenv(\"OPENAI_API_KEY\", \"OPENAI_API_KEY\")\n",
"serper_key = os.getenv(\"SERPER_API_KEY\", \"SERPER_API_KEY\")\n",
"\n",
"if openai_key == \"OPENAI_API_KEY\":\n",
" openai_key = getpass.getpass(\"Please enter your OPENAI_API_KEY: \")\n",
"\n",
"if serper_key == \"SERPER_API_KEY\":\n",
" serper_key = getpass.getpass(\"Please enter your SERPER_API_KEY: \")\n",
"\n",
"# Set the environment variables with the provided keys\n",
"os.environ[\"OPENAI_API_KEY\"] = openai_key\n",
"os.environ[\"SERPER_API_KEY\"] = serper_key\n",
"\n",
"if \"PHOENIX_API_KEY\" not in os.environ:\n",
" os.environ[\"PHOENIX_API_KEY\"] = getpass.getpass(\"Enter your Phoenix API key: \")\n",
"\n",
"os.environ[\"PHOENIX_CLIENT_HEADERS\"] = f\"api_key={os.environ['PHOENIX_API_KEY']}\"\n",
"os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = \"https://app.phoenix.arize.com/\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "r9X87mdGnpbc"
},
"source": [
"## Configure Tracing"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from phoenix.otel import register\n",
"\n",
"tracer_provider = register(\n",
" project_name=\"crewai-agents\", endpoint=\"https://app.phoenix.arize.com/v1/traces\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "vYT-EU56ni94"
},
"source": [
"# Instrument CrewAI"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from openinference.instrumentation.crewai import CrewAIInstrumentor\n",
"\n",
"CrewAIInstrumentor().instrument(skip_dep_check=True, tracer_provider=tracer_provider)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define your Agents"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from crewai import Agent, Crew, Task\n",
"from crewai.process import Process\n",
"\n",
"researcher_1 = Agent(\n",
" role=\"LLM Researcher A\",\n",
" goal=\"Research trend #1 in AI and summarize it clearly.\",\n",
" backstory=\"Specializes in model safety and governance.\",\n",
" verbose=True,\n",
")\n",
"\n",
"researcher_2 = Agent(\n",
" role=\"LLM Researcher B\",\n",
" goal=\"Research trend #2 in AI and summarize it clearly.\",\n",
" backstory=\"Expert in multimodal and frontier models.\",\n",
" verbose=True,\n",
")\n",
"\n",
"researcher_3 = Agent(\n",
" role=\"LLM Researcher C\",\n",
" goal=\"Research trend #3 in AI and summarize it clearly.\",\n",
" backstory=\"Focused on AI policy and alignment.\",\n",
" verbose=True,\n",
")\n",
"\n",
"aggregator = Agent(\n",
" role=\"Aggregator\",\n",
" goal=\"Combine and synthesize all research into a single summary report.\",\n",
" backstory=\"Information architect skilled at summarizing multiple sources.\",\n",
" verbose=True,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define your Tasks"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define parallel research tasks\n",
"task1 = Task(\n",
" description=\"Summarize a major trend in AI safety and model alignment.\",\n",
" expected_output=\"Concise summary of trend #1\",\n",
" agent=researcher_1,\n",
")\n",
"\n",
"task2 = Task(\n",
" description=\"Summarize a key innovation in multimodal or frontier AI systems.\",\n",
" expected_output=\"Concise summary of trend #2\",\n",
" agent=researcher_2,\n",
")\n",
"\n",
"task3 = Task(\n",
" description=\"Summarize a current topic in AI policy, regulation, or social impact.\",\n",
" expected_output=\"Concise summary of trend #3\",\n",
" agent=researcher_3,\n",
")\n",
"\n",
"# Aggregation task\n",
"aggregation_task = Task(\n",
" description=\"Combine the three AI trend summaries into a cohesive single report.\",\n",
" expected_output=\"A synthesized report capturing all three trends.\",\n",
" agent=aggregator,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Crew"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"crew = Crew(\n",
" agents=[researcher_1, researcher_2, researcher_3, aggregator],\n",
" tasks=[task1, task2, task3, aggregation_task],\n",
" process=Process.sequential,\n",
" verbose=True,\n",
")\n",
"\n",
"result = crew.kickoff()\n",
"print(result)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fH0uVMgxpLql"
},
"source": [
"### Check your Phoenix project to view the traces and spans from your runs."
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Orchestrator-Workers
The Orchestrator-Workers workflow centers around a primary agent—the orchestrator—that dynamically decomposes a complex task into smaller, more manageable subtasks. Rather than relying on a fixed structure or pre-defined subtasks, the orchestrator decides what needs to be done based on the input itself. It then delegates each piece to the most relevant worker agent, often specialized in a particular domain like research, content synthesis, or evaluation.
CrewAI supports this pattern using the Process.hierarchical
setup, where the orchestrator (as the manager agent) generates follow-up task specifications at runtime. This enables dynamic delegation and coordination without requiring the workflow to be rigidly structured up front. It's especially useful for use cases like multi-step research, document generation, or problem-solving workflows where the best structure only emerges after understanding the initial query.
Notebook: Research & Writing Delegation Agents
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<center>\n",
" <p style=\"text-align:center\">\n",
" <img alt=\"phoenix logo\" src=\"https://raw.githubusercontent.com/Arize-ai/phoenix-assets/9e6101d95936f4bd4d390efc9ce646dc6937fb2d/images/socal/github-large-banner-phoenix.jpg\" width=\"1000\"/>\n",
" <br>\n",
" <br>\n",
" <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n",
" |\n",
" <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n",
" |\n",
" <a href=\"https://arize-ai.slack.com/join/shared_invite/zt-2w57bhem8-hq24MB6u7yE_ZF_ilOYSBw#/shared-invite/email\">Community</a>\n",
" </p>\n",
"</center>\n",
"<h1 align=\"center\">Tracing CrewAI with Arize Phoenix - Orchestrator-Workers Workflow</h1>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install -q arize-phoenix opentelemetry-sdk opentelemetry-exporter-otlp crewai crewai_tools openinference-instrumentation-crewai"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5-gPdVmIndw9"
},
"source": [
"# Set up Keys and Dependencies"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note: For this colab you'll need:\n",
"\n",
"* OpenAI API key (https://openai.com/)\n",
"* Serper API key (https://serper.dev/)\n",
"* Phoenix API key (https://app.phoenix.arize.com/)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import getpass\n",
"import os\n",
"\n",
"# Prompt the user for their API keys if they haven't been set\n",
"openai_key = os.getenv(\"OPENAI_API_KEY\", \"OPENAI_API_KEY\")\n",
"serper_key = os.getenv(\"SERPER_API_KEY\", \"SERPER_API_KEY\")\n",
"\n",
"if openai_key == \"OPENAI_API_KEY\":\n",
" openai_key = getpass.getpass(\"Please enter your OPENAI_API_KEY: \")\n",
"\n",
"if serper_key == \"SERPER_API_KEY\":\n",
" serper_key = getpass.getpass(\"Please enter your SERPER_API_KEY: \")\n",
"\n",
"# Set the environment variables with the provided keys\n",
"os.environ[\"OPENAI_API_KEY\"] = openai_key\n",
"os.environ[\"SERPER_API_KEY\"] = serper_key\n",
"\n",
"if \"PHOENIX_API_KEY\" not in os.environ:\n",
" os.environ[\"PHOENIX_API_KEY\"] = getpass.getpass(\"Enter your Phoenix API key: \")\n",
"\n",
"os.environ[\"PHOENIX_CLIENT_HEADERS\"] = f\"api_key={os.environ['PHOENIX_API_KEY']}\"\n",
"os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = \"https://app.phoenix.arize.com/\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "r9X87mdGnpbc"
},
"source": [
"## Configure Tracing"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from phoenix.otel import register\n",
"\n",
"tracer_provider = register(\n",
" project_name=\"crewai-agents\", endpoint=\"https://app.phoenix.arize.com/v1/traces\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "vYT-EU56ni94"
},
"source": [
"# Instrument CrewAI"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from openinference.instrumentation.crewai import CrewAIInstrumentor\n",
"\n",
"CrewAIInstrumentor().instrument(skip_dep_check=True, tracer_provider=tracer_provider)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define your Agents"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from crewai import Agent, Crew, Task\n",
"\n",
"# Define worker agents\n",
"trend_researcher = Agent(\n",
" role=\"AI Trend Researcher\",\n",
" goal=\"Analyze current advancements in AI\",\n",
" backstory=\"Expert in tracking and analyzing new trends in artificial intelligence.\",\n",
" verbose=True,\n",
")\n",
"\n",
"policy_analyst = Agent(\n",
" role=\"AI Policy Analyst\",\n",
" goal=\"Examine the implications of AI regulations and governance\",\n",
" backstory=\"Tracks AI policy developments across governments and organizations.\",\n",
" verbose=True,\n",
")\n",
"\n",
"risk_specialist = Agent(\n",
" role=\"AI Risk Specialist\",\n",
" goal=\"Identify potential risks in frontier AI development\",\n",
" backstory=\"Focuses on safety, alignment, and misuse risks related to advanced AI.\",\n",
" verbose=True,\n",
")\n",
"\n",
"synthesizer = Agent(\n",
" role=\"Synthesis Writer\",\n",
" goal=\"Summarize all findings into a final cohesive report\",\n",
" backstory=\"Expert at compiling research insights into executive-level narratives.\",\n",
" verbose=True,\n",
")\n",
"\n",
"orchestrator = Agent(\n",
" role=\"Orchestrator\",\n",
" goal=(\n",
" \"Your job is to delegate research and writing tasks to the correct coworker using the 'Delegate work to coworker' tool.\\n\"\n",
" \"For each task you assign, you MUST call the tool with the following JSON input:\\n\\n\"\n",
" \"{\\n\"\n",
" ' \"task\": \"Short summary of the task to do (plain string)\",\\n'\n",
" ' \"context\": \"Why this task is important or part of the report (plain string)\",\\n'\n",
" ' \"coworker\": \"One of: AI Trend Researcher, AI Policy Analyst, AI Risk Specialist, Synthesis Writer\"\\n'\n",
" \"}\\n\\n\"\n",
" \"IMPORTANT:\\n\"\n",
" \"- Do NOT format 'task' or 'context' as dictionaries.\\n\"\n",
" \"- Do NOT include types or nested descriptions.\\n\"\n",
" \"- Only use plain strings for both.\\n\"\n",
" \"- Call the tool multiple times, one per coworker.\"\n",
" ),\n",
" backstory=\"You are responsible for assigning each part of an AI report to the right specialist.\",\n",
" verbose=True,\n",
" allow_delegation=True,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define your Tasks"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define the initial task only for the orchestrator\n",
"initial_task = Task(\n",
" description=\"Create an AI trends report. It should include recent innovations, policy updates, and safety risks. Then synthesize it into a unified summary.\",\n",
" expected_output=\"Assign subtasks via the DelegateWorkTool and return a final report.\",\n",
" agent=orchestrator,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Crew"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"crew = Crew(\n",
" agents=[trend_researcher, policy_analyst, risk_specialist, synthesizer],\n",
" tasks=[initial_task],\n",
" manager_agent=orchestrator,\n",
" verbose=True,\n",
")\n",
"# Run the full workflow\n",
"result = crew.kickoff()\n",
"print(result)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fH0uVMgxpLql"
},
"source": [
"### Check your Phoenix project to view the traces and spans from your runs."
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Last updated
Was this helpful?