Advanced Tracing Patterns

Problem: You need sophisticated tracing patterns for complex scenarios: context propagation across service boundaries, conditional tracing, dynamic sampling, trace correlation, and distributed system tracing.

Solution: Implement advanced patterns that go beyond basic span creation and enrichment for production-grade observability.

Note

Prerequisites

Before using these patterns, ensure you’re familiar with:

Context Propagation

When to Use: Trace requests across multiple services, async operations, or thread boundaries.

Cross-Service Tracing

from honeyhive import HoneyHiveTracer, trace
from opentelemetry import trace as otel_trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import requests

tracer = HoneyHiveTracer.init(project="distributed-system")
propagator = TraceContextTextMapPropagator()

@trace(tracer=tracer)
def call_downstream_service(user_id: str) -> dict:
    """Call downstream service with trace context propagation."""
    from honeyhive import enrich_span

    # Get current span context
    current_span = otel_trace.get_current_span()
    carrier = {}

    # Inject trace context into HTTP headers
    propagator.inject(carrier)

    enrich_span({
        "service.downstream": "user-service",
        "service.user_id": user_id
    })

    # Make HTTP request with trace context headers
    response = requests.post(
        "https://user-service/api/process",
        json={"user_id": user_id},
        headers=carrier  # Trace context propagated
    )

    enrich_span({"service.response_code": response.status_code})

    return response.json()

Async Context Propagation

import asyncio
from honeyhive import trace
from opentelemetry.context import attach, detach, get_current

@trace(tracer=tracer)
async def async_workflow(query: str) -> str:
    """Async workflow with context propagation."""
    from honeyhive import enrich_span

    enrich_span({"workflow.type": "async", "workflow.query": query})

    # Context is automatically propagated to async tasks
    results = await asyncio.gather(
        async_task_1(query),
        async_task_2(query)
    )

    enrich_span({"workflow.tasks_completed": len(results)})
    return " ".join(results)

@trace(tracer=tracer)
async def async_task_1(query: str) -> str:
    """Async task with inherited trace context."""
    from honeyhive import enrich_span
    enrich_span({"task.name": "task_1"})

    await asyncio.sleep(0.1)  # Simulate async work
    return "Result 1"

@trace(tracer=tracer)
async def async_task_2(query: str) -> str:
    """Async task with inherited trace context."""
    from honeyhive import enrich_span
    enrich_span({"task.name": "task_2"})

    await asyncio.sleep(0.1)  # Simulate async work
    return "Result 2"

Conditional Tracing

When to Use: Apply tracing selectively based on runtime conditions.

Sampling-Based Tracing

import random
from honeyhive import HoneyHiveTracer

tracer = HoneyHiveTracer.init(project="sampled-tracing")

def conditional_trace(sample_rate: float = 0.1):
    """Decorator that applies tracing based on sample rate."""
    def decorator(func):
        def wrapper(*args, **kwargs):
            # Sample: trace only sample_rate% of requests
            should_trace = random.random() < sample_rate

            if should_trace:
                from honeyhive import trace
                return trace(tracer=tracer)(func)(*args, **kwargs)
            else:
                # Execute without tracing
                return func(*args, **kwargs)

        return wrapper
    return decorator

@conditional_trace(sample_rate=0.1)  # Trace 10% of requests
def high_volume_operation(data: dict) -> dict:
    """High-volume operation with sampling."""
    return {"processed": True, **data}

User-Based Tracing

def trace_for_users(user_ids: set):
    """Trace only for specific users."""
    def decorator(func):
        def wrapper(user_id: str, *args, **kwargs):
            should_trace = user_id in user_ids

            if should_trace:
                from honeyhive import trace, enrich_span

                @trace(tracer=tracer)
                def traced_func(user_id, *args, **kwargs):
                    enrich_span({"user.id": user_id, "user.traced": True})
                    return func(user_id, *args, **kwargs)

                return traced_func(user_id, *args, **kwargs)
            else:
                return func(user_id, *args, **kwargs)

        return wrapper
    return decorator

# Trace only for beta users
BETA_USERS = {"user_123", "user_456"}

