Experiments Architecture

Note

This document explains how experiments work in HoneyHive, including the execution flow, component relationships, and evaluation lifecycle.

What are Experiments?

Experiments in HoneyHive are systematic evaluations of LLM applications that help you:

  • Test changes to prompts, models, or application logic

  • Measure quality with automated evaluators

  • Compare performance across different versions

  • Track improvements over time

Unlike simple tracing (which captures what happened), experiments evaluate how well it happened.

Key Distinction:

Tracing:
✓ Captured 1000 requests
✓ Average latency: 2.3s
✓ Token usage: 450K tokens

Experiments:
✓ Accuracy: 87% (improved from 82%)
✓ User satisfaction: 4.2/5
✓ Cost per quality response: $0.03 (down from $0.05)
✓ Which prompt works better? (A vs B)

How Experiments Work

The Experiment Lifecycle

An experiment follows a clear execution path:

1. Setup Phase
   └─→ Load dataset (code-defined or HoneyHive-managed)
   └─→ Initialize tracer for each datapoint
   └─→ Prepare evaluators

2. Execution Phase (for each datapoint)
   └─→ Create isolated tracer instance
   └─→ Call evaluation function with datapoint
   └─→ Capture traces automatically
   └─→ Collect function outputs

3. Evaluation Phase (for each datapoint)
   └─→ Run evaluators on outputs
   └─→ Compute metrics
   └─→ Send results to backend

4. Aggregation Phase (backend)
   └─→ Aggregate metrics across all datapoints
   └─→ Generate run statistics
   └─→ Enable comparison with other runs

Visual Flow:

        %%{init: {'theme':'base', 'themeVariables': {'primaryColor': '#4F81BD', 'primaryTextColor': '#ffffff', 'primaryBorderColor': '#ffffff', 'lineColor': '#ffffff', 'mainBkg': 'transparent', 'secondBkg': 'transparent', 'tertiaryColor': 'transparent', 'clusterBkg': 'transparent', 'clusterBorder': '#ffffff', 'edgeLabelBackground': 'transparent', 'background': 'transparent'}, 'flowchart': {'linkColor': '#ffffff', 'linkWidth': 2}}}%%
graph TB
    subgraph "1. Setup"
        DS[Dataset<br/>inputs + ground_truth]
        FUNC[Evaluation Function<br/>Your LLM logic]
        EVALS[Evaluators<br/>Quality checks]
    end

    subgraph "2. Per-Datapoint Execution"
        TRACER[Isolated Tracer<br/>Multi-instance]
        EXEC[Execute Function<br/>datapoint → outputs]
        TRACE[Capture Traces<br/>spans + metrics]
    end

    subgraph "3. Per-Datapoint Evaluation"
        RUN_EVAL[Run Evaluators<br/>outputs + ground_truth]
        METRICS[Compute Metrics<br/>scores + metadata]
    end

    subgraph "4. Backend Aggregation"
        SEND[Send to Backend<br/>HoneyHive API]
        AGG[Aggregate Results<br/>across datapoints]
        STORE[Store Run Results<br/>with metrics]
    end

    DS --> EXEC
    FUNC --> EXEC
    TRACER --> EXEC
    EXEC --> TRACE
    TRACE --> RUN_EVAL
    EVALS --> RUN_EVAL
    RUN_EVAL --> METRICS
    METRICS --> SEND
    SEND --> AGG
    AGG --> STORE

    style DS fill:#1b5e20,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style FUNC fill:#1b5e20,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style EVALS fill:#1b5e20,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style TRACER fill:#01579b,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style EXEC fill:#01579b,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style TRACE fill:#01579b,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style RUN_EVAL fill:#e65100,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style METRICS fill:#e65100,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style SEND fill:#4a148c,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style AGG fill:#4a148c,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style STORE fill:#4a148c,stroke:#ffffff,stroke-width:2px,color:#ffffff
    

Component Relationships

The Four Key Components:

  1. Dataset: Test cases with inputs and expected outputs

  2. Evaluation Function: Your LLM application logic

  3. Evaluators: Automated quality assessment functions

  4. Tracer: Captures execution details (multi-instance)

