Source code for honeyhive.tracer.integration.error_handling

"""Dynamic error handling and resilience for HoneyHive tracer integration.

This module provides comprehensive error handling using dynamic patterns for
graceful degradation, retry mechanisms, and recovery strategies. All error
handling logic is extensible and configuration-driven.
"""

import threading
import time
from dataclasses import dataclass, field
from enum import Enum
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, cast

# Import shared logging utility
from ...utils.logger import safe_log

# pylint: disable=global-statement
# Global statement used for singleton error handler pattern - required for
# maintaining consistent error handling across the entire tracer module


class IntegrationError(Exception):
    """Base exception for integration errors with dynamic context."""

    def __init__(
        self,
        message: str,
        error_code: str = "INTEGRATION_ERROR",
        details: Optional[Dict[str, Any]] = None,
    ):
        super().__init__(message)
        self.error_code = error_code
        self.details = details or {}
        self.timestamp = time.time()


class ProviderIncompatibleError(IntegrationError):
    """Provider doesn't support required operations."""

    def __init__(self, provider_type: str, required_operations: List[str]):
        message = (
            f"Provider {provider_type} doesn't support required operations: "
            f"{required_operations}"
        )
        super().__init__(
            message,
            error_code="PROVIDER_INCOMPATIBLE",
            details={
                "provider_type": provider_type,
                "required_operations": required_operations,
            },
        )


