Core Functions

Primary functions for running experiments and managing execution.

evaluate()

evaluate(function, dataset=None, dataset_id=None, evaluators=None, api_key=None, project=None, name=None, source=None, max_workers=1, aggregate_function='average', verbose=False)

Run an experiment by executing a function against a dataset and evaluating outputs.

This is the main entry point for the experiments framework. It handles:

  • Function execution with tracer integration

  • Evaluator orchestration (sync and async)

  • Session/event linking

  • Results aggregation via backend

Parameters:
  • function (Callable[[Dict[str, Any]], Dict[str, Any]]) – Function to test. Should accept Dict[str, Any] (datapoint) and return Dict[str, Any] (outputs).

  • dataset (Optional[List[Dict[str, Any]]]) – List of test cases with inputs and optional ground_truth. Mutually exclusive with dataset_id.

  • dataset_id (Optional[str]) – ID of HoneyHive-managed dataset. Mutually exclusive with dataset.

  • evaluators (Optional[List[Callable]]) – List of evaluator functions decorated with @evaluator or @aevaluator.

  • api_key (Optional[str]) – HoneyHive API key. Falls back to HH_API_KEY environment variable.

  • project (Optional[str]) – HoneyHive project name. Falls back to HH_PROJECT environment variable.

  • name (Optional[str]) – Human-readable name for this experiment run.

  • source (Optional[str]) – Source identifier for this experiment (e.g., “ci-pipeline”, “local-dev”).

  • max_workers (int) – Maximum number of concurrent workers for parallel execution.

  • aggregate_function (str) – Aggregation method for metrics (“average”, “sum”, “min”, “max”).

  • verbose (bool) – Enable detailed logging.

Returns:

Experiment result summary with aggregated metrics.

Return type:

ExperimentResultSummary

Raises:

ValueError – If neither dataset nor dataset_id provided, or if both provided.

Basic Usage

from honeyhive.experiments import evaluate, evaluator

@evaluator
def accuracy_evaluator(outputs, inputs, ground_truth):
    return {"score": 1.0 if outputs == ground_truth else 0.0}

def my_llm_function(datapoint):
    inputs = datapoint["inputs"]
    # Your LLM logic here
    return {"answer": process(inputs["query"])}

result = evaluate(
    function=my_llm_function,
    dataset=[
        {"inputs": {"query": "Q1"}, "ground_truth": {"answer": "A1"}},
        {"inputs": {"query": "Q2"}, "ground_truth": {"answer": "A2"}},
    ],
    evaluators=[accuracy_evaluator],
    api_key="your-api-key",
    project="your-project",
    name="accuracy-test-v1"
)

print(f"Success: {result.success}")
print(f"Passed: {result.passed} / {result.passed + result.failed}")
print(f"Avg accuracy: {result.metrics.get_metric('accuracy_evaluator')}")

External Dataset (Client-Side Data)

# SDK auto-generates EXT- prefixed IDs
result = evaluate(
    function=my_function,
    dataset=[
        {"inputs": {"x": 1}, "ground_truth": {"y": 2}},
        {"inputs": {"x": 2}, "ground_truth": {"y": 4}},
    ],
    evaluators=[my_evaluator],
    api_key="key",
    project="project"
)

Managed Dataset (HoneyHive-Stored)

# Use existing dataset by ID
result = evaluate(
    function=my_function,
    dataset_id="dataset-abc-123",  # Pre-created in HoneyHive
    evaluators=[my_evaluator],
    api_key="key",
    project="project"
)

Multiple Evaluators

@evaluator
def accuracy(outputs, inputs, ground_truth):
    return {"score": calculate_accuracy(outputs, ground_truth)}

@evaluator
def relevance(outputs, inputs, ground_truth):
    return {"score": calculate_relevance(outputs, inputs)}

@aevaluator
async def external_check(outputs, inputs, ground_truth):
    result = await external_api.validate(outputs)
    return {"score": result.score}

result = evaluate(
    function=my_function,
    dataset=test_data,
    evaluators=[accuracy, relevance, external_check],
    api_key="key",
    project="project",
    max_workers=4  # Parallel execution
)

Accessing Results

result = evaluate(...)

# Overall status
print(f"Run ID: {result.run_id}")
print(f"Status: {result.status}")
print(f"Success: {result.success}")

# Aggregated metrics
accuracy_score = result.metrics.get_metric("accuracy")
all_metrics = result.metrics.get_all_metrics()

