Source code for honeyhive.api.client

"""HoneyHive API Client - HTTP client with retry support."""

import asyncio
import time
from typing import Any, Dict, Optional

import httpx

from ..config.models.api_client import APIClientConfig
from ..utils.connection_pool import ConnectionPool, PoolConfig
from ..utils.error_handler import ErrorContext, get_error_handler
from ..utils.logger import HoneyHiveLogger, get_logger, safe_log
from ..utils.retry import RetryConfig
from .configurations import ConfigurationsAPI
from .datapoints import DatapointsAPI
from .datasets import DatasetsAPI
from .evaluations import EvaluationsAPI
from .events import EventsAPI
from .metrics import MetricsAPI
from .projects import ProjectsAPI
from .session import SessionAPI
from .tools import ToolsAPI


[docs] class RateLimiter: """Simple rate limiter for API calls. Provides basic rate limiting functionality to prevent exceeding API rate limits. """
[docs] def __init__(self, max_calls: int = 100, time_window: float = 60.0): """Initialize the rate limiter. Args: max_calls: Maximum number of calls allowed in the time window time_window: Time window in seconds for rate limiting """ self.max_calls = max_calls self.time_window = time_window self.calls: list = []
[docs] def can_call(self) -> bool: """Check if a call can be made. Returns: True if a call can be made, False if rate limit is exceeded """ now = time.time() # Remove old calls outside the time window self.calls = [ call_time for call_time in self.calls if now - call_time < self.time_window ] if len(self.calls) < self.max_calls: self.calls.append(now) return True return False
[docs] def wait_if_needed(self) -> None: """Wait if rate limit is exceeded. Blocks execution until a call can be made. """ while not self.can_call(): time.sleep(0.1) # Small delay
# ConnectionPool is now imported from utils.connection_pool for full feature support
[docs] class HoneyHive: # pylint: disable=too-many-instance-attributes """Main HoneyHive API client.""" # Type annotations for instance attributes logger: Optional[HoneyHiveLogger]
[docs] def __init__( # pylint: disable=too-many-arguments self, *, api_key: Optional[str] = None, server_url: Optional[str] = None, timeout: Optional[float] = None, retry_config: Optional[RetryConfig] = None, rate_limit_calls: int = 100, rate_limit_window: float = 60.0, max_connections: int = 10, max_keepalive: int = 20, test_mode: Optional[bool] = None, verbose: bool = False, tracer_instance: Optional[Any] = None, ): """Initialize the HoneyHive client. Args: api_key: API key for authentication server_url: Server URL for the API timeout: Request timeout in seconds retry_config: Retry configuration rate_limit_calls: Maximum calls per time window rate_limit_window: Time window in seconds max_connections: Maximum connections in pool max_keepalive: Maximum keepalive connections test_mode: Enable test mode (None = use config default) verbose: Enable verbose logging for API debugging tracer_instance: Optional tracer instance for multi-instance logging """ # Load fresh config using per-instance configuration # Create fresh config instance to pick up environment variables fresh_config = APIClientConfig() self.api_key = api_key or fresh_config.api_key # Allow initialization without API key for degraded mode # API calls will fail gracefully if no key is provided self.server_url = server_url or fresh_config.server_url # pylint: disable=no-member # fresh_config.http_config is HTTPClientConfig instance, not FieldInfo self.timeout = timeout or fresh_config.http_config.timeout self.retry_config = retry_config or RetryConfig() self.test_mode = fresh_config.test_mode if test_mode is None else test_mode self.verbose = verbose or fresh_config.verbose self.tracer_instance = tracer_instance # Initialize rate limiter and connection pool with configuration values self.rate_limiter = RateLimiter( rate_limit_calls or fresh_config.http_config.rate_limit_calls, rate_limit_window or fresh_config.http_config.rate_limit_window, ) # ENVIRONMENT-AWARE CONNECTION POOL: Full features in production, \ # safe in pytest-xdist # Uses feature-complete connection pool with automatic environment detection self.connection_pool = ConnectionPool( config=PoolConfig( max_connections=max_connections or fresh_config.http_config.max_connections, max_keepalive_connections=max_keepalive or fresh_config.http_config.max_keepalive_connections, timeout=self.timeout, keepalive_expiry=30.0, # Default keepalive expiry retries=self.retry_config.max_retries, pool_timeout=10.0, # Default pool timeout ) ) # Initialize logger for independent use (when not used by tracer) # When used by tracer, logging goes through tracer's safe_log if not self.tracer_instance: if self.verbose: self.logger = get_logger("honeyhive.client", level="DEBUG") else: self.logger = get_logger("honeyhive.client") else: # When used by tracer, we don't need an independent logger self.logger = None # Lazy initialization of HTTP clients self._sync_client: Optional[httpx.Client] = None self._async_client: Optional[httpx.AsyncClient] = None # Initialize API modules self.sessions = SessionAPI(self) # Changed from self.session to self.sessions self.events = EventsAPI(self) self.tools = ToolsAPI(self) self.datapoints = DatapointsAPI(self) self.datasets = DatasetsAPI(self) self.configurations = ConfigurationsAPI(self) self.projects = ProjectsAPI(self) self.metrics = MetricsAPI(self) self.evaluations = EvaluationsAPI(self) # Log initialization after all setup is complete # Enhanced safe_log handles tracer_instance delegation and fallbacks safe_log( self, "info", "HoneyHive client initialized", honeyhive_data={ "server_url": self.server_url, "test_mode": self.test_mode, "verbose": self.verbose, }, )
def _log( self, level: str, message: str, honeyhive_data: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: """Unified logging method using enhanced safe_log with automatic delegation. Enhanced safe_log automatically handles: - Tracer instance delegation when self.tracer_instance exists - Independent logger usage when self.logger exists - Graceful fallback for all other cases Args: level: Log level (debug, info, warning, error) message: Log message honeyhive_data: Optional structured data **kwargs: Additional keyword arguments """ # Enhanced safe_log handles all the delegation logic automatically safe_log(self, level, message, honeyhive_data=honeyhive_data, **kwargs) @property def client_kwargs(self) -> Dict[str, Any]: """Get common client configuration.""" # pylint: disable=import-outside-toplevel # Justification: Avoids circular import (__init__.py imports this module) from .. import __version__ return { "headers": { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "User-Agent": f"HoneyHive-Python-SDK/{__version__}", }, "timeout": self.timeout, "limits": httpx.Limits( max_connections=self.connection_pool.config.max_connections, max_keepalive_connections=( self.connection_pool.config.max_keepalive_connections ), ), } @property def sync_client(self) -> httpx.Client: """Get or create sync HTTP client.""" if self._sync_client is None: self._sync_client = httpx.Client(**self.client_kwargs) return self._sync_client @property def async_client(self) -> httpx.AsyncClient: """Get or create async HTTP client.""" if self._async_client is None: self._async_client = httpx.AsyncClient(**self.client_kwargs) return self._async_client def _make_url(self, path: str) -> str: """Create full URL from path.""" if path.startswith("http"): return path return f"{self.server_url.rstrip('/')}/{path.lstrip('/')}"
[docs] def get_health(self) -> Dict[str, Any]: """Get API health status. Returns basic info since health endpoint \ may not exist.""" error_handler = get_error_handler() context = ErrorContext( operation="get_health", method="GET", url=f"{self.server_url}/api/v1/health", client_name="HoneyHive", ) try: with error_handler.handle_operation(context): response = self.request("GET", "/api/v1/health") if response.status_code == 200: return response.json() # type: ignore[no-any-return] except Exception: # Health endpoint may not exist, return basic info pass # Return basic health info if health endpoint doesn't exist return { "status": "healthy", "message": "API client is operational", "server_url": self.server_url, "timestamp": time.time(), }
[docs] async def get_health_async(self) -> Dict[str, Any]: """Get API health status asynchronously. Returns basic info since \ health endpoint may not exist.""" error_handler = get_error_handler() context = ErrorContext( operation="get_health_async", method="GET", url=f"{self.server_url}/api/v1/health", client_name="HoneyHive", ) try: with error_handler.handle_operation(context): response = await self.request_async("GET", "/api/v1/health") if response.status_code == 200: return response.json() # type: ignore[no-any-return] except Exception: # Health endpoint may not exist, return basic info pass # Return basic health info if health endpoint doesn't exist return { "status": "healthy", "message": "API client is operational", "server_url": self.server_url, "timestamp": time.time(), }
[docs] def request( self, method: str, path: str, params: Optional[Dict[str, Any]] = None, json: Optional[Any] = None, **kwargs: Any, ) -> httpx.Response: """Make a synchronous HTTP request with rate limiting and retry logic.""" # Enhanced debug logging for pytest hang investigation self._log( "debug", "🔍 REQUEST START", honeyhive_data={ "method": method, "path": path, "params": params, "json": json, "test_mode": self.test_mode, }, ) # Apply rate limiting self._log("debug", "🔍 Applying rate limiting...") self.rate_limiter.wait_if_needed() self._log("debug", "🔍 Rate limiting completed") url = self._make_url(path) self._log("debug", f"🔍 URL created: {url}") self._log( "debug", "Making request", honeyhive_data={ "method": method, "url": url, "params": params, "json": json, }, ) if self.verbose: self._log( "info", "API Request Details", honeyhive_data={ "method": method, "url": url, "params": params, "json": json, "headers": self.client_kwargs.get("headers", {}), "timeout": self.timeout, }, ) # Import error handler here to avoid circular imports self._log("debug", "🔍 Creating error handler...") error_handler = get_error_handler() context = ErrorContext( operation="request", method=method, url=url, params=params, json_data=json, client_name="HoneyHive", ) self._log("debug", "🔍 Error handler created") self._log("debug", "🔍 Starting HTTP request...") with error_handler.handle_operation(context): self._log("debug", "🔍 Making sync_client.request call...") response = self.sync_client.request( method, url, params=params, json=json, **kwargs ) self._log( "debug", f"🔍 HTTP request completed with status: {response.status_code}", ) if self.verbose: self._log( "info", "API Response Details", honeyhive_data={ "method": method, "url": url, "status_code": response.status_code, "headers": dict(response.headers), "elapsed_time": ( response.elapsed.total_seconds() if hasattr(response, "elapsed") else None ), }, ) if self.retry_config.should_retry(response): return self._retry_request(method, path, params, json, **kwargs) return response
[docs] async def request_async( self, method: str, path: str, params: Optional[Dict[str, Any]] = None, json: Optional[Any] = None, **kwargs: Any, ) -> httpx.Response: """Make an asynchronous HTTP request with rate limiting and retry logic.""" # Apply rate limiting self.rate_limiter.wait_if_needed() url = self._make_url(path) self._log( "debug", "Making async request", honeyhive_data={ "method": method, "url": url, "params": params, "json": json, }, ) if self.verbose: self._log( "info", "API Request Details", honeyhive_data={ "method": method, "url": url, "params": params, "json": json, "headers": self.client_kwargs.get("headers", {}), "timeout": self.timeout, }, ) # Import error handler here to avoid circular imports error_handler = get_error_handler() context = ErrorContext( operation="request_async", method=method, url=url, params=params, json_data=json, client_name="HoneyHive", ) with error_handler.handle_operation(context): response = await self.async_client.request( method, url, params=params, json=json, **kwargs ) if self.verbose: self._log( "info", "API Async Response Details", honeyhive_data={ "method": method, "url": url, "status_code": response.status_code, "headers": dict(response.headers), "elapsed_time": ( response.elapsed.total_seconds() if hasattr(response, "elapsed") else None ), }, ) if self.retry_config.should_retry(response): return await self._retry_request_async( method, path, params, json, **kwargs ) return response
def _retry_request( self, method: str, path: str, params: Optional[Dict[str, Any]] = None, json: Optional[Any] = None, **kwargs: Any, ) -> httpx.Response: """Retry a synchronous request.""" for attempt in range(1, self.retry_config.max_retries + 1): delay: float = 0.0 if self.retry_config.backoff_strategy: delay = self.retry_config.backoff_strategy.get_delay(attempt) if delay > 0: time.sleep(delay) # Use unified logging - safe_log handles shutdown detection automatically self._log( "info", f"Retrying request (attempt {attempt})", honeyhive_data={ "method": method, "path": path, "attempt": attempt, }, ) if self.verbose: self._log( "info", "Retry Request Details", honeyhive_data={ "method": method, "path": path, "attempt": attempt, "delay": delay, "params": params, "json": json, }, ) try: response = self.sync_client.request( method, self._make_url(path), params=params, json=json, **kwargs ) return response except Exception: if attempt == self.retry_config.max_retries: raise continue raise httpx.RequestError("Max retries exceeded") async def _retry_request_async( self, method: str, path: str, params: Optional[Dict[str, Any]] = None, json: Optional[Any] = None, **kwargs: Any, ) -> httpx.Response: """Retry an asynchronous request.""" for attempt in range(1, self.retry_config.max_retries + 1): delay: float = 0.0 if self.retry_config.backoff_strategy: delay = self.retry_config.backoff_strategy.get_delay(attempt) if delay > 0: await asyncio.sleep(delay) # Use unified logging - safe_log handles shutdown detection automatically self._log( "info", f"Retrying async request (attempt {attempt})", honeyhive_data={ "method": method, "path": path, "attempt": attempt, }, ) if self.verbose: self._log( "info", "Retry Async Request Details", honeyhive_data={ "method": method, "path": path, "attempt": attempt, "delay": delay, "params": params, "json": json, }, ) try: response = await self.async_client.request( method, self._make_url(path), params=params, json=json, **kwargs ) return response except Exception: if attempt == self.retry_config.max_retries: raise continue raise httpx.RequestError("Max retries exceeded")
[docs] def close(self) -> None: """Close the HTTP clients.""" if self._sync_client: self._sync_client.close() self._sync_client = None if self._async_client: # AsyncClient doesn't have close(), it has aclose() # But we can't call aclose() in a sync context # So we'll just set it to None and let it be garbage collected self._async_client = None # Use unified logging - safe_log handles shutdown detection automatically self._log("info", "HoneyHive client closed")
[docs] async def aclose(self) -> None: """Close the HTTP clients asynchronously.""" if self._async_client: await self._async_client.aclose() self._async_client = None # Use unified logging - safe_log handles shutdown detection automatically self._log("info", "HoneyHive async client closed")
def __enter__(self) -> "HoneyHive": """Context manager entry.""" return self def __exit__( self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[Any], ) -> None: """Context manager exit.""" self.close() async def __aenter__(self) -> "HoneyHive": """Async context manager entry.""" return self async def __aexit__( self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[Any], ) -> None: """Async context manager exit.""" await self.aclose()