"""HoneyHive API Client.
This module provides the main HoneyHive client with an ergonomic interface
wrapping the auto-generated API code.
Usage::
from honeyhive.api import HoneyHive
client = HoneyHive(api_key="your-api-key")
# Sync usage
configs = client.configurations.list(project="my-project")
# Async usage
configs = await client.configurations.list_async(project="my-project")
"""
import time
import warnings
from typing import Any, Dict, List, Optional, Union
import httpx
from honeyhive.utils.retry import RetryConfig
from honeyhive._generated.api_config import APIConfig
from honeyhive.models import EventFilter, EventExportRequest, EventExportResponse
# Import models used in type hints
from honeyhive._generated.models import (
AddDatapointsResponse,
AddDatapointsToDatasetRequest,
CreateConfigurationRequest,
CreateConfigurationResponse,
CreateDatapointRequest,
CreateDatapointResponse,
CreateDatasetRequest,
CreateDatasetResponse,
CreateMetricRequest,
CreateMetricResponse,
CreateToolRequest,
CreateToolResponse,
DeleteConfigurationResponse,
DeleteDatapointResponse,
DeleteDatasetResponse,
DeleteExperimentRunResponse,
DeleteMetricResponse,
DeleteSessionResponse,
DeleteToolResponse,
GetConfigurationsResponse,
GetDatapointResponse,
GetDatapointsResponse,
GetDatasetsResponse,
GetEventsBySessionIdResponse,
GetEventsQuery,
GetEventsResponse,
GetExperimentRunResponse,
GetExperimentRunsResponse,
GetExperimentRunsSchemaResponse,
GetMetricsResponse,
GetSessionResponse,
GetToolsResponse,
PostEventRequest,
PostEventResponse,
PostExperimentRunRequest,
PostExperimentRunResponse,
PostSessionStartResponse,
PutExperimentRunRequest,
PutExperimentRunResponse,
RemoveDatapointResponse,
UpdateConfigurationRequest,
UpdateConfigurationResponse,
UpdateDatapointRequest,
UpdateDatapointResponse,
UpdateDatasetRequest,
UpdateDatasetResponse,
UpdateMetricRequest,
UpdateMetricResponse,
UpdateToolRequest,
UpdateToolResponse,
)
# Import async services
# Import sync services
from honeyhive._generated.services import Configurations_service as configs_svc
from honeyhive._generated.services import Datapoints_service as datapoints_svc
from honeyhive._generated.services import Datasets_service as datasets_svc
from honeyhive._generated.services import Events_service as events_svc
from honeyhive._generated.services import Experiments_service as experiments_svc
from honeyhive._generated.services import Metrics_service as metrics_svc
from honeyhive._generated.services import Projects_service as projects_svc
from honeyhive._generated.services import Session_service as session_svc
from honeyhive._generated.services import Sessions_service as sessions_svc
from honeyhive._generated.services import Tools_service as tools_svc
from honeyhive._generated.services import (
async_Configurations_service as configs_svc_async,
)
from honeyhive._generated.services import (
async_Datapoints_service as datapoints_svc_async,
)
from honeyhive._generated.services import async_Datasets_service as datasets_svc_async
from honeyhive._generated.services import async_Events_service as events_svc_async
from honeyhive._generated.services import (
async_Experiments_service as experiments_svc_async,
)
from honeyhive._generated.services import async_Metrics_service as metrics_svc_async
from honeyhive._generated.services import async_Projects_service as projects_svc_async
from honeyhive._generated.services import async_Session_service as session_svc_async
from honeyhive._generated.services import async_Sessions_service as sessions_svc_async
from honeyhive._generated.services import async_Tools_service as tools_svc_async
from ._base import BaseAPI
[docs]
class ConfigurationsAPI(BaseAPI):
"""Configurations API."""
# Sync methods
[docs]
def list(self, project: Optional[str] = None) -> List[GetConfigurationsResponse]:
"""List configurations.
Note: project parameter is currently unused as v1 API doesn't support project filtering.
"""
return configs_svc.getConfigurations(self._api_config)
[docs]
def create(
self, request: CreateConfigurationRequest
) -> CreateConfigurationResponse:
"""Create a configuration."""
return configs_svc.createConfiguration(self._api_config, data=request)
[docs]
def update(
self, id: str, request: UpdateConfigurationRequest
) -> UpdateConfigurationResponse:
"""Update a configuration."""
return configs_svc.updateConfiguration(self._api_config, id=id, data=request)
[docs]
def delete(self, id: str) -> DeleteConfigurationResponse:
"""Delete a configuration."""
return configs_svc.deleteConfiguration(self._api_config, id=id)
# Async methods
[docs]
async def list_async(
self, project: Optional[str] = None
) -> List[GetConfigurationsResponse]:
"""List configurations asynchronously.
Note: project parameter is currently unused as v1 API doesn't support project filtering.
"""
return await configs_svc_async.getConfigurations(self._api_config)
[docs]
async def create_async(
self, request: CreateConfigurationRequest
) -> CreateConfigurationResponse:
"""Create a configuration asynchronously."""
return await configs_svc_async.createConfiguration(
self._api_config, data=request
)
[docs]
async def update_async(
self, id: str, request: UpdateConfigurationRequest
) -> UpdateConfigurationResponse:
"""Update a configuration asynchronously."""
return await configs_svc_async.updateConfiguration(
self._api_config, id=id, data=request
)
[docs]
async def delete_async(self, id: str) -> DeleteConfigurationResponse:
"""Delete a configuration asynchronously."""
return await configs_svc_async.deleteConfiguration(self._api_config, id=id)
# Backwards compatible aliases
[docs]
def get_configuration(self, id: str) -> GetConfigurationsResponse:
"""Get a configuration (backwards compatible alias)."""
return self.list() # No single-get endpoint, returns all
[docs]
def create_configuration(
self, request: CreateConfigurationRequest
) -> CreateConfigurationResponse:
"""Create a configuration (backwards compatible alias)."""
return self.create(request)
[docs]
def update_configuration(
self, id: str, request: UpdateConfigurationRequest
) -> UpdateConfigurationResponse:
"""Update a configuration (backwards compatible alias)."""
return self.update(id, request)
[docs]
def delete_configuration(self, id: str) -> DeleteConfigurationResponse:
"""Delete a configuration (backwards compatible alias)."""
return self.delete(id)
[docs]
def list_configurations(
self, project: Optional[str] = None
) -> List[GetConfigurationsResponse]:
"""List configurations (backwards compatible alias)."""
return self.list(project)
[docs]
class DatapointsAPI(BaseAPI):
"""Datapoints API."""
# Sync methods
[docs]
def list(
self,
datapoint_ids: Optional[List[str]] = None,
dataset_name: Optional[str] = None,
) -> GetDatapointsResponse:
"""List datapoints.
Args:
datapoint_ids: Optional list of datapoint IDs to fetch.
dataset_name: Optional dataset name to filter by.
"""
return datapoints_svc.getDatapoints(
self._api_config, datapoint_ids=datapoint_ids, dataset_name=dataset_name
)
[docs]
def get(self, id: str) -> GetDatapointResponse:
"""Get a datapoint by ID."""
return datapoints_svc.getDatapoint(self._api_config, id=id)
[docs]
def create(self, request: CreateDatapointRequest) -> CreateDatapointResponse:
"""Create a datapoint."""
return datapoints_svc.createDatapoint(self._api_config, data=request)
[docs]
def update(
self, id: str, request: UpdateDatapointRequest
) -> UpdateDatapointResponse:
"""Update a datapoint."""
return datapoints_svc.updateDatapoint(self._api_config, id=id, data=request)
[docs]
def delete(self, id: str) -> DeleteDatapointResponse:
"""Delete a datapoint."""
return datapoints_svc.deleteDatapoint(self._api_config, id=id)
# Async methods
[docs]
async def list_async(
self,
datapoint_ids: Optional[List[str]] = None,
dataset_name: Optional[str] = None,
) -> GetDatapointsResponse:
"""List datapoints asynchronously.
Args:
datapoint_ids: Optional list of datapoint IDs to fetch.
dataset_name: Optional dataset name to filter by.
"""
return await datapoints_svc_async.getDatapoints(
self._api_config, datapoint_ids=datapoint_ids, dataset_name=dataset_name
)
[docs]
async def get_async(self, id: str) -> GetDatapointResponse:
"""Get a datapoint by ID asynchronously."""
return await datapoints_svc_async.getDatapoint(self._api_config, id=id)
[docs]
async def create_async(
self, request: CreateDatapointRequest
) -> CreateDatapointResponse:
"""Create a datapoint asynchronously."""
return await datapoints_svc_async.createDatapoint(
self._api_config, data=request
)
[docs]
async def update_async(
self, id: str, request: UpdateDatapointRequest
) -> UpdateDatapointResponse:
"""Update a datapoint asynchronously."""
return await datapoints_svc_async.updateDatapoint(
self._api_config, id=id, data=request
)
[docs]
async def delete_async(self, id: str) -> DeleteDatapointResponse:
"""Delete a datapoint asynchronously."""
return await datapoints_svc_async.deleteDatapoint(self._api_config, id=id)
# Backwards compatible aliases
[docs]
def get_datapoint(self, id: str) -> GetDatapointResponse:
"""Get a datapoint by ID (backwards compatible alias for get())."""
return self.get(id)
[docs]
def create_datapoint(
self, request: CreateDatapointRequest
) -> CreateDatapointResponse:
"""Create a datapoint (backwards compatible alias for create())."""
return self.create(request)
[docs]
def update_datapoint(
self, id: str, request: UpdateDatapointRequest
) -> UpdateDatapointResponse:
"""Update a datapoint (backwards compatible alias for update())."""
return self.update(id, request)
[docs]
def delete_datapoint(self, id: str) -> DeleteDatapointResponse:
"""Delete a datapoint (backwards compatible alias for delete())."""
return self.delete(id)
[docs]
def list_datapoints(
self,
datapoint_ids: Optional[List[str]] = None,
dataset_name: Optional[str] = None,
) -> GetDatapointsResponse:
"""List datapoints (backwards compatible alias)."""
return self.list(datapoint_ids=datapoint_ids, dataset_name=dataset_name)
[docs]
class DatasetsAPI(BaseAPI):
"""Datasets API."""
# Sync methods
[docs]
def list(
self,
dataset_id: Optional[str] = None,
name: Optional[str] = None,
include_datapoints: Optional[bool] = None,
) -> GetDatasetsResponse:
"""List datasets.
Args:
dataset_id: Optional dataset ID to fetch.
name: Optional dataset name to filter by.
include_datapoints: Whether to include datapoints in the response.
"""
return datasets_svc.getDatasets(
self._api_config,
dataset_id=dataset_id,
name=name,
include_datapoints=include_datapoints,
)
[docs]
def create(self, request: CreateDatasetRequest) -> CreateDatasetResponse:
"""Create a dataset."""
return datasets_svc.createDataset(self._api_config, data=request)
[docs]
def update(self, request: UpdateDatasetRequest) -> UpdateDatasetResponse:
"""Update a dataset."""
return datasets_svc.updateDataset(self._api_config, data=request)
[docs]
def delete(self, id: str) -> DeleteDatasetResponse:
"""Delete a dataset."""
return datasets_svc.deleteDataset(self._api_config, dataset_id=id)
[docs]
def add_datapoints(
self, dataset_id: str, request: AddDatapointsToDatasetRequest
) -> AddDatapointsResponse:
"""Add datapoints to a dataset.
Args:
dataset_id: The unique identifier of the dataset to add datapoints to.
request: The request containing data and mapping for the datapoints.
Returns:
AddDatapointsResponse with inserted status and datapoint IDs.
"""
return datasets_svc.addDatapoints(
self._api_config, dataset_id=dataset_id, data=request
)
[docs]
def remove_datapoint(self, dataset_id: str, datapoint_id: str) -> RemoveDatapointResponse:
"""Remove a datapoint from a dataset.
Args:
dataset_id: The unique identifier of the dataset.
datapoint_id: The unique identifier of the datapoint to remove.
Returns:
RemoveDatapointResponse with dereferenced status and message.
"""
return datasets_svc.removeDatapoint(
self._api_config, dataset_id=dataset_id, datapoint_id=datapoint_id
)
# Async methods
[docs]
async def list_async(
self,
dataset_id: Optional[str] = None,
name: Optional[str] = None,
include_datapoints: Optional[bool] = None,
) -> GetDatasetsResponse:
"""List datasets asynchronously.
Args:
dataset_id: Optional dataset ID to fetch.
name: Optional dataset name to filter by.
include_datapoints: Whether to include datapoints in the response.
"""
return await datasets_svc_async.getDatasets(
self._api_config,
dataset_id=dataset_id,
name=name,
include_datapoints=include_datapoints,
)
[docs]
async def create_async(
self, request: CreateDatasetRequest
) -> CreateDatasetResponse:
"""Create a dataset asynchronously."""
return await datasets_svc_async.createDataset(self._api_config, data=request)
[docs]
async def update_async(
self, request: UpdateDatasetRequest
) -> UpdateDatasetResponse:
"""Update a dataset asynchronously."""
return await datasets_svc_async.updateDataset(self._api_config, data=request)
[docs]
async def delete_async(self, id: str) -> DeleteDatasetResponse:
"""Delete a dataset asynchronously."""
return await datasets_svc_async.deleteDataset(self._api_config, dataset_id=id)
[docs]
async def add_datapoints_async(
self, dataset_id: str, request: AddDatapointsToDatasetRequest
) -> AddDatapointsResponse:
"""Add datapoints to a dataset asynchronously.
Args:
dataset_id: The unique identifier of the dataset to add datapoints to.
request: The request containing data and mapping for the datapoints.
Returns:
AddDatapointsResponse with inserted status and datapoint IDs.
"""
return await datasets_svc_async.addDatapoints(
self._api_config, dataset_id=dataset_id, data=request
)
[docs]
async def remove_datapoint_async(
self, dataset_id: str, datapoint_id: str
) -> RemoveDatapointResponse:
"""Remove a datapoint from a dataset asynchronously.
Args:
dataset_id: The unique identifier of the dataset.
datapoint_id: The unique identifier of the datapoint to remove.
Returns:
RemoveDatapointResponse with dereferenced status and message.
"""
return await datasets_svc_async.removeDatapoint(
self._api_config, dataset_id=dataset_id, datapoint_id=datapoint_id
)
# Backwards compatible aliases
[docs]
def get_dataset(self, id: str) -> GetDatasetsResponse:
"""Get a dataset by ID (backwards compatible alias).
Note: Uses list() with dataset_id filter since there's no single-get endpoint.
"""
return self.list(dataset_id=id)
[docs]
def create_dataset(self, request: CreateDatasetRequest) -> CreateDatasetResponse:
"""Create a dataset (backwards compatible alias for create())."""
return self.create(request)
[docs]
def update_dataset(self, request: UpdateDatasetRequest) -> UpdateDatasetResponse:
"""Update a dataset (backwards compatible alias for update())."""
return self.update(request)
[docs]
def delete_dataset(self, id: str) -> DeleteDatasetResponse:
"""Delete a dataset (backwards compatible alias for delete())."""
return self.delete(id)
[docs]
def list_datasets(
self,
dataset_id: Optional[str] = None,
name: Optional[str] = None,
include_datapoints: Optional[bool] = None,
) -> GetDatasetsResponse:
"""List datasets (backwards compatible alias)."""
return self.list(
dataset_id=dataset_id, name=name, include_datapoints=include_datapoints
)
[docs]
class EventsAPI(BaseAPI):
"""Events API.
Read operations (list, get_by_session_id, delete) use Control Plane.
Write operations (create, update, create_batch) use Data Plane.
"""
# Supported parameters for getEvents() method
_GET_EVENTS_SUPPORTED_PARAMS = {
"dateRange",
"filters",
"projections",
"ignore_order",
"limit",
"page",
"evaluation_id",
}
def _get_cp_config(self) -> APIConfig:
"""Get APIConfig configured for Control Plane endpoints."""
return APIConfig(
base_path=self._api_config.get_cp_base_path(),
access_token=self._api_config.access_token,
verify=self._api_config.verify,
)
# Sync methods
[docs]
def list(
self, query: Union[GetEventsQuery, Dict[str, Any]]
) -> GetEventsResponse:
"""Get events (uses Control Plane endpoint).
Args:
query: Query parameters as GetEventsQuery model or dict.
Supported fields: dateRange, filters, projections,
ignore_order, limit, page, evaluation_id
Returns:
GetEventsResponse with matching events
"""
# Convert to dict if Pydantic model
if hasattr(query, "model_dump"):
data = query.model_dump(exclude_none=True)
else:
data = query
# Filter data to only include supported parameters for getEvents()
filtered_data = {
k: v for k, v in data.items() if k in self._GET_EVENTS_SUPPORTED_PARAMS
}
# Read operations use Control Plane
return events_svc.getEvents(self._get_cp_config(), **filtered_data)
[docs]
def get_by_session_id(
self,
session_id: str,
project: Optional[str] = None,
*,
limit: int = 1000,
) -> EventExportResponse:
"""Get events by session ID using the Data Plane export endpoint.
This is a convenience wrapper around export() that filters by session_id.
Events are returned sorted by start_time in chronological order.
Args:
session_id: The session ID to fetch events for.
project: Project name associated with the events. Deprecated in v1.0
and will be removed in v2.0. The backend now infers project from
session_id.
limit: Maximum number of events to return (default 1000).
Returns:
EventExportResponse with events for the session, sorted by start_time.
Example::
response = client.events.get_by_session_id(
session_id="abc-123"
)
for event in response.events:
print(event["event_name"])
"""
if project is not None:
warnings.warn(
"The 'project' parameter is deprecated and will be removed in v2.0. "
"The backend now infers project from session_id.",
DeprecationWarning,
stacklevel=2,
)
return self.export(
project=project,
filters=[
EventFilter(
field="session_id",
operator="is",
value=session_id,
type="string",
)
],
limit=limit,
_sort_by_time=True, # Enable time-based sorting
)
[docs]
def create(self, request: PostEventRequest) -> PostEventResponse:
"""Create an event."""
data = (
request.model_dump(exclude_none=True)
if hasattr(request, "model_dump")
else request
)
return events_svc.createEvent(self._api_config, data=data)
[docs]
def update(self, data: Dict[str, Any]) -> None:
"""Update an event."""
return events_svc.updateEvent(self._api_config, data=data)
[docs]
def create_batch(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Create events in batch."""
return events_svc.createEventBatch(self._api_config, data=data)
[docs]
def export(
self,
project: Optional[str] = None,
filters: Optional[List[Union[EventFilter, Dict[str, Any]]]] = None,
*,
date_range: Optional[Dict[str, str]] = None,
projections: Optional[List[str]] = None,
limit: int = 1000,
page: int = 1,
_sort_by_time: bool = False,
) -> EventExportResponse:
"""Export events via POST /events/export (Data Plane).
This is the primary method for retrieving events from HoneyHive.
It uses the Data Plane endpoint which supports filtering by session_id,
event_type, and other fields.
Args:
project: Project name associated with the events. Deprecated in v1.0
and will be removed in v2.0. The backend now infers project from
filters (e.g., session_id).
filters: List of EventFilter objects or dicts with filter criteria.
Each filter should have: field, operator, value, type.
date_range: Optional date range filter with '$gte' and '$lte' keys
containing ISO timestamp strings.
projections: Optional list of fields to include in the response.
limit: Maximum number of results (default 1000, max 7500).
page: Page number for pagination (default 1).
_sort_by_time: Internal flag to sort events by start_time (default False).
Returns:
EventExportResponse with events list and total_events count.
Example::
from honeyhive.models import EventFilter
# Export events for a session
response = client.events.export(
filters=[
EventFilter(
field="session_id",
operator="is",
value="abc-123",
type="string"
)
],
limit=100
)
for event in response.events:
print(event["event_name"])
# Export with date range
response = client.events.export(
filters=[],
date_range={
"$gte": "2024-01-01T00:00:00Z",
"$lte": "2024-01-31T23:59:59Z"
}
)
"""
if project is not None:
warnings.warn(
"The 'project' parameter is deprecated and will be removed in v2.0. "
"The backend now infers project from filters (e.g., session_id).",
DeprecationWarning,
stacklevel=2,
)
# Build filters array
filters_data = []
if filters:
for f in filters:
if isinstance(f, EventFilter):
filters_data.append(f.to_dict())
elif isinstance(f, dict):
filters_data.append(f)
# Build request body
request_body: Dict[str, Any] = {
"filters": filters_data,
"limit": limit,
"page": page,
}
# Only include project if provided (for backwards compatibility)
if project is not None:
request_body["project"] = project
if date_range:
request_body["dateRange"] = date_range
if projections:
request_body["projections"] = projections
# Make direct request to /events/export (bypasses generated model issues)
base_path = self._api_config.base_path
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {self._api_config.get_access_token()}",
}
# Use retry logic for transient errors (502, 503, 504, etc.)
retry_config = RetryConfig.default()
last_exception: Optional[Exception] = None
last_response: Optional[httpx.Response] = None
for attempt in range(retry_config.max_retries + 1):
try:
with httpx.Client(base_url=base_path, verify=self._api_config.verify) as client:
response = client.request(
"POST",
"/v1/events/export",
headers=headers,
json=request_body,
)
if response.status_code == 200:
break # Success, exit retry loop
# Check if we should retry this status code
if retry_config.should_retry(response):
last_response = response
if attempt < retry_config.max_retries:
delay = retry_config.backoff_strategy.get_delay(attempt + 1)
time.sleep(delay)
continue
# Non-retryable error, raise immediately
raise Exception(
f"export() failed with status code: {response.status_code}, "
f"response: {response.text}"
)
except httpx.HTTPError as e:
if retry_config.should_retry_exception(e):
last_exception = e
if attempt < retry_config.max_retries:
delay = retry_config.backoff_strategy.get_delay(attempt + 1)
time.sleep(delay)
continue
raise
else:
# All retries exhausted
if last_response is not None:
raise Exception(
f"export() failed after {retry_config.max_retries + 1} attempts "
f"with status code: {last_response.status_code}, "
f"response: {last_response.text}"
)
if last_exception is not None:
raise last_exception
data = response.json()
events = data.get("events", [])
# Sort events by start_time if requested (fixes jumbled order issue)
if _sort_by_time and events:
events = self._sort_events_by_time(events)
return EventExportResponse(
events=events,
total_events=data.get("totalEvents", data.get("count", 0)),
)
def _sort_events_by_time(self, events: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Sort events by start_time in chronological order.
Args:
events: List of event dictionaries.
Returns:
Sorted list of events by start_time.
"""
def get_start_time(event: Dict[str, Any]) -> float:
"""Extract start_time from event, handling various formats."""
# Try different possible field names for start time
start_time = event.get("start_time") or event.get("startTime") or event.get("created_at")
if start_time is None:
return 0.0
# Handle both numeric timestamps and ISO string formats
if isinstance(start_time, (int, float)):
return float(start_time)
if isinstance(start_time, str):
try:
from datetime import datetime
# Try ISO format parsing
if "T" in start_time:
dt = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
return dt.timestamp()
except (ValueError, TypeError):
pass
return 0.0
return sorted(events, key=get_start_time)
# Async methods
[docs]
async def list_async(
self, query: Union[GetEventsQuery, Dict[str, Any]]
) -> GetEventsResponse:
"""Get events asynchronously (uses Control Plane endpoint).
Args:
query: Query parameters as GetEventsQuery model or dict.
Returns:
GetEventsResponse with matching events
"""
# Convert to dict if Pydantic model
if hasattr(query, "model_dump"):
data = query.model_dump(exclude_none=True)
else:
data = query
# Filter data to only include supported parameters for getEvents()
filtered_data = {
k: v for k, v in data.items() if k in self._GET_EVENTS_SUPPORTED_PARAMS
}
# Read operations use Control Plane
return await events_svc_async.getEvents(self._get_cp_config(), **filtered_data)
[docs]
async def get_by_session_id_async(
self,
session_id: str,
project: Optional[str] = None,
*,
limit: int = 1000,
) -> EventExportResponse:
"""Get events by session ID asynchronously using the Data Plane export endpoint.
Async version of get_by_session_id(). See get_by_session_id() for full documentation.
Events are returned sorted by start_time in chronological order.
Args:
session_id: The session ID to fetch events for.
project: Project name associated with the events. Deprecated in v1.0
and will be removed in v2.0. The backend now infers project from
session_id.
limit: Maximum number of events to return (default 1000).
Returns:
EventExportResponse with events for the session, sorted by start_time.
"""
if project is not None:
warnings.warn(
"The 'project' parameter is deprecated and will be removed in v2.0. "
"The backend now infers project from session_id.",
DeprecationWarning,
stacklevel=2,
)
return await self.export_async(
project=project,
filters=[
EventFilter(
field="session_id",
operator="is",
value=session_id,
type="string",
)
],
limit=limit,
_sort_by_time=True, # Enable time-based sorting
)
[docs]
async def create_async(self, request: PostEventRequest) -> PostEventResponse:
"""Create an event asynchronously."""
data = (
request.model_dump(exclude_none=True)
if hasattr(request, "model_dump")
else request
)
return await events_svc_async.createEvent(self._api_config, data=data)
[docs]
async def update_async(self, data: Dict[str, Any]) -> None:
"""Update an event asynchronously."""
return await events_svc_async.updateEvent(self._api_config, data=data)
[docs]
async def create_batch_async(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Create events in batch asynchronously."""
return await events_svc_async.createEventBatch(self._api_config, data=data)
[docs]
async def export_async(
self,
project: Optional[str] = None,
filters: Optional[List[Union[EventFilter, Dict[str, Any]]]] = None,
*,
date_range: Optional[Dict[str, str]] = None,
projections: Optional[List[str]] = None,
limit: int = 1000,
page: int = 1,
_sort_by_time: bool = False,
) -> EventExportResponse:
"""Export events via POST /events/export asynchronously (Data Plane).
Async version of export(). See export() for full documentation.
Args:
project: Project name associated with the events. Deprecated in v1.0
and will be removed in v2.0. The backend now infers project from
filters (e.g., session_id).
filters: List of EventFilter objects or dicts with filter criteria.
date_range: Optional date range filter.
projections: Optional list of fields to include in the response.
limit: Maximum number of results (default 1000, max 7500).
page: Page number for pagination (default 1).
_sort_by_time: Internal flag to sort events by start_time (default False).
Returns:
EventExportResponse with events list and total_events count.
"""
if project is not None:
warnings.warn(
"The 'project' parameter is deprecated and will be removed in v2.0. "
"The backend now infers project from filters (e.g., session_id).",
DeprecationWarning,
stacklevel=2,
)
# Build filters array
filters_data = []
if filters:
for f in filters:
if isinstance(f, EventFilter):
filters_data.append(f.to_dict())
elif isinstance(f, dict):
filters_data.append(f)
# Build request body
request_body: Dict[str, Any] = {
"filters": filters_data,
"limit": limit,
"page": page,
}
# Only include project if provided (for backwards compatibility)
if project is not None:
request_body["project"] = project
if date_range:
request_body["dateRange"] = date_range
if projections:
request_body["projections"] = projections
# Make direct async request to /events/export
base_path = self._api_config.base_path
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {self._api_config.get_access_token()}",
}
async with httpx.AsyncClient(
base_url=base_path, verify=self._api_config.verify
) as client:
response = await client.request(
"POST",
"/v1/events/export",
headers=headers,
json=request_body,
)
if response.status_code != 200:
raise Exception(
f"export_async() failed with status code: {response.status_code}, "
f"response: {response.text}"
)
data = response.json()
events = data.get("events", [])
# Sort events by start_time if requested (fixes jumbled order issue)
if _sort_by_time and events:
events = self._sort_events_by_time(events)
return EventExportResponse(
events=events,
total_events=data.get("totalEvents", data.get("count", 0)),
)
# Backwards compatible aliases
[docs]
def create_event(self, request: PostEventRequest) -> PostEventResponse:
"""Create an event (backwards compatible alias for create())."""
return self.create(request)
[docs]
def update_event(self, data: Dict[str, Any]) -> None:
"""Update an event (backwards compatible alias for update())."""
return self.update(data)
[docs]
def list_events(
self,
project: Optional[str] = None,
filters: Optional[List[Union[EventFilter, Dict[str, Any]]]] = None,
*,
date_range: Optional[Dict[str, str]] = None,
projections: Optional[List[str]] = None,
limit: int = 1000,
page: int = 1,
) -> EventExportResponse:
"""List events via export endpoint (backwards compatible alias).
This is a backwards compatible alias for export(). Uses the Data Plane
POST /events/export endpoint.
Args:
project: Project name. Deprecated in v1.0 and will be removed in v2.0.
filters: List of EventFilter objects or dicts.
date_range: Optional date range filter.
projections: Optional list of fields to include.
limit: Maximum number of results (default 1000).
page: Page number (default 1).
Returns:
EventExportResponse with events and total count.
"""
return self.export(
project=project,
filters=filters,
date_range=date_range,
projections=projections,
limit=limit,
page=page,
)
[docs]
def get_events(
self,
project: Optional[str] = None,
filters: Optional[List[Union[EventFilter, Dict[str, Any]]]] = None,
*,
date_range: Optional[Dict[str, str]] = None,
projections: Optional[List[str]] = None,
limit: int = 1000,
page: int = 1,
) -> EventExportResponse:
"""Get events via export endpoint (backwards compatible alias).
This is a backwards compatible alias for export(). Uses the Data Plane
POST /events/export endpoint.
Args:
project: Project name. Deprecated in v1.0 and will be removed in v2.0.
filters: List of EventFilter objects or dicts.
date_range: Optional date range filter.
projections: Optional list of fields to include.
limit: Maximum number of results (default 1000).
page: Page number (default 1).
Returns:
EventExportResponse with events and total count.
"""
return self.export(
project=project,
filters=filters,
date_range=date_range,
projections=projections,
limit=limit,
page=page,
)
[docs]
async def list_events_async(
self,
project: Optional[str] = None,
filters: Optional[List[Union[EventFilter, Dict[str, Any]]]] = None,
*,
date_range: Optional[Dict[str, str]] = None,
projections: Optional[List[str]] = None,
limit: int = 1000,
page: int = 1,
) -> EventExportResponse:
"""List events asynchronously (backwards compatible alias for export_async).
Args:
project: Project name. Deprecated in v1.0 and will be removed in v2.0.
"""
return await self.export_async(
project=project,
filters=filters,
date_range=date_range,
projections=projections,
limit=limit,
page=page,
)
[docs]
async def get_events_async(
self,
project: Optional[str] = None,
filters: Optional[List[Union[EventFilter, Dict[str, Any]]]] = None,
*,
date_range: Optional[Dict[str, str]] = None,
projections: Optional[List[str]] = None,
limit: int = 1000,
page: int = 1,
) -> EventExportResponse:
"""Get events asynchronously (backwards compatible alias for export_async).
Args:
project: Project name. Deprecated in v1.0 and will be removed in v2.0.
"""
return await self.export_async(
project=project,
filters=filters,
date_range=date_range,
projections=projections,
limit=limit,
page=page,
)
[docs]
class ExperimentsAPI(BaseAPI):
"""Experiments API."""
# Sync methods
[docs]
def get_schema(
self,
dateRange: Optional[Any] = None,
evaluation_id: Optional[str] = None,
) -> GetExperimentRunsSchemaResponse:
"""Get experiment runs schema.
Args:
dateRange: Filter by date range (string or dict with $gte/$lte).
evaluation_id: Filter by evaluation/run ID.
"""
return experiments_svc.getExperimentRunsSchema(
self._api_config, dateRange=dateRange, evaluation_id=evaluation_id
)
[docs]
def list_runs(
self,
dataset_id: Optional[str] = None,
page: Optional[int] = None,
limit: Optional[int] = None,
run_ids: Optional[List[str]] = None,
name: Optional[str] = None,
status: Optional[str] = None,
dateRange: Optional[Any] = None,
sort_by: Optional[str] = None,
sort_order: Optional[str] = None,
) -> GetExperimentRunsResponse:
"""List experiment runs.
Args:
dataset_id: Filter by dataset ID.
page: Page number for pagination.
limit: Number of results per page.
run_ids: Filter by specific run IDs.
name: Filter by run name.
status: Filter by run status.
dateRange: Filter by date range.
sort_by: Sort by field.
sort_order: Sort order (asc/desc).
"""
return experiments_svc.getRuns(
self._api_config,
dataset_id=dataset_id,
page=page,
limit=limit,
run_ids=run_ids,
name=name,
status=status,
dateRange=dateRange,
sort_by=sort_by,
sort_order=sort_order,
)
[docs]
def get_run(self, run_id: str) -> GetExperimentRunResponse:
"""Get an experiment run by ID."""
return experiments_svc.getRun(self._api_config, run_id=run_id)
[docs]
def create_run(
self, request: PostExperimentRunRequest
) -> PostExperimentRunResponse:
"""Create an experiment run."""
return experiments_svc.createRun(self._api_config, data=request)
[docs]
def update_run(
self, run_id: str, request: PutExperimentRunRequest
) -> PutExperimentRunResponse:
"""Update an experiment run."""
return experiments_svc.updateRun(self._api_config, run_id=run_id, data=request)
[docs]
def delete_run(self, run_id: str) -> DeleteExperimentRunResponse:
"""Delete an experiment run."""
return experiments_svc.deleteRun(self._api_config, run_id=run_id)
# Async methods
[docs]
async def get_schema_async(
self,
dateRange: Optional[Any] = None,
evaluation_id: Optional[str] = None,
) -> GetExperimentRunsSchemaResponse:
"""Get experiment runs schema asynchronously.
Args:
dateRange: Filter by date range (string or dict with $gte/$lte).
evaluation_id: Filter by evaluation/run ID.
"""
return await experiments_svc_async.getExperimentRunsSchema(
self._api_config, dateRange=dateRange, evaluation_id=evaluation_id
)
[docs]
async def list_runs_async(
self,
dataset_id: Optional[str] = None,
page: Optional[int] = None,
limit: Optional[int] = None,
run_ids: Optional[List[str]] = None,
name: Optional[str] = None,
status: Optional[str] = None,
dateRange: Optional[Any] = None,
sort_by: Optional[str] = None,
sort_order: Optional[str] = None,
) -> GetExperimentRunsResponse:
"""List experiment runs asynchronously.
Args:
dataset_id: Filter by dataset ID.
page: Page number for pagination.
limit: Number of results per page.
run_ids: Filter by specific run IDs.
name: Filter by run name.
status: Filter by run status.
dateRange: Filter by date range.
sort_by: Sort by field.
sort_order: Sort order (asc/desc).
"""
return await experiments_svc_async.getRuns(
self._api_config,
dataset_id=dataset_id,
page=page,
limit=limit,
run_ids=run_ids,
name=name,
status=status,
dateRange=dateRange,
sort_by=sort_by,
sort_order=sort_order,
)
[docs]
async def get_run_async(self, run_id: str) -> GetExperimentRunResponse:
"""Get an experiment run by ID asynchronously."""
return await experiments_svc_async.getRun(self._api_config, run_id=run_id)
[docs]
async def create_run_async(
self, request: PostExperimentRunRequest
) -> PostExperimentRunResponse:
"""Create an experiment run asynchronously."""
return await experiments_svc_async.createRun(self._api_config, data=request)
[docs]
async def update_run_async(
self, run_id: str, request: PutExperimentRunRequest
) -> PutExperimentRunResponse:
"""Update an experiment run asynchronously."""
return await experiments_svc_async.updateRun(
self._api_config, run_id=run_id, data=request
)
[docs]
async def delete_run_async(self, run_id: str) -> DeleteExperimentRunResponse:
"""Delete an experiment run asynchronously."""
return await experiments_svc_async.deleteRun(self._api_config, run_id=run_id)
[docs]
def get_result(
self,
run_id: str,
aggregate_function: Optional[str] = None,
filters: Optional[Any] = None,
) -> Dict[str, Any]:
"""Get experiment run result.
Args:
run_id: The experiment run ID.
aggregate_function: Aggregation function to apply.
filters: Optional filters to apply.
"""
result = experiments_svc.getExperimentResult(
self._api_config,
run_id=run_id,
aggregate_function=aggregate_function,
filters=filters,
)
# GetExperimentRunResultResponse is a pass-through dict model
return result.model_dump() if hasattr(result, "model_dump") else dict(result)
[docs]
def compare_runs(
self,
new_run_id: str,
old_run_id: str,
aggregate_function: Optional[str] = None,
filters: Optional[Any] = None,
) -> Dict[str, Any]:
"""Compare two experiment runs.
Args:
new_run_id: The new run ID to compare.
old_run_id: The old run ID to compare against.
aggregate_function: Aggregation function to apply.
filters: Optional filters to apply.
"""
result = experiments_svc.getExperimentComparison(
self._api_config,
new_run_id=new_run_id,
old_run_id=old_run_id,
aggregate_function=aggregate_function,
filters=filters,
)
# GetExperimentRunCompareResponse is a pass-through dict model
return result.model_dump() if hasattr(result, "model_dump") else dict(result)
[docs]
async def get_result_async(
self,
run_id: str,
aggregate_function: Optional[str] = None,
filters: Optional[Any] = None,
) -> Dict[str, Any]:
"""Get experiment run result asynchronously.
Args:
run_id: The experiment run ID.
aggregate_function: Aggregation function to apply.
filters: Optional filters to apply.
"""
result = await experiments_svc_async.getExperimentResult(
self._api_config,
run_id=run_id,
aggregate_function=aggregate_function,
filters=filters,
)
return result.model_dump() if hasattr(result, "model_dump") else dict(result)
[docs]
async def compare_runs_async(
self,
new_run_id: str,
old_run_id: str,
aggregate_function: Optional[str] = None,
filters: Optional[Any] = None,
) -> Dict[str, Any]:
"""Compare two experiment runs asynchronously.
Args:
new_run_id: The new run ID to compare.
old_run_id: The old run ID to compare against.
aggregate_function: Aggregation function to apply.
filters: Optional filters to apply.
"""
result = await experiments_svc_async.getExperimentComparison(
self._api_config,
new_run_id=new_run_id,
old_run_id=old_run_id,
aggregate_function=aggregate_function,
filters=filters,
)
return result.model_dump() if hasattr(result, "model_dump") else dict(result)
# Aliases for backwards compatibility (evaluations naming)
[docs]
def get_run_result(
self,
run_id: str,
aggregate_function: Optional[str] = None,
filters: Optional[Any] = None,
) -> Dict[str, Any]:
"""Get experiment run result (alias for get_result)."""
return self.get_result(run_id, aggregate_function, filters)
[docs]
def compare_run_events(
self,
new_run_id: str,
old_run_id: str,
event_name: Optional[str] = None,
event_type: Optional[str] = None,
filter: Optional[Any] = None,
limit: Optional[int] = None,
page: Optional[int] = None,
) -> Dict[str, Any]:
"""Compare events between two experiment runs.
Args:
new_run_id: The new run ID to compare.
old_run_id: The old run ID to compare against.
event_name: Filter by event name.
event_type: Filter by event type.
filter: Additional filter criteria.
limit: Maximum number of results.
page: Page number for pagination.
"""
return experiments_svc.getExperimentCompareEvents(
self._api_config,
run_id_1=new_run_id,
run_id_2=old_run_id,
event_name=event_name,
event_type=event_type,
filter=filter,
limit=limit,
page=page,
)
[docs]
class MetricsAPI(BaseAPI):
"""Metrics API."""
# Sync methods
[docs]
def list(
self,
project: Optional[str] = None,
name: Optional[str] = None,
type: Optional[str] = None,
) -> GetMetricsResponse:
"""List metrics."""
return metrics_svc.getMetrics(
self._api_config, project=project, name=name, type=type
)
[docs]
def create(self, request: CreateMetricRequest) -> CreateMetricResponse:
"""Create a metric."""
return metrics_svc.createMetric(self._api_config, data=request)
[docs]
def update(self, request: UpdateMetricRequest) -> UpdateMetricResponse:
"""Update a metric."""
return metrics_svc.updateMetric(self._api_config, data=request)
[docs]
def delete(self, id: str) -> DeleteMetricResponse:
"""Delete a metric."""
return metrics_svc.deleteMetric(self._api_config, metric_id=id)
# Async methods
[docs]
async def list_async(
self,
project: Optional[str] = None,
name: Optional[str] = None,
type: Optional[str] = None,
) -> GetMetricsResponse:
"""List metrics asynchronously."""
return await metrics_svc_async.getMetrics(
self._api_config, project=project, name=name, type=type
)
[docs]
async def create_async(self, request: CreateMetricRequest) -> CreateMetricResponse:
"""Create a metric asynchronously."""
return await metrics_svc_async.createMetric(self._api_config, data=request)
[docs]
async def update_async(self, request: UpdateMetricRequest) -> UpdateMetricResponse:
"""Update a metric asynchronously."""
return await metrics_svc_async.updateMetric(self._api_config, data=request)
[docs]
async def delete_async(self, id: str) -> DeleteMetricResponse:
"""Delete a metric asynchronously."""
return await metrics_svc_async.deleteMetric(self._api_config, metric_id=id)
# Backwards compatible aliases
[docs]
def get_metric(self, id: str) -> GetMetricsResponse:
"""Get a metric (backwards compatible alias)."""
return self.list() # No single-get endpoint
[docs]
def create_metric(self, request: CreateMetricRequest) -> CreateMetricResponse:
"""Create a metric (backwards compatible alias)."""
return self.create(request)
[docs]
def update_metric(self, request: UpdateMetricRequest) -> UpdateMetricResponse:
"""Update a metric (backwards compatible alias)."""
return self.update(request)
[docs]
def delete_metric(self, id: str) -> DeleteMetricResponse:
"""Delete a metric (backwards compatible alias)."""
return self.delete(id)
[docs]
def list_metrics(
self,
project: Optional[str] = None,
name: Optional[str] = None,
type: Optional[str] = None,
) -> GetMetricsResponse:
"""List metrics (backwards compatible alias)."""
return self.list(project=project, name=name, type=type)
[docs]
class ProjectsAPI(BaseAPI):
"""Projects API."""
# Sync methods
[docs]
def list(self, name: Optional[str] = None) -> Dict[str, Any]:
"""List projects."""
return projects_svc.getProjects(self._api_config, name=name)
[docs]
def create(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a project."""
return projects_svc.createProject(self._api_config, data=data)
[docs]
def update(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Update a project."""
return projects_svc.updateProject(self._api_config, data=data)
[docs]
def delete(self, name: str) -> Dict[str, Any]:
"""Delete a project."""
return projects_svc.deleteProject(self._api_config, name=name)
# Async methods
[docs]
async def list_async(self, name: Optional[str] = None) -> Dict[str, Any]:
"""List projects asynchronously."""
return await projects_svc_async.getProjects(self._api_config, name=name)
[docs]
async def create_async(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a project asynchronously."""
return await projects_svc_async.createProject(self._api_config, data=data)
[docs]
async def update_async(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Update a project asynchronously."""
return await projects_svc_async.updateProject(self._api_config, data=data)
[docs]
async def delete_async(self, name: str) -> Dict[str, Any]:
"""Delete a project asynchronously."""
return await projects_svc_async.deleteProject(self._api_config, name=name)
# Backwards compatible aliases
[docs]
def get_project(self, id: str) -> Dict[str, Any]:
"""Get a project (backwards compatible alias)."""
return self.list(name=id) # Use name filter since no single-get
[docs]
def create_project(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a project (backwards compatible alias)."""
return self.create(data)
[docs]
def update_project(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Update a project (backwards compatible alias)."""
return self.update(data)
[docs]
def delete_project(self, name: str) -> Dict[str, Any]:
"""Delete a project (backwards compatible alias)."""
return self.delete(name)
[docs]
def list_projects(self, name: Optional[str] = None) -> Dict[str, Any]:
"""List projects (backwards compatible alias)."""
return self.list(name=name)
[docs]
class SessionsAPI(BaseAPI):
"""Sessions API."""
# Sync methods
[docs]
def get(self, session_id: str) -> GetSessionResponse:
"""Get a session by ID."""
return sessions_svc.getSession(self._api_config, session_id=session_id)
[docs]
def delete(self, session_id: str) -> DeleteSessionResponse:
"""Delete a session."""
return sessions_svc.deleteSession(self._api_config, session_id=session_id)
[docs]
def start(self, data: Dict[str, Any]) -> PostSessionStartResponse:
"""Start a new session."""
return session_svc.startSession(self._api_config, data=data)
# Async methods
[docs]
async def get_async(self, session_id: str) -> GetSessionResponse:
"""Get a session by ID asynchronously."""
return await sessions_svc_async.getSession(
self._api_config, session_id=session_id
)
[docs]
async def delete_async(self, session_id: str) -> DeleteSessionResponse:
"""Delete a session asynchronously."""
return await sessions_svc_async.deleteSession(
self._api_config, session_id=session_id
)
[docs]
async def start_async(self, data: Dict[str, Any]) -> PostSessionStartResponse:
"""Start a new session asynchronously."""
return await session_svc_async.startSession(self._api_config, data=data)
# Backwards compatible aliases
[docs]
def create_session(self, request: Dict[str, Any]) -> PostSessionStartResponse:
"""Create/start a session (backwards compatible alias for start())."""
return self.start(request)
[docs]
def start_session(self, request: Dict[str, Any]) -> PostSessionStartResponse:
"""Start a session (backwards compatible alias for start())."""
return self.start(request)
[docs]
def get_session(self, session_id: str) -> GetSessionResponse:
"""Get a session (backwards compatible alias for get())."""
return self.get(session_id)
[docs]
def delete_session(self, session_id: str) -> DeleteSessionResponse:
"""Delete a session (backwards compatible alias for delete())."""
return self.delete(session_id)
[docs]
class HoneyHive:
"""Main HoneyHive API client.
Provides an ergonomic interface to the HoneyHive API with both
sync and async methods.
Example::
client = HoneyHive(api_key="your-api-key")
# Sync
configs = client.configurations.list(project="my-project")
# Async
configs = await client.configurations.list_async(project="my-project")
Attributes:
configurations: API for managing configurations.
datapoints: API for managing datapoints.
datasets: API for managing datasets.
events: API for managing events.
experiments: API for managing experiment runs.
metrics: API for managing metrics.
projects: API for managing projects.
sessions: API for managing sessions.
tools: API for managing tools.
"""
[docs]
def __init__(
self,
api_key: Optional[str] = None,
*,
# Primary URL parameters
base_url: Optional[str] = None,
cp_base_url: Optional[str] = None,
# Backwards compatible alias for base_url
server_url: Optional[str] = None,
# Backwards compatible parameters (accepted but not used in new client)
timeout: Optional[float] = None,
retry_config: Optional[Any] = None,
rate_limit_calls: Optional[int] = None,
rate_limit_window: Optional[float] = None,
max_connections: Optional[int] = None,
max_keepalive: Optional[int] = None,
test_mode: Optional[bool] = None,
verbose: Optional[bool] = None,
tracer_instance: Optional[Any] = None,
) -> None:
"""Initialize the HoneyHive client.
Args:
api_key: HoneyHive API key (typically starts with ``hh_``).
Falls back to HH_API_KEY environment variable.
base_url: API base URL for Data Plane/ingestion.
Falls back to HH_API_URL env var, then https://api.honeyhive.ai.
cp_base_url: Control Plane API URL for query endpoints.
Falls back to HH_CP_API_URL, then HH_API_URL env var.
server_url: Deprecated alias for base_url (for backwards compatibility).
timeout: Request timeout in seconds (accepted for backwards compat, not used).
retry_config: Retry configuration (accepted for backwards compat, not used).
rate_limit_calls: Max calls per time window (accepted for backwards compat).
rate_limit_window: Time window in seconds (accepted for backwards compat).
max_connections: Max connections in pool (accepted for backwards compat).
max_keepalive: Max keepalive connections (accepted for backwards compat).
test_mode: Enable test mode (accepted for backwards compat, not used).
verbose: Enable verbose logging (accepted for backwards compat, not used).
tracer_instance: Tracer instance (accepted for backwards compat, not used).
"""
import os
# Resolve API key from parameter or environment
self._api_key = api_key or os.environ.get("HH_API_KEY", "")
# Resolve base URL: base_url > server_url (legacy) > env var > default
resolved_base_url = (
base_url
or server_url # Legacy parameter
or os.environ.get("HH_API_URL")
or "https://api.honeyhive.ai"
)
# Resolve CP base URL: cp_base_url > HH_CP_API_URL > HH_API_URL > base_url
resolved_cp_base_url = (
cp_base_url
or os.environ.get("HH_CP_API_URL")
or os.environ.get("HH_API_URL")
or resolved_base_url
)
# Store backwards compat params (silently accepted)
self._timeout = timeout
self._test_mode = test_mode if test_mode is not None else False
self._verbose = verbose if verbose is not None else False
self._tracer_instance = tracer_instance
# Create API config
self._api_config = APIConfig(
base_path=resolved_base_url,
cp_base_path=resolved_cp_base_url,
access_token=self._api_key,
)
# Initialize API namespaces
self.configurations = ConfigurationsAPI(self._api_config)
self.datapoints = DatapointsAPI(self._api_config)
self.datasets = DatasetsAPI(self._api_config)
self.events = EventsAPI(self._api_config)
self.experiments = ExperimentsAPI(self._api_config)
self.metrics = MetricsAPI(self._api_config)
self.projects = ProjectsAPI(self._api_config)
self.sessions = SessionsAPI(self._api_config)
self.tools = ToolsAPI(self._api_config)
# Alias for backwards compatibility
self.evaluations = self.experiments
@property
def test_mode(self) -> bool:
"""Return whether client is in test mode."""
return self._test_mode
@property
def verbose(self) -> bool:
"""Return whether verbose mode is enabled."""
return self._verbose
@property
def timeout(self) -> Optional[float]:
"""Return the configured timeout."""
return self._timeout
@property
def api_config(self) -> APIConfig:
"""Access the underlying API configuration."""
return self._api_config
@property
def api_key(self) -> str:
"""Get the HoneyHive API key."""
return self._api_key
@property
def server_url(self) -> str:
"""Get the HoneyHive API server URL."""
return self._api_config.base_path
@server_url.setter
def server_url(self, value: str) -> None:
"""Set the HoneyHive API server URL."""
self._api_config.base_path = value