Core Functions
Primary functions for running experiments and managing execution.
evaluate()
- evaluate(function, dataset=None, dataset_id=None, evaluators=None, api_key=None, project=None, name=None, source=None, max_workers=1, aggregate_function='average', verbose=False)
Run an experiment by executing a function against a dataset and evaluating outputs.
This is the main entry point for the experiments framework. It handles:
Function execution with tracer integration
Evaluator orchestration (sync and async)
Session/event linking
Results aggregation via backend
- Parameters:
function (Callable[[Dict[str, Any]], Dict[str, Any]]) – Function to test. Should accept
Dict[str, Any](datapoint) and returnDict[str, Any](outputs).dataset (Optional[List[Dict[str, Any]]]) – List of test cases with
inputsand optionalground_truth. Mutually exclusive withdataset_id.dataset_id (Optional[str]) – ID of HoneyHive-managed dataset. Mutually exclusive with
dataset.evaluators (Optional[List[Callable]]) – List of evaluator functions decorated with
@evaluatoror@aevaluator.api_key (Optional[str]) – HoneyHive API key. Falls back to
HH_API_KEYenvironment variable.project (Optional[str]) – HoneyHive project name. Falls back to
HH_PROJECTenvironment variable.name (Optional[str]) – Human-readable name for this experiment run.
source (Optional[str]) – Source identifier for this experiment (e.g., “ci-pipeline”, “local-dev”).
max_workers (int) – Maximum number of concurrent workers for parallel execution.
aggregate_function (str) – Aggregation method for metrics (“average”, “sum”, “min”, “max”).
verbose (bool) – Enable detailed logging.
- Returns:
Experiment result summary with aggregated metrics.
- Return type:
- Raises:
ValueError – If neither
datasetnordataset_idprovided, or if both provided.
Basic Usage
from honeyhive.experiments import evaluate, evaluator @evaluator def accuracy_evaluator(outputs, inputs, ground_truth): return {"score": 1.0 if outputs == ground_truth else 0.0} def my_llm_function(datapoint): inputs = datapoint["inputs"] # Your LLM logic here return {"answer": process(inputs["query"])} result = evaluate( function=my_llm_function, dataset=[ {"inputs": {"query": "Q1"}, "ground_truth": {"answer": "A1"}}, {"inputs": {"query": "Q2"}, "ground_truth": {"answer": "A2"}}, ], evaluators=[accuracy_evaluator], api_key="your-api-key", project="your-project", name="accuracy-test-v1" ) print(f"Success: {result.success}") print(f"Passed: {result.passed} / {result.passed + result.failed}") print(f"Avg accuracy: {result.metrics.get_metric('accuracy_evaluator')}")
External Dataset (Client-Side Data)
# SDK auto-generates EXT- prefixed IDs result = evaluate( function=my_function, dataset=[ {"inputs": {"x": 1}, "ground_truth": {"y": 2}}, {"inputs": {"x": 2}, "ground_truth": {"y": 4}}, ], evaluators=[my_evaluator], api_key="key", project="project" )
Managed Dataset (HoneyHive-Stored)
# Use existing dataset by ID result = evaluate( function=my_function, dataset_id="dataset-abc-123", # Pre-created in HoneyHive evaluators=[my_evaluator], api_key="key", project="project" )
Multiple Evaluators
@evaluator def accuracy(outputs, inputs, ground_truth): return {"score": calculate_accuracy(outputs, ground_truth)} @evaluator def relevance(outputs, inputs, ground_truth): return {"score": calculate_relevance(outputs, inputs)} @aevaluator async def external_check(outputs, inputs, ground_truth): result = await external_api.validate(outputs) return {"score": result.score} result = evaluate( function=my_function, dataset=test_data, evaluators=[accuracy, relevance, external_check], api_key="key", project="project", max_workers=4 # Parallel execution )
Accessing Results
result = evaluate(...) # Overall status print(f"Run ID: {result.run_id}") print(f"Status: {result.status}") print(f"Success: {result.success}") # Aggregated metrics accuracy_score = result.metrics.get_metric("accuracy") all_metrics = result.metrics.get_all_metrics() # Individual datapoints for datapoint in result.datapoints: print(f"Datapoint: {datapoint}")
run_experiment()
- run_experiment(function, dataset, datapoint_ids, experiment_context, api_key, max_workers=1, verbose=False)
Low-level function to execute a function against a dataset with tracer integration.
Warning
This is a low-level API. Most users should use
evaluate()instead, which provides a higher-level interface with evaluator support.- Parameters:
function (Callable[[Dict[str, Any]], Dict[str, Any]]) – Function to execute for each datapoint.
dataset (List[Dict[str, Any]]) – List of datapoints to process.
datapoint_ids (List[str]) – List of datapoint IDs (must match dataset length).
experiment_context (ExperimentContext) – Context with run_id, dataset_id, project, source.
api_key (str) – HoneyHive API key.
max_workers (int) – Maximum concurrent workers.
verbose (bool) – Enable detailed logging.
- Returns:
List of execution results with outputs, errors, and session IDs.
- Return type:
List[Dict[str, Any]]
Usage Example
from honeyhive.experiments import run_experiment, ExperimentContext context = ExperimentContext( run_id="run-123", dataset_id="dataset-456", project="my-project", source="test" ) results = run_experiment( function=my_function, dataset=test_data, datapoint_ids=["dp-1", "dp-2", "dp-3"], experiment_context=context, api_key="key", max_workers=2 ) for result in results: print(f"Datapoint: {result['datapoint_id']}") print(f"Status: {result['status']}") print(f"Outputs: {result['outputs']}") if result['error']: print(f"Error: {result['error']}")
ExperimentContext
- class ExperimentContext
Context object storing experiment metadata for tracer integration.
- Parameters:
Methods
- to_tracer_config()
Convert context to tracer configuration dictionary.
- Returns:
Configuration dict for HoneyHiveTracer initialization.
- Return type:
Dict[str, Any]
Usage Example
from honeyhive.experiments import ExperimentContext context = ExperimentContext( run_id="run-abc-123", dataset_id="EXT-dataset-xyz", project="my-project", source="ci-pipeline" ) # Convert to tracer config tracer_config = context.to_tracer_config() # Use with HoneyHiveTracer from honeyhive import HoneyHiveTracer tracer = HoneyHiveTracer(**tracer_config, api_key="key")
Best Practices
1. Function Signatures
Your function should accept a datapoint dict and return outputs dict:
def my_function(datapoint: Dict[str, Any]) -> Dict[str, Any]:
Args:
datapoint: Contains 'inputs' and optionally 'ground_truth'
Returns:
Dict with your outputs (e.g., {"answer": "...", "confidence": 0.9})
inputs = datapoint["inputs"]
# Process inputs
return {"answer": process(inputs)}
2. Error Handling
Let exceptions bubble up - evaluate() catches and logs them:
def my_function(datapoint):
try:
result = risky_operation(datapoint["inputs"])
return {"result": result}
except SpecificError as e:
# Log but don't suppress - let evaluate() handle it
logger.warning(f"Operation failed: {e}")
raise
3. Parallel Execution
Use max_workers for I/O-bound workloads:
# Good for API calls
result = evaluate(
function=api_heavy_function,
dataset=large_dataset,
evaluators=[...],
max_workers=10, # High concurrency for I/O
api_key="key",
project="project"
)
# For CPU-bound work, keep lower
result = evaluate(
function=cpu_intensive_function,
dataset=dataset,
max_workers=2, # Lower for CPU work
api_key="key",
project="project"
)
4. Dataset Size Management
For large datasets, use batching:
def run_large_experiment(full_dataset, batch_size=100):
"""Process large dataset in batches."""
results = []
for i in range(0, len(full_dataset), batch_size):
batch = full_dataset[i:i+batch_size]
result = evaluate(
function=my_function,
dataset=batch,
evaluators=[my_evaluator],
name=f"experiment-batch-{i//batch_size}",
api_key="key",
project="project"
)
results.append(result)
return results
See Also
Evaluators - Define custom evaluators
Results Retrieval - Retrieve and compare results
Data Models - Result data models
Evaluation & Analysis Guides - Experiments tutorial