Arize + Mosaic AI Agent Framework

This notebook is adapted from Databricks's "Mosaic AI Agent Framework: Author and deploy a tool-calling LangGraph agent"

In this notebook you learn to:

  • Author a tool-calling LangGraph agent wrapped with ChatAgent and Arize auto-instrumentation for tracing

  • This agent has the capability to generate and execute python code in a stateless sandboxed environment

  • Log and deploy the agent

  • Evaluate the agent's python code using Arize LLM as a Judge evaluation

  • Invoke the agent and view traces and evaluation results in the Arize platform

  • Set up evaluation custom metrics and view them in monitors and dashboards in Arize

To learn more about authoring an agent using Mosaic AI Agent Framework, see Databricks documentation (AWS | Azure).

Prerequisites

Install Dependencies

%pip install -U -qqqq mlflow databricks-langchain databricks-agents uv langgraph==0.3.4  arize-otel openinference-instrumentation-langchain
dbutils.library.restartPython()

Access Arize Space and API Keys from Databricks Secrets and set them as Environment Variables

Create a Arize API key and Space ID for the items below. Set up Arize credentials using Databricks Secrets for secure access of keys.

# Reading the secure keys from secrets
ARIZE_API_KEY = dbutils.secrets.get(scope="ryoung", key="ARIZE_API_KEY")
ARIZE_SPACE_ID = dbutils.secrets.get(scope="ryoung", key="ARIZE_SPACE_ID")

# setting as environment variables to be used by the chain
import os
os.environ["ARIZE_API_KEY"] = ARIZE_API_KEY
os.environ["ARIZE_SPACE_ID"] = ARIZE_SPACE_ID

Create a local configuration file to store project settings:

Create a file named "chain_config.yaml" with variables below. It should reside in the same folder as the notebook. These variables will be accessed from the agent code. Replace the example values with your own values:

ARIZE_PROJECT_NAME="databricks-langgraph-tool-calling-agent" LLM_ENDPOINT_NAME="databricks-claude-3-7-sonnet"

Define the agent in code

Define the agent code in a single cell below. This lets you easily write the agent code to a local Python file, using the %%writefile magic command, for subsequent logging and deployment.

Tracing auto-instrumentation

Opentelemetry based auto-instrumentation for Langgraph exports traces to Arize.

Agent tools

This agent code adds the built-in Unity Catalog function system.ai.python_exec to the agent. The agent code also includes commented-out sample code for adding a vector search index to perform unstructured data retrieval.

system.ai.python_exec - Executes Python code in a stateless sandboxed environment and returns its stdout. The runtime cannot access files or read previous executions' output. All operations must be self-contained, using only standard Python libraries. Calls to other tools are prohibited.

For more examples of tools to add to your agent, see Databricks documentation (AWS | Azure)

Wrap the LangGraph agent using the ChatAgent interface

For compatibility with Databricks AI features, the LangGraphChatAgent class implements the ChatAgent interface to wrap the LangGraph agent. This example uses the provided convenience APIs ChatAgentState and ChatAgentToolNode for ease of use.

Databricks recommends using ChatAgent as it simplifies authoring multi-turn conversational agents using an open source standard. See MLflow's ChatAgent documentation.

%%writefile agent.py
from typing import Any, Generator, Optional, Sequence, Union

import mlflow
from databricks_langchain import (
    ChatDatabricks,
    UCFunctionToolkit,
    VectorSearchRetrieverTool,
)
from langchain_core.language_models import LanguageModelLike
from langchain_core.runnables import RunnableConfig, RunnableLambda
from langchain_core.tools import BaseTool
from langgraph.graph import END, StateGraph
from langgraph.graph.graph import CompiledGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt.tool_node import ToolNode
from mlflow.langchain.chat_agent_langgraph import ChatAgentState, ChatAgentToolNode
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import (
    ChatAgentChunk,
    ChatAgentMessage,
    ChatAgentResponse,
    ChatContext,
)
import os

import logging
logging.getLogger("openinference.instrumentation.langchain._tracer").setLevel(logging.CRITICAL)

