Source code for honeyhive.tracer.core.base

"""Base tracer implementation with initialization and core infrastructure.

This module provides the foundational HoneyHive tracer class with dynamic
initialization, configuration management, and utility classes. It uses
dynamic logic for flexible configuration handling and graceful degradation.
"""

# pylint: disable=too-many-lines
# Justification: Base tracer class requires extensive functionality including
# dynamic configuration, cache management, resource detection, attribute
# normalization, and backward compatibility. Splitting would break cohesion.

# pylint: disable=duplicate-code
# Justification: Legitimate shared patterns with decorator and operations mixins.
# Duplicate code represents common dynamic attribute normalization patterns
# shared across core mixin classes for consistent behavior.

import os
import platform
import threading
from typing import Any, Dict, Optional, Self, Union

from opentelemetry.trace import INVALID_SPAN_CONTEXT, SpanKind

from ...api.client import HoneyHive
from ...api.session import SessionAPI
from ...config import create_unified_config
from ...config.models import EvaluationConfig, SessionConfig, TracerConfig
from ...utils.cache import CacheConfig, CacheManager
from ...utils.dotdict import DotDict
from ...utils.logger import safe_log
from ..infra import build_otel_resources
from ..instrumentation.initialization import initialize_tracer_instance
from ..lifecycle.core import get_lock_config

# Removed TracerConfigInterface - replaced with DotDict config


# Sentinel type for detecting explicitly passed parameters
class _ExplicitType:  # pylint: disable=too-few-public-methods
    """Sentinel type for detecting explicitly passed vs default parameters."""

    def __repr__(self) -> str:
        return "<EXPLICIT>"


_EXPLICIT = _ExplicitType()