How They Interact:

from honeyhive.experiments import evaluate, evaluator

# 1. Dataset: What to test
dataset = [
    {
        "inputs": {"question": "What is AI?"},
        "ground_truth": {"answer": "Artificial Intelligence..."}
    }
]

# 2. Evaluation Function: What to run
def my_llm_app(datapoint):
    inputs = datapoint.get("inputs", {})
    # Your LLM logic here
    return {"answer": call_llm(inputs["question"])}

# 3. Evaluator: How to score
@evaluator
def accuracy_check(outputs, inputs, ground_truth):
    return {
        "score": 1.0 if outputs["answer"] == ground_truth["answer"] else 0.0
    }

# 4. Run experiment (tracer created automatically)
result = evaluate(
    function=my_llm_app,
    dataset=dataset,
    evaluators=[accuracy_check],
    api_key="key",
    project="project"
)

Multi-Instance Architecture

Each datapoint gets its own isolated tracer instance:

Datapoint 1 → Tracer Instance 1 → Session ID: session_abc_1
Datapoint 2 → Tracer Instance 2 → Session ID: session_abc_2
Datapoint 3 → Tracer Instance 3 → Session ID: session_abc_3

Why This Matters:

  • Isolation: No cross-contamination between test cases

  • Parallel execution: Can process multiple datapoints simultaneously

  • Clear attribution: Each session maps to exactly one datapoint

  • Session enrichment: Can add metadata per datapoint

Example:

def my_function(datapoint, tracer):  # tracer auto-injected
    inputs = datapoint.get("inputs", {})

    # Each datapoint has isolated tracer
    tracer.enrich_session(
        metadata={"test_case_id": inputs.get("id")}
    )

    result = call_llm(inputs["query"])
    return {"answer": result}

# Each execution gets its own tracer instance
# Datapoint 1: tracer_1 → traces stored under session_1
# Datapoint 2: tracer_2 → traces stored under session_2

Data Flow Through the System

Input Data Structure

Dataset Format:

[
    {
        "inputs": {
            # Parameters passed to your function
            "question": "What is machine learning?",
            "context": "ML is a subset of AI",
            "model": "gpt-4"
        },
        "ground_truth": {
            # Expected outputs for evaluation
            "answer": "Machine learning is...",
            "category": "AI/ML",
            "confidence": "high"
        }
    },
    # ... more datapoints
]

Function Signature (v1.0+):

from typing import Any, Dict

def evaluation_function(datapoint: Dict[str, Any]) -> Dict[str, Any]:
    """Your function receives the complete datapoint."""
    inputs = datapoint.get("inputs", {})
    ground_truth = datapoint.get("ground_truth", {})

    # Process inputs
    result = your_logic(inputs)

    # Return outputs
    return {"answer": result}

Execution Data Flow

Step-by-Step Data Transformation:

1. Dataset Entry:
   {
       "inputs": {"query": "What is 2+2?"},
       "ground_truth": {"answer": "4"}
   }

2. Function Receives Datapoint:
   datapoint = {
       "inputs": {"query": "What is 2+2?"},
       "ground_truth": {"answer": "4"}
   }

3. Function Returns Outputs:
   outputs = {"answer": "4", "confidence": "high"}

4. Evaluator Receives:
   - outputs: {"answer": "4", "confidence": "high"}
   - inputs: {"query": "What is 2+2?"}
   - ground_truth: {"answer": "4"}

5. Evaluator Returns Metrics:
   {
       "exact_match": 1.0,
       "confidence_check": 1.0
   }

6. Backend Aggregates:
   Run Results:
   - exact_match: avg(1.0, 0.8, 1.0, ...) = 0.93
   - confidence_check: avg(1.0, 1.0, 0.5, ...) = 0.85

Evaluation Metadata

The system automatically tracks:

# Per-datapoint metadata (automatically added)
{
    "run_id": "run_abc123",
    "dataset_id": "dataset_xyz789",
    "datapoint_id": "EXT-datapoint-1",
    "session_id": "session_unique_id",
    "execution_time_ms": 1234,
    "tracer_instance_id": "tracer_1"
}

