Source code for honeyhive.tracer.processing.otlp_profiles

"""Environment-aware OTLP configuration profiles with full dynamic logic.

This module provides intelligent OTLP session configuration profiles that
automatically optimize connection pooling based on detected environment
characteristics, leveraging the existing resource detection from the tracer core.

Key Features:
- Reuses existing environment detection from tracer core (no duplication)
- Fully dynamic configuration - no hardcoded values
- Resource-aware optimization based on actual system characteristics

- Environment-specific optimization patterns
- Intelligent scaling based on detected constraints

Architecture:
- Leverages tracer._detect_container_environment_dynamically()
- Leverages tracer._detect_cloud_environment_dynamically()
- Uses dynamic logic throughout - all values calculated from environment
"""

# pylint: disable=duplicate-code
# Pydantic field validators are domain-specific but share identical validation logic

import multiprocessing
import os
from typing import Any, Dict, Optional, Tuple

from pydantic import BaseModel, ConfigDict, Field, field_validator

from ...utils.logger import safe_log
from ..infra.environment import get_comprehensive_environment_analysis
from .otlp_session import OTLPSessionConfig

# Removed unnecessary wrapper - use environment module directly


[docs] class EnvironmentProfile(BaseModel): """Environment-specific OTLP configuration profile.""" model_config = ConfigDict( validate_assignment=True, extra="forbid", frozen=False, # Allow modifications during dynamic adjustments ) name: str = Field( ..., description="Profile name for identification", min_length=1, max_length=100, ) pool_connections: int = Field( ..., description="Number of connection pools to maintain", ge=1, le=50, ) pool_maxsize: int = Field( ..., description="Maximum size of each connection pool", ge=1, le=100, ) max_retries: int = Field( ..., description="Maximum number of retry attempts", ge=0, le=10, ) timeout: float = Field( ..., description="Request timeout in seconds", gt=0.0, le=300.0, ) backoff_factor: float = Field( ..., description="Backoff factor for retry delays", ge=0.0, le=5.0, ) description: str = Field( "", description="Human-readable profile description", max_length=500, ) pool_block: bool = Field( False, description="Whether connection pool should block when exhausted", ) additional_config: Optional[Dict[str, Any]] = Field( None, description="Additional configuration parameters", )
[docs] @field_validator("pool_maxsize") @classmethod def validate_pool_maxsize(cls, v: int, info: Any) -> int: """Ensure pool_maxsize is at least as large as pool_connections. Note: This validation logic is intentionally duplicated between EnvironmentProfile and OTLPSessionConfig classes as both need the same pool size validation constraints. """ if hasattr(info, "data") and "pool_connections" in info.data: pool_connections = info.data["pool_connections"] if v < pool_connections: return int(max(pool_connections, v)) return v
# EnvironmentAnalyzer class removed - profiles are now pure consumers of # environment data # All environment analysis is handled by the dedicated environment.py module def _determine_environment_type( container_info: Dict[str, Any], cloud_info: Dict[str, Any] ) -> str: """Determine primary environment type from detection results.""" # Serverless takes highest priority if cloud_info.get("faas.name"): return "aws_lambda" # Container orchestration if container_info.get("k8s.cluster.name"): return "kubernetes" if container_info.get("container.runtime") == "docker": return "docker" # Cloud providers cloud_provider = cloud_info.get("cloud.provider") if cloud_provider == "aws": return "aws_ec2" if cloud_provider == "gcp": return "gcp" if cloud_provider == "azure": return "azure" return "standard" def _analyze_resource_constraints() -> Dict[str, Any]: """Dynamically analyze system resource constraints.""" constraints: Dict[str, Any] = {} try: # Memory analysis if lambda_memory := os.getenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE"): memory_mb = int(lambda_memory) constraints["memory_mb"] = memory_mb constraints["memory_tier"] = ( "low" if memory_mb < 512 else "medium" if memory_mb < 1024 else "high" ) else: # Default memory tier based on environment constraints["memory_tier"] = "medium" # CPU analysis try: cpu_count = multiprocessing.cpu_count() constraints["cpu_count"] = cpu_count constraints["cpu_tier"] = ( "low" if cpu_count <= 2 else "medium" if cpu_count <= 8 else "high" ) except: constraints["cpu_count"] = 1 constraints["cpu_tier"] = "low" except Exception as e: constraints["analysis_error"] = str(e) return constraints def _analyze_performance_characteristics(environment_type: str) -> Dict[str, Any]: """Dynamically analyze performance characteristics.""" characteristics = {} try: # Execution model based on environment if environment_type == "aws_lambda": characteristics.update( { "execution_model": "serverless", "cold_start_sensitive": True, "connection_reuse_critical": True, "latency_sensitivity": "high", } ) elif environment_type == "kubernetes": characteristics.update( { "execution_model": "orchestrated", "scaling_dynamic": True, "connection_persistence": "medium", "latency_sensitivity": "standard", } ) else: characteristics.update( { "execution_model": "persistent", "connection_persistence": "high", "latency_sensitivity": "standard", } ) # Dynamic concurrency analysis if os.getenv("HH_HIGH_CONCURRENCY") == "true": characteristics["concurrency_pattern"] = "high" elif environment_type == "aws_lambda": characteristics["concurrency_pattern"] = "burst" else: characteristics["concurrency_pattern"] = "standard" # Dynamic latency sensitivity session_name = os.getenv("HH_SESSION_NAME", "").lower() if "benchmark" in session_name or "load" in session_name: characteristics["latency_sensitivity"] = "critical" except Exception as e: characteristics["analysis_error"] = str(e) return characteristics class EnvironmentProfileManager: """Manages environment-specific OTLP configuration profiles.""" # Predefined environment profiles PROFILES = { "aws_lambda": EnvironmentProfile( name="AWS Lambda", description="Optimized for AWS Lambda serverless functions", pool_connections=3, # Minimal pools for cold start speed pool_maxsize=8, # Small pools due to memory constraints max_retries=2, # Fast failure for timeout constraints timeout=10.0, # Short timeout for Lambda limits backoff_factor=0.1, # Very fast backoff additional_config={ "connection_reuse_priority": "critical", "cold_start_optimization": True, }, ), "kubernetes": EnvironmentProfile( name="Kubernetes", description="Optimized for Kubernetes orchestrated environments", pool_connections=12, # Moderate pools for scaling pool_maxsize=20, # Balanced for pod resources max_retries=4, # More retries for network resilience timeout=25.0, # Reasonable timeout for orchestrated networking backoff_factor=0.3, # Moderate backoff additional_config={ "graceful_shutdown": True, "scaling_aware": True, }, ), "docker": EnvironmentProfile( name="Docker Container", description="Optimized for Docker containerized applications", pool_connections=8, # Moderate pools pool_maxsize=15, # Container resource aware max_retries=3, # Standard retries timeout=20.0, # Container networking timeout backoff_factor=0.4, # Standard backoff additional_config={ "container_optimized": True, }, ), "gcp": EnvironmentProfile( name="Google Cloud Platform", description="Optimized for GCP environments", pool_connections=10, # GCP networking optimized pool_maxsize=18, # GCP resource patterns max_retries=3, # GCP reliability patterns timeout=22.0, # GCP network characteristics backoff_factor=0.35, # GCP-tuned backoff additional_config={ "gcp_optimized": True, }, ), "azure": EnvironmentProfile( name="Microsoft Azure", description="Optimized for Azure cloud environments", pool_connections=10, # Azure networking patterns pool_maxsize=18, # Azure resource allocation max_retries=4, # Azure resilience patterns timeout=24.0, # Azure network characteristics backoff_factor=0.4, # Azure-tuned backoff additional_config={ "azure_optimized": True, }, ), "aws_ec2": EnvironmentProfile( name="AWS EC2", description="Optimized for AWS EC2 instances", pool_connections=15, # EC2 networking capacity pool_maxsize=25, # EC2 resource availability max_retries=3, # AWS reliability timeout=30.0, # EC2 network performance backoff_factor=0.3, # AWS-tuned backoff additional_config={ "aws_optimized": True, "ec2_instance": True, }, ), "standard": EnvironmentProfile( name="Standard Environment", description="Default profile for standard server environments", pool_connections=10, # Balanced default pool_maxsize=20, # Standard capacity max_retries=3, # Standard resilience timeout=30.0, # Standard timeout backoff_factor=0.5, # Standard backoff additional_config={ "standard_environment": True, }, ), } @classmethod def get_optimal_profile( cls, tracer_instance: Optional[Any] = None ) -> Tuple[EnvironmentProfile, Dict[str, Any]]: """Get the optimal OTLP profile for the current environment. Args: tracer_instance: Optional tracer instance for logging context Returns: Tuple of (selected_profile, environment_analysis) """ # Get comprehensive environment analysis directly from environment module # Profiles are pure consumers - no environment detection logic here safe_log( tracer_instance, "debug", "Getting environment analysis for OTLP profile optimization", ) env_analysis = get_comprehensive_environment_analysis(tracer_instance) env_type = env_analysis.get("environment_type", "standard") _resource_constraints = env_analysis.get("resource_constraints", {}) _performance_chars = env_analysis.get("performance_characteristics", {}) # Use the comprehensive analysis directly environment_analysis = env_analysis # Select base profile base_profile = cls.PROFILES.get(env_type, cls.PROFILES["standard"]) # Create optimized profile with dynamic adjustments optimized_profile = cls._apply_dynamic_adjustments( base_profile, environment_analysis, tracer_instance ) safe_log( tracer_instance, "info", f"Selected OTLP profile: {optimized_profile.name}", honeyhive_data={ "profile_name": optimized_profile.name, "environment_analysis": environment_analysis, "profile_config": { "pool_connections": optimized_profile.pool_connections, "pool_maxsize": optimized_profile.pool_maxsize, "timeout": optimized_profile.timeout, "max_retries": optimized_profile.max_retries, }, }, ) return optimized_profile, environment_analysis @classmethod def _apply_dynamic_adjustments( cls, base_profile: EnvironmentProfile, environment_analysis: Dict[str, Any], tracer_instance: Optional[Any] = None, ) -> EnvironmentProfile: """Apply dynamic adjustments to base profile based on environment analysis.""" # Create a copy for modification using Pydantic model_copy adjusted_profile = base_profile.model_copy( update={ "name": f"{base_profile.name} (Optimized)", "description": f"{base_profile.description} with dynamic adjustments", "additional_config": ( base_profile.additional_config.copy() if base_profile.additional_config else {} ), } ) try: constraints = environment_analysis.get("resource_constraints", {}) performance = environment_analysis.get("performance_characteristics", {}) # Memory-based adjustments memory_tier = constraints.get("memory_tier", "medium") if memory_tier == "low": adjusted_profile.pool_connections = max( 2, adjusted_profile.pool_connections // 2 ) adjusted_profile.pool_maxsize = max( 5, adjusted_profile.pool_maxsize // 2 ) elif memory_tier == "high": adjusted_profile.pool_connections = min( 20, int(adjusted_profile.pool_connections * 1.3) ) adjusted_profile.pool_maxsize = min( 40, int(adjusted_profile.pool_maxsize * 1.3) ) # CPU-based adjustments cpu_tier = constraints.get("cpu_tier", "medium") if cpu_tier == "low": adjusted_profile.max_retries = max(1, adjusted_profile.max_retries - 1) elif cpu_tier == "high": adjusted_profile.max_retries = min(6, adjusted_profile.max_retries + 1) # Latency sensitivity adjustments latency_sensitivity = performance.get("latency_sensitivity", "standard") if latency_sensitivity == "critical": adjusted_profile.timeout = max(5.0, adjusted_profile.timeout * 0.6) adjusted_profile.backoff_factor = max( 0.1, adjusted_profile.backoff_factor * 0.5 ) adjusted_profile.max_retries = max(1, adjusted_profile.max_retries - 1) elif latency_sensitivity == "high": adjusted_profile.timeout = max(8.0, adjusted_profile.timeout * 0.8) adjusted_profile.backoff_factor = max( 0.2, adjusted_profile.backoff_factor * 0.7 ) # Concurrency pattern adjustments concurrency_pattern = performance.get("concurrency_pattern", "standard") if concurrency_pattern == "high": adjusted_profile.pool_connections = min( 25, int(adjusted_profile.pool_connections * 1.5) ) adjusted_profile.pool_maxsize = min( 50, int(adjusted_profile.pool_maxsize * 1.4) ) elif concurrency_pattern == "burst": # Optimize for burst scaling (like Lambda) adjusted_profile.pool_connections = max( 3, adjusted_profile.pool_connections ) adjusted_profile.pool_maxsize = max(8, adjusted_profile.pool_maxsize) # Add adjustment metadata if adjusted_profile.additional_config is not None: adjusted_profile.additional_config["dynamic_adjustments"] = { "memory_tier": memory_tier, "cpu_tier": cpu_tier, "latency_sensitivity": latency_sensitivity, "concurrency_pattern": concurrency_pattern, } except Exception as e: safe_log( tracer_instance, "warning", f"Failed to apply dynamic adjustments: {e}", honeyhive_data={"error_type": type(e).__name__}, ) # Return base profile if adjustments fail return base_profile return adjusted_profile @classmethod def create_otlp_config_from_profile( cls, profile: EnvironmentProfile, _tracer_instance: Optional[Any] = None, **overrides: Any, ) -> OTLPSessionConfig: """Create an OTLPSessionConfig from an environment profile. Args: profile: Environment profile to use as base tracer_instance: Optional tracer instance for context **overrides: Explicit configuration overrides Returns: Configured OTLPSessionConfig """ config_params: Dict[str, Any] = { "pool_connections": int(profile.pool_connections), "pool_maxsize": int(profile.pool_maxsize), "max_retries": int(profile.max_retries), "timeout": float(profile.timeout) if profile.timeout is not None else None, "backoff_factor": float(profile.backoff_factor), "pool_block": bool(profile.pool_block), } # Apply any explicit overrides with proper type conversion for key, value in overrides.items(): if key in ["pool_connections", "pool_maxsize", "max_retries"]: config_params[key] = int(value) elif key in ["timeout", "backoff_factor"]: config_params[key] = float(value) if value is not None else None elif key == "pool_block": config_params[key] = bool(value) elif key == "retry_status_codes": config_params[key] = ( list(value) if isinstance(value, (list, tuple)) else [int(value)] ) else: config_params[key] = value return OTLPSessionConfig(**config_params) def get_environment_optimized_config( tracer_instance: Optional[Any] = None, **overrides: Any ) -> OTLPSessionConfig: """Get environment-optimized OTLP configuration. This is the main entry point for getting an OTLP configuration that's automatically optimized for the current environment. Args: tracer_instance: Optional tracer instance for context **overrides: Explicit configuration overrides Returns: Environment-optimized OTLPSessionConfig """ profile_manager = EnvironmentProfileManager() optimal_profile, _environment_analysis = profile_manager.get_optimal_profile( tracer_instance ) return profile_manager.create_otlp_config_from_profile( optimal_profile, tracer_instance, **overrides )