Advanced Tracing (OTEL) Examples

Manual Context Propagation, Custom Decorators, Custom Exports and Sampling

This documentation provides advanced use cases and examples, including manual context propagation, custom decorators, custom sampling/filtering, and more. These scenarios address real-world needs such as asynchronous execution, multi-service flows and specialized exporters or decorators for observability platforms like Arize.

1. Manual Context Propagation

Context Propagation in OpenTelemetry ensures that the current tracing context (i.e., the currently active span and its metadata) is available whenever you switch threads, tasks, or processes. This is particularly relevant if your code spans across asynchronous tasks or crosses microservice boundaries.

In typical usage, OTEL instrumentation libraries handle context propagation automatically. However, there are cases where you need to do it manually, especially in asynchronous workflows or custom instrumentation.

Propagation with Async Functions

When dealing with Python async/await code, you can manually pass context if an automated instrumentation doesn't handle it, or if you have custom logic. The steps are:

  1. Extract the current context (e.g., from an incoming HTTP request).

  2. Create a new span as a child of that context.

  3. Pass or embed the context into the async function so it can be reused.

import asyncio
from opentelemetry import trace
from opentelemetry.context import attach, detach, get_current

tracer = trace.get_tracer(__name__)

async def async_func(ctx):
    token = attach(ctx)
    try:
        current_span = trace.get_current_span()
        current_span.set_attribute("input.value", User Input)
        await asyncio.sleep(1)  # Simulate async work
    finally:
        detach(token)

def sync_func():
    with tracer.start_as_current_span("sync_span") as span:
        # Capture the current context 
        context = get_current()
        # Run the async function, passing the context
        asyncio.run(async_func(context))

if __name__ == "__main__":
    sync_func()

Propagation Across Different Microservices

When making HTTP or gRPC calls to another microservice, we typically propagate the current tracing context through HTTP headers. If you're using the built-in instrumentation (like opentelemetry-instrumentation-requests or opentelemetry-instrumentation-httpx), it's handled automatically. For a custom approach, you do the following:

  1. Inject the current span context into HTTP headers before sending the request.

  2. On the receiving microservice, extract the context from the incoming headers.

Example: Service A sends a request to Service B.

Service A:

import requests
from opentelemetry import trace
from opentelemetry.context import Context
from opentelemetry.propagators.textmap import CarrierT, DefaultTextMapPropagator

tracer = trace.get_tracer(__name__)

def make_request_to_service_b():
    # Start a new span for this operation
    with tracer.start_as_current_span("llm_service_a") as span:
        # Prepare headers
        headers = {}
        DefaultTextMapPropagator().inject(carrier=headers)  # Inject the current context

        # Make the request with the injected headers
        response = requests.get("http://service-b:5000/endpoint", headers=headers)
        return response.text

Service B:

from flask import Flask, request
from opentelemetry import trace
from opentelemetry.propagators.textmap import DefaultTextMapPropagator

app = Flask(__name__)
tracer = trace.get_tracer(__name__)

@app.route("/endpoint")
def endpoint():
    # Extract the context from incoming request
    context = DefaultTextMapPropagator().extract(carrier=dict(request.headers))

    # Create a new span as child
    with tracer.start_as_current_span("service_b_processing", context=context) as span:
        span.add_event("Received request in service B")
        # ... do some processing ...
        return "Hello from Service B"

Propagation with Concurrent Threads

When you submit tasks to a ThreadPoolExecutor or any other concurrency mechanism, each task runs in a separate thread. If you rely on a tracer's current context (which stores the active span or baggage), it won't automatically follow your tasks to those worker threads. By manually capturing the context in the main thread and then attaching it in each worker thread, you preserve the association between the tasks and the original trace context.

Example

Below is a detailed, annotated example to show how you can:

  1. Capture the current context before submitting tasks to the executor.

  2. Attach that context within each worker thread (using attach).

  3. Run your task logic (e.g., processing questions).

  4. Detach the context when the task is complete (using detach).

import concurrent.futures
from opentelemetry import trace
from opentelemetry.context import attach, detach, get_current

tracer = trace.get_tracer(__name__)

def func1():
    """
    Some example work done in a thread.
    """
    current_span = trace.get_current_span()
    current_span.set_attribute("input.value", User Input)
    return "func1 result"

def func2():
    """
    Another example function that logs an event to the current span.
    """
    current_span = trace.get_current_span()
    current_span.set_attribute("input.value", User Input2)
    return "func2 result"
    
    
def wrapped_func(func: Callable):
    """
    Demonstrates how to capture the current context in the main thread,
    and attach/detach it within each worker thread. 
    Wraps the original function to attach/detach the current context
    so the worker thread has the correct span context.
    """
    # Capture the context from the current thread
    main_context = get_current()
    def wrapper():
        token = attach(main_context)  # Attach context to this thread
        try:
            return func()
        finally:
            detach(token)              # Detach after finishing
    return wrapper
# Create a list of functions to be executed in parallel
funcs = [func1, func2, func1, func2]

with concurrent.futures.ThreadPoolExecutor() as executor:
 # Map each function to its wrapped version
 results = list(executor.map(lambda f: wrapped_func(f)(), funcs))

return results

2. Creating Custom Decorators

Decorators are a convenient way to instrument functions and methods across your codebase without having to insert tracing calls repeatedly. A custom decorator can:

  • Start a new span before the function call.

  • Add attributes/events with function arguments (inputs).

  • Return the function's result (outputs) and annotate or log it in the span.

  • End the span.

Example Decorator Implementation:

from opentelemetry import trace