This metadata propagates through:

  • Span attributes (via OpenTelemetry baggage)

  • Session metadata

  • Backend storage

  • Results API

Experiments vs Traces

Understanding the Relationship

Experiments use tracing but add evaluation on top:

Tracing Alone:
├─ Captures execution details
├─ Stores spans and attributes
├─ Shows what happened
└─ No quality assessment

Experiments (Tracing + Evaluation):
├─ Everything tracing does, PLUS:
├─ Runs evaluators on outputs
├─ Computes quality metrics
├─ Enables comparison
└─ Drives improvement decisions

When to Use Each:

# Tracing only: Production monitoring
from honeyhive import HoneyHiveTracer

tracer = HoneyHiveTracer.init(api_key="key", project="project")

@trace(tracer=tracer)
def production_endpoint(user_query):
    # Just capture what happens in production
    return process_query(user_query)

# Experiments: Testing and improvement
from honeyhive.experiments import evaluate

result = evaluate(
    function=production_endpoint,
    dataset=test_dataset,  # Controlled test cases
    evaluators=[quality_evaluator],  # Automated scoring
    api_key="key",
    project="project"
)
# Use results to improve before deploying

Complementary Usage:

# 1. Develop with experiments
baseline_result = evaluate(function=v1, dataset=test_data)
improved_result = evaluate(function=v2, dataset=test_data)

# 2. Compare and choose best
if improved_result.metrics.accuracy > baseline_result.metrics.accuracy:
    deploy(v2)

# 3. Monitor in production with tracing
@trace(tracer=tracer)
def production_v2(query):
    return v2(query)

Evaluation Lifecycle

Phase 1: Initialization

# When evaluate() is called:

1. Load/validate dataset
   - If dataset_id provided: fetch from HoneyHive
   - If dataset list provided: generate EXT- ID
   - Validate structure (inputs, ground_truth)

2. Setup run metadata
   - Generate unique run_id
   - Create experiment name
   - Record timestamp

3. Initialize evaluators
   - Validate evaluator signatures
   - Prepare async/sync execution

4. Prepare execution plan
   - Determine parallelization (max_workers)
   - Setup tracer instances pool
   - Initialize progress tracking

Phase 2: Execution Loop

For each datapoint (potentially in parallel):

for datapoint in dataset:
    # 1. Create isolated tracer
    tracer = create_tracer_instance(
        api_key=api_key,
        project=project,
        session_name=f"{experiment_name}-{datapoint_id}"
    )

    # 2. Add evaluation metadata to baggage
    set_baggage({
        "honeyhive.run_id": run_id,
        "honeyhive.dataset_id": dataset_id,
        "honeyhive.datapoint_id": datapoint_id
    })

    # 3. Execute function
    try:
        if function_accepts_tracer(function):
            outputs = function(datapoint, tracer=tracer)
        else:
            outputs = function(datapoint)
    except Exception as e:
        outputs = {"error": str(e)}

    # 4. Run evaluators
    metrics = {}
    for evaluator in evaluators:
        result = evaluator(
            outputs=outputs,
            inputs=datapoint["inputs"],
            ground_truth=datapoint["ground_truth"]
        )
        metrics.update(result)

    # 5. Send to backend
    send_datapoint_result(
        run_id=run_id,
        datapoint_id=datapoint_id,
        session_id=tracer.session_id,
        outputs=outputs,
        metrics=metrics
    )

Phase 3: Backend Aggregation

Happens automatically on HoneyHive backend:

1. Collect Results:
   - Gather all datapoint results for run_id
   - Associate with session traces
   - Link metrics to datapoints

2. Compute Aggregates:
   For each metric (e.g., "accuracy"):
     - Calculate mean across all datapoints
     - Calculate median, min, max
     - Count improved/degraded cases
     - Generate distributions

3. Store Run Metadata:
   - Total datapoints processed
   - Success/failure counts
   - Execution time statistics
   - Cost analysis

