Evaluating Agentic RAG using Arize & Couchbase
This tutorial is adapted from the Langgraph Agentic RAG notebook.
This guide shows you how to create a Retrieval Augmented Generation (RAG) Agent using Couchbase Vectorstore and evaluate performance with Arize. Agentic RAG combines RAG with the power of agents.
Retrieval Agents are useful when we want to make decisions about whether to retrieve from an index. To implement a Retrieval Agent, we simply need to give an LLM access to a retriever tool.
We'll go through the following steps:
Create a Agentic RAG QA chatbot with OpenAI, Langgraph, Couchbase and Agent Catalog
Trace the agent's function calls including retrieval and LLM calls using Arize
Create a dataset to benchmark performance
Evaluate performance using LLM as a judge
Experiment with different chunk sizes, overlaps, and k number of documents retrieved to see how these affect the performance of the Agentic RAG
Compare these experiments in Arize
Notebook Setup
First, let's download the required packages and set our API keys:
%pip install -qU langchain-openai langchain-community langchain langgraph langgraph.prebuilt openai langchain-couchbase agentc langchain-huggingface langchain_core
%pip install -qq "arize-phoenix[evals]" arize-otel openinference-instrumentation-openai openinference-instrumentation-langchain
Set API Keys
To follow along with this tutorial, you'll need to sign up for Arize and get your Space, API and Developer keys. You can see the guide here. You will also need an OpenAI key.
import os
from getpass import getpass
# Enter here or set as environment variables
SPACE_ID = globals().get("SPACE_ID") or getpass(
"🔑 Enter your Arize Space ID: "
)
API_KEY = globals().get("API_KEY") or getpass("🔑 Enter your Arize API Key: ")
OPENAI_API_KEY = globals().get("OPENAI_API_KEY") or getpass(
"🔑 Enter your OpenAI API key: "
)
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
Set up Arize Tracing
from arize.otel import register
# Setup tracer provider via our convenience function
tracer_provider = register(
space_id = SPACE_ID, #
api_key = API_KEY,
project_name = "langgraph-agentic-rag", # name this to whatever you would like
)
# Import the automatic instrumentor from OpenInference
from openinference.instrumentation.langchain import LangChainInstrumentor
# Finish automatic instrumentation
LangChainInstrumentor().instrument(tracer_provider=tracer_provider)
Setup Couchbase
You'll need to setup your Couchbase cluster by doing the following:
Create an account at Couchbase Cloud
Create a free cluster with the Data, Index, and Search services enabled*
Create cluster access credentials
Allow access to the cluster from your local machine
Create a bucket to store your documents
*The Search Service will be used to perform Semantic Search later when we use Agent catalog.
Initialize Couchbase cluster
Once you've setup your cluster, you can connect to it using langchain's couchbase package.
Collect the following information from your cluster:
Connection string
Username
Password
Bucket name
Scope name
Collection name
Before this step, you must also create a search index. You can do this by going to the Couchbase UI and clicking on the "Search" tab. Make sure the names match up with the ones we've defined above.
Link below: https://docs.couchbase.com/cloud/vector-search/create-vector-search-index-ui.html
from datetime import timedelta
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
from langchain_couchbase.vectorstores import CouchbaseVectorStore
from langchain_huggingface import HuggingFaceEmbeddings
#Cluster settings
CB_CONN_STRING = getpass("Enter the connection string for the Couchbase cluster: ")
CB_USERNAME = getpass("Enter the username for the Couchbase cluster: ")
CB_PASSWORD = getpass("Enter the password for the Couchbase cluster: ")
BUCKET_NAME = "" #enter your bucket name
SCOPE_NAME = "" #enter your scope name
COLLECTION_NAME = "" #enter your collection name
SEARCH_INDEX_NAME = "" #enter your search index name
#connect to couchbase cluster
auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
options = ClusterOptions(auth)
options.apply_profile("wan_development")
cluster = Cluster(CB_CONN_STRING, options)
cluster.wait_until_ready(timedelta(seconds=5))
#Initialize vector store
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L12-v2")
vector_store = CouchbaseVectorStore(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
embedding=embeddings,
index_name=SEARCH_INDEX_NAME,
)
Since we will test multiple runs, we create a convenience function that will reset the vector store with new different chunk sizes and overlaps. Documents content will be sourced from 3 blog posts by Lilian Weng.
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
# Define the reset_vector_store function so we can run experiments with different chunk sizes
def reset_vector_store(vector_store, chunk_size=1024, chunk_overlap=20):
try:
results = vector_store.similarity_search(
k=1000,
query="", # Use an empty query or a specific one if needed
search_options={
"query": {"field": "metadata.source", "match": "lilian_weng_blog"}
},
)
if results:
deleted_ids = []
for result in results:
deleted_ids.append(result.id)
vector_store.delete(ids=deleted_ids)
# Load documents from a URL or file
urls = [
"https://lilianweng.github.io/posts/2024-07-07-hallucination/",
"https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
"https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
# Use RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", " ", ""], # Hierarchical separators
)
doc_splits = text_splitter.split_documents(docs_list)
# Adding metadata to documents
for i, doc in enumerate(doc_splits):
doc.metadata["source"] = "lilian_weng_blog"
try:
vector_store.add_documents(doc_splits)
except ValueError as e:
print(f"Failed to insert documents: {e}")
return vector_store
except ValueError as e:
print(f"Search failed with error: {e}")
# Reset the vector store
reset_vector_store(vector_store)
Retriever Tool
Create tools and prompts with Agent Catalog
Fetch our retriever tool from the Agent Catalog using the agentc provider. In the future, when more tools (and/or prompts) are required and the application grows more complex, Agent Catalog SDK and CLI can be used to automatically fetch the tools based on the use case (semantic search) or by name.
For instructions on how this tool was created and more capabilities of Agent catalog, please refer to the documentation here.
import agentc.langchain
import agentc
from langchain_core.tools import tool
import os
# For retrieval from the local catalog
provider = agentc.Provider(
decorator=lambda t: tool(t.func)
)
# In case the tools were published to the Couchbase cluster beforehand
# provider = agentc.Provider(
# decorator=lambda t: tool(t.func),
# secrets={"CB_USERNAME": CB_USERNAME,
# "CB_PASSWORD": CB_PASSWORD,
# "CB_CONN_STRING": CB_CONN_STRING})
# This is the tool that will be used to retrieve documents from the vector store
retriever_tool = provider.get_item(name="retriever_tool", item_type="tool")
tools = retriever_tool
print (retriever_tool)
Create Agent
Agent State
We will define a graph of agents to help all involved agents communicate with each other better.
Agents communicate through a state
object that is passed around to each node and modified with output from that node.
Our state will be a list of messages
and each node in our graph will append to it.
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
# The add_messages function defines how an update should be processed
# Default is to replace. add_messages says "append"
messages: Annotated[Sequence[BaseMessage], add_messages]
Define the Nodes and Edges
We can lay out an agentic RAG graph like this:
The state is a set of messages
Each node will update (append to) state
Conditional edges decide which node to visit next
from typing import Annotated, Literal, Sequence, TypedDict
from langchain import hub
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import tools_condition
### Edges
def grade_documents(state) -> Literal["generate", "rewrite"]:
"""
Determines whether the retrieved documents are relevant to the question.
Args:
state (messages): The current state
Returns:
str: A decision for whether the documents are relevant or not
"""
print("---CHECK RELEVANCE---")
# Data model
class grade(BaseModel):
"""Binary score for relevance check."""
binary_score: str = Field(description="Relevance score 'yes' or 'no'")
# LLM
model = ChatOpenAI(temperature=0, model="gpt-4o", streaming=True)
# LLM with tool and validation
llm_with_tool = model.with_structured_output(grade)
#fetch a prompt called "grade_documents" from the Agent Catalog
grade_documents_prompt = PromptTemplate(
template=provider.get_item(name="grade_documents", item_type="prompt").prompt.render(),
input_variables=["context", "question"],
)
print (grade_documents_prompt)
# Chain
chain = grade_documents_prompt | llm_with_tool
messages = state["messages"]
last_message = messages[-1]
question = messages[0].content
docs = last_message.content
scored_result = chain.invoke({"question": question, "context": docs})
score = scored_result.binary_score
if score == "yes":
print("---DECISION: DOCS RELEVANT---")
return "generate"
else:
print("---DECISION: DOCS NOT RELEVANT---")
print(score)
return "rewrite"
### Nodes
def agent(state):
"""
Invokes the agent model to generate a response based on the current state. Given
the question, it will decide to retrieve using the retriever tool, or simply end.
Args:
state (messages): The current state
Returns:
dict: The updated state with the agent response appended to messages
"""
print("---CALL AGENT---")
messages = state["messages"]
model = ChatOpenAI(temperature=0, streaming=True, model="gpt-4-turbo")
model = model.bind_tools(tools)
response = model.invoke(messages)
# We return a list, because this will get added to the existing list
return {"messages": [response]}
def rewrite(state):
"""
Transform the query to produce a better question.
Args:
state (messages): The current state
Returns:
dict: The updated state with re-phrased question
"""
print("---TRANSFORM QUERY---")
messages = state["messages"]
question = messages[0].content
msg = [
HumanMessage(
content=f""" \n
Look at the input and try to reason about the underlying semantic intent / meaning. \n
Here is the initial question:
\n ------- \n
{question}
\n ------- \n
Formulate an improved question: """,
)
]
# Grader
model = ChatOpenAI(temperature=0, model="gpt-4-0125-preview", streaming=True)
response = model.invoke(msg)
return {"messages": [response]}
def generate(state):
"""
Generate answer
Args:
state (messages): The current state
Returns:
dict: The updated state with re-phrased question
"""
print("---GENERATE---")
messages = state["messages"]
question = messages[0].content
last_message = messages[-1]
docs = last_message.content
# Prompt
prompt = hub.pull("rlm/rag-prompt")
# LLM
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0, streaming=True)
# Post-processing
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# Chain
rag_chain = prompt | llm | StrOutputParser()
# Run
response = rag_chain.invoke({"context": docs, "question": question})
return {"messages": [response]}
print("*" * 20 + "Prompt[rlm/rag-prompt]" + "*" * 20)
prompt = hub.pull("rlm/rag-prompt") # Show what the prompt looks like
prompt.pretty_print()
Define Graph
Start with an agent,
call_model
Agent makes a decision to call a function
If so, then
action
to call tool (retriever)Then call agent with the tool output added to messages (
state
)
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import ToolNode
# Define a new graph
workflow = StateGraph(AgentState)
# Define the nodes we will cycle between
workflow.add_node("agent", agent) # agent
retrieve = ToolNode(retriever_tool)
workflow.add_node("retrieve", retrieve) # retrieval
workflow.add_node("rewrite", rewrite) # Re-writing the question
workflow.add_node(
"generate", generate
) # Generating a response after we know the documents are relevant
# Call agent node to decide to retrieve or not
workflow.add_edge(START, "agent")
# Decide whether to retrieve
workflow.add_conditional_edges(
"agent",
# Assess agent decision
tools_condition,
{
# Translate the condition outputs to nodes in our graph
"tools": "retrieve",
END: END,
},
)
# Edges taken after the `action` node is called.
workflow.add_conditional_edges(
"retrieve",
# Assess agent decision
grade_documents,
)
workflow.add_edge("generate", END)
workflow.add_edge("rewrite", "agent")
# Compile
graph = workflow.compile()
Visualize the graph
from IPython.display import Image, display
try:
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
Run the graph
import pprint
inputs = {
"messages": [
("user", "What does Lilian Weng say about the types of adversarial attacks on LLMs?"),
]
}
for output in graph.stream(inputs):
for key, value in output.items():
pprint.pprint(f"Output from node '{key}':")
pprint.pprint(value, indent=2, width=80, depth=None)
View the trace in the Arize UI
Once you've run a single query, you can see the trace in the Arize UI with each step taken by the retriever, the embedding, and the LLM query.
Click through the queries to better understand how the query engine is performing. Arize can be used to understand and troubleshoot your RAG app by surfacing:
Application latency
Token usage
Runtime exceptions
Retrieved documents
Embeddings
LLM parameters
Prompt templates
Tool descriptions
LLM function calls
And more!

