HoneyHive Client API Reference
Note
Complete API documentation for the HoneyHive client classes
Direct API clients for interacting with HoneyHive services without tracing middleware.
The HoneyHive SDK provides several client classes for direct interaction with HoneyHive services. These clients are used internally by tracers but can also be used directly for advanced use cases.
HoneyHive Client
- class honeyhive.HoneyHive(*, api_key=None, server_url=None, timeout=None, retry_config=None, rate_limit_calls=100, rate_limit_window=60.0, max_connections=10, max_keepalive=20, test_mode=None, verbose=False, tracer_instance=None)[source]
Bases:
objectMain HoneyHive API client.
- Parameters:
- logger: HoneyHiveLogger | None
- property sync_client: Client
Get or create sync HTTP client.
- property async_client: AsyncClient
Get or create async HTTP client.
- get_health()[source]
Get API health status. Returns basic info since health endpoint may not exist.
- async get_health_async()[source]
Get API health status asynchronously. Returns basic info since health endpoint may not exist.
- request(method, path, params=None, json=None, **kwargs)[source]
Make a synchronous HTTP request with rate limiting and retry logic.
The main client class for interacting with HoneyHive’s core services.
Key Features:
Direct API access to HoneyHive services
Session and event management
Project and configuration management
Synchronous and asynchronous operations
Built-in retry logic and error handling
Rate limiting and throttling support
Initialization
- honeyhive.__init__(api_key: str | None = None, base_url: str | None = None, timeout: float = 30.0, max_retries: int = 3, test_mode: bool = False, **kwargs)
Initialize a HoneyHive client instance.
Parameters:
- Parameters:
api_key (Optional[str]) – HoneyHive API key. If not provided, reads from
HH_API_KEYenvironment variable.base_url (Optional[str]) – Base URL for HoneyHive API. Defaults to “https://api.honeyhive.ai”.
timeout (float) – Request timeout in seconds. Default: 30.0
max_retries (int) – Maximum number of retry attempts for failed requests. Default: 3
test_mode (bool) – Enable test mode (requests are validated but not processed). Default: False
kwargs (Any) – Additional configuration options
Example:
from honeyhive import HoneyHive from honeyhive.models import EventType # Basic initialization client = HoneyHive(api_key="hh_your_api_key_here") # Or set HH_API_KEY environment variable # With custom configuration client = HoneyHive( api_key="hh_your_api_key_here", # Or set HH_API_KEY environment variable base_url="https://api.honeyhive.ai", # Or set HH_API_URL environment variable timeout=60.0, max_retries=5 ) # Test mode for development client = HoneyHive( api_key="hh_test_key", # Or set HH_API_KEY environment variable test_mode=True # Or set HH_TEST_MODE=true environment variable )
Session Management
create_session()
- honeyhive.create_session(project: str, source: str | None = None, session_name: str | None = None, **kwargs) dict
Create a new session for grouping related events.
Parameters:
- Parameters:
Returns:
- Return type:
- Returns:
Session information including session_id
Example:
# Create a basic session session = client.create_session( source="development", session_name="user-onboarding-flow" ) print(f"Created session: {session['session_id']}") # Create session with metadata session = client.create_session( source="development", user_id="user_123", conversation_type="customer_support", priority="high" )
get_session()
- honeyhive.get_session(session_id: str) dict
Retrieve session information by ID.
Parameters:
- Parameters:
session_id (str) – Unique session identifier
Returns:
- Return type:
- Returns:
Session details and metadata
Example:
session_info = client.get_session("session_abc123") print(f"Session project: {session_info['project']}") print(f"Session created: {session_info['created_at']}") print(f"Event count: {session_info['event_count']}")
list_sessions()
- honeyhive.list_sessions(project: str | None = None, source: str | None = None, limit: int = 100, offset: int = 0, **filters) dict
List sessions with optional filtering.
Parameters:
- Parameters:
Returns:
- Return type:
- Returns:
List of sessions and pagination info
Example:
# List all sessions for a project sessions = client.list_sessions(limit=50) for session in sessions['sessions']: print(f"Session {session['session_id']}: {session['session_name']}") # List with filters recent_sessions = client.list_sessions( source="development", created_after="2024-01-01T00:00:00Z", limit=20 )
Event Management
create_event()
- honeyhive.create_event(session_id: str, event_type: str, event_name: str, inputs: dict | None = None, outputs: dict | None = None, metadata: dict | None = None, **kwargs) dict
Create a new event within a session.
Parameters:
- Parameters:
session_id (str) – Session ID to associate the event with
event_type (str) – Type of event. Must be one of:
"model","tool", or"chain"event_name (str) – Descriptive name for the event
inputs (Optional[dict]) – Input data for the event
outputs (Optional[dict]) – Output data from the event
metadata (Optional[dict]) – Additional event metadata
kwargs (Any) – Additional event attributes
Returns:
- Return type:
- Returns:
Created event information
Example:
# Create an LLM call event event = client.create_event( session_id="session_abc123", event_type=EventType.model, event_name="openai_completion", inputs={ "model": "gpt-4", "messages": [{"role": "user", "content": "Hello!"}], "temperature": 0.7 }, outputs={ "response": "Hello! How can I help you today?", "usage": { "prompt_tokens": 10, "completion_tokens": 12, "total_tokens": 22 } }, metadata={ "duration_ms": 1500, "model_version": "gpt-4-0613" } ) print(f"Created event: {event['event_id']}")
get_event()
- honeyhive.get_event(event_id: str) dict
Retrieve event information by ID.
Parameters:
- Parameters:
event_id (str) – Unique event identifier
Returns:
- Return type:
- Returns:
Event details and data
Example:
event = client.get_event("event_xyz789") print(f"Event type: {event['event_type']}") print(f"Event name: {event['event_name']}") print(f"Duration: {event['metadata']['duration_ms']}ms")
list_events()
- honeyhive.list_events(session_id: str | None = None, project: str | None = None, event_type: str | None = None, limit: int = 100, offset: int = 0, **filters) dict
List events with optional filtering.
Parameters:
- Parameters:
Returns:
- Return type:
- Returns:
List of events and pagination info
Example:
# List events for a session events = client.list_events(session_id="session_abc123") for event in events['events']: print(f"Event: {event['event_name']} ({event['event_type']})") # List LLM call events across all sessions llm_events = client.list_events( event_type=EventType.model, limit=50 )
Project Management
create_project()
- honeyhive.create_project(name: str, description: str | None = None, **kwargs) dict
Create a new project.
Parameters:
- Parameters:
Returns:
- Return type:
- Returns:
Created project information
Example:
project = client.create_project( name="customer-support-bot", description="AI-powered customer support chatbot", team="engineering", environment="production" )
get_project()
- honeyhive.get_project(project_name: str) dict
Retrieve project information.
Parameters:
- Parameters:
project_name (str) – Name of the project
Returns:
- Return type:
- Returns:
Project details and configuration
Example:
project_info = client.get_project("customer-support-bot") print(f"Project: {project_info['name']}") print(f"Created: {project_info['created_at']}") print(f"Total events: {project_info['event_count']}")
list_projects()
- honeyhive.list_projects(limit: int = 100, offset: int = 0) dict
List all accessible projects.
Parameters:
- Parameters:
Returns:
- Return type:
- Returns:
List of projects and pagination info
Example:
projects = client.list_projects() for project in projects['projects']: print(f"Project: {project['name']} - {project['description']}")
Configuration Management
get_configuration()
- honeyhive.get_configuration(project: str) dict
Get project configuration settings.
Parameters:
- Parameters:
project (str) – Project name
Returns:
- Return type:
- Returns:
Project configuration
Example:
config = client.get_configuration("my-app") print(f"Sampling rate: {config['sampling_rate']}") print(f"Retention days: {config['retention_days']}")
update_configuration()
- honeyhive.update_configuration(project: str, configuration: dict) dict
Update project configuration settings.
Parameters:
Returns:
- Return type:
- Returns:
Updated configuration
Example:
updated_config = client.update_configuration( configuration={ "sampling_rate": 0.1, # 10% sampling "retention_days": 30, "alert_thresholds": { "error_rate": 0.05, "latency_p95": 5000 } } )
Async Client
AsyncHoneyHive
Asynchronous version of the HoneyHive client for non-blocking operations.
Key Features:
Non-blocking API calls
Context manager support
Concurrent request handling
Same interface as sync client
Example Usage:
import asyncio
from honeyhive import AsyncHoneyHive
async def async_example():
async with AsyncHoneyHive(api_key="your-key") as client: # Or set HH_API_KEY environment variable
session = await client.create_session(
session_name="async-session"
)
event = await client.create_event(
session_id=session['session_id'],
event_type=EventType.model,
event_name="async_completion"
)
Asynchronous version of the HoneyHive client for use in async applications.
Key Features:
All methods are async/await compatible
Built-in connection pooling
Concurrent request handling
Async context manager support
Initialization
- honeyhive.__init__(api_key: str | None = None, base_url: str | None = None, timeout: float = 30.0, max_retries: int = 3, max_connections: int = 100, test_mode: bool = False, **kwargs)
Initialize an async HoneyHive client.
Parameters:
- Parameters:
api_key (Optional[str]) – HoneyHive API key
base_url (Optional[str]) – Base URL for HoneyHive API
timeout (float) – Request timeout in seconds
max_retries (int) – Maximum retry attempts
max_connections (int) – Maximum concurrent connections
test_mode (bool) – Enable test mode
kwargs (Any) – Additional configuration
Example:
import asyncio from honeyhive import AsyncHoneyHive async def main(): async with AsyncHoneyHive(api_key="hh_your_key") as client: # Or set HH_API_KEY environment variable # Use async client session = await client.create_session( source="production" ) event = await client.create_event( session_id=session['session_id'], event_type=EventType.model, event_name="async_completion", inputs={"prompt": "Hello async world!"}, outputs={"response": "Hello back!"} ) asyncio.run(main())
Async Session Management
All session management methods have async equivalents:
async def manage_sessions():
async with AsyncHoneyHive(api_key="hh_key") as client: # Or set HH_API_KEY environment variable
# Create session
session = await client.create_session(
source="production"
)
# Get session info
session_info = await client.get_session(session['session_id'])
# List sessions
sessions = await client.list_sessions(
limit=10
)
Async Event Management
All event management methods have async equivalents:
async def manage_events():
async with AsyncHoneyHive(api_key="hh_key") as client: # Or set HH_API_KEY environment variable
session = await client.create_session(
source="production"
)
# Create multiple events concurrently
tasks = []
for i in range(10):
task = client.create_event(
session_id=session['session_id'],
event_type=EventType.tool,
event_name=f"task_{i}",
inputs={"task_id": i},
outputs={"result": f"completed_{i}"}
)
tasks.append(task)
# Wait for all events to be created
events = await asyncio.gather(*tasks)
print(f"Created {len(events)} events concurrently")
Batch Operations
For high-throughput scenarios, both clients support batch operations:
Batch Event Creation
- honeyhive.create_events_batch(events: List[dict]) dict
Create multiple events in a single API call.
Parameters:
- Parameters:
events (List[dict]) – List of event dictionaries
Returns:
- Return type:
- Returns:
Batch creation results
Example:
# Prepare batch of events events_batch = [] for i in range(100): events_batch.append({ "session_id": session_id, "event_type": "chain", "event_name": f"process_item_{i}", "inputs": {"item_id": i, "data": f"item_data_{i}"}, "outputs": {"result": f"processed_{i}"}, "metadata": {"batch_id": "batch_001", "item_index": i} }) # Create all events in one API call result = client.create_events_batch(events_batch) print(f"Created {result['created_count']} events") print(f"Failed: {result['failed_count']} events")
Error Handling
Both clients provide comprehensive error handling:
Exception Types
- exception honeyhive.HoneyHiveError
Base exception for all HoneyHive client errors.
- exception honeyhive.HoneyHiveAPIError
API-related errors (4xx, 5xx HTTP responses).
Attributes:
status_code: HTTP status coderesponse: Raw API responsemessage: Error message
- exception honeyhive.HoneyHiveConnectionError
Connection-related errors (network, timeout).
- exception honeyhive.HoneyHiveAuthenticationError
Authentication failures (invalid API key).
- exception honeyhive.HoneyHiveRateLimitError
Rate limiting errors.
Attributes:
retry_after: Recommended retry delay in seconds
Error Handling Examples
from honeyhive import HoneyHive, HoneyHiveAPIError, HoneyHiveRateLimitError
import time
client = HoneyHive(api_key="hh_your_key") # Or set HH_API_KEY environment variable
def robust_api_call():
max_retries = 3
for attempt in range(max_retries):
try:
session = client.create_session(
source="production"
)
return session
except HoneyHiveRateLimitError as e:
if attempt < max_retries - 1:
wait_time = e.retry_after or (2 ** attempt)
print(f"Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
else:
raise
except HoneyHiveAPIError as e:
if e.status_code >= 500 and attempt < max_retries - 1:
# Retry on server errors
wait_time = 2 ** attempt
print(f"Server error {e.status_code}, retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise
except HoneyHiveConnectionError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"Connection error, retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise
Client Configuration
Advanced Configuration Options
from honeyhive import HoneyHive
# Production configuration
client = HoneyHive(
api_key="hh_prod_key", # Or set HH_API_KEY environment variable
base_url="https://api.honeyhive.ai", # Or set HH_API_URL environment variable
timeout=30.0,
max_retries=3,
# Custom headers
headers={
"User-Agent": "MyApp/1.0",
"X-Custom-Header": "custom-value"
},
# SSL configuration
verify_ssl=True,
ssl_cert_path="/path/to/cert.pem",
# Proxy configuration
proxy_url="http://proxy.company.com:8080",
# Rate limiting
rate_limit_calls=100,
rate_limit_period=60, # 100 calls per minute
# Connection pooling
max_connections=50,
max_keepalive_connections=10,
keepalive_expiry=30.0,
# Retry configuration
retry_backoff_factor=1.0,
retry_backoff_max=60.0,
retry_on_status_codes=[429, 502, 503, 504],
# Debug mode
debug=True,
log_requests=True,
log_responses=True
)
Environment-Based Configuration
import os
from honeyhive import HoneyHive
def create_client_from_env():
"""Create client with environment-based configuration."""
config = {
"api_key": os.getenv("HH_API_KEY"),
"base_url": os.getenv("HH_BASE_URL", "https://api.honeyhive.ai"),
"timeout": float(os.getenv("HH_TIMEOUT", "30.0")),
"max_retries": int(os.getenv("HH_MAX_RETRIES", "3")),
"test_mode": os.getenv("HH_TEST_MODE", "false").lower() == "true"
}
# Optional proxy configuration
if proxy_url := os.getenv("HH_PROXY_URL"):
config["proxy_url"] = proxy_url
# Optional SSL configuration
if cert_path := os.getenv("HH_SSL_CERT_PATH"):
config["ssl_cert_path"] = cert_path
return HoneyHive(**config)
# Usage
client = create_client_from_env()
Integration Patterns
Context Manager Usage
# Automatic resource cleanup
with HoneyHive(api_key="hh_key") as client: # Or set HH_API_KEY environment variable
session = client.create_session(
source="production"
)
# Multiple operations
for i in range(10):
client.create_event(
session_id=session['session_id'],
event_type=EventType.tool,
event_name=f"iteration_{i}",
inputs={"iteration": i},
outputs={"result": i * 2}
)
# Client automatically closed and cleaned up
Dependency Injection
from typing import Protocol
class HoneyHiveClientProtocol(Protocol):
def create_session(self, project: str, **kwargs) -> dict: ...
def create_event(self, session_id: str, **kwargs) -> dict: ...
class MyService:
def __init__(self, honeyhive_client: HoneyHiveClientProtocol):
self.client = honeyhive_client
def process_user_request(self, user_id: str, request_data: dict):
# Create session for this request
session = self.client.create_session(
source="development",
user_id=user_id
)
# Process and log events
event = self.client.create_event(
session_id=session['session_id'],
event_type=EventType.session,
event_name="process_request",
inputs={"user_id": user_id, "request": request_data},
outputs={"result": "processed"}
)
return event
# Dependency injection
client = HoneyHive(api_key="hh_key") # Or set HH_API_KEY environment variable
service = MyService(honeyhive_client=client)
Factory Pattern
class HoneyHiveClientFactory:
"""Factory for creating configured HoneyHive clients."""
@staticmethod
def create_production_client(api_key: str) -> HoneyHive:
return HoneyHive(
api_key=api_key, # Or set HH_API_KEY environment variable
timeout=60.0,
max_retries=5,
rate_limit_calls=200,
rate_limit_period=60
)
@staticmethod
def create_development_client(api_key: str) -> HoneyHive:
return HoneyHive(
api_key=api_key, # Or set HH_API_KEY environment variable
test_mode=True, # Or set HH_TEST_MODE=true environment variable
timeout=10.0,
max_retries=1,
debug=True,
log_requests=True
)
@staticmethod
def create_testing_client() -> HoneyHive:
return HoneyHive(
api_key="test_key", # Or set HH_API_KEY environment variable
test_mode=True, # Or set HH_TEST_MODE=true environment variable
timeout=5.0,
max_retries=0
)
# Usage
if os.getenv("ENVIRONMENT") == "production":
client = HoneyHiveClientFactory.create_production_client(
api_key=os.getenv("HH_API_KEY")
)
elif os.getenv("ENVIRONMENT") == "development":
client = HoneyHiveClientFactory.create_development_client(
api_key=os.getenv("HH_DEV_API_KEY")
)
else:
client = HoneyHiveClientFactory.create_testing_client()
Performance Optimization
Connection Pooling
# Configure connection pooling for high-throughput applications
client = HoneyHive(
api_key="hh_key", # Or set HH_API_KEY environment variable
max_connections=100, # Total connection pool size
max_keepalive_connections=20, # Persistent connections
keepalive_expiry=60.0, # Connection lifetime
connection_timeout=10.0, # Time to establish connection
read_timeout=30.0, # Time to read response
write_timeout=10.0 # Time to send request
)
Request Batching
import asyncio
from honeyhive import AsyncHoneyHive
async def batch_events_efficiently():
async with AsyncHoneyHive(api_key="hh_key") as client: # Or set HH_API_KEY environment variable
session = await client.create_session(
source="production"
)
# Create events in batches for better performance
batch_size = 50
all_events = []
for batch_start in range(0, 1000, batch_size):
batch_events = []
for i in range(batch_start, min(batch_start + batch_size, 1000)):
batch_events.append({
"session_id": session['session_id'],
"event_type": "batch_item",
"event_name": f"item_{i}",
"inputs": {"item_id": i},
"outputs": {"processed": True}
})
# Send batch
result = await client.create_events_batch(batch_events)
all_events.extend(result['events'])
print(f"Processed batch {batch_start//batch_size + 1}")
return all_events
See Also
HoneyHiveTracer API Reference - HoneyHiveTracer API reference
Decorators API Reference - Decorator-based APIs
Set Up Your First Tracer - Getting started tutorial
How-to Guides - Client troubleshooting (see Troubleshooting section)
Architecture Overview - Architecture overview