[docs] class InitializationError(IntegrationError): """Error during tracer initialization.""" def __init__(self, message: str, cause: Optional[Exception] = None): super().__init__( message, error_code="INITIALIZATION_ERROR", details={"cause": str(cause) if cause else None}, )
class SpanProcessingError(IntegrationError): """Error during span processing.""" def __init__(self, span_name: str, cause: Optional[Exception] = None): message = f"Error processing span '{span_name}'" super().__init__( message, error_code="SPAN_PROCESSING_ERROR", details={"span_name": span_name, "cause": str(cause) if cause else None}, )
[docs] class ExportError(IntegrationError): """Error during span export.""" def __init__(self, export_type: str, cause: Optional[Exception] = None): message = f"Error exporting spans via {export_type}" super().__init__( message, error_code="EXPORT_ERROR", details={ "export_type": export_type, "cause": str(cause) if cause else None, }, )
[docs] class ErrorSeverity(Enum): """Error severity levels for dynamic handling.""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical"
[docs] class ResilienceLevel(Enum): """Resilience levels for dynamic error handling strategies.""" STRICT = "strict" # Fail fast, no retries BALANCED = "balanced" # Some retries, graceful degradation RESILIENT = "resilient" # Maximum retries, always degrade gracefully
@dataclass class ErrorContext: """Dynamic error context with extensible metadata.""" error: Exception severity: ErrorSeverity = ErrorSeverity.MEDIUM component: str = "unknown" operation: str = "unknown" metadata: Dict[str, Any] = field(default_factory=dict) timestamp: float = field(default_factory=time.time) retry_count: int = 0 max_retries: int = 3 @dataclass class RecoveryStrategy: """Dynamic recovery strategy configuration.""" name: str handler: Callable[[ErrorContext], bool] applicable_errors: List[str] = field(default_factory=list) max_attempts: int = 3 backoff_multiplier: float = 1.5 base_delay: float = 0.1 class ErrorHandler: """Dynamic error handler with extensible strategies and patterns.""" def __init__( self, resilience_level: ResilienceLevel = ResilienceLevel.BALANCED, tracer_instance: Any = None, ): """Initialize error handler with dynamic configuration. Args: resilience_level: Level of resilience for error handling tracer_instance: Optional tracer instance for logging context """ self.resilience_level = resilience_level self.tracer_instance = tracer_instance self._lock = threading.Lock() self._error_history: List[ErrorContext] = [] self._recovery_strategies = self._build_recovery_strategies_dynamically() self._error_patterns = self._build_error_patterns_dynamically() def _build_recovery_strategies_dynamically(self) -> List[RecoveryStrategy]: """Dynamically build recovery strategies based on resilience level. Returns: List of recovery strategies """ strategies = [] # Base strategies available for all resilience levels strategies.extend( [ RecoveryStrategy( name="graceful_degradation", handler=self._graceful_degradation_handler, applicable_errors=["PROVIDER_INCOMPATIBLE", "INITIALIZATION_ERROR"], max_attempts=1, ), RecoveryStrategy( name="retry_with_backoff", handler=self._retry_with_backoff_handler, applicable_errors=["EXPORT_ERROR", "SPAN_PROCESSING_ERROR"], max_attempts=self._get_max_retries_for_level(), backoff_multiplier=1.5, base_delay=0.1, ), ] ) # Add resilience-level specific strategies if self.resilience_level in { ResilienceLevel.BALANCED, ResilienceLevel.RESILIENT, }: strategies.append( RecoveryStrategy( name="fallback_provider", handler=self._fallback_provider_handler, applicable_errors=["PROVIDER_INCOMPATIBLE"], max_attempts=1, ) ) if self.resilience_level == ResilienceLevel.RESILIENT: strategies.append( RecoveryStrategy( name="console_fallback", handler=self._console_fallback_handler, applicable_errors=["EXPORT_ERROR"], max_attempts=1, ) ) return strategies def _build_error_patterns_dynamically(self) -> Dict[str, Dict[str, Any]]: """Dynamically build error patterns for classification. Returns: Dictionary of error patterns and their configurations """ return { "connection_errors": { "patterns": ["connection", "timeout", "network", "unreachable"], "severity": ErrorSeverity.MEDIUM, "retry_eligible": True, }, "authentication_errors": { "patterns": ["auth", "unauthorized", "forbidden", "api_key"], "severity": ErrorSeverity.HIGH, "retry_eligible": False, }, "provider_errors": { "patterns": ["provider", "incompatible", "unsupported"], "severity": ErrorSeverity.HIGH, "retry_eligible": False, }, "processing_errors": { "patterns": ["processing", "span", "attribute"], "severity": ErrorSeverity.LOW, "retry_eligible": True, }, } def _get_max_retries_for_level(self) -> int: """Dynamically get max retries based on resilience level. Returns: Maximum number of retries """ retry_mapping = { ResilienceLevel.STRICT: 0, ResilienceLevel.BALANCED: 3, ResilienceLevel.RESILIENT: 5, } return retry_mapping.get(self.resilience_level, 3) def handle_error( self, error: Exception, component: str = "unknown", operation: str = "unknown", **metadata: Any, ) -> bool: """Dynamically handle error with appropriate recovery strategy. Args: error: Exception that occurred component: Component where error occurred operation: Operation that failed **metadata: Additional error metadata Returns: bool: True if error was handled successfully, False otherwise """ with self._lock: # Create error context error_context = self._create_error_context_dynamically( error, component, operation, metadata ) # Record error in history self._record_error_dynamically(error_context) # Classify error severity error_context.severity = self._classify_error_severity_dynamically(error) # Apply recovery strategies recovery_success = self._apply_recovery_strategies_dynamically( error_context ) # Log error handling result self._log_error_handling_result_dynamically(error_context, recovery_success) return recovery_success def _create_error_context_dynamically( self, error: Exception, component: str, operation: str, metadata: Dict[str, Any], ) -> ErrorContext: """Dynamically create error context with comprehensive information. Args: error: Exception that occurred component: Component where error occurred operation: Operation that failed metadata: Additional error metadata Returns: ErrorContext with comprehensive error information """ return ErrorContext( error=error, component=component, operation=operation, metadata=metadata, max_retries=self._get_max_retries_for_level(), ) def _record_error_dynamically(self, error_context: ErrorContext) -> None: """Dynamically record error in history with size management. Args: error_context: Error context to record """ self._error_history.append(error_context) # Dynamic history size management max_history_size = 100 if len(self._error_history) > max_history_size: self._error_history = self._error_history[-max_history_size:] def _classify_error_severity_dynamically(self, error: Exception) -> ErrorSeverity: """Dynamically classify error severity using pattern matching. Args: error: Exception to classify Returns: ErrorSeverity level """ error_message = str(error).lower() error_type = type(error).__name__.lower() # Dynamic pattern matching for _pattern_name, pattern_config in self._error_patterns.items(): patterns = pattern_config["patterns"] if any( pattern in error_message or pattern in error_type for pattern in patterns ): return ErrorSeverity(pattern_config["severity"]) # Default severity for unclassified errors return ErrorSeverity.MEDIUM def _apply_recovery_strategies_dynamically( self, error_context: ErrorContext ) -> bool: """Dynamically apply recovery strategies based on error context. Args: error_context: Error context to handle Returns: bool: True if recovery was successful """ error_code = getattr(error_context.error, "error_code", "UNKNOWN_ERROR") # Find applicable strategies applicable_strategies = self._find_applicable_strategies_dynamically(error_code) # Apply strategies in order for strategy in applicable_strategies: try: if self._execute_recovery_strategy_dynamically(strategy, error_context): return True except Exception as strategy_error: safe_log( self.tracer_instance, "warning", "Recovery strategy failed", honeyhive_data={ "strategy": strategy.name, "error": str(strategy_error), "original_error": str(error_context.error), }, ) continue return False def _find_applicable_strategies_dynamically( self, error_code: str ) -> List[RecoveryStrategy]: """Dynamically find applicable recovery strategies. Args: error_code: Error code to match against Returns: List of applicable recovery strategies """ applicable = [] for strategy in self._recovery_strategies: if ( not strategy.applicable_errors or error_code in strategy.applicable_errors ): applicable.append(strategy) # Sort by priority (could be made dynamic in future) return applicable def _execute_recovery_strategy_dynamically( self, strategy: RecoveryStrategy, error_context: ErrorContext ) -> bool: """Dynamically execute recovery strategy with backoff. Args: strategy: Recovery strategy to execute error_context: Error context Returns: bool: True if strategy succeeded """ for attempt in range(strategy.max_attempts): try: if strategy.handler(error_context): return True # Dynamic backoff calculation if attempt < strategy.max_attempts - 1: delay = strategy.base_delay * (strategy.backoff_multiplier**attempt) time.sleep(delay) except Exception as handler_error: safe_log( self.tracer_instance, "debug", "Recovery strategy handler failed", honeyhive_data={ "strategy": strategy.name, "attempt": attempt + 1, "error": str(handler_error), }, ) continue return False def _log_error_handling_result_dynamically( self, error_context: ErrorContext, recovery_success: bool ) -> None: """Dynamically log error handling result. Args: error_context: Error context that was handled recovery_success: Whether recovery was successful """ log_level = "info" if recovery_success else "warning" log_message = ( "Error handled successfully" if recovery_success else "Error handling failed" ) safe_log( self.tracer_instance, log_level, log_message, honeyhive_data={ "component": error_context.component, "operation": error_context.operation, "error_type": type(error_context.error).__name__, "severity": error_context.severity.value, "recovery_success": recovery_success, "retry_count": error_context.retry_count, }, ) # Recovery strategy handlers def _graceful_degradation_handler(self, error_context: ErrorContext) -> bool: """Handle error with graceful degradation. Args: error_context: Error context Returns: bool: True if degradation successful """ safe_log( self.tracer_instance, "info", "Applying graceful degradation", honeyhive_data={ "component": error_context.component, "operation": error_context.operation, }, ) # Graceful degradation always succeeds by definition return True def _retry_with_backoff_handler(self, error_context: ErrorContext) -> bool: """Handle error with retry and backoff. Args: error_context: Error context Returns: bool: True if retry should be attempted """ if error_context.retry_count < error_context.max_retries: error_context.retry_count += 1 return False # Indicate retry needed return True # Max retries reached, give up def _fallback_provider_handler(self, error_context: ErrorContext) -> bool: """Handle error by falling back to alternative provider. Args: error_context: Error context Returns: bool: True if fallback successful """ safe_log( self.tracer_instance, "info", "Falling back to alternative provider", honeyhive_data={ "component": error_context.component, "original_error": str(error_context.error), }, ) # Implementation would set up fallback provider return True def _console_fallback_handler(self, error_context: ErrorContext) -> bool: """Handle error by falling back to console logging. Args: error_context: Error context Returns: bool: True if console fallback successful """ safe_log( self.tracer_instance, "info", "Falling back to console logging", honeyhive_data={ "component": error_context.component, "original_error": str(error_context.error), }, ) # Console fallback always succeeds return True def get_error_statistics(self) -> Dict[str, Any]: """Get dynamic error statistics. Returns: Dictionary with error statistics """ with self._lock: if not self._error_history: return {"total_errors": 0} # Dynamic statistics calculation stats = { "total_errors": len(self._error_history), "error_types": self._calculate_error_type_distribution(), "severity_distribution": self._calculate_severity_distribution(), "component_distribution": self._calculate_component_distribution(), "recent_errors": len( [ e for e in self._error_history if time.time() - e.timestamp < 300 # Last 5 minutes ] ), } return stats def _calculate_error_type_distribution(self) -> Dict[str, int]: """Calculate error type distribution.""" distribution: Dict[str, int] = {} for error_context in self._error_history: error_type = type(error_context.error).__name__ distribution[error_type] = distribution.get(error_type, 0) + 1 return distribution def _calculate_severity_distribution(self) -> Dict[str, int]: """Calculate severity distribution.""" distribution: Dict[str, int] = {} for error_context in self._error_history: severity = error_context.severity.value distribution[severity] = distribution.get(severity, 0) + 1 return distribution def _calculate_component_distribution(self) -> Dict[str, int]: """Calculate component distribution.""" distribution: Dict[str, int] = {} for error_context in self._error_history: component = error_context.component distribution[component] = distribution.get(component, 0) + 1 return distribution def get_error_handler( resilience_level: ResilienceLevel = ResilienceLevel.BALANCED, tracer_instance: Any = None, ) -> ErrorHandler: """Get or create per-tracer-instance error handler with dynamic configuration. Args: resilience_level: Resilience level for error handling tracer_instance: Tracer instance for logging context and isolation Returns: ErrorHandler instance (per-tracer-instance for proper isolation) """ # Multi-instance architecture: Each tracer gets its own error handler if tracer_instance is not None: # Check if tracer already has an error handler if not hasattr(tracer_instance, "_error_handler"): # Internal SDK code accessing tracer's error handler attribute # Protected access is required for multi-instance architecture tracer_instance._error_handler = ( # pylint: disable=protected-access ErrorHandler(resilience_level, tracer_instance) ) error_handler = ( tracer_instance._error_handler # pylint: disable=protected-access ) return cast(ErrorHandler, error_handler) # Fallback: Create new handler for cases without tracer instance return ErrorHandler(resilience_level, tracer_instance) def with_error_handling( component: str = "unknown", operation: str = "unknown", resilience_level: ResilienceLevel = ResilienceLevel.BALANCED, tracer_instance: Any = None, ) -> Any: """Decorator for dynamic error handling. Args: component: Component name for error context operation: Operation name for error context resilience_level: Resilience level for error handling tracer_instance: Optional tracer instance for logging context Returns: Decorator function """ def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: try: return func(*args, **kwargs) except Exception as e: error_handler = get_error_handler(resilience_level, tracer_instance) handled = error_handler.handle_error( e, component=component, operation=operation, function_name=func.__name__, args_count=len(args), kwargs_keys=list(kwargs.keys()), ) if not handled and resilience_level == ResilienceLevel.STRICT: raise # Return None or appropriate default for graceful degradation return None return wrapper return decorator