4. Enable Comparison:
   - Index run for fast comparison
   - Link to dataset for reproducibility
   - Store evaluator configurations

Phase 4: Results Access

from honeyhive.experiments import get_run_result, compare_runs
from honeyhive import HoneyHive

client = HoneyHive(api_key="key")

# Access aggregated results
result = get_run_result(client, run_id="run_123")

print(f"Status: {result.status}")
print(f"Metrics: {result.metrics}")  # Aggregated metrics
print(f"Datapoints: {result.passed}/{result.total}")

# Compare with another run
comparison = compare_runs(
    client=client,
    new_run_id="run_456",
    old_run_id="run_123"
)

print(f"Improved metrics: {comparison.list_improved_metrics()}")
print(f"Degraded metrics: {comparison.list_degraded_metrics()}")

Backend Aggregation

Why Backend Aggregation?

Previous approach (client-side):

❌ Client calculates all metrics
❌ Must process full dataset to get results
❌ No incremental updates
❌ Comparison requires downloading all data
❌ Slow for large datasets

Current approach (backend-powered):

✅ Backend handles aggregation
✅ Results available as data arrives
✅ Incremental metrics updates
✅ Fast comparison (server-side)
✅ Scales to millions of datapoints

Aggregation Strategies

1. Metric Aggregation:

# For each metric across all datapoints:

{
    "metric_name": "accuracy",
    "values": [1.0, 0.8, 1.0, 0.9, 1.0],  # Individual scores

    # Aggregated statistics:
    "aggregate": {
        "mean": 0.94,
        "median": 1.0,
        "min": 0.8,
        "max": 1.0,
        "std_dev": 0.089
    },

    # Distribution:
    "distribution": {
        "0.0-0.2": 0,
        "0.2-0.4": 0,
        "0.4-0.6": 0,
        "0.6-0.8": 0,
        "0.8-1.0": 5
    }
}

2. Comparison Aggregation:

# When comparing two runs:

{
    "metric_name": "accuracy",
    "old_run": {
        "mean": 0.82,
        "datapoints": 100
    },
    "new_run": {
        "mean": 0.94,
        "datapoints": 100
    },

    # Comparison analysis:
    "comparison": {
        "delta": +0.12,  # Improvement
        "percent_change": +14.6,
        "common_datapoints": 100,
        "improved_count": 15,  # Specific datapoints that improved
        "degraded_count": 3,   # Specific datapoints that degraded
        "unchanged_count": 82
    }
}

3. Cost Aggregation:

# Automatic cost tracking:

{
    "total_tokens": 125000,
    "total_cost_usd": 3.75,

    "by_model": {
        "gpt-4": {
            "tokens": 50000,
            "cost": 3.00
        },
        "gpt-3.5-turbo": {
            "tokens": 75000,
            "cost": 0.75
        }
    },

    "cost_per_datapoint": 0.0375,
    "cost_per_success": 0.0395  # Only successful evaluations
}

Best Practices

1. Structure Experiments for Reproducibility

# ✅ Good: Clear, versioned experiment

EXPERIMENT_VERSION = "v2.1"
DATASET_ID = "qa-dataset-v1"  # Stable dataset reference

result = evaluate(
    function=my_function,
    dataset_id=DATASET_ID,  # Use managed dataset
    evaluators=[accuracy, quality, latency],
    name=f"experiment-{EXPERIMENT_VERSION}-{datetime.now().isoformat()}",
    api_key=api_key,
    project=project
)

# Save results
with open(f"results-{EXPERIMENT_VERSION}.json", "w") as f:
    json.dump(result.to_dict(), f)

2. Use Consistent Evaluators for Comparison

# ✅ Good: Same evaluators for all runs

evaluators = [accuracy_evaluator, quality_evaluator]

baseline = evaluate(
    function=v1_function,
    dataset=dataset,
    evaluators=evaluators,  # Same evaluators
    name="baseline-v1"
)

improved = evaluate(
    function=v2_function,
    dataset=dataset,  # Same dataset
    evaluators=evaluators,  # Same evaluators
    name="improved-v2"
)

