Skip to main content
Create evaluation tasks that continuously or on-demand score spans in a project, or evaluate examples in a dataset using your LLM-as-judge evaluators.

Key Capabilities

  • Create project-based tasks that run continuously against live spans
  • Create dataset-based tasks that evaluate experiment results
  • Trigger on-demand task runs with custom data windows
  • Poll task runs until completion with configurable timeout
  • Cancel in-progress runs
  • List and filter task runs by status
tasks operations are currently in ALPHA. A one-time warning is emitted on first use.

List Tasks

List tasks you have access to, with optional filtering by space, project, dataset, or type.
resp = client.tasks.list(
    space="your-space-name-or-id",  # optional
    limit=50,
)

for task in resp.tasks:
    print(task.id, task.name)
Filter by task type:
resp = client.tasks.list(
    space="your-space-name-or-id",
    task_type="template_evaluation",
)
Valid values for task_type are "template_evaluation" and "code_evaluation". For details on pagination, field introspection, and data conversion (to dict/JSON/DataFrame), see Response Objects.

Get a Task

Retrieve a task by name or ID. When using a name, provide space to disambiguate.
task = client.tasks.get(
    task="your-task-name-or-id",
    space="your-space-name-or-id",  # required when using a name
)

print(task.id, task.name)

Create a Task

Create a new evaluation task. Tasks can target either a project (live spans) or a dataset (experiment results).

Project-Based Task

A project-based task continuously evaluates incoming spans. Set is_continuous=True to run the task on every new span, or False to run it only on demand.
from arize._generated.api_client.models import TasksCreateRequestEvaluatorsInner

task = client.tasks.create(
    name="Relevance Monitor",
    task_type="template_evaluation",
    project="your-project-name-or-id",
    evaluators=[
        TasksCreateRequestEvaluatorsInner(
            evaluator_id="your-evaluator-id",
        ),
    ],
    is_continuous=True,
    sampling_rate=0.1,  # Evaluate 10% of spans
)

print(task.id)

Dataset-Based Task

A dataset-based task evaluates examples from one or more experiments. At least one experiment_ids entry is required.
task = client.tasks.create(
    name="Experiment Evaluation",
    task_type="template_evaluation",
    dataset="your-dataset-name-or-id",
    experiment_ids=["experiment-id-1", "experiment-id-2"],
    evaluators=[
        TasksCreateRequestEvaluatorsInner(
            evaluator_id="your-evaluator-id",
        ),
    ],
    is_continuous=False,
)

print(task.id)

Column Mappings and Filters

Each evaluator in the task can have its own column mappings (to map template variables to span attribute names) and a per-evaluator query filter.
task = client.tasks.create(
    name="Custom Relevance",
    task_type="template_evaluation",
    project="your-project-name-or-id",
    evaluators=[
        TasksCreateRequestEvaluatorsInner(
            evaluator_id="your-evaluator-id",
            column_mappings={"user_query": "input.value"},
            query_filter="status_code = 'OK'",
        ),
    ],
    query_filter="latency_ms < 5000",  # Task-level filter (AND-ed with evaluator filter)
    is_continuous=True,
)
Parameter reference:
ParameterTypeDescription
namestrTask name. Must be unique within the space.
task_typestr"template_evaluation" or "code_evaluation".
evaluatorslistList of evaluators to attach. At least one required.
projectstrTarget project name or ID. Required when dataset is not provided.
datasetstrTarget dataset name or ID. Required when project is not provided.
spacestrSpace name or ID used to disambiguate name-based resolution for project and dataset.
experiment_idslist[str]Required (at least one) when dataset is provided.
sampling_ratefloatFraction of spans to evaluate (0–1). Project-based tasks only.
is_continuousboolTrue to run on every new span; False for on-demand only.
query_filterstrTask-level SQL-style filter applied to all evaluators.

Task Runs

Trigger a Run

Trigger an on-demand run for a task. The run starts in "pending" status.
from datetime import datetime

run = client.tasks.trigger_run(
    task="your-task-name-or-id",
    data_start_time=datetime(2024, 1, 1),
    data_end_time=datetime(2024, 2, 1),
)

print(run.id, run.status)  # e.g. "run-abc123", "pending"
Parameters:
ParameterTypeDefaultDescription
taskstrrequiredTask name or ID to trigger.
spacestrNoneSpace name or ID used to disambiguate the task lookup. Recommended when resolving by name.
data_start_timedatetimeNoneStart of data window to evaluate.
data_end_timedatetimenowEnd of data window. Defaults to the current time.
max_spansint10 000Maximum number of spans to process.
override_evaluationsboolFalseRe-evaluate data that already has labels.
experiment_idslist[str]NoneExperiment IDs to run against (dataset-based tasks only).

List Runs

List runs for a task with optional status filtering.
resp = client.tasks.list_runs(
    task="your-task-name-or-id",
    limit=20,
)

for run in resp.task_runs:
    print(run.id, run.status)
Filter to only completed runs:
resp = client.tasks.list_runs(
    task="your-task-name-or-id",
    status="completed",
)
Valid status values: "pending", "running", "completed", "failed", "cancelled".

Get a Run

Retrieve a specific run by its ID.
run = client.tasks.get_run(run_id="your-run-id")

print(run.id, run.status)

Cancel a Run

Cancel a run that is currently "pending" or "running".
run = client.tasks.cancel_run(run_id="your-run-id")

print(run.status)  # "cancelled"

Wait for a Run

Poll a run until it reaches a terminal state ("completed", "failed", or "cancelled").
run = client.tasks.wait_for_run(
    run_id="your-run-id",
    poll_interval=5,   # Check every 5 seconds (default)
    timeout=600,       # Give up after 10 minutes (default)
)

print(run.status)  # "completed", "failed", or "cancelled"
Raises TimeoutError if the run does not complete within timeout seconds.

End-to-End: Trigger and Wait

# Trigger an on-demand run
run = client.tasks.trigger_run(task="your-task-name-or-id")

# Block until the run finishes
run = client.tasks.wait_for_run(run_id=run.id)

if run.status == "completed":
    print("Task run completed successfully")
elif run.status == "failed":
    print("Task run failed")
Learn more: Online Evaluations Documentation