"""HoneyHive API evaluations module."""
from typing import Any, Dict, Optional, cast
from uuid import UUID
from ..models import (
CreateRunRequest,
CreateRunResponse,
DeleteRunResponse,
GetRunResponse,
GetRunsResponse,
UpdateRunRequest,
UpdateRunResponse,
)
from ..models.generated import UUIDType
from ..utils.error_handler import APIError, ErrorContext, ErrorResponse
from .base import BaseAPI
def _convert_uuid_string(value: str) -> Any:
"""Convert a single UUID string to UUIDType, or return original on error."""
try:
return cast(Any, UUIDType(UUID(value)))
except ValueError:
return value
def _convert_uuid_list(items: list) -> list:
"""Convert a list of UUID strings to UUIDType objects."""
converted = []
for item in items:
if isinstance(item, str):
converted.append(_convert_uuid_string(item))
else:
converted.append(item)
return converted
def _convert_uuids_recursively(data: Any) -> Any:
"""Recursively convert string UUIDs to UUIDType objects in response data."""
if isinstance(data, dict):
result = {}
for key, value in data.items():
if key in ["run_id", "id"] and isinstance(value, str):
result[key] = _convert_uuid_string(value)
elif key == "event_ids" and isinstance(value, list):
result[key] = _convert_uuid_list(value)
else:
result[key] = _convert_uuids_recursively(value)
return result
if isinstance(data, list):
return [_convert_uuids_recursively(item) for item in data]
return data
[docs]
class EvaluationsAPI(BaseAPI):
"""API client for HoneyHive evaluations."""
[docs]
def create_run(self, request: CreateRunRequest) -> CreateRunResponse:
"""Create a new evaluation run using CreateRunRequest model."""
response = self.client.request(
"POST",
"/runs",
json={"run": request.model_dump(mode="json", exclude_none=True)},
)
data = response.json()
# Convert string UUIDs to UUIDType objects recursively
data = _convert_uuids_recursively(data)
return CreateRunResponse(**data)
[docs]
def create_run_from_dict(self, run_data: dict) -> CreateRunResponse:
"""Create a new evaluation run from dictionary (legacy method)."""
response = self.client.request("POST", "/runs", json={"run": run_data})
data = response.json()
# Convert string UUIDs to UUIDType objects recursively
data = _convert_uuids_recursively(data)
return CreateRunResponse(**data)
[docs]
async def create_run_async(self, request: CreateRunRequest) -> CreateRunResponse:
"""Create a new evaluation run asynchronously using CreateRunRequest model."""
response = await self.client.request_async(
"POST",
"/runs",
json={"run": request.model_dump(mode="json", exclude_none=True)},
)
data = response.json()
# Convert string UUIDs to UUIDType objects recursively
data = _convert_uuids_recursively(data)
return CreateRunResponse(**data)
[docs]
async def create_run_from_dict_async(self, run_data: dict) -> CreateRunResponse:
"""Create a new evaluation run asynchronously from dictionary
(legacy method)."""
response = await self.client.request_async(
"POST", "/runs", json={"run": run_data}
)
data = response.json()
# Convert string UUIDs to UUIDType objects recursively
data = _convert_uuids_recursively(data)
return CreateRunResponse(**data)
[docs]
def get_run(self, run_id: str) -> GetRunResponse:
"""Get an evaluation run by ID."""
response = self.client.request("GET", f"/runs/{run_id}")
data = response.json()
# Convert string UUIDs to UUIDType objects recursively
data = _convert_uuids_recursively(data)
return GetRunResponse(**data)
[docs]
async def get_run_async(self, run_id: str) -> GetRunResponse:
"""Get an evaluation run asynchronously."""
response = await self.client.request_async("GET", f"/runs/{run_id}")
data = response.json()
# Convert string UUIDs to UUIDType objects recursively
data = _convert_uuids_recursively(data)
return GetRunResponse(**data)
[docs]
def list_runs(
self, project: Optional[str] = None, limit: int = 100
) -> GetRunsResponse:
"""List evaluation runs with optional filtering."""
params: dict = {"limit": limit}
if project:
params["project"] = project
response = self.client.request("GET", "/runs", params=params)
data = response.json()
# Convert string UUIDs to UUIDType objects recursively
data = _convert_uuids_recursively(data)
return GetRunsResponse(**data)
[docs]
async def list_runs_async(
self, project: Optional[str] = None, limit: int = 100
) -> GetRunsResponse:
"""List evaluation runs asynchronously."""
params: dict = {"limit": limit}
if project:
params["project"] = project
response = await self.client.request_async("GET", "/runs", params=params)
data = response.json()
# Convert string UUIDs to UUIDType objects recursively
data = _convert_uuids_recursively(data)
return GetRunsResponse(**data)
[docs]
def update_run(self, run_id: str, request: UpdateRunRequest) -> UpdateRunResponse:
"""Update an evaluation run using UpdateRunRequest model."""
response = self.client.request(
"PUT",
f"/runs/{run_id}",
json=request.model_dump(mode="json", exclude_none=True),
)
data = response.json()
return UpdateRunResponse(**data)
[docs]
def update_run_from_dict(self, run_id: str, run_data: dict) -> UpdateRunResponse:
"""Update an evaluation run from dictionary (legacy method)."""
response = self.client.request("PUT", f"/runs/{run_id}", json=run_data)
# Check response status before parsing
if response.status_code >= 400:
error_body = {}
try:
error_body = response.json()
except Exception:
try:
error_body = {"error_text": response.text[:500]}
except Exception:
pass
# Create ErrorResponse for proper error handling
error_response = ErrorResponse(
error_type="APIError",
error_message=(
f"HTTP {response.status_code}: Failed to update run {run_id}"
),
error_code=(
"CLIENT_ERROR" if response.status_code < 500 else "SERVER_ERROR"
),
status_code=response.status_code,
details={
"run_id": run_id,
"update_data": run_data,
"error_response": error_body,
},
context=ErrorContext(
operation="update_run_from_dict",
method="PUT",
url=f"/runs/{run_id}",
json_data=run_data,
),
)
raise APIError(
f"HTTP {response.status_code}: Failed to update run {run_id}",
error_response=error_response,
original_exception=None,
)
data = response.json()
return UpdateRunResponse(**data)
[docs]
async def update_run_async(
self, run_id: str, request: UpdateRunRequest
) -> UpdateRunResponse:
"""Update an evaluation run asynchronously using UpdateRunRequest model."""
response = await self.client.request_async(
"PUT",
f"/runs/{run_id}",
json=request.model_dump(mode="json", exclude_none=True),
)
data = response.json()
return UpdateRunResponse(**data)
[docs]
async def update_run_from_dict_async(
self, run_id: str, run_data: dict
) -> UpdateRunResponse:
"""Update an evaluation run asynchronously from dictionary (legacy method)."""
response = await self.client.request_async(
"PUT", f"/runs/{run_id}", json=run_data
)
data = response.json()
return UpdateRunResponse(**data)
[docs]
def delete_run(self, run_id: str) -> DeleteRunResponse:
"""Delete an evaluation run by ID."""
context = self._create_error_context(
operation="delete_run",
method="DELETE",
path=f"/runs/{run_id}",
additional_context={"run_id": run_id},
)
with self.error_handler.handle_operation(context):
response = self.client.request("DELETE", f"/runs/{run_id}")
data = response.json()
# Convert string UUIDs to UUIDType objects recursively
data = _convert_uuids_recursively(data)
return DeleteRunResponse(**data)
[docs]
async def delete_run_async(self, run_id: str) -> DeleteRunResponse:
"""Delete an evaluation run by ID asynchronously."""
context = self._create_error_context(
operation="delete_run_async",
method="DELETE",
path=f"/runs/{run_id}",
additional_context={"run_id": run_id},
)
with self.error_handler.handle_operation(context):
response = await self.client.request_async("DELETE", f"/runs/{run_id}")
data = response.json()
# Convert string UUIDs to UUIDType objects recursively
data = _convert_uuids_recursively(data)
return DeleteRunResponse(**data)
[docs]
def get_run_result(
self, run_id: str, aggregate_function: str = "average"
) -> Dict[str, Any]:
"""
Get aggregated result for a run from backend.
Backend Endpoint: GET /runs/:run_id/result?aggregate_function=<function>
The backend computes all aggregations, pass/fail status, and composite metrics.
Args:
run_id: Experiment run ID
aggregate_function: Aggregation function ("average", "sum", "min", "max")
Returns:
Dictionary with aggregated results from backend
Example:
>>> results = client.evaluations.get_run_result("run-123", "average")
>>> results["success"]
True
>>> results["metrics"]["accuracy"]
{'aggregate': 0.85, 'values': [0.8, 0.9, 0.85]}
"""
response = self.client.request(
"GET",
f"/runs/{run_id}/result",
params={"aggregate_function": aggregate_function},
)
return cast(Dict[str, Any], response.json())
[docs]
async def get_run_result_async(
self, run_id: str, aggregate_function: str = "average"
) -> Dict[str, Any]:
"""Get aggregated result for a run asynchronously."""
response = await self.client.request_async(
"GET",
f"/runs/{run_id}/result",
params={"aggregate_function": aggregate_function},
)
return cast(Dict[str, Any], response.json())
[docs]
def get_run_metrics(self, run_id: str) -> Dict[str, Any]:
"""
Get raw metrics for a run (without aggregation).
Backend Endpoint: GET /runs/:run_id/metrics
Args:
run_id: Experiment run ID
Returns:
Dictionary with raw metrics data
Example:
>>> metrics = client.evaluations.get_run_metrics("run-123")
>>> metrics["events"]
[{'event_id': '...', 'metrics': {...}}, ...]
"""
response = self.client.request("GET", f"/runs/{run_id}/metrics")
return cast(Dict[str, Any], response.json())
[docs]
async def get_run_metrics_async(self, run_id: str) -> Dict[str, Any]:
"""Get raw metrics for a run asynchronously."""
response = await self.client.request_async("GET", f"/runs/{run_id}/metrics")
return cast(Dict[str, Any], response.json())
[docs]
def compare_runs(
self, new_run_id: str, old_run_id: str, aggregate_function: str = "average"
) -> Dict[str, Any]:
"""
Compare two experiment runs using backend aggregated comparison.
Backend Endpoint: GET /runs/:new_run_id/compare-with/:old_run_id
The backend computes metric deltas, percent changes, and datapoint differences.
Args:
new_run_id: New experiment run ID
old_run_id: Old experiment run ID
aggregate_function: Aggregation function ("average", "sum", "min", "max")
Returns:
Dictionary with aggregated comparison data
Example:
>>> comparison = client.evaluations.compare_runs("run-new", "run-old")
>>> comparison["metric_deltas"]["accuracy"]
{'new_value': 0.85, 'old_value': 0.80, 'delta': 0.05}
"""
response = self.client.request(
"GET",
f"/runs/{new_run_id}/compare-with/{old_run_id}",
params={"aggregate_function": aggregate_function},
)
return cast(Dict[str, Any], response.json())
[docs]
async def compare_runs_async(
self, new_run_id: str, old_run_id: str, aggregate_function: str = "average"
) -> Dict[str, Any]:
"""Compare two experiment runs asynchronously (aggregated)."""
response = await self.client.request_async(
"GET",
f"/runs/{new_run_id}/compare-with/{old_run_id}",
params={"aggregate_function": aggregate_function},
)
return cast(Dict[str, Any], response.json())
[docs]
def compare_run_events(
self,
new_run_id: str,
old_run_id: str,
*,
event_name: Optional[str] = None,
event_type: Optional[str] = None,
limit: int = 100,
page: int = 1,
) -> Dict[str, Any]:
"""
Compare events between two experiment runs with datapoint-level matching.
Backend Endpoint: GET /runs/compare/events
The backend matches events by datapoint_id and provides detailed
per-datapoint comparison with improved/degraded/same classification.
Args:
new_run_id: New experiment run ID (run_id_1)
old_run_id: Old experiment run ID (run_id_2)
event_name: Optional event name filter (e.g., "initialization")
event_type: Optional event type filter (e.g., "session")
limit: Pagination limit (default: 100)
page: Pagination page (default: 1)
Returns:
Dictionary with detailed comparison including:
- commonDatapoints: List of common datapoint IDs
- metrics: Per-metric comparison with improved/degraded/same lists
- events: Paired events (event_1, event_2) for each datapoint
- event_details: Event presence information
- old_run: Old run metadata
- new_run: New run metadata
Example:
>>> comparison = client.evaluations.compare_run_events(
... "run-new", "run-old",
... event_name="initialization",
... event_type="session"
... )
>>> len(comparison["commonDatapoints"])
3
>>> comparison["metrics"][0]["improved"]
["EXT-c1aed4cf0dfc3f16"]
"""
params = {
"run_id_1": new_run_id,
"run_id_2": old_run_id,
"limit": limit,
"page": page,
}
if event_name:
params["event_name"] = event_name
if event_type:
params["event_type"] = event_type
response = self.client.request("GET", "/runs/compare/events", params=params)
return cast(Dict[str, Any], response.json())
[docs]
async def compare_run_events_async(
self,
new_run_id: str,
old_run_id: str,
*,
event_name: Optional[str] = None,
event_type: Optional[str] = None,
limit: int = 100,
page: int = 1,
) -> Dict[str, Any]:
"""Compare events between two experiment runs asynchronously."""
params = {
"run_id_1": new_run_id,
"run_id_2": old_run_id,
"limit": limit,
"page": page,
}
if event_name:
params["event_name"] = event_name
if event_type:
params["event_type"] = event_type
response = await self.client.request_async(
"GET", "/runs/compare/events", params=params
)
return cast(Dict[str, Any], response.json())