The following frameworks have built-in support for agent metadata through their auto-instrumentors:
LangGraph
Automatically tracks agent nodes and graph transitions
Uses native metadata.langgraph_node and metadata.langgraph_step
Handles agent metadata and state transitions
No additional implementation needed
AutoGen
Automatically tracks metadata.agent_name and metadata.next_agent
Handles agent handoffs through _handoffs tracking
No additional implementation needed
CrewAI
Automatically tracks agent roles and task relationships
Includes agent metadata in spans
No additional implementation needed
OpenAI Agents
Handles agent metadata via OpenInferenceTracingProcessor
Tracks handoffs between agents
No additional implementation needed
Agno
Tracks agent names and team relationships
Includes agent metadata in spans
No additional implementation needed
When Custom Implementation Is Needed
Custom agent metadata tracking is required when:
Using frameworks without built-in support:
Vanilla OpenAI / Anthropic calls
Custom agent implementations
LangChain without agent components
Other unsupported frameworks
Using hybrid instrumentation:
Mixing auto-instrumented frameworks with custom code
Building custom agents that interact with instrumented frameworks
Required Metadata Attributes
To enable agent visualization in Arize, include one of the following attribute sets:
Agents
metadata.agent_name: Identifies the agent or component (required for all spans)
metadata.next_agent: Indicates the next agent(s). Supported formats:
Single agent: "next_agent_name"
Multiple agents: "agent1,agent2,agent3" (comma-delimited, no spaces)
Nodes
metadata.node_name: Identifies a higher-level component of system work
metadata.next_node: Indicates the next node(s). Supported formats:
Single node: "next_node_name"
Multiple nodes: "node1,node2,node3" (comma-delimited, no spaces)
Example: Multiple Transitions
class ParallelAgentOrchestrator:
def execute(self, input_data: str):
with self.tracer.start_as_current_span(
f"{self.name}.execute",
attributes={
"metadata.agent_name": self.name,
"metadata.next_agent": "research_agent,translation_agent,summary_agent"
}
) as span:
results = self.execute_parallel_agents(input_data)
return results
class ForkingAgentNode:
def _determine_next_nodes(self, result: AgentResponse) -> str:
if result.requires_review:
return "review_node,notification_node"
elif result.is_complete:
return "completion_node,analytics_node"
return "default_node"
def execute(self, input_data: str):
with self.tracer.start_as_current_span(
f"{self.name}.execute",
attributes={"metadata.node_name": self.name}
) as span:
result = self.process(input_data)
next_nodes = self._determine_next_nodes(result)
span.set_attribute("metadata.next_node", next_nodes)
return result
Best Practices for Multiple Transitions
1. Format
Use comma-delimited strings without spaces: "agent1,agent2,agent3"
Follow consistent naming conventions
Order doesn't affect visualization but can aid readability
def set_next_agents(span: Span, next_agents: List[str]):
valid_agents = [a for a in next_agents if a]
if valid_agents:
span.set_attribute("metadata.next_agent", ",".join(valid_agents))
2. Validation
def validate_next_agents(agents: List[str]) -> bool:
if not agents:
return False
if len(agents) > MAX_PARALLEL_AGENTS:
raise ValueError(f"Too many parallel agents: {len(agents)}")
return all(agent in AgentNames.__members__ for agent in agents)
def set_validated_next_agents(span: Span, next_agents: List[str]):
if validate_next_agents(next_agents):
span.set_attribute("metadata.next_agent", ",".join(next_agents))
3. State Management
class ParallelStateManager:
def __init__(self):
self.parallel_states: Dict[str, Dict[str, Any]] = {}
def register_parallel_transition(self, parent_id: str, next_agents: List[str]):
self.parallel_states[parent_id] = {
"agents": next_agents,
"completed": set(),
"failed": set()
}
def mark_completed(self, parent_id: str, agent_name: str):
if parent_id in self.parallel_states:
self.parallel_states[parent_id]["completed"].add(agent_name)
def all_completed(self, parent_id: str) -> bool:
state = self.parallel_states.get(parent_id)
return state and len(state["completed"]) == len(state["agents"])
4. Error Handling
try:
next_agents = ["agent1", "agent2", "agent3"]
span.set_attribute("metadata.next_agent", ",".join(next_agents))
results = await asyncio.gather(
*[self.execute_agent(agent) for agent in next_agents],
return_exceptions=True
)
failed_agents = [
agent for agent, result in zip(next_agents, results)
if isinstance(result, Exception)
]
if failed_agents:
span.set_attribute("agent.failed_transitions", ",".join(failed_agents))
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
State Management Patterns
1. Response-Based State
@dataclass
class AgentResponse:
content: str
metadata: Dict[str, Any]
next_agent: Optional[str] = None
class BaseAgent:
def execute(self, input_data: str) -> AgentResponse:
with self.tracer.start_as_current_span(
f"{self.name}.execute",
attributes={"metadata.agent_name": self.name}
) as span:
result = self._run_agent_logic(input_data)
if result.next_agent:
span.set_attribute("metadata.next_agent", result.next_agent)
return result
2. Orchestrator-Based State
class AgentOrchestrator:
def __init__(self):
self.agents = {}
self.current_agent = None
self.tracer = trace.get_tracer(__name__)
def register_agent(self, name: str, agent: Any):
self.agents[name] = agent
def execute(self, start_agent: str, input_data: str):
self.current_agent = start_agent
result = None
while self.current_agent:
agent = self.agents[self.current_agent]
with self.tracer.start_as_current_span(
f"{self.current_agent}.execute",
attributes={"metadata.agent_name": self.current_agent}
) as span:
result = agent.execute(input_data)
next_agent = result.next_agent
if next_agent:
span.set_attribute("metadata.next_agent", next_agent)
self.current_agent = next_agent
input_data = result.content
return result
3. Graph-Based State
class AgentNode:
def __init__(self, name: str, agent: Any, next_nodes: List[str] = None):
self.name = name
self.agent = agent
self.next_nodes = next_nodes or []
self.tracer = trace.get_tracer(__name__)
def execute(self, input_data: str) -> AgentResponse:
with self.tracer.start_as_current_span(
f"{self.name}.execute",
attributes={"metadata.node_name": self.name}
) as span:
result = self.agent.execute(input_data)
next_node = self._determine_next_node(result)
if next_node:
span.set_attribute("metadata.next_node", next_node)
return result
class AgentGraph:
def __init__(self):
self.nodes: Dict[str, AgentNode] = {}
def add_node(self, name: str, agent: Any, next_nodes: List[str] = None):
self.nodes[name] = AgentNode(name, agent, next_nodes)
Best Practices for State Management
1. Consistent Naming
from enum import Enum
class AgentNames(Enum):
SUPERVISOR = "supervisor_agent"
RESEARCH = "research_agent"
WRITING = "writing_agent"
REVIEW = "review_agent"
span.set_attribute("metadata.agent_name", AgentNames.SUPERVISOR.value)
2. State Validation
def set_next_agent(span: Span, next_agent: Optional[str]):
if next_agent and next_agent in AgentNames.__members__:
span.set_attribute("metadata.next_agent", next_agent)
elif next_agent:
raise ValueError(f"Invalid next agent: {next_agent}")
3. Error Handling
try:
result = agent.execute(input_data)
if result.next_agent:
span.set_attribute("metadata.next_agent", result.next_agent)
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
error_handler = self.get_error_handler()
if error_handler:
span.set_attribute("metadata.next_agent", error_handler)
4. Testing State Transitions
def test_agent_transitions():
orchestrator = AgentOrchestrator()
result = orchestrator.execute("start_agent", "test input")
spans = get_spans() # Fetch spans from your tracing framework
for span in spans:
agent_name = span.attributes.get("metadata.agent_name")
next_agent = span.attributes.get("metadata.next_agent")
assert agent_name in AgentNames.__members__
if next_agent:
assert next_agent in AgentNames.__members__