############################################
# Arize Tracing Setup
############################################
#register tracer provider to send traces to Arize
from arize.otel import register

model_config = mlflow.models.ModelConfig(development_config="chain_config.yaml")

tracer_provider = register(
    space_id = os.getenv("ARIZE_SPACE_ID"),
    api_key = os.getenv("ARIZE_API_KEY"),
    project_name = model_config.get("ARIZE_PROJECT_NAME"),
    #log_to_console=True
)
# 1 line auto instrumentation
from openinference.instrumentation.langchain import LangChainInstrumentor
LangChainInstrumentor().instrument(tracer_provider=tracer_provider)


############################################
# Define your LLM endpoint and system prompt
############################################
# TODO: Replace with your model serving endpoint
LLM_ENDPOINT_NAME = model_config.get("LLM_ENDPOINT_NAME") 
llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME)

# TODO: Update with your system prompt
system_prompt = "You are a helpful assistant. Take the user's request and where applicable, use the appropriate tool if necessary to accomplish the task. If tools are not necessary, response directly to the user's request."

###############################################################################
## Define tools for your agent, enabling it to retrieve data or take actions
## beyond text generation
## To create and see usage examples of more tools, see
## https://docs.databricks.com/en/generative-ai/agent-framework/agent-tool.html
###############################################################################
tools = []

# You can use UDFs in Unity Catalog as agent tools
# Below, we add the `system.ai.python_exec` UDF, which provides
# a python code interpreter tool to our agent
# You can also add local LangChain python tools. See https://python.langchain.com/docs/concepts/tools

# TODO: Add additional tools
uc_tool_names = ["system.ai.python_exec"]
uc_toolkit = UCFunctionToolkit(function_names=uc_tool_names)
tools.extend(uc_toolkit.tools)

# Use Databricks vector search indexes as tools
# See https://docs.databricks.com/en/generative-ai/agent-framework/unstructured-retrieval-tools.html
# for details

# TODO: Add vector search indexes
# vector_search_tools = [
#         VectorSearchRetrieverTool(
#         index_name="",
#         # filters="..."
#     )
# ]
# tools.extend(vector_search_tools)

#####################
## Define agent logic
#####################


def create_tool_calling_agent(
    model: LanguageModelLike,
    tools: Union[ToolNode, Sequence[BaseTool]],
    system_prompt: Optional[str] = None,
) -> CompiledGraph:
    model = model.bind_tools(tools)

    # Define the function that determines which node to go to
    def should_continue(state: ChatAgentState):
        messages = state["messages"]
        last_message = messages[-1]
        # If there are function calls, continue. else, end
        if last_message.get("tool_calls"):
            return "continue"
        else:
            return "end"

    if system_prompt:
        preprocessor = RunnableLambda(
            lambda state: [{"role": "system", "content": system_prompt}]
            + state["messages"]
        )
    else:
        preprocessor = RunnableLambda(lambda state: state["messages"])
    model_runnable = preprocessor | model

    def call_model(
        state: ChatAgentState,
        config: RunnableConfig,
    ):
        response = model_runnable.invoke(state, config)

        return {"messages": [response]}

    workflow = StateGraph(ChatAgentState)

    workflow.add_node("agent", RunnableLambda(call_model))
    workflow.add_node("tools", ChatAgentToolNode(tools))

    workflow.set_entry_point("agent")
    workflow.add_conditional_edges(
        "agent",
        should_continue,
        {
            "continue": "tools",
            "end": END,
        },
    )
    workflow.add_edge("tools", "agent")

    return workflow.compile()


