Implementing Agent Metadata for Arize

Frameworks with Built-In Support

The following frameworks have built-in support for agent metadata through their auto-instrumentors:

  1. LangGraph

    • Automatically tracks agent nodes and graph transitions

    • Uses native metadata.langgraph_node and metadata.langgraph_step

    • Handles agent metadata and state transitions

    • No additional implementation needed

  2. AutoGen

    • Automatically tracks metadata.agent_name and metadata.next_agent

    • Handles agent handoffs through _handoffs tracking

    • No additional implementation needed

  3. CrewAI

    • Automatically tracks agent roles and task relationships

    • Includes agent metadata in spans

    • No additional implementation needed

  4. OpenAI Agents

    • Handles agent metadata via OpenInferenceTracingProcessor

    • Tracks handoffs between agents

    • No additional implementation needed

  5. Agno

    • Tracks agent names and team relationships

    • Includes agent metadata in spans

    • No additional implementation needed

When Custom Implementation Is Needed

Custom agent metadata tracking is required when:

  1. Using frameworks without built-in support:

    • Vanilla OpenAI / Anthropic calls

    • Custom agent implementations

    • LangChain without agent components

    • Other unsupported frameworks

  2. Using hybrid instrumentation:

    • Mixing auto-instrumented frameworks with custom code

    • Building custom agents that interact with instrumented frameworks

Required Metadata Attributes

To enable agent visualization in Arize, include one of the following attribute sets:

Agents

  • metadata.agent_name: Identifies the agent or component (required for all spans)

  • metadata.next_agent: Indicates the next agent(s). Supported formats:

    • Single agent: "next_agent_name"

    • Multiple agents: "agent1,agent2,agent3" (comma-delimited, no spaces)

Nodes

  • metadata.node_name: Identifies a higher-level component of system work

  • metadata.next_node: Indicates the next node(s). Supported formats:

    • Single node: "next_node_name"

    • Multiple nodes: "node1,node2,node3" (comma-delimited, no spaces)

Example: Multiple Transitions

class ParallelAgentOrchestrator:
    def execute(self, input_data: str):
        with self.tracer.start_as_current_span(
            f"{self.name}.execute",
            attributes={
                "metadata.agent_name": self.name,
                "metadata.next_agent": "research_agent,translation_agent,summary_agent"
            }
        ) as span:
            results = self.execute_parallel_agents(input_data)
            return results

class ForkingAgentNode:
    def _determine_next_nodes(self, result: AgentResponse) -> str:
        if result.requires_review:
            return "review_node,notification_node"
        elif result.is_complete:
            return "completion_node,analytics_node"
        return "default_node"

    def execute(self, input_data: str):
        with self.tracer.start_as_current_span(
            f"{self.name}.execute",
            attributes={"metadata.node_name": self.name}
        ) as span:
            result = self.process(input_data)
            next_nodes = self._determine_next_nodes(result)
            span.set_attribute("metadata.next_node", next_nodes)
            return result

Best Practices for Multiple Transitions

1. Format

  • Use comma-delimited strings without spaces: "agent1,agent2,agent3"

  • Follow consistent naming conventions

  • Order doesn't affect visualization but can aid readability

def set_next_agents(span: Span, next_agents: List[str]):
    valid_agents = [a for a in next_agents if a]
    if valid_agents:
        span.set_attribute("metadata.next_agent", ",".join(valid_agents))

2. Validation

def validate_next_agents(agents: List[str]) -> bool:
    if not agents:
        return False
    if len(agents) > MAX_PARALLEL_AGENTS:
        raise ValueError(f"Too many parallel agents: {len(agents)}")
    return all(agent in AgentNames.__members__ for agent in agents)

def set_validated_next_agents(span: Span, next_agents: List[str]):
    if validate_next_agents(next_agents):
        span.set_attribute("metadata.next_agent", ",".join(next_agents))

3. State Management

class ParallelStateManager:
    def __init__(self):
        self.parallel_states: Dict[str, Dict[str, Any]] = {}

    def register_parallel_transition(self, parent_id: str, next_agents: List[str]):
        self.parallel_states[parent_id] = {
            "agents": next_agents,
            "completed": set(),
            "failed": set()
        }

    def mark_completed(self, parent_id: str, agent_name: str):
        if parent_id in self.parallel_states:
            self.parallel_states[parent_id]["completed"].add(agent_name)

    def all_completed(self, parent_id: str) -> bool:
        state = self.parallel_states.get(parent_id)
        return state and len(state["completed"]) == len(state["agents"])

4. Error Handling