@trace_for_users(BETA_USERS)
def beta_feature(user_id: str, data: dict) -> dict:
    """Feature traced only for beta users."""
    return {"feature": "beta", "user": user_id, **data}

Dynamic Sampling

When to Use: Adjust trace sampling based on runtime metrics or system load.

Adaptive Sampling

import time
from collections import deque

class AdaptiveSampler:
    """Adjust sampling rate based on request volume."""

    def __init__(self, base_rate: float = 0.1, window_size: int = 100):
        self.base_rate = base_rate
        self.window_size = window_size
        self.request_times = deque(maxlen=window_size)

    def should_sample(self) -> bool:
        """Determine if current request should be sampled."""
        current_time = time.time()
        self.request_times.append(current_time)

        if len(self.request_times) < 2:
            return True  # Always sample first requests

        # Calculate requests per second
        time_span = current_time - self.request_times[0]
        rps = len(self.request_times) / time_span if time_span > 0 else 0

        # Reduce sampling rate under high load
        if rps > 100:
            sample_rate = self.base_rate / 10
        elif rps > 50:
            sample_rate = self.base_rate / 2
        else:
            sample_rate = self.base_rate

        return random.random() < sample_rate

# Global sampler
sampler = AdaptiveSampler(base_rate=0.1)

def adaptive_trace(func):
    """Decorator with adaptive sampling."""
    def wrapper(*args, **kwargs):
        if sampler.should_sample():
            from honeyhive import trace
            return trace(tracer=tracer)(func)(*args, **kwargs)
        else:
            return func(*args, **kwargs)

    return wrapper

@adaptive_trace
def high_traffic_endpoint(request_data: dict) -> dict:
    """Endpoint with adaptive sampling."""
    return {"status": "processed"}

Trace Correlation

When to Use: Link related traces across different operations or sessions.

Request ID Correlation

import uuid
from contextvars import ContextVar

# Context variable for request tracking
request_id_var: ContextVar[str] = ContextVar('request_id', default=None)

def with_request_id(func):
    """Decorator that adds request ID to all spans."""
    def wrapper(*args, **kwargs):
        # Generate or propagate request ID
        request_id = request_id_var.get() or str(uuid.uuid4())
        request_id_var.set(request_id)

        from honeyhive import trace, enrich_span

        @trace(tracer=tracer)
        def traced_func(*args, **kwargs):
            enrich_span({"request.id": request_id})
            return func(*args, **kwargs)

        return traced_func(*args, **kwargs)

    return wrapper

@with_request_id
def handle_request(data: dict) -> dict:
    """Handle request with correlated request ID."""
    # All child operations will have the same request ID
    process_step_1(data)
    process_step_2(data)
    return {"status": "complete"}

@with_request_id
def process_step_1(data: dict):
    """Step 1 - shares request ID from parent."""
    pass

@with_request_id
def process_step_2(data: dict):
    """Step 2 - shares request ID from parent."""
    pass

Session Correlation

from honeyhive.models import EventType

class SessionTracker:
    """Track multiple operations within a session."""

    def __init__(self, session_id: str):
        self.session_id = session_id
        self.operation_count = 0

    def trace_operation(self, operation_name: str):
        """Trace operation with session context."""
        def decorator(func):
            def wrapper(*args, **kwargs):
                self.operation_count += 1

                from honeyhive import trace, enrich_span

                @trace(tracer=tracer, event_type=EventType.chain)
                def traced_func(*args, **kwargs):
                    enrich_span({
                        "session.id": self.session_id,
                        "session.operation": operation_name,
                        "session.operation_number": self.operation_count
                    })
                    return func(*args, **kwargs)

                return traced_func(*args, **kwargs)

            return wrapper
        return decorator

# Usage
session = SessionTracker("session_abc123")

@session.trace_operation("login")
def user_login(username: str):
    """Login operation tracked in session."""
    return {"logged_in": True}

@session.trace_operation("fetch_data")
def fetch_user_data(user_id: str):
    """Data fetch tracked in session."""
    return {"data": "..."}

Error Recovery Patterns

When to Use: Implement retry logic with comprehensive tracing.

Traced Retry Pattern

import time
from functools import wraps