def trace_function(span_kind=None, additional_attributes=None):
    tracer = trace.get_tracer(__name__)
    def wrapper(*args, **kwargs):
        with tracer.start_as_current_span(func.__name__) as span:
               if span_kind:
                   span.set_attribute("openinference.span.kind", span_kind)

               span.set_attribute("input.value", str(args))

               if additional_attributes:
                   for key, value in additional_attributes.items():
                       span.set_attribute(key, value)
              
               result = func(*args, **kwargs)
               span.set_attribute("output.value", str(result))
               return result
    return wrapper

# Example Implementation
@trace_function(span_kind="LLM", additional_attributes={"llm.model_name": "gpt-4o"})
def process_text(text):
    return text.upper()

3. Filtering Spans Based on Specific Attributes

In large-scale applications, you may not need to record every single span. Instead, you might want to selectively sample:

  • Spans of a particular service or component.

  • Spans that meet certain business criteria (e.g., user.id in a specific subset).

  • Only error or slow spans.

By creating a custom sampler, you can dynamically control which spans get recorded/exported based on their attributes or names. This approach helps control telemetry volume and cost, while ensuring you capture the traces most relevant for debugging or analysis.

Custom Sampling Basics

Sampler Interface

In OTEL Python, you create a custom sampler by subclassing the Sampler interface from opentelemetry.sdk.trace.sampling. You then implement:

  1. should_sample(...)

    • Decides whether the span is recorded (Sampled) or dropped (NotSampled).

    • You can look at the attributes, span name, span kind, parent context, etc.

Sampling Result

When implementing should_sample, you must return a SamplingResult, which indicates:

  • Sampling Decision: Decision.RECORD_AND_SAMPLE, Decision.RECORD_ONLY, or Decision.DROP.

  • Attributes: You can optionally modify or add attributes in the returned SamplingResult (e.g., a reason for sampling).

Example:

Let's create an example sampling mechanism where spans with a specific user ID is dropped.

from opentelemetry.context import Context
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision
from opentelemetry import trace

class UserBasedSampler(Sampler):
    """
    A custom sampler that drops any span having a `user.id` attribute matching
    a specified user ID. Otherwise, spans are recorded and sampled.
    """
    def should_sample(
        self,
        parent_context: Context,
        trace_id: int,
        name: str,
        kind,
        attributes: dict,
        links
    ) -> SamplingResult:
        user_id = attributes.get("user.id")
        if user_id == USER_ID_TO_DROP:
            # If this user matches the one we want to drop, do not sample
            return SamplingResult(
                decision=Decision.DROP,
                attributes={"sampler.reason": f"Dropping span for user.id={user_id}"}
            )
        else:
            # Otherwise, record and sample normally
            return SamplingResult(
                decision=Decision.RECORD_AND_SAMPLE,
                attributes={}
            )

You then pass your custom sampler into your tracer provider.

tracer_provider = TracerProvider(sampler=UserBasedSampler())

4. Inheriting Context Attributes in Manual Spans

When using OpenTelemetry, context attributes are not automatically attached to spans created manually using start_as_current_span. This differs from auto-instrumentation (e.g., _llm_context), where attributes from the context are explicitly merged into the span during its creation.

To ensure context attributes are available in manual spans, you must explicitly retrieve and set them:

# Solution: Custom Span Creation Helper

def create_span_with_context(tracer, name, **kwargs):
    with tracer.start_as_current_span(name, **kwargs) as span:
        context_attributes = dict(get_attributes_from_context())
        span.set_attributes(context_attributes)
        return span

This pattern ensures that all relevant context (like session IDs, user info, etc.) is inherited by manually created spans, preserving traceability and observability.

example usage

with using_session("my-session-id"):
    with create_span_with_context(tracer, "my-manual-span") as span:
        # ... your code here

5. Health Check Pattern for Tracer Initialization

Validates endpoint connectivity before initializing the tracer provider, providing graceful fallback when services are unavailable. This pattern prevents retry loops and ensures your application continues to function even when tracing services are down.

Pattern Implementation

import httpx
import logging
from opentelemetry.trace import NoOpTracerProvider, TracerProvider
from arize.otel import register

def create_tracer_provider() -> TracerProvider:
    """
    Create tracer provider with connectivity validation.
    Falls back to NoOp tracer if endpoint is unreachable.
    """
    try:
        # Quick connectivity check
        with httpx.Client(timeout=3.0) as client:
            response = client.get(
                settings.arize_endpoint,
                headers={
                    "Authorization": settings.api_key,
                    "Grpc-Metadata-space_id": settings.space_id,
                }
            )
            response.raise_for_status()
        
        # Initialize if healthy
        logging.info("Tracing endpoint healthy - initializing Arize tracer")
        return register(
            space_id=settings.space_id,
            api_key=settings.api_key,
            project_name=settings.project_name,
            endpoint=settings.endpoint,
        )
        
    except Exception as e:
        logging.warning(f"Tracing unavailable: {e}. Using NoOp tracer.")
        return NoOpTracerProvider()

Usage Example

# Initialize tracer with health check
tracer_provider = create_tracer_provider()
tracer = tracer_provider.get_tracer(__name__)

# Use normally - works with both real and NoOp tracers
with tracer.start_as_current_span("my_operation") as span:
    span.set_attribute("user.id", "123")
    # ... your application logic

Benefits

  • Fail Fast: Quick 3-second validation prevents long waits

  • Graceful Degradation: Application continues when tracing is down

  • Resource Efficient: Prevents retry loops and reduces memory usage

  • Operational Safety: No impact on application performance during outages

Last updated

Was this helpful?