Set the GOOGLE_API_KEY environment variable. Refer to Google's official guide for more details on configuring environment variables for Vertex AI if needed.
export GOOGLE_API_KEY=[your_key_here]
Use the register function to connect your application to Arize AX.
from arize.otel import registertracer_provider =register( space_id ="your-space-id", # in app space settings page api_key ="your-api-key", # in app space settings page project_name ="your-project-name", # name this to whatever you would like)# Import the automatic instrumentor from OpenInferencefrom openinference.instrumentation.google_adk import GoogleADKInstrumentor# Finish automatic instrumentationGoogleADKInstrumentor().instrument(tracer_provider=tracer_provider)
Observe
Now that you have tracing setup, all Google ADK SDK requests will be streamed to Arize AX for observability and evaluation.
import nest_asyncio
nest_asyncio.apply()
from google.adk.agents import Agent
from google.adk.runners import InMemoryRunner
from google.genai import types
def get_weather(city: str) -> dict:
"""Retrieves the current weather report for a specified city.
Args:
city (str): The name of the city for which to retrieve the weather report.
Returns:
dict: status and result or error msg.
"""
if city.lower() == "new york":
return {
"status": "success",
"report": (
"The weather in New York is sunny with a temperature of 25 degrees"
" Celsius (77 degrees Fahrenheit)."
),
}
else:
return {
"status": "error",
"error_message": f"Weather information for '{city}' is not available.",
}
agent = Agent(
name="test_agent",
model="gemini-2.0-flash-exp",
description="Agent to answer questions using tools.",
instruction="You must use the available tools to find an answer.",
tools=[get_weather]
)
app_name = "test_instrumentation"
user_id = "test_user"
session_id = "test_session"
runner = InMemoryRunner(agent=agent, app_name=app_name)
session_service = runner.session_service
await session_service.create_session(
app_name=app_name,
user_id=user_id,
session_id=session_id
)
async for event in runner.run_async(
user_id=user_id,
session_id=session_id,
new_message=types.Content(role="user", parts=[
types.Part(text="What is the weather in New York?")]
)
):
if event.is_final_response():
print(event.content.parts[0].text.strip())
This instrumentation will support tool calling soon. Refer to this page for the status.