def traced_retry(max_attempts: int = 3, backoff: float = 1.0):
    """Retry decorator with trace enrichment."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            from honeyhive import trace, enrich_span

            @trace(tracer=tracer)
            def retry_wrapper(*args, **kwargs):
                enrich_span({
                    "retry.max_attempts": max_attempts,
                    "retry.backoff": backoff
                })

                for attempt in range(1, max_attempts + 1):
                    try:
                        enrich_span({f"retry.attempt_{attempt}": "started"})
                        result = func(*args, **kwargs)

                        enrich_span({
                            "retry.succeeded_at_attempt": attempt,
                            "retry.total_attempts": attempt
                        })
                        return result

                    except Exception as e:
                        enrich_span({
                            f"retry.attempt_{attempt}_failed": str(e),
                            f"retry.attempt_{attempt}_error_type": type(e).__name__
                        })

                        if attempt == max_attempts:
                            enrich_span({"retry.all_failed": True})
                            raise

                        # Exponential backoff
                        sleep_time = backoff * (2 ** (attempt - 1))
                        enrich_span({f"retry.attempt_{attempt}_backoff_s": sleep_time})
                        time.sleep(sleep_time)

                return None  # Should never reach here

            return retry_wrapper(*args, **kwargs)

        return wrapper
    return decorator

@traced_retry(max_attempts=3, backoff=1.0)
def unreliable_api_call(endpoint: str) -> dict:
    """API call with retry logic and tracing."""
    # Simulate unreliable call
    return requests.get(endpoint).json()

Performance Monitoring

When to Use: Track detailed performance metrics within traces.

Resource Usage Tracing

import psutil
import os

def trace_with_resources(func):
    """Trace function with resource usage metrics."""
    def wrapper(*args, **kwargs):
        from honeyhive import trace, enrich_span

        @trace(tracer=tracer)
        def traced_func(*args, **kwargs):
            process = psutil.Process(os.getpid())

            # Before execution
            cpu_before = process.cpu_percent()
            mem_before = process.memory_info().rss / 1024 / 1024  # MB

            enrich_span({
                "resources.cpu_before_%": cpu_before,
                "resources.memory_before_mb": mem_before
            })

            start_time = time.perf_counter()
            result = func(*args, **kwargs)
            duration = time.perf_counter() - start_time

            # After execution
            cpu_after = process.cpu_percent()
            mem_after = process.memory_info().rss / 1024 / 1024

            enrich_span({
                "resources.duration_ms": duration * 1000,
                "resources.cpu_after_%": cpu_after,
                "resources.memory_after_mb": mem_after,
                "resources.memory_delta_mb": mem_after - mem_before
            })

            return result

        return traced_func(*args, **kwargs)

    return wrapper

@trace_with_resources
def memory_intensive_operation(data_size: int):
    """Operation with resource monitoring."""
    # Memory-intensive work
    large_data = [0] * (data_size * 1000000)
    return len(large_data)

Best Practices

1. Choose Appropriate Patterns

  • High-volume systems: Use adaptive sampling

  • Distributed systems: Implement context propagation

  • Debug scenarios: Use user-based or conditional tracing

  • Performance-critical: Use resource usage tracing

2. Combine Patterns

@adaptive_trace  # Sampling
@with_request_id  # Correlation
@traced_retry(max_attempts=3)  # Error handling
def complex_operation(data: dict) -> dict:
    """Operation with multiple advanced patterns."""
    return process_data(data)

3. Monitor Sampling Effectiveness

# Track sampling statistics
from collections import defaultdict

sampling_stats = defaultdict(int)

def track_sampling(func):
    def wrapper(*args, **kwargs):
        sampled = sampler.should_sample()
        sampling_stats['total'] += 1
        if sampled:
            sampling_stats['sampled'] += 1

        return func(*args, **kwargs) if not sampled else traced_func(*args, **kwargs)
    return wrapper

# Periodically log stats
sample_rate = sampling_stats['sampled'] / sampling_stats['total']
print(f"Current sample rate: {sample_rate:.2%}")

Next Steps

Key Takeaway: Advanced tracing patterns enable sophisticated observability for complex, distributed, and high-scale LLM applications. Use context propagation for distributed systems, conditional tracing for high-volume services, and correlation patterns for debugging multi-step workflows. ✨