try:
    next_agents = ["agent1", "agent2", "agent3"]
    span.set_attribute("metadata.next_agent", ",".join(next_agents))

    results = await asyncio.gather(
        *[self.execute_agent(agent) for agent in next_agents],
        return_exceptions=True
    )

    failed_agents = [
        agent for agent, result in zip(next_agents, results)
        if isinstance(result, Exception)
    ]
    if failed_agents:
        span.set_attribute("agent.failed_transitions", ",".join(failed_agents))

except Exception as e:
    span.set_status(Status(StatusCode.ERROR, str(e)))
    span.record_exception(e)

State Management Patterns

1. Response-Based State

@dataclass
class AgentResponse:
    content: str
    metadata: Dict[str, Any]
    next_agent: Optional[str] = None

class BaseAgent:
    def execute(self, input_data: str) -> AgentResponse:
        with self.tracer.start_as_current_span(
            f"{self.name}.execute",
            attributes={"metadata.agent_name": self.name}
        ) as span:
            result = self._run_agent_logic(input_data)
            if result.next_agent:
                span.set_attribute("metadata.next_agent", result.next_agent)
            return result

2. Orchestrator-Based State

class AgentOrchestrator:
    def __init__(self):
        self.agents = {}
        self.current_agent = None
        self.tracer = trace.get_tracer(__name__)

    def register_agent(self, name: str, agent: Any):
        self.agents[name] = agent

    def execute(self, start_agent: str, input_data: str):
        self.current_agent = start_agent
        result = None

        while self.current_agent:
            agent = self.agents[self.current_agent]
            with self.tracer.start_as_current_span(
                f"{self.current_agent}.execute",
                attributes={"metadata.agent_name": self.current_agent}
            ) as span:
                result = agent.execute(input_data)
                next_agent = result.next_agent
                if next_agent:
                    span.set_attribute("metadata.next_agent", next_agent)
                self.current_agent = next_agent
                input_data = result.content

        return result

3. Graph-Based State

class AgentNode:
    def __init__(self, name: str, agent: Any, next_nodes: List[str] = None):
        self.name = name
        self.agent = agent
        self.next_nodes = next_nodes or []
        self.tracer = trace.get_tracer(__name__)

    def execute(self, input_data: str) -> AgentResponse:
        with self.tracer.start_as_current_span(
            f"{self.name}.execute",
            attributes={"metadata.node_name": self.name}
        ) as span:
            result = self.agent.execute(input_data)
            next_node = self._determine_next_node(result)
            if next_node:
                span.set_attribute("metadata.next_node", next_node)
            return result

class AgentGraph:
    def __init__(self):
        self.nodes: Dict[str, AgentNode] = {}

    def add_node(self, name: str, agent: Any, next_nodes: List[str] = None):
        self.nodes[name] = AgentNode(name, agent, next_nodes)

Best Practices for State Management

1. Consistent Naming

from enum import Enum

class AgentNames(Enum):
    SUPERVISOR = "supervisor_agent"
    RESEARCH = "research_agent"
    WRITING = "writing_agent"
    REVIEW = "review_agent"

span.set_attribute("metadata.agent_name", AgentNames.SUPERVISOR.value)

2. State Validation

def set_next_agent(span: Span, next_agent: Optional[str]):
    if next_agent and next_agent in AgentNames.__members__:
        span.set_attribute("metadata.next_agent", next_agent)
    elif next_agent:
        raise ValueError(f"Invalid next agent: {next_agent}")

3. Error Handling

try:
    result = agent.execute(input_data)
    if result.next_agent:
        span.set_attribute("metadata.next_agent", result.next_agent)
except Exception as e:
    span.set_status(Status(StatusCode.ERROR, str(e)))
    span.record_exception(e)
    error_handler = self.get_error_handler()
    if error_handler:
        span.set_attribute("metadata.next_agent", error_handler)

4. Testing State Transitions

def test_agent_transitions():
    orchestrator = AgentOrchestrator()
    result = orchestrator.execute("start_agent", "test input")

    spans = get_spans()  # Fetch spans from your tracing framework
    for span in spans:
        agent_name = span.attributes.get("metadata.agent_name")
        next_agent = span.attributes.get("metadata.next_agent")
        assert agent_name in AgentNames.__members__
        if next_agent:
            assert next_agent in AgentNames.__members__

Common Pitfalls to Avoid

  1. Inconsistent Naming

    • Different names for the same agent

    • Lack of validation or convention

  2. Missing State Updates

    • Forgetting to set next_agent before span ends

    • Ignoring error transitions

  3. Race Conditions

    • Parallel agents updating shared state unsafely

    • No synchronization logic

  4. Poor Error Recovery

    • No fallback agents or transition metadata

    • Incomplete state cleanup

Last updated

Was this helpful?