class LangGraphChatAgent(ChatAgent):
    def __init__(self, agent: CompiledStateGraph):
        self.agent = agent

    def predict(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> ChatAgentResponse:
        request = {"messages": self._convert_messages_to_dict(messages)}

        messages = []
        for event in self.agent.stream(request, stream_mode="updates"):
            for node_data in event.values():
                messages.extend(
                    ChatAgentMessage(**msg) for msg in node_data.get("messages", [])
                )
        return ChatAgentResponse(messages=messages)

    def predict_stream(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> Generator[ChatAgentChunk, None, None]:
        request = {"messages": self._convert_messages_to_dict(messages)}
        for event in self.agent.stream(request, stream_mode="updates"):
            for node_data in event.values():
                yield from (
                    ChatAgentChunk(**{"delta": msg}) for msg in node_data["messages"]
                )


# Create the agent object, and specify it as the agent object to use when
# loading the agent back for inference via mlflow.models.set_model()

agent = create_tool_calling_agent(llm, tools, system_prompt)
AGENT = LangGraphChatAgent(agent)
mlflow.models.set_model(AGENT)

Restart Python and reset environment variables

dbutils.library.restartPython()
# Reading the secure keys from secrets
ARIZE_API_KEY = dbutils.secrets.get(scope="ryoung", key="ARIZE_API_KEY")
ARIZE_SPACE_ID = dbutils.secrets.get(scope="ryoung", key="ARIZE_SPACE_ID")

# setting as environment variables to be used by the chain
import os
os.environ["ARIZE_API_KEY"] = ARIZE_API_KEY
os.environ["ARIZE_SPACE_ID"] = ARIZE_SPACE_ID

Log the agent as an MLflow model

Log the agent as code from the agent.py file. See MLflow - Models from Code.

Enable automatic authentication for Databricks resources

For the most common Databricks resource types, Databricks supports and recommends declaring resource dependencies for the agent upfront during logging. This enables automatic authentication passthrough when you deploy the agent. With automatic authentication passthrough, Databricks automatically provisions, rotates, and manages short-lived credentials to securely access these resource dependencies from within the agent endpoint.

To enable automatic authentication, specify the dependent Databricks resources when calling mlflow.pyfunc.log_model().

  • TODO: If your Unity Catalog tool queries a [vector search index](docs link) or leverages [external functions](docs link), you need to include the dependent vector search index and UC connection objects, respectively, as resources. See docs (AWS | Azure).

import mlflow
from agent import tools, LLM_ENDPOINT_NAME
from databricks_langchain import VectorSearchRetrieverTool
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint
from unitycatalog.ai.langchain.toolkit import UnityCatalogTool
from pkg_resources import get_distribution

model_config = mlflow.models.ModelConfig(development_config="chain_config.yaml")

resources = [DatabricksServingEndpoint(endpoint_name=model_config.get("LLM_ENDPOINT_NAME"))]
for tool in tools:
    if isinstance(tool, VectorSearchRetrieverTool):
        resources.extend(tool.resources)
    elif isinstance(tool, UnityCatalogTool):
        resources.append(DatabricksFunction(function_name=tool.uc_function_name))


with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        artifact_path="agent",
        python_model="agent.py",
        model_config="chain_config.yaml",
        extra_pip_requirements= [
            f"databricks-connect=={get_distribution('databricks-connect').version}",
            "arize-otel", "openinference.instrumentation.langchain"
            ],
        resources=resources,
    )

Pre-deployment agent validation

Before registering and deploying the agent, perform pre-deployment checks using the mlflow.models.predict() API. See Databricks documentation (AWS | Azure).

mlflow.models.predict(
    model_uri=f"runs:/{logged_agent_info.run_id}/agent",
    input_data={"messages": [{"role": "user", "content": "Hello!"}]},
    env_manager="uv",
)

Register the model to Unity Catalog

Before you deploy the agent, you must register the agent to Unity Catalog.

  • TODO Update the catalog, schema, and model_name below to register the MLflow model to Unity Catalog.

mlflow.set_registry_uri("databricks-uc")

# TODO: define the catalog, schema, and model name for your UC model
catalog = "prasad_kona_isv"
schema = "demo"
model_name = "langgraph-tool-calling-agent"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(
    model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
)

Deploy the agent

from databricks import agents
agents.deploy(
                UC_MODEL_NAME, 
                uc_registered_model_info.version, 
                tags = {"endpointSource": "docs"},
                scale_to_zero_enabled=True,
                environment_vars={
                    "ARIZE_API_KEY": "{{secrets/<configration profile>/ARIZE_API_KEY}}",
                    "ARIZE_SPACE_ID": "{{secrets/<configuration profile >/ARIZE_SPACE_ID}}",
                }
              )

Configure Online Evaluations in Arize AX

Follow instructions here to setup up online evaluations in Arize AX.

Arize's Online Evaluations automatically run LLM-as-a-Judge based evaluations directly on the traces collected in the Arize platform from our Agent runs. This provides continuous quality monitoring without manual intervention. This approach scales to thousands of interactions, enabling data-driven improvements to your agent's performance. These evaluations are for assessing code generation quality that the agent produces, specifically:

  • Code Correctness: Does the generated code solve the user's problem accurately?

  • Code Readability: Is the code clean, well-structured, and maintainable?

References:

  • LLM-as-a-Judge evaluation best practices: (Arize docs

  • Agent evaluation best practices: (Arize Docs

  • Automate running evaluations on your Traces and Spans: (Docs

Call the Agent

There are several methods we can use to call our newly deployed agent in Databricks.

  • REST API Calls: You can invoke your deployed agent through HTTP POST requests to the model serving endpoint. This method provides programmatic access, allowing you to integrate the agent into applications or automated workflows by sending JSON payloads with your input data and receiving structured responses.

  • Model Serving UI: Databricks provides a built-in web interface where you can directly test your deployed agent. Simply navigate to the serving endpoint in the Databricks workspace, use the "Test" tab to input sample data, and see real-time responses without writing any code.

  • Databricks AI Playground: This interactive environment lets you experiment with your agent in a conversational interface. You can test different prompts, observe the agent's behavior, and refine your interactions before implementing them in production scenarios.

# Example REST API Call via Curl 

# #1 - Basic question (no code generation)
curl \
-u token:$DATABRICKS_TOKEN \
-X POST \
-H "Content-Type: application/json" \
-d '{"prompt": "What is a lakehouse?", "max_tokens": 64}' \
https://<workspace_host>.databricks.com/serving-endpoints/<your-agents-serving-endpoint-name>/invocations

# #2 - Math question (code generation)
curl \
-u token:$DATABRICKS_TOKEN \
-X POST \
-H "Content-Type: application/json" \
-d '{"prompt": "What is 5*5 in python?", "max_tokens": 64}' \
https://<workspace_host>.databricks.com/serving-endpoints/<your-agents-serving-endpoint-name>/invocations
# Example calling the agent using openai sdk

from openai import OpenAI
import os

# In a Databricks notebook you can use this:
DATABRICKS_HOSTNAME = dbutils.notebook.entry_point.getDbutils().notebook().getContext().browserHostName().get()
DATABRICKS_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
serving_endpoint_name = "<your-agents-serving-endpoint-name>"

client = OpenAI(
   api_key=DATABRICKS_TOKEN,
   base_url=f"https://{DATABRICKS_HOSTNAME}/serving-endpoints"
)
chat_completion = client.chat.completions.create(
   messages=[
       {
           "role": "system",
           "content": "You are an AI assistant"
       },
       {
           "role": "user",
           "content": "Tell me about Large Language Models in one sentence"
       }
   ],
   model=serving_endpoint_name,
   max_tokens=256
)

print(chat_completion.choices[0].message.content) if chat_completion and chat_completion.choices else print(chat_completion)

View traces and evaluation results in Arize

As you run your agent, traces are automatically sent to Arize. In the Arize platform, you can see agent execution details, tool invocations, latency breakdown by component, token usage and costs, errors and metadata captured for each span and function call. Additionally, evaluation labels are captured for every trace based on the code correctness and code readability evals we setup earlier.

Monitoring, alerting and KPI dashboards in Arize AX

Turn any trace attribute and evaluation label into custom metrics. Build KPI driven dashboards and monitors that proactively alert you when any degradation in performance or quality of your agent occurs.

Next steps

After your agent is deployed, you can chat with it in AI playground to perform additional checks, share it with SMEs in your organization for feedback, or embed it in a production application. See Databricks documentation (AWS | Azure).

Resources

Databricks Resources

Arize Resources

Last updated

Was this helpful?