Generate a synthetic dataset of questions
We will run our Agent against the dataset of questions we generate, and then evaluate the results.
import pandas as pd
from langchain import hub
from langchain_openai import ChatOpenAI
# Define a template for generating questions
GEN_TEMPLATE = """
You are an assistant that generates Q&A questions about the content below.
The questions should involve the content, specific facts and figures,
names, and elements of the story. Do not ask any questions where the answer is
not in the content.
Respond with one question per line. Do not include any numbering at the beginning of each line. Do not include any category headings.
Generate 10 questions. Be sure there are no duplicate questions.
[START CONTENT]
{content}
[END CONTENT]
"""
# Load the content you want to generate questions about
content = """
Lilian Weng discusses various aspects of adversarial attacks on LLMs and prompt engineering techniques. Make sure to use Lilian Weng's name in the questions.
"""
# Format the template with the content
formatted_template = GEN_TEMPLATE.format(content=content)
# Initialize the language model
model = ChatOpenAI(model="gpt-4o", max_tokens=1300)
# Generate questions using the language model
response = model.invoke(formatted_template)
# Extract the content from the response object
questions_content = response.content # Directly access the content attribute
# Split the response into individual questions
questions = questions_content.strip().split("\n")
# Create a dataframe to store the questions
questions_df = pd.DataFrame(questions, columns=["input"])
# Display the first few questions
questions_df.head()
Run our Agent against the list of generated questions
def run_rag(questions_df, k_value=2):
response_df = questions_df.copy(deep=True)
for index, row in response_df.iterrows():
inputs = {
"messages": [
("user", f"{row['input']}"),
]
}
for output in graph.stream(inputs):
for key, value in output.items():
if key == "generate":
response_df.loc[index, "output"] = value["messages"][-1]
if key == "retrieve":
response_df.loc[index, "reference"] = value["messages"][-1].content
text_columns = ["input", "output", "reference"]
response_df[text_columns] = response_df[text_columns].apply(
lambda x: x.astype(str)
)
return response_df
response_df = run_rag(questions_df, k_value=1)
# Let's inspect the results
response_df
Evaluating your Agentic RAG using LLM as a Judge
Now that we have run a set of test cases, we can create evaluators to measure performance of our run. This way, we don't have to manually inspect every single trace to see if the LLM is doing the right thing. First, we'll define the prompts for the evaluators.
There are two evaluators we will use for this example.
Retrieval Relevance: This evaluator checks if the reference text selected by the retriever is relevant to the question.
QA Correctness: This evaluator checks if the answer correctly answers the question based on the reference text provided.
(For more information on these and other prebuilt evaluators see here.)
We will be creating an LLM as a judge using prebuilt prompt templates, taking the spans recorded by Phoenix, and then giving them labels using the llm_classify
function. This function uses LLMs to evaluate your LLM calls and gives them labels and explanations. You can read more detail here.
from phoenix.evals import (
RAG_RELEVANCY_PROMPT_RAILS_MAP,
RAG_RELEVANCY_PROMPT_TEMPLATE,
QA_PROMPT_RAILS_MAP,
QA_PROMPT_TEMPLATE,
OpenAIModel,
llm_classify
)
# The rails is used to hold the output to specific values based on the template
RELEVANCE_RAILS = list(RAG_RELEVANCY_PROMPT_RAILS_MAP.values())
QA_RAILS = list(QA_PROMPT_RAILS_MAP.values())
relevance_eval_df = llm_classify(
dataframe=response_df,
template=RAG_RELEVANCY_PROMPT_TEMPLATE,
model=OpenAIModel(model="gpt-4o"),
rails=RELEVANCE_RAILS,
provide_explanation=True,
include_prompt=True,
concurrency=4,
)
correctness_eval_df = llm_classify(
dataframe=response_df,
template=QA_PROMPT_TEMPLATE,
model=OpenAIModel(model="gpt-4o"),
rails=QA_RAILS,
provide_explanation=True,
include_prompt=True,
concurrency=4,
)
Let's look at and inspect the results of our evaluatiion!
relevance_eval_df
correctness_eval_df
Experiment with different k-values and chunk sizes
Re-run experiments with different k-values and chunk sizes. Then log the results to Arize to see how the performance changes.
Let's setup our evaluators to see how the performance changes.
def run_evaluators(rag_df):
relevance_eval_df = llm_classify(
dataframe=rag_df,
template=RELEVANCE_EVAL_TEMPLATE,
model=OpenAIModel(model="gpt-4o"),
rails=RELEVANCE_RAILS,
provide_explanation=True,
concurrency=4,
)
rag_df["relevance"] = relevance_eval_df["label"]
rag_df["relevance_explanation"] = relevance_eval_df["explanation"]
correctness_eval_df = llm_classify(
dataframe=rag_df,
template=CORRECTNESS_EVAL_TEMPLATE,
model=OpenAIModel(model="gpt-4o"),
rails=CORRECTNESS_RAILS,
provide_explanation=True,
concurrency=4,
)
rag_df["correctness"] = correctness_eval_df["label"]
rag_df["correctness_explanation"] = correctness_eval_df["explanation"]
return rag_df
Let's log these results to Arize and see how they compare.
First we'll create a dataset to store our questions.
from arize.experimental.datasets import ArizeDatasetsClient
from uuid import uuid1
from arize.experimental.datasets.experiments.types import (
ExperimentTaskResultColumnNames,
EvaluationResultColumnNames,
)
from arize.experimental.datasets.utils.constants import GENERATIVE
import pandas as pd
# Set up the arize client
arize_client = ArizeDatasetsClient(api_key=API_KEY)
dataset = None
dataset_name = "rag-experiments-" + str(uuid1())[:3]
dataset_id = arize_client.create_dataset(
space_id=SPACE_ID,
dataset_name=dataset_name,
dataset_type=GENERATIVE,
data=questions_df,
)
dataset = arize_client.get_dataset(space_id=SPACE_ID, dataset_id=dataset_id)
print(dataset)
Next we'll define which columns of our dataframe will be mapped to outputs and which will be mapped to evaluation labels and explanations..
# Define column mappings for task
task_cols = ExperimentTaskResultColumnNames(
example_id="example_id", result="output"
)
# Define column mappings for evaluator
relevance_evaluator_cols = EvaluationResultColumnNames(
label="relevance",
explanation="relevance_explanation",
)
correctness_evaluator_cols = EvaluationResultColumnNames(
label="correctness",
explanation="correctness_explanation",
)
def log_experiment_to_arize(experiment_df, experiment_name):
experiment_df["example_id"] = dataset["id"]
return arize_client.log_experiment(
space_id=SPACE_ID,
experiment_name=experiment_name + "-" + str(uuid1())[:2],
experiment_df=experiment_df,
task_columns=task_cols,
evaluator_columns={
"correctness": correctness_evaluator_cols,
"relevance": relevance_evaluator_cols,
},
dataset_name=dataset_name,
)
Now let's run it for each of our experiments.
# Run Experiments for different k-values
reset_vector_store(vector_store, chunk_size=1024, chunk_overlap=20)
k_2_chunk_1024_overlap_20 = run_rag(questions_df, k_value=2)
k_4_chunk_1024_overlap_20 = run_rag(questions_df, k_value=4)
# Run evaluators
k_2_chunk_1024_overlap_20 = run_evaluators(k_2_chunk_1024_overlap_20)
k_4_chunk_1024_overlap_20 = run_evaluators(k_4_chunk_1024_overlap_20)
# Log experiments to Arize
log_experiment_to_arize(k_2_chunk_1024_overlap_20, "k_2_chunk_1024_overlap_20")
log_experiment_to_arize(k_4_chunk_1024_overlap_20, "k_4_chunk_1024_overlap_20")
# Run experiments for different chunk sizes
reset_vector_store(vector_store, chunk_size=200, chunk_overlap=20)
k_2_chunk_200_overlap_20 = run_rag(questions_df, k_value=2)
reset_vector_store(vector_store, chunk_size=500, chunk_overlap=20)
k_2_chunk_500_overlap_20 = run_rag(questions_df, k_value=2)
# Run evaluators
k_2_chunk_200_overlap_20 = run_evaluators(k_2_chunk_200_overlap_20)
k_2_chunk_500_overlap_20 = run_evaluators(k_2_chunk_500_overlap_20)
# Log experiments to Arize
log_experiment_to_arize(k_2_chunk_200_overlap_20, "k_2_chunk_200_overlap_20")
log_experiment_to_arize(k_2_chunk_500_overlap_20, "k_2_chunk_500_overlap_20")
You can compare the experiment results in the Arize UI and see how each RAG method performs.

Last updated
Was this helpful?