# Individual datapoints
for datapoint in result.datapoints:
    print(f"Datapoint: {datapoint}")

run_experiment()

run_experiment(function, dataset, datapoint_ids, experiment_context, api_key, max_workers=1, verbose=False)

Low-level function to execute a function against a dataset with tracer integration.

Warning

This is a low-level API. Most users should use evaluate() instead, which provides a higher-level interface with evaluator support.

Parameters:
  • function (Callable[[Dict[str, Any]], Dict[str, Any]]) – Function to execute for each datapoint.

  • dataset (List[Dict[str, Any]]) – List of datapoints to process.

  • datapoint_ids (List[str]) – List of datapoint IDs (must match dataset length).

  • experiment_context (ExperimentContext) – Context with run_id, dataset_id, project, source.

  • api_key (str) – HoneyHive API key.

  • max_workers (int) – Maximum concurrent workers.

  • verbose (bool) – Enable detailed logging.

Returns:

List of execution results with outputs, errors, and session IDs.

Return type:

List[Dict[str, Any]]

Usage Example

from honeyhive.experiments import run_experiment, ExperimentContext

context = ExperimentContext(
    run_id="run-123",
    dataset_id="dataset-456",
    project="my-project",
    source="test"
)

results = run_experiment(
    function=my_function,
    dataset=test_data,
    datapoint_ids=["dp-1", "dp-2", "dp-3"],
    experiment_context=context,
    api_key="key",
    max_workers=2
)

for result in results:
    print(f"Datapoint: {result['datapoint_id']}")
    print(f"Status: {result['status']}")
    print(f"Outputs: {result['outputs']}")
    if result['error']:
        print(f"Error: {result['error']}")

ExperimentContext

class ExperimentContext

Context object storing experiment metadata for tracer integration.

Parameters:
  • run_id (str) – Unique experiment run identifier.

  • dataset_id (str) – Dataset identifier (may be EXT- prefixed for external datasets).

  • project (str) – HoneyHive project name.

  • source (Optional[str]) – Optional source identifier.

Methods

to_tracer_config()

Convert context to tracer configuration dictionary.

Returns:

Configuration dict for HoneyHiveTracer initialization.

Return type:

Dict[str, Any]

Usage Example

from honeyhive.experiments import ExperimentContext

context = ExperimentContext(
    run_id="run-abc-123",
    dataset_id="EXT-dataset-xyz",
    project="my-project",
    source="ci-pipeline"
)

# Convert to tracer config
tracer_config = context.to_tracer_config()

# Use with HoneyHiveTracer
from honeyhive import HoneyHiveTracer
tracer = HoneyHiveTracer(**tracer_config, api_key="key")

Best Practices

1. Function Signatures

Your function should accept a datapoint dict and return outputs dict:

def my_function(datapoint: Dict[str, Any]) -> Dict[str, Any]:

    Args:
        datapoint: Contains 'inputs' and optionally 'ground_truth'

    Returns:
        Dict with your outputs (e.g., {"answer": "...", "confidence": 0.9})

    inputs = datapoint["inputs"]
    # Process inputs
    return {"answer": process(inputs)}

2. Error Handling

Let exceptions bubble up - evaluate() catches and logs them:

def my_function(datapoint):
    try:
        result = risky_operation(datapoint["inputs"])
        return {"result": result}
    except SpecificError as e:
        # Log but don't suppress - let evaluate() handle it
        logger.warning(f"Operation failed: {e}")
        raise

3. Parallel Execution

Use max_workers for I/O-bound workloads:

# Good for API calls
result = evaluate(
    function=api_heavy_function,
    dataset=large_dataset,
    evaluators=[...],
    max_workers=10,  # High concurrency for I/O
    api_key="key",
    project="project"
)

# For CPU-bound work, keep lower
result = evaluate(
    function=cpu_intensive_function,
    dataset=dataset,
    max_workers=2,  # Lower for CPU work
    api_key="key",
    project="project"
)

4. Dataset Size Management

For large datasets, use batching:

def run_large_experiment(full_dataset, batch_size=100):
    """Process large dataset in batches."""
    results = []

    for i in range(0, len(full_dataset), batch_size):
        batch = full_dataset[i:i+batch_size]

        result = evaluate(
            function=my_function,
            dataset=batch,
            evaluators=[my_evaluator],
            name=f"experiment-batch-{i//batch_size}",
            api_key="key",
            project="project"
        )

        results.append(result)

    return results

See Also