# Now comparison is meaningful
comparison = compare_runs(client, improved.run_id, baseline.run_id)

3. Leverage Multi-Instance Architecture

# ✅ Good: Use tracer parameter when needed

def my_function(datapoint, tracer):
    """Function with tracer access for session enrichment."""
    inputs = datapoint.get("inputs", {})

    # Enrich session with experiment metadata
    tracer.enrich_session(
        metadata={
            "test_type": inputs.get("category"),
            "difficulty": inputs.get("difficulty")
        }
    )

    result = process(inputs)
    return result

# Tracer automatically provided by evaluate()
evaluate(function=my_function, dataset=dataset)

4. Start Simple, Add Complexity Gradually

# Phase 1: Basic experiment
result = evaluate(
    function=my_function,
    dataset=small_dataset  # Start small
)

# Phase 2: Add evaluators
result = evaluate(
    function=my_function,
    dataset=small_dataset,
    evaluators=[basic_evaluator]  # Add simple evaluator
)

# Phase 3: Scale up
result = evaluate(
    function=my_function,
    dataset=full_dataset,  # Full dataset
    evaluators=[eval1, eval2, eval3],  # Multiple evaluators
    max_workers=10  # Parallel processing
)

# Phase 4: Comparison workflow
comparison = compare_runs(client, new_run, old_run)

5. Monitor Experiment Costs

# Track costs across experiments

result = evaluate(
    function=my_function,
    dataset=dataset,
    evaluators=evaluators,
    verbose=True  # See progress and costs
)

# Access cost information
print(f"Total tokens: {result.total_tokens}")
print(f"Estimated cost: ${result.estimated_cost}")
print(f"Cost per datapoint: ${result.estimated_cost / len(dataset)}")

# Set cost budgets
if result.estimated_cost > 10.0:
    print("⚠️ Experiment exceeded budget!")

Common Patterns

A/B Testing Pattern

from honeyhive.experiments import evaluate, compare_runs
from honeyhive import HoneyHive

# Test two variants
variant_a = evaluate(
    function=prompt_variant_a,
    dataset=test_dataset,
    evaluators=evaluators,
    name="variant-a-test"
)

variant_b = evaluate(
    function=prompt_variant_b,
    dataset=test_dataset,  # Same dataset!
    evaluators=evaluators,  # Same evaluators!
    name="variant-b-test"
)

# Compare
client = HoneyHive(api_key=api_key)
comparison = compare_runs(client, variant_b.run_id, variant_a.run_id)

# Decide
if "accuracy" in comparison.list_improved_metrics():
    deploy(variant_b)
else:
    deploy(variant_a)

Progressive Improvement Pattern

# Iterative improvement workflow

def improve_iteratively():
    current_best = baseline_function
    current_best_score = 0

    for iteration in range(10):
        # Generate variant
        variant = generate_improvement(current_best)

        # Test variant
        result = evaluate(
            function=variant,
            dataset=test_dataset,
            evaluators=[accuracy_evaluator],
            name=f"iteration-{iteration}"
        )

        # Compare
        if result.metrics.accuracy > current_best_score:
            print(f"✅ Iteration {iteration}: Improved to {result.metrics.accuracy}")
            current_best = variant
            current_best_score = result.metrics.accuracy
        else:
            print(f"❌ Iteration {iteration}: No improvement")

    return current_best

Regression Testing Pattern

# Ensure changes don't break existing behavior

def regression_test(new_function):
    """Test new function against baseline."""

    # Run on regression test suite
    new_result = evaluate(
        function=new_function,
        dataset_id="regression-test-suite-v1",  # Stable test set
        evaluators=[accuracy, quality, safety],
        name="regression-check"
    )

    # Compare with baseline
    baseline_run_id = get_latest_baseline_run()
    comparison = compare_runs(
        client,
        new_run_id=new_result.run_id,
        old_run_id=baseline_run_id
    )

    # Check for regressions
    degraded = comparison.list_degraded_metrics()
    if degraded:
        raise ValueError(f"Regression detected in metrics: {degraded}")

    print("✅ No regressions detected")
    return new_result

See Also