[docs] class NoOpSpan: """No-op span implementation for graceful degradation. This class provides a safe default span that implements the same interface as a real span but performs no operations. This follows OpenTelemetry best practices for error handling - never return None, always return a usable object. """ def __init__(self) -> None: """Initialize no-op span with safe defaults.""" self.kind = SpanKind.INTERNAL self._attributes: Dict[str, Any] = {}
[docs] def set_attribute(self, key: str, value: Any) -> None: """Set attribute (no-op)."""
[docs] def set_attributes(self, attributes: Dict[str, Any]) -> None: """Set multiple attributes (no-op)."""
[docs] def add_event( self, name: str, attributes: Optional[Dict[str, Any]] = None, timestamp: Optional[int] = None, ) -> None: """Add event (no-op)."""
[docs] def record_exception( self, exception: Exception, attributes: Optional[Dict[str, Any]] = None, timestamp: Optional[int] = None, escaped: bool = False, ) -> None: """Record exception (no-op)."""
[docs] def set_status(self, status: Any, description: Optional[str] = None) -> None: """Set status (no-op)."""
[docs] def update_name(self, name: str) -> None: """Update name (no-op)."""
[docs] def end(self, end_time: Optional[int] = None) -> None: """End span (no-op)."""
[docs] def is_recording(self) -> bool: """Check if span is recording (always False for no-op).""" return False
[docs] def get_span_context(self) -> Any: """Get span context (returns invalid context).""" return INVALID_SPAN_CONTEXT
[docs] class HoneyHiveTracerBase: # pylint: disable=too-many-instance-attributes """Base HoneyHive tracer with dynamic initialization and configuration. This base class provides the core infrastructure for HoneyHive tracing including dynamic configuration handling, initialization logic, and foundational properties. It uses dynamic patterns for flexible setup. Note: too-many-instance-attributes disabled - Base tracer class requires extensive attributes for configuration management, state tracking, API clients, threading locks, and backward compatibility support. """ # Type annotations for instance attributes config: DotDict client: Optional["HoneyHive"] session_api: Optional["SessionAPI"] _baggage_lock: "threading.Lock" _session_id: Optional[str] tracer: Any # OpenTelemetry Tracer instance # Sentinel object moved to module level with proper typing # pylint: disable=too-many-arguments # Justification: This __init__ supports both new Pydantic config approach # and backwards compatibility. High argument count necessary for API compatibility. def __init__( self, # New Pydantic config approach (recommended) config: Optional["TracerConfig"] = None, session_config: Optional["SessionConfig"] = None, evaluation_config: Optional["EvaluationConfig"] = None, *, # Force all remaining arguments to be keyword-only # Backwards compatibility - all original parameters (keyword-only) # Use _EXPLICIT as sentinel to detect explicitly passed vs default values api_key: Union[Optional[str], _ExplicitType] = _EXPLICIT, project: Union[Optional[str], _ExplicitType] = _EXPLICIT, session_name: Union[Optional[str], _ExplicitType] = _EXPLICIT, source: Union[str, _ExplicitType] = _EXPLICIT, server_url: Union[Optional[str], _ExplicitType] = _EXPLICIT, session_id: Union[Optional[str], _ExplicitType] = _EXPLICIT, disable_http_tracing: Union[Optional[bool], _ExplicitType] = _EXPLICIT, disable_batch: Union[bool, _ExplicitType] = _EXPLICIT, verbose: Union[bool, _ExplicitType] = _EXPLICIT, inputs: Union[Optional[Dict[str, Any]], _ExplicitType] = _EXPLICIT, is_evaluation: Union[bool, _ExplicitType] = _EXPLICIT, run_id: Union[Optional[str], _ExplicitType] = _EXPLICIT, dataset_id: Union[Optional[str], _ExplicitType] = _EXPLICIT, datapoint_id: Union[Optional[str], _ExplicitType] = _EXPLICIT, link_carrier: Union[Optional[Dict[str, Any]], _ExplicitType] = _EXPLICIT, test_mode: Union[bool, _ExplicitType] = _EXPLICIT, **kwargs: Any, ) -> None: """Initialize the HoneyHive tracer using dynamic configuration merging. This constructor uses dynamic logic to merge Pydantic config objects with backward-compatible parameters, allowing flexible initialization patterns. New Pydantic Config Approach (Recommended): config = TracerConfig(api_key="...", project="...", verbose=True) tracer = HoneyHiveTracer(config=config) Backwards Compatible Approach (Still Supported): tracer = HoneyHiveTracer(api_key="...", project="...", verbose=True) :param config: Pydantic tracer configuration object (recommended) :type config: Optional[TracerConfig] :param session_config: Session-specific configuration :type session_config: Optional[SessionConfig] :param evaluation_config: Evaluation-specific configuration :type evaluation_config: Optional[EvaluationConfig] """ # Multi-instance architecture uses safe_log() for all logging # No direct logger assignment needed - safe_log handles per-instance logging # Dynamic configuration merging - handles both new and legacy patterns # Create parameter dict with only explicitly provided parameters explicit_params = {} # Map of parameter names to their values - only include if not sentinel param_mapping = { "api_key": api_key, "project": project, "session_name": session_name, "source": source, "server_url": server_url, "session_id": session_id, "disable_http_tracing": disable_http_tracing, "disable_batch": disable_batch, "verbose": verbose, "inputs": inputs, "is_evaluation": is_evaluation, "run_id": run_id, "dataset_id": dataset_id, "datapoint_id": datapoint_id, "link_carrier": link_carrier, "test_mode": test_mode, } # Only include explicitly provided parameters (not sentinel values) for param_name, value in param_mapping.items(): if value is not _EXPLICIT: explicit_params[param_name] = value # Use centralized config merging from config module self.config = create_unified_config( config=config, session_config=session_config, evaluation_config=evaluation_config, **explicit_params, **kwargs, ) # Initialize core instance attributes dynamically self._initialize_core_attributes() # Initialize OpenTelemetry components using dynamic initialization self._initialize_otel_components() # Set up API clients using dynamic configuration self._initialize_api_clients() # Configuration merging moved to config module - see create_unified_config() def _initialize_core_attributes(self) -> None: """Initialize core tracer attributes using dynamic configuration.""" # Extract configuration values dynamically config = self.config # Core tracer state self._initialized = False self._instance_shutdown = ( False # Instance-specific shutdown flag for multi-instance architecture ) self.test_mode = config.get("test_mode", False) # Core configuration attributes self.api_key = config.get("api_key") self.server_url = config.get("server_url") self.verbose = config.get("verbose", False) # Session management attributes (both public and private for compatibility) self.session_name = config.get("session_name") # session_id is now properly promoted to root by create_unified_config() # Fallback to nested location for extra safety self.session_id = config.get("session_id") or ( config.get("session", {}).get("session_id") if isinstance(config.get("session"), dict) else None ) self._session_name = self.session_name # Private version for internal use self._session_id = self.session_id # Private version for internal use # Evaluation attributes self.is_evaluation = config.get("is_evaluation", False) self.run_id = config.get("run_id") self.dataset_id = config.get("dataset_id") self.datapoint_id = config.get("datapoint_id") # Legacy compatibility attributes self.project = config.get("project") self.source = config.get("source") # Dynamic Cache Management - Initialize per-instance cache manager self._cache_manager = self._initialize_cache_manager(config) # Initialize evaluation context self._evaluation_context: Dict[str, Any] = {} # Dynamic evaluation context setup if self.is_evaluation: self._setup_evaluation_context_dynamically(config) # OpenTelemetry components (initialized later) self.provider = None self.tracer = None self.span_processor = None self.propagator = None # Provider management for multi-instance architecture self.is_main_provider = False self._tracer_id = None # Per-instance locking for high-concurrency scenarios self._baggage_lock = threading.Lock() self._instance_lock = threading.RLock() # Reentrant for same thread self._flush_lock = threading.Lock() # Separate lock for flush operations def _initialize_otel_components(self) -> None: """Initialize OpenTelemetry components using dynamic initialization.""" try: # Use dynamic initialization helper initialize_tracer_instance(self) self._initialized = True safe_log( self, "info", "HoneyHive tracer initialized successfully", honeyhive_data={ "architecture": "multi-instance", "test_mode": self.test_mode, "has_session": bool(self._session_id), }, ) except Exception as e: safe_log( self, "error", "Failed to initialize tracer: %s", str(e), honeyhive_data={"error_type": type(e).__name__}, ) # Graceful degradation - tracer remains usable but in no-op mode self._initialized = False def _initialize_api_clients(self) -> None: """Initialize API clients using dynamic configuration.""" config = self.config # Initialize HoneyHive API client dynamically api_params = self._extract_api_parameters_dynamically(config) if api_params: try: self.client = HoneyHive(**api_params, tracer_instance=self) self.session_api = SessionAPI(self.client) except Exception as e: safe_log( self, "warning", "Failed to initialize API client: %s", str(e), honeyhive_data={"error_type": type(e).__name__}, ) # Graceful degradation self.client = None self.session_api = None else: self.client = None self.session_api = None def _extract_api_parameters_dynamically( self, config: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """Dynamically extract API parameters from configuration.""" # Required parameters for tracer (api_key for API client, project for tracer) api_key = config.get("api_key") project = config.get("project") # Both api_key and project are required for tracer functionality if not api_key or not project: return None # Build API parameters dynamically (only params accepted by HoneyHive API) api_params = {} # Map configuration keys to API client parameters (excluding project) param_mapping = { "api_key": "api_key", "server_url": "server_url", "timeout": "timeout", "test_mode": "test_mode", "verbose": "verbose", } for config_key, api_key_param in param_mapping.items(): value = config.get(config_key) if value is not None: api_params[api_key_param] = value return api_params # Legacy config resolution methods removed - all consumers should use # self.config.get() directly. The unified DotDict config object from # create_unified_config() handles all resolution def _initialize_cache_manager(self, config: Any) -> Optional[CacheManager]: """Initialize cache manager with config-driven defaults. Args: config: Configuration object (dict or Pydantic model) Returns: CacheManager instance if caching is enabled, None otherwise """ # Check if caching is enabled cache_enabled = config.get("cache_enabled", True) if not cache_enabled: safe_log(self, "debug", "Cache disabled via configuration") return None # Generate unique instance ID for multi-instance isolation instance_id = f"tracer_{id(self)}_{getattr(self, '_tracer_id', 'unknown')}" # Use config-driven defaults with sensible fallbacks cache_config = CacheConfig( max_size=config.get("cache_max_size", 1000), default_ttl=config.get("cache_ttl", 300.0), cleanup_interval=60.0, # Static - no need for dynamic calculation enable_stats=True, ) try: cache_manager = CacheManager(instance_id=instance_id, config=cache_config) safe_log( self, "debug", "Initialized cache manager for instance %s", instance_id ) return cache_manager except Exception as e: # Graceful degradation - cache failures should not break tracer safe_log(self, "warning", "Failed to initialize cache manager: %s", e) return None def _setup_evaluation_context_dynamically(self, config: Dict[str, Any]) -> None: """Dynamically set up evaluation context from configuration.""" # Extract evaluation-specific fields dynamically evaluation_fields = ["run_id", "dataset_id", "datapoint_id", "is_evaluation"] for field in evaluation_fields: value = config.get(field) if value is not None: self._evaluation_context[field] = value def _merge_configs_internally( self, config: Optional[TracerConfig] = None, session_config: Optional[SessionConfig] = None, evaluation_config: Optional[EvaluationConfig] = None, **individual_params: Any, ) -> tuple[TracerConfig, SessionConfig, EvaluationConfig]: """Internal method to merge config objects with individual parameters. This method encapsulates the hybrid assembly logic within the class, ensuring that the external interface only exposes the final merged result. Individual parameters take precedence over config object values for backwards compatibility. """ # Start with defaults or provided configs tracer_config = config or TracerConfig() session_cfg = session_config or SessionConfig() eval_cfg = evaluation_config or EvaluationConfig() # Override tracer config with individual parameters tracer_overrides = {} for field in TracerConfig.model_fields.keys(): if field in individual_params: tracer_overrides[field] = individual_params[field] if tracer_overrides: tracer_config = tracer_config.model_copy(update=tracer_overrides) # Override session config with individual parameters session_overrides = {} for field in SessionConfig.model_fields.keys(): if field in individual_params: session_overrides[field] = individual_params[field] if session_overrides: session_cfg = session_cfg.model_copy(update=session_overrides) # Override evaluation config with individual parameters eval_overrides = {} for field in EvaluationConfig.model_fields.keys(): if field in individual_params: eval_overrides[field] = individual_params[field] if eval_overrides: eval_cfg = eval_cfg.model_copy(update=eval_overrides) return tracer_config, session_cfg, eval_cfg def _acquire_instance_lock_with_timeout( self, timeout: Optional[float] = None ) -> bool: """Acquire per-instance lock with environment-optimized timeout. Args: timeout: Optional custom timeout. If None, uses environment-optimized value Returns: bool: True if lock was acquired, False if timeout occurred Examples: >>> # Auto-optimized timeout >>> if tracer._acquire_instance_lock_with_timeout(): ... try: ... # Perform instance-specific operation ... pass ... finally: ... tracer._release_instance_lock() """ if timeout is None: config = get_lock_config() timeout = config.get("lifecycle_timeout", 1.0) # Ensure timeout is not None for type safety effective_timeout = timeout if timeout is not None else 1.0 return self._instance_lock.acquire(timeout=effective_timeout) def _release_instance_lock(self) -> None: """Release per-instance lock.""" try: self._instance_lock.release() except Exception as e: # Graceful degradation following Agent OS standards - never crash host app safe_log( self, "debug", "Failed to release instance lock", honeyhive_data={"error_type": type(e).__name__}, )
[docs] @classmethod def reset(cls) -> None: """Reset static state for testing purposes. This method provides backward compatibility for test environments that expect a reset capability. In the multi-instance architecture, this primarily delegates to the lifecycle management system. """ # In multi-instance architecture, reset is handled by lifecycle management # This is a no-op for backward compatibility return None
[docs] @classmethod def init( cls, config: Optional["TracerConfig"] = None, session_config: Optional["SessionConfig"] = None, evaluation_config: Optional["EvaluationConfig"] = None, **kwargs: Any, ) -> Self: """Factory method for creating tracer instances with dynamic configuration. This is a simple pass-through to __init__ for backwards compatibility. Args: config: Pydantic tracer configuration session_config: Session-specific configuration evaluation_config: Evaluation-specific configuration **kwargs: Backward-compatible parameters Returns: Initialized HoneyHive tracer instance """ # Simple pass-through to constructor return cls( config=config, session_config=session_config, evaluation_config=evaluation_config, **kwargs, )
def _should_create_session_automatically(self) -> bool: """Dynamically determine if session should be created automatically.""" # Check if we have the necessary components and configuration return ( self.session_api is not None and self._session_name is not None and self._session_id is None # Don't create if already have session_id and not self.test_mode # Skip in test mode ) def _create_session_dynamically(self) -> None: """Dynamically create a session using available configuration.""" if not self.session_api or not self._session_name: return try: # Build session creation parameters dynamically session_params = self._build_session_parameters_dynamically() # Create session via API response = self.session_api.create_session_from_dict(session_params) if hasattr(response, "session_id"): # pylint: disable=attribute-defined-outside-init # Justification: _session_id is properly initialized in __init__. # This is legitimate reassignment during dynamic session creation, # not a first-time attribute definition. self._session_id = response.session_id safe_log( self, "info", "Created session automatically: %s", str(self._session_id), honeyhive_data={"session_name": self._session_name}, ) except Exception as e: safe_log( self, "warning", "Failed to create session automatically: %s", str(e), honeyhive_data={"session_name": self._session_name}, ) def _build_session_parameters_dynamically(self) -> Dict[str, Any]: """Dynamically build session creation parameters.""" params = {"session_name": self._session_name} # Add evaluation context if available if self._evaluation_context: params.update(self._evaluation_context) # Add other dynamic parameters from configuration config = self.config optional_params = ["source", "inputs"] for param in optional_params: value = config.get(param) if value is not None: params[param] = value return params # Properties with dynamic access patterns @property def project_name(self) -> Optional[str]: """Get project name from unified configuration.""" result = self.config.get("project") return str(result) if result is not None else None @property def source_environment(self) -> str: """Get source environment from unified configuration.""" result = self.config.get("source", "dev") return str(result) @property def is_initialized(self) -> bool: """Check if tracer is properly initialized.""" return self._initialized @property def is_test_mode(self) -> bool: """Check if tracer is in test mode.""" return bool(self.test_mode) # Removed config_interface property - replaced with DotDict config # Users should now use tracer.config directly for all configuration access def _normalize_attribute_key_dynamically(self, key: str) -> str: """Dynamically normalize attribute keys with caching for performance. This method uses dynamic caching to optimize repeated attribute key normalization, which is critical for high-throughput tracing scenarios. """ if not isinstance(key, str): key = str(key) if not self._is_caching_enabled() or not self._cache_manager: return self._perform_key_normalization(key) # Use cache manager's domain-specific method attr_key = f"key_norm:{hash(key)}" result = self._cache_manager.get_cached_attributes( attr_key=attr_key, normalizer_func=lambda: self._perform_key_normalization(key), ) return str(result) # Ensure string return type def _perform_key_normalization(self, key: str) -> str: """Perform the actual key normalization logic.""" # Replace invalid characters dynamically normalized = key.replace(".", "_").replace("-", "_").replace(" ", "_") # Ensure valid identifier if not normalized or normalized[0].isdigit(): normalized = f"attr_{normalized}" return normalized def _normalize_attribute_value_dynamically(self, value: Any) -> Any: """Dynamically normalize attribute values with caching for performance. This method uses dynamic caching to optimize repeated attribute value normalization, especially for complex objects that require string conversion. """ # Handle None values immediately (no caching needed) if value is None: return None # Handle basic types that don't need normalization (no caching needed) if isinstance(value, (str, int, float, bool)): return value # Use caching for complex types only if not self._is_caching_enabled() or not self._cache_manager: return self._perform_value_normalization(value) # Use cache manager's domain-specific method try: value_type = type(value).__name__ attr_key = f"val_norm:{hash(str(value))}:{value_type}" return self._cache_manager.get_cached_attributes( attr_key=attr_key, normalizer_func=lambda: self._perform_value_normalization(value), ) except Exception: # If hashing fails, skip caching and normalize directly return self._perform_value_normalization(value) def _perform_value_normalization(self, value: Any) -> Any: """Perform the actual value normalization logic.""" # Handle enum values dynamically if hasattr(value, "value"): return value.value # Convert complex types to strings try: return str(value) except Exception as e: # Graceful degradation following Agent OS standards - never crash host app safe_log( self, "debug", "Failed to serialize attribute value", honeyhive_data={"error_type": type(e).__name__}, ) return "<unserializable>" # Cache getter methods removed - using CacheManager domain methods directly def _is_caching_enabled(self) -> bool: """Check if caching is enabled via configuration. Returns: True if caching is enabled and cache manager is available, False otherwise """ # Check if cache manager exists if not hasattr(self, "_cache_manager") or not self._cache_manager: return False # Direct config resolution (never use caching for this check) if hasattr(self, "config") and self.config: return bool(self.config.get("cache_enabled", True)) # No merged config available, default to True return True def _detect_resources_with_cache(self) -> Dict[str, Any]: """Detect system resources with dynamic caching for performance. This method performs expensive system resource detection and caches the results for improved performance on subsequent calls. Returns: Dictionary containing detected resource information """ if not self._is_caching_enabled() or not self._cache_manager: return self._perform_resource_detection() # Use cache manager's domain-specific method resource_key = self._build_resource_cache_key() return self._cache_manager.get_cached_resources( resource_key=resource_key, detector_func=self._perform_resource_detection ) def _build_resource_cache_key(self) -> str: """Build cache key for resource detection based on system characteristics. Returns: Cache key string based on stable system identifiers """ # Dynamic key based on system characteristics that affect resources key_components = [ platform.system(), # OS type (Linux, Darwin, Windows) platform.machine(), # Architecture (x86_64, arm64) str(os.getpid()), # Process ID (changes per process) os.getenv("HOSTNAME", "unknown"), # Hostname for containerized environments os.getenv("KUBERNETES_SERVICE_HOST", ""), # K8s detection os.getenv("AWS_LAMBDA_FUNCTION_NAME", ""), # Lambda detection ] # Create stable hash of key components key_string = "|".join(str(c) for c in key_components) return f"resources:{hash(key_string)}" def _perform_resource_detection(self) -> Dict[str, Any]: """Perform resource detection using the infra module. Returns: Dictionary containing detected resource attributes """ return build_otel_resources(self) # Resource detection methods moved to infra module # Backwards compatibility methods for context propagation
[docs] def inject(self, carrier: Dict[str, Any]) -> Dict[str, Any]: """Inject context into carrier for backwards compatibility. Args: carrier: Dictionary to inject context into Returns: The carrier with injected context """ # Use existing context injection if available if hasattr(self, "inject_context"): self.inject_context(carrier) return carrier