Utilities Reference
Complete reference for utility classes and helper functions.
Caching
Cache
- class honeyhive.utils.cache.Cache(config=None)[source]
Bases:
objectIn-memory cache with TTL and size limits.
- Parameters:
config (CacheConfig | None)
- property cache: Dict[str, CacheEntry]
Get the underlying cache dictionary.
- Returns:
Cache dictionary
- cleanup_expired()[source]
Clean up expired entries.
- Returns:
Number of entries cleaned up
- Return type:
FunctionCache
AsyncFunctionCache
CacheEntry
- class honeyhive.utils.cache.CacheEntry(key, value, ttl=300.0)[source]
Bases:
objectCache entry with metadata.
- is_expired()[source]
Check if entry is expired.
- Returns:
True if expired, False otherwise
- Return type:
Connection Pooling
ConnectionPool
- class honeyhive.utils.connection_pool.ConnectionPool(config=None, *, max_connections=None, max_keepalive=None, max_keepalive_connections=None, keepalive_expiry=None, retries=None, timeout=None, pool_timeout=None)[source]
Bases:
objectConnection pool for HTTP clients.
- Parameters:
- get_async_client(base_url, headers=None, **kwargs)[source]
Get or create an async HTTP client from the pool.
- cleanup_idle_connections(max_idle_time=300.0)[source]
Clean up idle connections.
- Parameters:
max_idle_time (float) – Maximum idle time in seconds
- Return type:
None
- property active_connections: int
Get number of active connections.
- Returns:
Number of active connections
- get_connection(base_url)[source]
Get a connection for a specific base URL.
- Parameters:
base_url (str) – Base URL for the connection
- Returns:
HTTP client instance or None if not found
- Return type:
Client | None
- return_connection(base_url, client)[source]
Return a connection to the pool.
- Parameters:
base_url (str) – Base URL for the connection
client (Client) – HTTP client to return
- Return type:
None
- get_async_connection(base_url)[source]
Get an async connection for a specific base URL.
- Parameters:
base_url (str) – Base URL for the connection
- Returns:
Async HTTP client instance or None if not found
- Return type:
AsyncClient | None
- return_async_connection(base_url, client)[source]
Return an async connection to the pool.
- Parameters:
base_url (str) – Base URL for the connection
client (AsyncClient) – Async HTTP client to return
- Return type:
None
PooledHTTPClient
- class honeyhive.utils.connection_pool.PooledHTTPClient(pool, **kwargs)[source]
Bases:
objectHTTP client that uses connection pooling.
- Parameters:
pool (ConnectionPool)
kwargs (Any)
PooledAsyncHTTPClient
- class honeyhive.utils.connection_pool.PooledAsyncHTTPClient(pool, **kwargs)[source]
Bases:
objectAsync HTTP client that uses connection pooling.
- Parameters:
pool (ConnectionPool)
kwargs (Any)
Data Structures
DotDict
- class honeyhive.utils.dotdict.DotDict(*args, **kwargs)[source]
Bases:
dictDictionary with dot notation access.
Example
>>> d = DotDict({'foo': {'bar': 'baz'}}) >>> d.foo.bar 'baz' >>> d.foo.bar = 'qux' >>> d['foo']['bar'] 'qux'
BaggageDict
- class honeyhive.utils.baggage_dict.BaggageDict(ctx=None)[source]
Bases:
objectDictionary-like interface for OpenTelemetry baggage.
This class provides a convenient way to work with OpenTelemetry baggage as if it were a regular dictionary, while maintaining proper context propagation.
- Parameters:
ctx (Context | None)
- set(key, value)[source]
Set a value in baggage.
- Parameters:
- Returns:
New BaggageDict with updated context
- Return type:
- delete(key)[source]
Delete a key from baggage.
- Parameters:
key (str) – Baggage key to delete
- Returns:
New BaggageDict with updated context
- Return type:
- update(**kwargs)[source]
Update multiple baggage values.
- Parameters:
**kwargs (Any) – Key-value pairs to set
- Returns:
New BaggageDict with updated context
- Return type:
Retry Configuration
RetryConfig
- class honeyhive.utils.retry.RetryConfig(strategy='exponential', backoff_strategy=None, max_retries=3, retry_on_status_codes=None)[source]
Bases:
objectConfiguration for retry behavior.
- Parameters:
- classmethod exponential(initial_delay=1.0, max_delay=60.0, multiplier=2.0, max_retries=3)[source]
Create an exponential backoff retry configuration.
- Parameters:
- Return type:
- classmethod linear(delay=1.0, max_retries=3)[source]
Create a linear backoff retry configuration.
- Parameters:
- Return type:
- classmethod constant(delay=1.0, max_retries=3)[source]
Create a constant delay retry configuration.
- Parameters:
- Return type:
Logging
HoneyHiveLogger
- class honeyhive.utils.logger.HoneyHiveLogger(name, *, level=None, formatter=None, handler=None, verbose=None)[source]
Bases:
objectHoneyHive logger with structured logging.
Provides a structured logging interface with HoneyHive-specific formatting and context data support. Uses per-instance configuration instead of global config for multi-instance architecture.
- Parameters:
- update_verbose_setting(verbose)[source]
Dynamically update the logger’s verbose setting.
This allows the tracer to update the logger’s level after initialization based on configuration changes.
- Parameters:
verbose (bool) – New verbose setting
- Return type:
None
- debug(message, *args, honeyhive_data=None, **kwargs)[source]
Log debug message with lazy formatting support.
- info(message, *args, honeyhive_data=None, **kwargs)[source]
Log info message with lazy formatting support.
- warning(message, *args, honeyhive_data=None, **kwargs)[source]
Log warning message with lazy formatting support.
- error(message, *args, honeyhive_data=None, **kwargs)[source]
Log error message with lazy formatting support.
get_logger
- honeyhive.utils.logger.get_logger(name, verbose=None, tracer_instance=None, **kwargs)[source]
Get a HoneyHive logger instance with dynamic configuration.
Uses dynamic logic to determine logger configuration based on tracer instance settings or explicit parameters.
- Parameters:
- Returns:
Configured HoneyHive logger instance
- Return type:
Distributed Tracing (v1.0+)
Context Propagation Functions
These functions enable distributed tracing by propagating trace context across service boundaries via HTTP headers.
inject_context_into_carrier
- honeyhive.tracer.processing.context.inject_context_into_carrier(carrier, tracer_instance)[source]
Inject OpenTelemetry context into a carrier dictionary.
This function injects the current OpenTelemetry context (including trace context and baggage) into a carrier dictionary for cross-service or cross-process propagation.
- Parameters:
carrier (Dict[str, str]) – Dictionary to inject context into
tracer_instance (HoneyHiveTracer) – The tracer instance for propagator access
- Return type:
None
Example:
headers = {} inject_context_into_carrier(headers, tracer) # headers now contains trace context and baggage # Use headers in HTTP request response = requests.get(url, headers=headers)
Note:
The carrier dictionary will be modified in-place with context information. This is typically used for HTTP headers or message metadata in distributed systems.
Adds OpenTelemetry trace context (trace ID, span ID, baggage) to a dictionary (typically HTTP headers) for propagation to downstream services.
Example:
from honeyhive.tracer.processing.context import inject_context_into_carrier
import requests
# Inject trace context into HTTP headers
headers = {"Content-Type": "application/json"}
inject_context_into_carrier(headers, tracer)
# Send request with distributed trace context
response = requests.post(
"http://downstream-service/api/endpoint",
json=data,
headers=headers # Trace context propagates here
)
extract_context_from_carrier
- honeyhive.tracer.processing.context.extract_context_from_carrier(carrier, tracer_instance)[source]
Extract OpenTelemetry context from a carrier dictionary.
This function extracts OpenTelemetry context (including trace context and baggage) from a carrier dictionary, typically received from another service or process.
- Parameters:
carrier (Dict[str, str]) – Dictionary containing context information
tracer_instance (HoneyHiveTracer) – The tracer instance for propagator access
- Returns:
Extracted OpenTelemetry context or None if extraction fails
- Return type:
Optional[Context]
Example:
# Extract context from HTTP headers extracted_context = extract_context_from_carrier(request.headers, tracer) # Use extracted context as parent for new spans with tracer.start_span("operation", context=extracted_context) as span: # This span will be a child of the remote span pass
Note:
This function is typically used in service endpoints to continue distributed traces from upstream services. The extracted context can be used as a parent context for new spans.
Extracts OpenTelemetry trace context from a dictionary (typically HTTP headers) received from an upstream service.
Example:
from flask import request
from honeyhive.tracer.processing.context import extract_context_from_carrier
from opentelemetry import context
@app.route("/api/endpoint", methods=["POST"])
def endpoint():
# Extract trace context from incoming headers
incoming_context = extract_context_from_carrier(dict(request.headers), tracer)
# Attach context so spans become children of parent trace
if incoming_context:
token = context.attach(incoming_context)
try:
# Your business logic here
result = do_work()
return jsonify(result)
finally:
if incoming_context:
context.detach(token)
with_distributed_trace_context (Recommended)
- honeyhive.tracer.processing.context.with_distributed_trace_context(carrier, tracer_instance, *, session_id=None)[source]
Context manager for distributed tracing that extracts and sets up context.
This function extracts OpenTelemetry context from a carrier (e.g., HTTP headers), extracts session_id from baggage if available, and attaches the context with session_id in baggage. This is the recommended way to handle distributed tracing on the server side.
- Parameters:
carrier (Dict[str, str]) – Dictionary containing trace context (e.g., HTTP headers)
tracer_instance (HoneyHiveTracer) – The tracer instance for propagator access
session_id (Optional[str]) – Optional explicit session_id to use (overrides baggage)
- Returns:
Context manager that yields the extracted context
- Return type:
Iterator[Context]
Example:
@app.route("/api/endpoint", methods=["POST"]) def my_endpoint(): with with_distributed_trace_context(dict(request.headers), tracer) as ctx: # All spans created here will use the propagated session_id with tracer.start_span("operation"): pass
Note for async functions:
If you need to use this with asyncio.run(), you’ll need to re-attach the context inside the async function since asyncio.run() creates a new event loop:
with with_distributed_trace_context(dict(request.headers), tracer) as ctx: async def my_async_function(): # Re-attach context in new event loop token = context.attach(ctx) try: # Your async code here pass finally: context.detach(token) asyncio.run(my_async_function())
New in v1.0+: Simplified context manager for server-side distributed tracing that handles extraction, baggage parsing, and context attachment automatically.
This is the recommended approach for modern Python applications.
Advantages:
✅ Concise: 1 line vs 65 lines of boilerplate
✅ Thread-safe: Automatic context isolation per request
✅ Automatic cleanup: Context detached even on exceptions
✅ Baggage handling: Automatically extracts and preserves
session_id,project,source✅ Works with async: Handles
asyncio.run()edge cases
Example:
from flask import Flask, request, jsonify
from honeyhive import HoneyHiveTracer
from honeyhive.tracer.processing.context import with_distributed_trace_context
tracer = HoneyHiveTracer.init(
project="distributed-app",
source="api-service"
)
app = Flask(__name__)
@app.route("/api/process", methods=["POST"])
def process():
"""Server endpoint with simplified distributed tracing."""
# Single line replaces ~65 lines of context management
with with_distributed_trace_context(dict(request.headers), tracer):
# All spans created here automatically:
# - Use the client's session_id
# - Become children of the parent trace
# - Inherit the client's project and source
with tracer.start_span("process_request") as span:
data = request.get_json()
result = process_data(data)
return jsonify(result)
Works seamlessly with the @trace decorator:
from honeyhive import trace
@app.route("/api/endpoint", methods=["POST"])
def endpoint():
with with_distributed_trace_context(dict(request.headers), tracer):
return handle_request()
@trace(event_type="chain")
def handle_request():
# Decorator automatically uses the distributed context
return {"status": "success"}
Note
The @trace decorator in v1.0+ preserves existing baggage from distributed traces, so you don’t need to manually set session_id or other baggage items inside decorated functions.
For async functions with asyncio.run():
If you need to use asyncio.run() inside your handler, you’ll need to re-attach the context in the async function since asyncio.run() creates a new event loop:
from opentelemetry import context
@app.route("/api/async-endpoint", methods=["POST"])
def async_endpoint():
with with_distributed_trace_context(dict(request.headers), tracer) as ctx:
async def process():
# Re-attach context in new event loop
token = context.attach(ctx)
try:
# Your async code here
result = await async_operation()
return result
finally:
context.detach(token)
return jsonify(asyncio.run(process()))
See Also
API Client Classes - API client reference
Configuration Options Reference - Configuration options
End-to-End Distributed Tracing - Distributed tracing tutorial