"""Evaluation utilities for HoneyHive."""
# pylint: disable=duplicate-code
# This module contains legitimate code duplication with other modules:
# 1. API export lists (__all__) - intentionally duplicated for consistent public APIs
# 2. Pydantic field validators - shared validation logic across similar model classes
# 3. ErrorContext creation patterns - standard parameter structures for error handling
import asyncio
import contextvars
import functools
import logging
import uuid
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Union
from honeyhive.api.client import HoneyHive
from honeyhive.models.generated import (
CreateRunRequest,
EvaluationRun,
)
# Config import removed - not used in this module
logger = logging.getLogger(__name__)
[docs]
@dataclass
class EvaluationResult:
"""Result of an evaluation."""
score: float
metrics: Dict[str, Any]
feedback: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
evaluation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: Optional[str] = None
[docs]
@dataclass
class EvaluationContext:
"""Context for evaluation runs."""
project: str
source: str
session_id: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
[docs]
class BaseEvaluator:
"""Base class for custom evaluators."""
[docs]
def __init__(self, name: str, **kwargs: Any) -> None:
"""Initialize the evaluator."""
self.name = name
self.__name__ = name # Add __name__ attribute for compatibility
self.config = kwargs
[docs]
def evaluate(
self,
inputs: Dict[str, Any],
outputs: Dict[str, Any],
ground_truth: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""Evaluate the given inputs and outputs."""
raise NotImplementedError("Subclasses must implement evaluate method")
[docs]
def __call__(
self,
inputs: Dict[str, Any],
outputs: Dict[str, Any],
ground_truth: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""Make the evaluator callable."""
return self.evaluate(inputs, outputs, ground_truth, **kwargs)
[docs]
class ExactMatchEvaluator(BaseEvaluator): # pylint: disable=too-few-public-methods
"""Evaluator for exact string matching."""
[docs]
def __init__(self, **kwargs: Any) -> None:
"""Initialize the exact match evaluator."""
super().__init__("exact_match", **kwargs)
[docs]
def evaluate(
self,
inputs: Dict[str, Any],
outputs: Dict[str, Any],
ground_truth: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""Evaluate exact match between expected and actual outputs."""
expected = inputs.get("expected", "")
actual = outputs.get("response", "")
# Handle different types
if isinstance(expected, str) and isinstance(actual, str):
score = float(expected.strip().lower() == actual.strip().lower())
else:
score = float(expected == actual)
return {
"exact_match": score,
"expected": expected,
"actual": actual,
}
[docs]
class F1ScoreEvaluator(BaseEvaluator): # pylint: disable=too-few-public-methods
"""Evaluator for F1 score calculation."""
[docs]
def __init__(self, **kwargs: Any) -> None:
"""Initialize the F1 score evaluator."""
super().__init__("f1_score", **kwargs)
[docs]
def evaluate(
self,
inputs: Dict[str, Any],
outputs: Dict[str, Any],
ground_truth: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""Evaluate F1 score between expected and actual outputs."""
expected = inputs.get("expected", "")
actual = outputs.get("response", "")
if not isinstance(expected, str) or not isinstance(actual, str):
return {"f1_score": 0.0, "error": "Both inputs must be strings"}
score = self._compute_f1_score(actual, expected)
return {"f1_score": score}
def _compute_f1_score(self, prediction: str, ground_truth: str) -> float:
"""Compute F1 score between prediction and ground truth."""
pred_words = set(prediction.lower().split())
gt_words = set(ground_truth.lower().split())
if not pred_words or not gt_words:
return 0.0
intersection = pred_words & gt_words
precision = len(intersection) / len(pred_words)
recall = len(intersection) / len(gt_words)
if precision + recall == 0:
return 0.0
return 2 * (precision * recall) / (precision + recall)
class LengthEvaluator(BaseEvaluator): # pylint: disable=too-few-public-methods
"""Evaluator for response length analysis."""
def __init__(self, **kwargs: Any) -> None:
"""Initialize the length evaluator."""
super().__init__("length", **kwargs)
def evaluate(
self,
inputs: Dict[str, Any],
outputs: Dict[str, Any],
ground_truth: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""Evaluate response length metrics."""
response = outputs.get("response", "")
if isinstance(response, str):
char_count = len(response)
word_count = len(response.split())
line_count = len(response.splitlines())
else:
char_count = len(str(response))
word_count = 1
line_count = 1
return {
"char_count": char_count,
"word_count": word_count,
"line_count": line_count,
}
[docs]
class SemanticSimilarityEvaluator(BaseEvaluator):
# pylint: disable=too-few-public-methods
"""Evaluator for semantic similarity using basic heuristics."""
[docs]
def __init__(self, **kwargs: Any) -> None:
"""Initialize the semantic similarity evaluator."""
super().__init__("semantic_similarity", **kwargs)
[docs]
def evaluate(
self,
inputs: Dict[str, Any],
outputs: Dict[str, Any],
ground_truth: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""Evaluate semantic similarity between expected and actual outputs."""
expected = inputs.get("expected", "")
actual = outputs.get("response", "")
if not isinstance(expected, str) or not isinstance(actual, str):
return {"semantic_similarity": 0.0, "error": "Both inputs must be strings"}
# Simple semantic similarity using word overlap and structure
score = self._compute_semantic_similarity(actual, expected)
return {"semantic_similarity": score}
def _compute_semantic_similarity(self, prediction: str, ground_truth: str) -> float:
"""Compute semantic similarity score."""
pred_words = set(prediction.lower().split())
gt_words = set(ground_truth.lower().split())
if not pred_words or not gt_words:
return 0.0
# Word overlap
overlap = len(pred_words & gt_words)
total_unique = len(pred_words | gt_words)
# Structure similarity (simple heuristic)
pred_sentences = len(prediction.split("."))
gt_sentences = len(ground_truth.split("."))
structure_similarity = 1.0 - abs(pred_sentences - gt_sentences) / max(
pred_sentences, gt_sentences, 1
)
# Combined score
word_similarity = overlap / total_unique if total_unique > 0 else 0.0
final_score = (word_similarity * 0.7) + (structure_similarity * 0.3)
return min(1.0, max(0.0, final_score))
# Built-in evaluators
BUILTIN_EVALUATORS = {
"exact_match": ExactMatchEvaluator,
"f1_score": F1ScoreEvaluator,
"length": LengthEvaluator,
"semantic_similarity": SemanticSimilarityEvaluator,
}
def get_evaluator(evaluator_name: str, **kwargs: Any) -> BaseEvaluator:
"""Get a built-in evaluator by name."""
if evaluator_name not in BUILTIN_EVALUATORS:
raise ValueError(f"Unknown evaluator: {evaluator_name}")
return BUILTIN_EVALUATORS[evaluator_name](**kwargs)
[docs]
def evaluate(
prediction: str,
ground_truth: str,
metrics: Optional[List[str]] = None,
**kwargs: Any,
) -> EvaluationResult:
"""Evaluate a prediction against ground truth.
Args:
prediction: Model prediction
ground_truth: Ground truth value
metrics: List of metrics to compute
**kwargs: Additional evaluation parameters
Returns:
Evaluation result
"""
# Default metrics
if metrics is None:
metrics = ["exact_match", "f1_score"]
result_metrics = {}
# Create inputs/outputs dict for evaluators
inputs = {"expected": ground_truth}
outputs = {"response": prediction}
# Run each metric
for metric in metrics:
if metric in BUILTIN_EVALUATORS:
eval_instance = BUILTIN_EVALUATORS[metric]()
try:
metric_result = eval_instance.evaluate(inputs, outputs)
result_metrics.update(metric_result)
except Exception as e:
logger.warning("Failed to compute %s: %s", metric, e)
result_metrics[metric] = 0.0
# Compute overall score (average of numeric metrics)
numeric_metrics = [
v for v in result_metrics.values() if isinstance(v, (int, float))
]
overall_score = (
sum(numeric_metrics) / len(numeric_metrics) if numeric_metrics else 0.0
)
# Ensure score is in 0-1 range
overall_score = max(0.0, min(1.0, overall_score))
return EvaluationResult(score=overall_score, metrics=result_metrics, **kwargs)
def evaluate_decorator(
evaluators: Optional[List[Union[str, BaseEvaluator, Callable]]] = None,
**kwargs: Any,
) -> Callable[[Callable], Callable]:
"""Decorator for functions that should be evaluated.
This is the main @evaluate decorator that can be used with evaluators.
Args:
evaluators: List of evaluators to apply
**kwargs: Additional evaluation parameters
"""
def decorator(func: Callable) -> Callable:
# Check if function is async
if asyncio.iscoroutinefunction(func): # pylint: disable=no-else-return
@functools.wraps(func)
async def async_wrapper(*args: Any, **func_kwargs: Any) -> Any:
# Execute the async function first
result = await func(*args, **func_kwargs)
# If we have evaluators and the first argument is a dict (inputs)
if evaluators and args and isinstance(args[0], dict):
inputs = args[0]
# Convert result to outputs format if it's not already
if isinstance(result, dict):
outputs = result
else:
outputs = {"response": result}
# Run evaluation
try:
eval_result = evaluate_with_evaluators(
evaluators=evaluators,
inputs=inputs,
outputs=outputs,
**kwargs,
)
# Store evaluation result in metadata if result is a dict
if isinstance(result, dict):
if "evaluation" not in result:
result["evaluation"] = {}
result["evaluation"]["result"] = eval_result
else:
# If result is not a dict, we can't easily attach evaluation
# but we could log it or store it elsewhere
logger.info("Evaluation result: %s", eval_result)
except Exception as e:
logger.warning("Evaluation failed: %s", e)
return result
return async_wrapper
else:
@functools.wraps(func)
def sync_wrapper(*args: Any, **func_kwargs: Any) -> Any:
# Execute the function first
result = func(*args, **func_kwargs)
# If we have evaluators and the first argument is a dict (inputs)
if evaluators and args and isinstance(args[0], dict):
inputs = args[0]
# Convert result to outputs format if it's not already
if isinstance(result, dict):
outputs = result
else:
outputs = {"response": result}
# Run evaluation
try:
eval_result = evaluate_with_evaluators(
evaluators=evaluators,
inputs=inputs,
outputs=outputs,
**kwargs,
)
# Store evaluation result in metadata if result is a dict
if isinstance(result, dict):
if "evaluation" not in result:
result["evaluation"] = {}
result["evaluation"]["result"] = eval_result
else:
# If result is not a dict, we can't easily attach evaluation
# but we could log it or store it elsewhere
logger.info("Evaluation result: %s", eval_result)
except Exception as e:
logger.warning("Evaluation failed: %s", e)
return result
return sync_wrapper
return decorator
[docs]
def evaluator(
_name: Optional[str] = None, _session_id: Optional[str] = None, **_kwargs: Any
) -> Callable[[Callable], Callable]:
"""Decorator for synchronous evaluation functions.
Args:
name: Evaluation name
session_id: Session ID for tracing
**kwargs: Additional evaluation parameters
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# Execute evaluation
result = func(*args, **kwargs)
# Note: Event creation for evaluation functions is disabled to avoid \
# type issues
# The evaluation functionality works independently of event creation
return result
return wrapper
return decorator
[docs]
def aevaluator(
_name: Optional[str] = None, _session_id: Optional[str] = None, **_kwargs: Any
) -> Callable[[Callable], Callable]:
"""Decorator for asynchronous evaluation functions.
Args:
name: Evaluation name
session_id: Session ID for tracing
**kwargs: Additional evaluation parameters
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
# Execute evaluation
result = await func(*args, **kwargs)
# Note: Event creation for evaluation functions is disabled to avoid \
# type issues
# The evaluation functionality works independently of event creation
return result
return wrapper
return decorator
def evaluate_with_evaluators(
# pylint: disable=too-many-arguments,too-many-branches
evaluators: List[Union[str, BaseEvaluator, Callable]],
inputs: Dict[str, Any],
outputs: Dict[str, Any],
*,
ground_truth: Optional[Dict[str, Any]] = None,
context: Optional[EvaluationContext] = None,
max_workers: int = 1,
run_concurrently: bool = True,
) -> EvaluationResult:
"""Evaluate outputs using multiple evaluators with optional threading support.
Args:
evaluators: List of evaluators to apply
inputs: Input data for evaluation
outputs: Output data to evaluate
ground_truth: Ground truth data for comparison
context: Evaluation context
max_workers: Maximum number of worker threads for parallel evaluation
run_concurrently: Whether to run evaluators concurrently
Returns:
EvaluationResult with aggregated metrics
"""
if not evaluators:
return EvaluationResult(
score=0.0,
metrics={},
metadata={
"inputs": inputs,
"outputs": outputs,
"ground_truth": ground_truth,
"context": context.__dict__ if context else None,
},
)
metrics: Dict[str, Any] = {}
if run_concurrently and max_workers > 1 and len(evaluators) > 1:
# Run evaluators concurrently using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit evaluation tasks
futures = []
for eval_item in evaluators:
eval_func = _get_evaluator_function(eval_item)
# Create context for each thread
ctx = contextvars.copy_context()
future = executor.submit(
ctx.run,
functools.partial(
_run_single_evaluator, eval_func, inputs, outputs, ground_truth
),
)
futures.append((eval_item, future))
# Collect results
for eval_item, future in futures:
try:
result = future.result()
if isinstance(eval_item, str):
evaluator_name = eval_item
elif isinstance(eval_item, BaseEvaluator):
evaluator_name = eval_item.name
else:
evaluator_name = getattr(eval_item, "__name__", str(eval_item))
metrics[evaluator_name] = result
except Exception as e:
logger.warning("Evaluator %s failed: %s", eval_item, e)
if isinstance(eval_item, str):
evaluator_name = eval_item
elif isinstance(eval_item, BaseEvaluator):
evaluator_name = eval_item.name
else:
evaluator_name = getattr(eval_item, "__name__", str(eval_item))
metrics[evaluator_name] = None
else:
# Run evaluators sequentially
for eval_item in evaluators:
try:
eval_func = _get_evaluator_function(eval_item)
if isinstance(eval_item, str):
evaluator_name = eval_item
elif isinstance(eval_item, BaseEvaluator):
evaluator_name = eval_item.name
else:
evaluator_name = getattr(eval_item, "__name__", str(eval_item))
result = _run_single_evaluator(eval_func, inputs, outputs, ground_truth)
metrics[evaluator_name] = result
except Exception as e:
logger.warning("Evaluator %s failed: %s", eval_item, e)
if isinstance(eval_item, str):
evaluator_name = eval_item
elif isinstance(eval_item, BaseEvaluator):
evaluator_name = eval_item.name
else:
evaluator_name = getattr(eval_item, "__name__", str(eval_item))
metrics[evaluator_name] = None
# Calculate overall score
valid_scores = []
for metric_result in metrics.values():
if metric_result is not None and isinstance(metric_result, dict):
# Extract numeric scores from metric result dictionaries
for value in metric_result.values():
if isinstance(value, (int, float)) and value > 0:
valid_scores.append(value)
elif isinstance(metric_result, (int, float)) and metric_result > 0:
valid_scores.append(metric_result)
if valid_scores:
overall_score = sum(valid_scores) / len(valid_scores)
# Normalize score to 0-1 range
overall_score = max(0.0, min(1.0, overall_score))
else:
overall_score = 0.0
return EvaluationResult(
score=overall_score,
metrics=metrics,
metadata={
"inputs": inputs,
"outputs": outputs,
"ground_truth": ground_truth,
"context": context.__dict__ if context else None,
},
)
def _run_single_evaluator(
evaluator_func: Callable,
inputs: Dict[str, Any],
outputs: Dict[str, Any],
ground_truth: Optional[Dict[str, Any]] = None,
) -> Any:
"""Run a single evaluator function in a thread-safe manner.
Args:
evaluator_func: The evaluator function to run
inputs: Input data
outputs: Output data
ground_truth: Ground truth data
Returns:
Evaluation result from the evaluator
"""
try:
if ground_truth is not None:
return evaluator_func(inputs, outputs, ground_truth)
return evaluator_func(inputs, outputs)
except Exception as e:
logger.error("Evaluator %s failed: %s", evaluator_func.__name__, e)
raise
def _get_evaluator_function(eval_item: Union[str, BaseEvaluator, Callable]) -> Callable:
"""Get the evaluator function from different evaluator types.
Args:
evaluator: Evaluator (string name, BaseEvaluator instance, or callable)
Returns:
Callable evaluator function
"""
if isinstance(eval_item, str):
return get_evaluator(eval_item)
if isinstance(eval_item, BaseEvaluator):
return eval_item.evaluate
return eval_item
def evaluate_batch(
evaluators: List[Union[str, BaseEvaluator, Callable]],
dataset: List[Dict[str, Any]],
max_workers: int = 4,
run_concurrently: bool = True,
context: Optional[EvaluationContext] = None,
) -> List[EvaluationResult]:
"""Evaluate a batch of data points using multiple evaluators with threading support.
Args:
evaluators: List of evaluators to apply
dataset: List of data points, each containing inputs, outputs, and \
optional ground_truth
max_workers: Maximum number of worker threads for parallel evaluation
run_concurrently: Whether to run evaluations concurrently
context: Evaluation context
Returns:
List of EvaluationResult objects
"""
if not dataset:
return []
if run_concurrently and max_workers > 1 and len(dataset) > 1:
# Run evaluations concurrently using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit evaluation tasks
futures = []
for data_point in dataset:
inputs = data_point.get("inputs", {})
outputs = data_point.get("outputs", {})
ground_truth = data_point.get("ground_truth")
# Create context for each thread
ctx = contextvars.copy_context()
future = executor.submit(
ctx.run,
functools.partial(
evaluate_with_evaluators,
evaluators=evaluators,
inputs=inputs,
outputs=outputs,
ground_truth=ground_truth,
context=context,
max_workers=1, # Single evaluator per thread
run_concurrently=False, # Sequential within thread
),
)
futures.append(future)
# Collect results
results = []
for future in futures:
try:
result = future.result()
results.append(result)
except Exception as e:
logger.warning("Batch evaluation failed: %s", e)
# Create empty result for failed evaluation
results.append(
EvaluationResult(
score=0.0,
metrics={},
metadata={
"inputs": {},
"outputs": {},
"ground_truth": {},
"context": context.__dict__ if context else None,
},
)
)
return results
else:
# Run evaluations sequentially
results = []
for data_point in dataset:
try:
inputs = data_point.get("inputs", {})
outputs = data_point.get("outputs", {})
ground_truth = data_point.get("ground_truth")
result = evaluate_with_evaluators(
evaluators=evaluators,
inputs=inputs,
outputs=outputs,
ground_truth=ground_truth,
context=context,
max_workers=1,
run_concurrently=False,
)
results.append(result)
except Exception as e:
logger.warning("Batch evaluation failed: %s", e)
# Create empty result for failed evaluation
results.append(
EvaluationResult(
score=0.0,
metrics={},
metadata={
"inputs": {},
"outputs": {},
"ground_truth": {},
"context": context.__dict__ if context else None,
},
)
)
return results
def create_evaluation_run(
name: str,
project: str,
_results: List[EvaluationResult],
metadata: Optional[Dict[str, Any]] = None,
client: Optional[HoneyHive] = None,
) -> Optional[EvaluationRun]:
"""Create an evaluation run in HoneyHive.
Args:
name: Name of the evaluation run
project: Project name
results: List of evaluation results
metadata: Additional metadata
client: HoneyHive client instance
Returns:
Created evaluation run or None if failed
"""
if client is None:
try:
client = HoneyHive()
except Exception as e:
logger.warning("Could not create HoneyHive client: %s", e)
return None
try:
# Aggregate results (commented out for future use)
# total_score = sum(r.score for r in results)
# Prepare run data - CreateRunRequest expects specific fields
# For now, we'll create a minimal request with required fields
# Note: This is a simplified version - in production you'd want proper UUIDs
try:
# Create run request with minimal required data
run_request = CreateRunRequest(
name=name,
project=project, # This should be a valid UUID string
event_ids=[], # Empty list for now - in production you'd want \
# actual event IDs
dataset_id=None,
datapoint_ids=None,
configuration=None,
status=None,
metadata=metadata or {},
)
except Exception as e:
logger.warning("Could not create CreateRunRequest: %s", e)
# Fallback: return None instead of crashing
return None
# Submit to API
response = client.evaluations.create_run(run_request)
logger.info(
"Created evaluation run: %s",
response.evaluation.run_id if response.evaluation else "unknown",
)
return response.evaluation
except Exception as e:
logger.error("Failed to create evaluation run: %s", e)
return None
# Legacy function for backward compatibility
def _compute_f1_score(prediction: str, ground_truth: str) -> float:
"""Compute F1 score between prediction and ground truth.
Args:
prediction: Model prediction
ground_truth: Ground truth value
Returns:
F1 score between 0 and 1
"""
f1_evaluator = F1ScoreEvaluator()
result = f1_evaluator.evaluate({"expected": ground_truth}, {"response": prediction})
f1_score = result.get("f1_score", 0.0)
if isinstance(f1_score, (int, float)):
return float(f1_score)
return 0.0