Skip to content

honeyhive.tracer.integration

Integration framework for HoneyHive tracer with external systems.

This module provides dynamic integration capabilities for OpenTelemetry providers, error handling, compatibility layers, and HTTP instrumentation. All components use dynamic logic patterns for flexible, extensible integration strategies.

IntegrationStrategy

Bases: Enum

Integration strategies for different provider types.

Source code in src/honeyhive/tracer/integration/detection.py
38
39
40
41
42
43
class IntegrationStrategy(Enum):
    """Integration strategies for different provider types."""

    MAIN_PROVIDER = "main_provider"
    INDEPENDENT_PROVIDER = "independent_provider"
    CONSOLE_FALLBACK = "console_fallback"

MAIN_PROVIDER class-attribute instance-attribute

MAIN_PROVIDER = 'main_provider'

INDEPENDENT_PROVIDER class-attribute instance-attribute

INDEPENDENT_PROVIDER = 'independent_provider'

CONSOLE_FALLBACK class-attribute instance-attribute

CONSOLE_FALLBACK = 'console_fallback'

ProviderDetector

Dynamically detects and classifies existing OpenTelemetry TracerProviders.

Source code in src/honeyhive/tracer/integration/detection.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
class ProviderDetector:
    """Dynamically detects and classifies existing OpenTelemetry TracerProviders."""

    def __init__(self, tracer_instance: Any = None) -> None:
        """Initialize the provider detector with dynamic detection patterns.

        Args:
            tracer_instance: Optional tracer instance for logging context
        """
        self.tracer_instance = tracer_instance
        self._detection_patterns = self._build_detection_patterns_dynamically()
        self._strategy_rules = self._build_strategy_rules_dynamically()

    def _build_detection_patterns_dynamically(self) -> Dict[str, List[str]]:
        """Dynamically build provider detection patterns.

        Returns:
            Dictionary mapping provider types to detection patterns
        """
        return {
            "noop": ["NoOp", "NoOpTracerProvider"],
            "proxy_tracer_provider": ["Proxy", "ProxyTracerProvider"],
            "tracer_provider": ["TracerProvider"],
            "custom": [],  # Fallback for unrecognized patterns
        }

    def _build_strategy_rules_dynamically(
        self,
    ) -> Dict[ProviderType, IntegrationStrategy]:
        """Dynamically build integration strategy rules.

        Returns:
            Dictionary mapping provider types to integration strategies
        """
        return {
            ProviderType.NOOP: IntegrationStrategy.MAIN_PROVIDER,
            ProviderType.PROXY_TRACER_PROVIDER: IntegrationStrategy.MAIN_PROVIDER,
            ProviderType.TRACER_PROVIDER: IntegrationStrategy.INDEPENDENT_PROVIDER,
            ProviderType.CUSTOM: IntegrationStrategy.INDEPENDENT_PROVIDER,
        }

    def detect_provider_type(self) -> ProviderType:
        """Dynamically detect the type of existing TracerProvider.

        Returns:
            ProviderType: The detected provider type
        """
        existing_provider = trace.get_tracer_provider()

        # Dynamic provider type detection
        provider_type = self._classify_provider_dynamically(existing_provider)

        safe_log(
            self.tracer_instance,
            "debug",
            "Provider type detected",
            honeyhive_data={
                "provider_class": type(existing_provider).__name__,
                "detected_type": provider_type.value,
            },
        )

        return provider_type

    def _classify_provider_dynamically(self, provider: Any) -> ProviderType:
        """Dynamically classify provider using pattern matching.

        Args:
            provider: The provider instance to classify

        Returns:
            ProviderType: The classified provider type
        """
        if provider is None:
            return ProviderType.NOOP

        provider_name = type(provider).__name__

        # Dynamic pattern matching
        for provider_type, patterns in self._detection_patterns.items():
            if self._matches_patterns_dynamically(provider_name, patterns):
                return ProviderType(provider_type)

        # Special case for TracerProvider - check for exclusions
        if self._is_tracer_provider_dynamically(provider_name):
            return ProviderType.TRACER_PROVIDER

        # Default to custom for unrecognized providers
        return ProviderType.CUSTOM

    def _matches_patterns_dynamically(
        self, provider_name: str, patterns: List[str]
    ) -> bool:
        """Dynamically match provider name against patterns.

        Args:
            provider_name: Name of the provider class
            patterns: List of patterns to match against

        Returns:
            bool: True if provider name matches any pattern
        """
        return any(pattern in provider_name for pattern in patterns)

    def _is_tracer_provider_dynamically(self, provider_name: str) -> bool:
        """Dynamically check if provider is a real TracerProvider.

        Args:
            provider_name: Name of the provider class

        Returns:
            bool: True if provider is a real TracerProvider
        """
        # Dynamic exclusion patterns
        exclusion_patterns = ["Proxy", "NoOp", "Custom"]

        # Check for TracerProvider with exclusions
        return provider_name == "TracerProvider" or (
            "TracerProvider" in provider_name
            and not any(exclusion in provider_name for exclusion in exclusion_patterns)
        )

    def get_integration_strategy(
        self, provider_type: Optional[ProviderType] = None
    ) -> IntegrationStrategy:
        """Dynamically determine integration strategy using Provider Intelligence.

        This implements the Provider Strategy Intelligence:
        - Main Provider Strategy: Replace non-functioning providers (NoOp/Proxy/Empty)
        - Independent Provider Strategy: Coexist with functioning providers
        - Critical: Someone must process instrumentor spans - empty providers lose data

        Args:
            provider_type: Optional provider type. If None, will detect automatically.

        Returns:
            IntegrationStrategy: The recommended integration strategy
        """
        if provider_type is None:
            provider_type = self.detect_provider_type()

        # PROVIDER STRATEGY INTELLIGENCE: Check if current provider is functioning
        current_provider = trace.get_tracer_provider()
        is_functioning = _is_functioning_tracer_provider(
            current_provider, self.tracer_instance
        )

        if is_functioning:
            # Functioning provider exists - use Independent Provider Strategy
            # Coexist with existing observability systems
            strategy = IntegrationStrategy.INDEPENDENT_PROVIDER
            safe_log(
                self.tracer_instance,
                "debug",
                "Using Independent Provider Strategy - functioning provider detected",
                honeyhive_data={
                    "provider_type": provider_type.value,
                    "strategy": "independent_provider",
                    "reason": "functioning_provider_exists",
                },
            )
        else:
            # Non-functioning provider - use Main Provider Strategy
            # Replace empty providers to prevent instrumentor span loss
            strategy = IntegrationStrategy.MAIN_PROVIDER
            safe_log(
                self.tracer_instance,
                "debug",
                "Using Main Provider Strategy - non-functioning provider detected",
                honeyhive_data={
                    "provider_type": provider_type.value,
                    "strategy": "main_provider",
                    "reason": "non_functioning_provider",
                },
            )

        safe_log(
            self.tracer_instance,
            "debug",
            "Integration strategy determined",
            honeyhive_data={
                "provider_type": provider_type.value,
                "strategy": strategy.value,
            },
        )

        return strategy

    def _get_base_strategy_dynamically(
        self, provider_type: ProviderType
    ) -> IntegrationStrategy:
        """Dynamically get base integration strategy.

        Args:
            provider_type: The provider type

        Returns:
            IntegrationStrategy: Base integration strategy
        """
        return self._strategy_rules.get(
            provider_type, IntegrationStrategy.INDEPENDENT_PROVIDER
        )

    def _refine_tracer_provider_strategy_dynamically(
        self, _base_strategy: IntegrationStrategy
    ) -> IntegrationStrategy:
        """Dynamically refine strategy for TracerProvider based on functionality.

        Args:
            base_strategy: Base integration strategy

        Returns:
            IntegrationStrategy: Refined integration strategy
        """
        existing_provider = trace.get_tracer_provider()

        if self._is_functioning_tracer_provider_dynamically(existing_provider):
            # Functioning TracerProvider - maintain independence
            return IntegrationStrategy.INDEPENDENT_PROVIDER

        # Empty TracerProvider - become main provider to capture instrumentor spans
        return IntegrationStrategy.MAIN_PROVIDER

    def _is_functioning_tracer_provider_dynamically(self, provider: Any) -> bool:
        """Dynamically check if TracerProvider is functioning.

        Args:
            provider: The provider instance to check

        Returns:
            bool: True if provider has active processors/exporters
        """
        # Dynamic functionality detection patterns
        functionality_checks = [
            self._has_active_span_processor_dynamically,
            self._has_composite_processors_dynamically,
        ]

        # Apply functionality checks dynamically
        for check in functionality_checks:
            try:
                if check(provider):
                    return True
            except Exception as e:
                safe_log(
                    self.tracer_instance,
                    "debug",
                    "Functionality check failed",
                    honeyhive_data={
                        "check": check.__name__,
                        "error": str(e),
                    },
                )
                continue

        return False

    def _has_active_span_processor_dynamically(self, provider: Any) -> bool:
        """Dynamically check for active span processor.

        Args:
            provider: Provider to check

        Returns:
            bool: True if has active span processor
        """
        if not hasattr(provider, "_active_span_processor"):
            return False

        active_processor = getattr(provider, "_active_span_processor", None)
        return active_processor is not None

    def _has_composite_processors_dynamically(self, provider: Any) -> bool:
        """Dynamically check for composite processors.

        Args:
            provider: Provider to check

        Returns:
            bool: True if has composite processors
        """
        if not hasattr(provider, "_active_span_processor"):
            return False

        active_processor = getattr(provider, "_active_span_processor", None)
        if active_processor is None:
            return False

        if hasattr(active_processor, "_span_processors"):
            processors = getattr(active_processor, "_span_processors", [])
            return len(processors) > 0

        return False

    def can_add_span_processor(self) -> bool:
        """Dynamically check if the current provider supports adding span processors.

        Returns:
            bool: True if span processors can be added
        """
        existing_provider = trace.get_tracer_provider()

        # Dynamic capability detection
        capability_checks = [
            lambda p: hasattr(p, "add_span_processor"),
            lambda p: hasattr(p, "_active_span_processor"),
        ]

        return any(check(existing_provider) for check in capability_checks)

    def get_provider_info(self) -> Dict[str, Any]:
        """Dynamically gather comprehensive provider information.

        Returns:
            dict: Provider information including type, name, and capabilities
        """
        existing_provider = trace.get_tracer_provider()
        provider_type = self.detect_provider_type()
        integration_strategy = self.get_integration_strategy(provider_type)

        # Dynamic information gathering
        info = {
            "provider_instance": existing_provider,
            "provider_class_name": type(existing_provider).__name__,
            "provider_type": provider_type,
            "integration_strategy": integration_strategy,
            "supports_span_processors": self.can_add_span_processor(),
            "is_replaceable": self._is_replaceable_dynamically(provider_type),
            "is_functioning": _is_functioning_tracer_provider(
                existing_provider, self.tracer_instance
            ),
        }

        # Dynamic capability assessment
        info.update(self._assess_capabilities_dynamically(existing_provider))

        return info

    def _is_replaceable_dynamically(self, provider_type: ProviderType) -> bool:
        """Dynamically determine if provider is replaceable.

        Args:
            provider_type: The provider type

        Returns:
            bool: True if provider can be safely replaced
        """
        replaceable_types = {ProviderType.NOOP, ProviderType.PROXY_TRACER_PROVIDER}
        return provider_type in replaceable_types

    def _assess_capabilities_dynamically(self, provider: Any) -> Dict[str, Any]:
        """Dynamically assess provider capabilities.

        Args:
            provider: Provider to assess

        Returns:
            dict: Capability assessment results
        """
        capabilities = {}

        # Dynamic capability assessment patterns
        capability_assessments = [
            ("has_span_processors", lambda p: hasattr(p, "_active_span_processor")),
            ("has_resource", lambda p: hasattr(p, "resource")),
            ("has_sampler", lambda p: hasattr(p, "_sampler")),
            ("is_shutdown", lambda p: getattr(p, "_shutdown", False)),
        ]

        for capability_name, assessment_func in capability_assessments:
            try:
                capabilities[capability_name] = assessment_func(provider)
            except Exception:
                capabilities[capability_name] = False

        return capabilities

tracer_instance instance-attribute

tracer_instance = tracer_instance

detect_provider_type

detect_provider_type() -> ProviderType

Dynamically detect the type of existing TracerProvider.

Returns:

Name Type Description
ProviderType ProviderType

The detected provider type

Source code in src/honeyhive/tracer/integration/detection.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def detect_provider_type(self) -> ProviderType:
    """Dynamically detect the type of existing TracerProvider.

    Returns:
        ProviderType: The detected provider type
    """
    existing_provider = trace.get_tracer_provider()

    # Dynamic provider type detection
    provider_type = self._classify_provider_dynamically(existing_provider)

    safe_log(
        self.tracer_instance,
        "debug",
        "Provider type detected",
        honeyhive_data={
            "provider_class": type(existing_provider).__name__,
            "detected_type": provider_type.value,
        },
    )

    return provider_type

get_integration_strategy

get_integration_strategy(
    provider_type: Optional[ProviderType] = None,
) -> IntegrationStrategy

Dynamically determine integration strategy using Provider Intelligence.

This implements the Provider Strategy Intelligence: - Main Provider Strategy: Replace non-functioning providers (NoOp/Proxy/Empty) - Independent Provider Strategy: Coexist with functioning providers - Critical: Someone must process instrumentor spans - empty providers lose data

Parameters:

Name Type Description Default
provider_type Optional[ProviderType]

Optional provider type. If None, will detect automatically.

None

Returns:

Name Type Description
IntegrationStrategy IntegrationStrategy

The recommended integration strategy

Source code in src/honeyhive/tracer/integration/detection.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def get_integration_strategy(
    self, provider_type: Optional[ProviderType] = None
) -> IntegrationStrategy:
    """Dynamically determine integration strategy using Provider Intelligence.

    This implements the Provider Strategy Intelligence:
    - Main Provider Strategy: Replace non-functioning providers (NoOp/Proxy/Empty)
    - Independent Provider Strategy: Coexist with functioning providers
    - Critical: Someone must process instrumentor spans - empty providers lose data

    Args:
        provider_type: Optional provider type. If None, will detect automatically.

    Returns:
        IntegrationStrategy: The recommended integration strategy
    """
    if provider_type is None:
        provider_type = self.detect_provider_type()

    # PROVIDER STRATEGY INTELLIGENCE: Check if current provider is functioning
    current_provider = trace.get_tracer_provider()
    is_functioning = _is_functioning_tracer_provider(
        current_provider, self.tracer_instance
    )

    if is_functioning:
        # Functioning provider exists - use Independent Provider Strategy
        # Coexist with existing observability systems
        strategy = IntegrationStrategy.INDEPENDENT_PROVIDER
        safe_log(
            self.tracer_instance,
            "debug",
            "Using Independent Provider Strategy - functioning provider detected",
            honeyhive_data={
                "provider_type": provider_type.value,
                "strategy": "independent_provider",
                "reason": "functioning_provider_exists",
            },
        )
    else:
        # Non-functioning provider - use Main Provider Strategy
        # Replace empty providers to prevent instrumentor span loss
        strategy = IntegrationStrategy.MAIN_PROVIDER
        safe_log(
            self.tracer_instance,
            "debug",
            "Using Main Provider Strategy - non-functioning provider detected",
            honeyhive_data={
                "provider_type": provider_type.value,
                "strategy": "main_provider",
                "reason": "non_functioning_provider",
            },
        )

    safe_log(
        self.tracer_instance,
        "debug",
        "Integration strategy determined",
        honeyhive_data={
            "provider_type": provider_type.value,
            "strategy": strategy.value,
        },
    )

    return strategy

can_add_span_processor

can_add_span_processor() -> bool

Dynamically check if the current provider supports adding span processors.

Returns:

Name Type Description
bool bool

True if span processors can be added

Source code in src/honeyhive/tracer/integration/detection.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def can_add_span_processor(self) -> bool:
    """Dynamically check if the current provider supports adding span processors.

    Returns:
        bool: True if span processors can be added
    """
    existing_provider = trace.get_tracer_provider()

    # Dynamic capability detection
    capability_checks = [
        lambda p: hasattr(p, "add_span_processor"),
        lambda p: hasattr(p, "_active_span_processor"),
    ]

    return any(check(existing_provider) for check in capability_checks)

get_provider_info

get_provider_info() -> Dict[str, Any]

Dynamically gather comprehensive provider information.

Returns:

Name Type Description
dict Dict[str, Any]

Provider information including type, name, and capabilities

Source code in src/honeyhive/tracer/integration/detection.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def get_provider_info(self) -> Dict[str, Any]:
    """Dynamically gather comprehensive provider information.

    Returns:
        dict: Provider information including type, name, and capabilities
    """
    existing_provider = trace.get_tracer_provider()
    provider_type = self.detect_provider_type()
    integration_strategy = self.get_integration_strategy(provider_type)

    # Dynamic information gathering
    info = {
        "provider_instance": existing_provider,
        "provider_class_name": type(existing_provider).__name__,
        "provider_type": provider_type,
        "integration_strategy": integration_strategy,
        "supports_span_processors": self.can_add_span_processor(),
        "is_replaceable": self._is_replaceable_dynamically(provider_type),
        "is_functioning": _is_functioning_tracer_provider(
            existing_provider, self.tracer_instance
        ),
    }

    # Dynamic capability assessment
    info.update(self._assess_capabilities_dynamically(existing_provider))

    return info

ProviderType

Bases: Enum

Types of OpenTelemetry TracerProviders.

Source code in src/honeyhive/tracer/integration/detection.py
29
30
31
32
33
34
35
class ProviderType(Enum):
    """Types of OpenTelemetry TracerProviders."""

    NOOP = "noop"
    TRACER_PROVIDER = "tracer_provider"
    PROXY_TRACER_PROVIDER = "proxy_tracer_provider"
    CUSTOM = "custom"

NOOP class-attribute instance-attribute

NOOP = 'noop'

TRACER_PROVIDER class-attribute instance-attribute

TRACER_PROVIDER = 'tracer_provider'

PROXY_TRACER_PROVIDER class-attribute instance-attribute

PROXY_TRACER_PROVIDER = 'proxy_tracer_provider'

CUSTOM class-attribute instance-attribute

CUSTOM = 'custom'

ErrorHandler

Dynamic error handler with extensible strategies and patterns.

Source code in src/honeyhive/tracer/integration/error_handling.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
class ErrorHandler:
    """Dynamic error handler with extensible strategies and patterns."""

    def __init__(
        self,
        resilience_level: ResilienceLevel = ResilienceLevel.BALANCED,
        tracer_instance: Any = None,
    ):
        """Initialize error handler with dynamic configuration.

        Args:
            resilience_level: Level of resilience for error handling
            tracer_instance: Optional tracer instance for logging context
        """
        self.resilience_level = resilience_level
        self.tracer_instance = tracer_instance
        self._lock = threading.Lock()
        self._error_history: List[ErrorContext] = []
        self._recovery_strategies = self._build_recovery_strategies_dynamically()
        self._error_patterns = self._build_error_patterns_dynamically()

    def _build_recovery_strategies_dynamically(self) -> List[RecoveryStrategy]:
        """Dynamically build recovery strategies based on resilience level.

        Returns:
            List of recovery strategies
        """
        strategies = []

        # Base strategies available for all resilience levels
        strategies.extend(
            [
                RecoveryStrategy(
                    name="graceful_degradation",
                    handler=self._graceful_degradation_handler,
                    applicable_errors=["PROVIDER_INCOMPATIBLE", "INITIALIZATION_ERROR"],
                    max_attempts=1,
                ),
                RecoveryStrategy(
                    name="retry_with_backoff",
                    handler=self._retry_with_backoff_handler,
                    applicable_errors=["EXPORT_ERROR", "SPAN_PROCESSING_ERROR"],
                    max_attempts=self._get_max_retries_for_level(),
                    backoff_multiplier=1.5,
                    base_delay=0.1,
                ),
            ]
        )

        # Add resilience-level specific strategies
        if self.resilience_level in {
            ResilienceLevel.BALANCED,
            ResilienceLevel.RESILIENT,
        }:
            strategies.append(
                RecoveryStrategy(
                    name="fallback_provider",
                    handler=self._fallback_provider_handler,
                    applicable_errors=["PROVIDER_INCOMPATIBLE"],
                    max_attempts=1,
                )
            )

        if self.resilience_level == ResilienceLevel.RESILIENT:
            strategies.append(
                RecoveryStrategy(
                    name="console_fallback",
                    handler=self._console_fallback_handler,
                    applicable_errors=["EXPORT_ERROR"],
                    max_attempts=1,
                )
            )

        return strategies

    def _build_error_patterns_dynamically(self) -> Dict[str, Dict[str, Any]]:
        """Dynamically build error patterns for classification.

        Returns:
            Dictionary of error patterns and their configurations
        """
        return {
            "connection_errors": {
                "patterns": ["connection", "timeout", "network", "unreachable"],
                "severity": ErrorSeverity.MEDIUM,
                "retry_eligible": True,
            },
            "authentication_errors": {
                "patterns": ["auth", "unauthorized", "forbidden", "api_key"],
                "severity": ErrorSeverity.HIGH,
                "retry_eligible": False,
            },
            "provider_errors": {
                "patterns": ["provider", "incompatible", "unsupported"],
                "severity": ErrorSeverity.HIGH,
                "retry_eligible": False,
            },
            "processing_errors": {
                "patterns": ["processing", "span", "attribute"],
                "severity": ErrorSeverity.LOW,
                "retry_eligible": True,
            },
        }

    def _get_max_retries_for_level(self) -> int:
        """Dynamically get max retries based on resilience level.

        Returns:
            Maximum number of retries
        """
        retry_mapping = {
            ResilienceLevel.STRICT: 0,
            ResilienceLevel.BALANCED: 3,
            ResilienceLevel.RESILIENT: 5,
        }
        return retry_mapping.get(self.resilience_level, 3)

    def handle_error(
        self,
        error: Exception,
        component: str = "unknown",
        operation: str = "unknown",
        **metadata: Any,
    ) -> bool:
        """Dynamically handle error with appropriate recovery strategy.

        Args:
            error: Exception that occurred
            component: Component where error occurred
            operation: Operation that failed
            **metadata: Additional error metadata

        Returns:
            bool: True if error was handled successfully, False otherwise
        """
        with self._lock:
            # Create error context
            error_context = self._create_error_context_dynamically(
                error, component, operation, metadata
            )

            # Record error in history
            self._record_error_dynamically(error_context)

            # Classify error severity
            error_context.severity = self._classify_error_severity_dynamically(error)

            # Apply recovery strategies
            recovery_success = self._apply_recovery_strategies_dynamically(
                error_context
            )

            # Log error handling result
            self._log_error_handling_result_dynamically(error_context, recovery_success)

            return recovery_success

    def _create_error_context_dynamically(
        self,
        error: Exception,
        component: str,
        operation: str,
        metadata: Dict[str, Any],
    ) -> ErrorContext:
        """Dynamically create error context with comprehensive information.

        Args:
            error: Exception that occurred
            component: Component where error occurred
            operation: Operation that failed
            metadata: Additional error metadata

        Returns:
            ErrorContext with comprehensive error information
        """
        return ErrorContext(
            error=error,
            component=component,
            operation=operation,
            metadata=metadata,
            max_retries=self._get_max_retries_for_level(),
        )

    def _record_error_dynamically(self, error_context: ErrorContext) -> None:
        """Dynamically record error in history with size management.

        Args:
            error_context: Error context to record
        """
        self._error_history.append(error_context)

        # Dynamic history size management
        max_history_size = 100
        if len(self._error_history) > max_history_size:
            self._error_history = self._error_history[-max_history_size:]

    def _classify_error_severity_dynamically(self, error: Exception) -> ErrorSeverity:
        """Dynamically classify error severity using pattern matching.

        Args:
            error: Exception to classify

        Returns:
            ErrorSeverity level
        """
        error_message = str(error).lower()
        error_type = type(error).__name__.lower()

        # Dynamic pattern matching
        for _pattern_name, pattern_config in self._error_patterns.items():
            patterns = pattern_config["patterns"]
            if any(
                pattern in error_message or pattern in error_type
                for pattern in patterns
            ):
                return ErrorSeverity(pattern_config["severity"])

        # Default severity for unclassified errors
        return ErrorSeverity.MEDIUM

    def _apply_recovery_strategies_dynamically(
        self, error_context: ErrorContext
    ) -> bool:
        """Dynamically apply recovery strategies based on error context.

        Args:
            error_context: Error context to handle

        Returns:
            bool: True if recovery was successful
        """
        error_code = getattr(error_context.error, "error_code", "UNKNOWN_ERROR")

        # Find applicable strategies
        applicable_strategies = self._find_applicable_strategies_dynamically(error_code)

        # Apply strategies in order
        for strategy in applicable_strategies:
            try:
                if self._execute_recovery_strategy_dynamically(strategy, error_context):
                    return True
            except Exception as strategy_error:
                safe_log(
                    self.tracer_instance,
                    "warning",
                    "Recovery strategy failed",
                    honeyhive_data={
                        "strategy": strategy.name,
                        "error": str(strategy_error),
                        "original_error": str(error_context.error),
                    },
                )
                continue

        return False

    def _find_applicable_strategies_dynamically(
        self, error_code: str
    ) -> List[RecoveryStrategy]:
        """Dynamically find applicable recovery strategies.

        Args:
            error_code: Error code to match against

        Returns:
            List of applicable recovery strategies
        """
        applicable = []

        for strategy in self._recovery_strategies:
            if (
                not strategy.applicable_errors
                or error_code in strategy.applicable_errors
            ):
                applicable.append(strategy)

        # Sort by priority (could be made dynamic in future)
        return applicable

    def _execute_recovery_strategy_dynamically(
        self, strategy: RecoveryStrategy, error_context: ErrorContext
    ) -> bool:
        """Dynamically execute recovery strategy with backoff.

        Args:
            strategy: Recovery strategy to execute
            error_context: Error context

        Returns:
            bool: True if strategy succeeded
        """
        for attempt in range(strategy.max_attempts):
            try:
                if strategy.handler(error_context):
                    return True

                # Dynamic backoff calculation
                if attempt < strategy.max_attempts - 1:
                    delay = strategy.base_delay * (strategy.backoff_multiplier**attempt)
                    time.sleep(delay)

            except Exception as handler_error:
                safe_log(
                    self.tracer_instance,
                    "debug",
                    "Recovery strategy handler failed",
                    honeyhive_data={
                        "strategy": strategy.name,
                        "attempt": attempt + 1,
                        "error": str(handler_error),
                    },
                )
                continue

        return False

    def _log_error_handling_result_dynamically(
        self, error_context: ErrorContext, recovery_success: bool
    ) -> None:
        """Dynamically log error handling result.

        Args:
            error_context: Error context that was handled
            recovery_success: Whether recovery was successful
        """
        log_level = "info" if recovery_success else "warning"
        log_message = (
            "Error handled successfully"
            if recovery_success
            else "Error handling failed"
        )

        safe_log(
            self.tracer_instance,
            log_level,
            log_message,
            honeyhive_data={
                "component": error_context.component,
                "operation": error_context.operation,
                "error_type": type(error_context.error).__name__,
                "severity": error_context.severity.value,
                "recovery_success": recovery_success,
                "retry_count": error_context.retry_count,
            },
        )

    # Recovery strategy handlers
    def _graceful_degradation_handler(self, error_context: ErrorContext) -> bool:
        """Handle error with graceful degradation.

        Args:
            error_context: Error context

        Returns:
            bool: True if degradation successful
        """
        safe_log(
            self.tracer_instance,
            "info",
            "Applying graceful degradation",
            honeyhive_data={
                "component": error_context.component,
                "operation": error_context.operation,
            },
        )
        # Graceful degradation always succeeds by definition
        return True

    def _retry_with_backoff_handler(self, error_context: ErrorContext) -> bool:
        """Handle error with retry and backoff.

        Args:
            error_context: Error context

        Returns:
            bool: True if retry should be attempted
        """
        if error_context.retry_count < error_context.max_retries:
            error_context.retry_count += 1
            return False  # Indicate retry needed
        return True  # Max retries reached, give up

    def _fallback_provider_handler(self, error_context: ErrorContext) -> bool:
        """Handle error by falling back to alternative provider.

        Args:
            error_context: Error context

        Returns:
            bool: True if fallback successful
        """
        safe_log(
            self.tracer_instance,
            "info",
            "Falling back to alternative provider",
            honeyhive_data={
                "component": error_context.component,
                "original_error": str(error_context.error),
            },
        )
        # Implementation would set up fallback provider
        return True

    def _console_fallback_handler(self, error_context: ErrorContext) -> bool:
        """Handle error by falling back to console logging.

        Args:
            error_context: Error context

        Returns:
            bool: True if console fallback successful
        """
        safe_log(
            self.tracer_instance,
            "info",
            "Falling back to console logging",
            honeyhive_data={
                "component": error_context.component,
                "original_error": str(error_context.error),
            },
        )
        # Console fallback always succeeds
        return True

    def get_error_statistics(self) -> Dict[str, Any]:
        """Get dynamic error statistics.

        Returns:
            Dictionary with error statistics
        """
        with self._lock:
            if not self._error_history:
                return {"total_errors": 0}

            # Dynamic statistics calculation
            stats = {
                "total_errors": len(self._error_history),
                "error_types": self._calculate_error_type_distribution(),
                "severity_distribution": self._calculate_severity_distribution(),
                "component_distribution": self._calculate_component_distribution(),
                "recent_errors": len(
                    [
                        e
                        for e in self._error_history
                        if time.time() - e.timestamp < 300  # Last 5 minutes
                    ]
                ),
            }

            return stats

    def _calculate_error_type_distribution(self) -> Dict[str, int]:
        """Calculate error type distribution."""
        distribution: Dict[str, int] = {}
        for error_context in self._error_history:
            error_type = type(error_context.error).__name__
            distribution[error_type] = distribution.get(error_type, 0) + 1
        return distribution

    def _calculate_severity_distribution(self) -> Dict[str, int]:
        """Calculate severity distribution."""
        distribution: Dict[str, int] = {}
        for error_context in self._error_history:
            severity = error_context.severity.value
            distribution[severity] = distribution.get(severity, 0) + 1
        return distribution

    def _calculate_component_distribution(self) -> Dict[str, int]:
        """Calculate component distribution."""
        distribution: Dict[str, int] = {}
        for error_context in self._error_history:
            component = error_context.component
            distribution[component] = distribution.get(component, 0) + 1
        return distribution

resilience_level instance-attribute

resilience_level = resilience_level

tracer_instance instance-attribute

tracer_instance = tracer_instance

handle_error

handle_error(
    error: Exception,
    component: str = "unknown",
    operation: str = "unknown",
    **metadata: Any
) -> bool

Dynamically handle error with appropriate recovery strategy.

Parameters:

Name Type Description Default
error Exception

Exception that occurred

required
component str

Component where error occurred

'unknown'
operation str

Operation that failed

'unknown'
**metadata Any

Additional error metadata

{}

Returns:

Name Type Description
bool bool

True if error was handled successfully, False otherwise

Source code in src/honeyhive/tracer/integration/error_handling.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def handle_error(
    self,
    error: Exception,
    component: str = "unknown",
    operation: str = "unknown",
    **metadata: Any,
) -> bool:
    """Dynamically handle error with appropriate recovery strategy.

    Args:
        error: Exception that occurred
        component: Component where error occurred
        operation: Operation that failed
        **metadata: Additional error metadata

    Returns:
        bool: True if error was handled successfully, False otherwise
    """
    with self._lock:
        # Create error context
        error_context = self._create_error_context_dynamically(
            error, component, operation, metadata
        )

        # Record error in history
        self._record_error_dynamically(error_context)

        # Classify error severity
        error_context.severity = self._classify_error_severity_dynamically(error)

        # Apply recovery strategies
        recovery_success = self._apply_recovery_strategies_dynamically(
            error_context
        )

        # Log error handling result
        self._log_error_handling_result_dynamically(error_context, recovery_success)

        return recovery_success

get_error_statistics

get_error_statistics() -> Dict[str, Any]

Get dynamic error statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary with error statistics

Source code in src/honeyhive/tracer/integration/error_handling.py
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
def get_error_statistics(self) -> Dict[str, Any]:
    """Get dynamic error statistics.

    Returns:
        Dictionary with error statistics
    """
    with self._lock:
        if not self._error_history:
            return {"total_errors": 0}

        # Dynamic statistics calculation
        stats = {
            "total_errors": len(self._error_history),
            "error_types": self._calculate_error_type_distribution(),
            "severity_distribution": self._calculate_severity_distribution(),
            "component_distribution": self._calculate_component_distribution(),
            "recent_errors": len(
                [
                    e
                    for e in self._error_history
                    if time.time() - e.timestamp < 300  # Last 5 minutes
                ]
            ),
        }

        return stats

IntegrationError

Bases: Exception

Base exception for integration errors with dynamic context.

Source code in src/honeyhive/tracer/integration/error_handling.py
23
24
25
26
27
28
29
30
31
32
33
34
35
class IntegrationError(Exception):
    """Base exception for integration errors with dynamic context."""

    def __init__(
        self,
        message: str,
        error_code: str = "INTEGRATION_ERROR",
        details: Optional[Dict[str, Any]] = None,
    ):
        super().__init__(message)
        self.error_code = error_code
        self.details = details or {}
        self.timestamp = time.time()

error_code instance-attribute

error_code = error_code

details instance-attribute

details = details or {}

timestamp instance-attribute

timestamp = time()

ResilienceLevel

Bases: Enum

Resilience levels for dynamic error handling strategies.

Source code in src/honeyhive/tracer/integration/error_handling.py
103
104
105
106
107
108
class ResilienceLevel(Enum):
    """Resilience levels for dynamic error handling strategies."""

    STRICT = "strict"  # Fail fast, no retries
    BALANCED = "balanced"  # Some retries, graceful degradation
    RESILIENT = "resilient"  # Maximum retries, always degrade gracefully

STRICT class-attribute instance-attribute

STRICT = 'strict'

BALANCED class-attribute instance-attribute

BALANCED = 'balanced'

RESILIENT class-attribute instance-attribute

RESILIENT = 'resilient'

ErrorProviderIncompatibleError

Bases: IntegrationError

Provider doesn't support required operations.

Source code in src/honeyhive/tracer/integration/error_handling.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class ProviderIncompatibleError(IntegrationError):
    """Provider doesn't support required operations."""

    def __init__(self, provider_type: str, required_operations: List[str]):
        message = (
            f"Provider {provider_type} doesn't support required operations: "
            f"{required_operations}"
        )
        super().__init__(
            message,
            error_code="PROVIDER_INCOMPATIBLE",
            details={
                "provider_type": provider_type,
                "required_operations": required_operations,
            },
        )

HTTPInstrumentation

Dynamic HTTP instrumentation for automatic request tracing.

Source code in src/honeyhive/tracer/integration/http.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
class HTTPInstrumentation:
    """Dynamic HTTP instrumentation for automatic request tracing."""

    def __init__(self, tracer_instance: Any = None) -> None:
        """Initialize HTTP instrumentation with dynamic library detection.

        Args:
            tracer_instance: Optional tracer instance for logging context
        """
        self.tracer_instance = tracer_instance
        self._library_availability = self._detect_libraries_dynamically()
        self._original_methods: Dict[str, Any] = {}
        self._is_instrumented = False
        self._instrumentation_config = self._build_instrumentation_config_dynamically()

    def _detect_libraries_dynamically(self) -> Dict[str, bool]:
        """Dynamically detect available HTTP libraries.

        Returns:
            Dictionary mapping library names to availability status
        """
        libraries = {}

        # Dynamic library detection patterns
        library_detection_map = {
            "httpx": "httpx",
            "requests": "requests",
            "aiohttp": "aiohttp",
            "urllib3": "urllib3",
        }

        for lib_name, import_name in library_detection_map.items():
            try:
                __import__(import_name)
                libraries[lib_name] = True
                safe_log(
                    self.tracer_instance,
                    "debug",
                    f"HTTP library detected: {lib_name}",
                    honeyhive_data={"library": lib_name},
                )
            except ImportError:
                libraries[lib_name] = False

        return libraries

    def _build_instrumentation_config_dynamically(self) -> Dict[str, Any]:
        """Dynamically build instrumentation configuration.

        Returns:
            Configuration dictionary for instrumentation
        """
        return {
            "enabled": not self._is_http_tracing_disabled_dynamically(),
            "libraries": {
                "httpx": {
                    "enabled": self._library_availability.get("httpx", False),
                    "methods": ["request"],
                    "classes": ["Client", "AsyncClient"],
                },
                "requests": {
                    "enabled": self._library_availability.get("requests", False),
                    "methods": ["request"],
                    "classes": ["Session"],
                },
            },
            "span_attributes": {
                "http.method": True,
                "http.url": True,
                "http.status_code": True,
                "http.user_agent": False,  # Privacy consideration
            },
            "error_handling": {
                "graceful_degradation": True,
                "log_failures": True,
                "fallback_to_original": True,
            },
        }

    def _is_http_tracing_disabled_dynamically(self) -> bool:
        """Dynamically check if HTTP tracing is disabled.

        Returns:
            True if HTTP tracing is disabled
        """
        # Dynamic environment variable patterns
        disable_patterns = [
            "HH_DISABLE_HTTP_TRACING",
            "HONEYHIVE_DISABLE_HTTP_TRACING",
            "DISABLE_HTTP_TRACING",
        ]

        for pattern in disable_patterns:
            value = os.getenv(pattern, "false").lower()
            if value in {"true", "1", "yes", "on"}:
                return True

        return False

    def instrument(self) -> None:
        """Dynamically instrument HTTP libraries for automatic tracing."""
        if self._is_instrumented:
            safe_log(
                self.tracer_instance, "debug", "HTTP instrumentation already active"
            )
            return

        if not self._instrumentation_config["enabled"]:
            safe_log(
                self.tracer_instance, "info", "HTTP tracing disabled by configuration"
            )
            return

        # Dynamic instrumentation execution
        instrumentation_results = self._execute_instrumentation_dynamically()

        self._is_instrumented = any(instrumentation_results.values())

        safe_log(
            self.tracer_instance,
            "info",
            "HTTP instrumentation completed",
            honeyhive_data={
                "instrumented_libraries": [
                    lib for lib, success in instrumentation_results.items() if success
                ],
                "total_instrumented": sum(instrumentation_results.values()),
            },
        )

    def _execute_instrumentation_dynamically(self) -> Dict[str, bool]:
        """Dynamically execute instrumentation for available libraries.

        Returns:
            Dictionary mapping library names to instrumentation success status
        """
        results = {}

        # Dynamic instrumentation strategies
        instrumentation_strategies = {
            "httpx": self._instrument_httpx_dynamically,
            "requests": self._instrument_requests_dynamically,
        }

        for library_name, strategy in instrumentation_strategies.items():
            if self._should_instrument_library_dynamically(library_name):
                try:
                    success = strategy()
                    results[library_name] = success

                    if success:
                        safe_log(
                            self.tracer_instance,
                            "debug",
                            f"Successfully instrumented {library_name}",
                            honeyhive_data={"library": library_name},
                        )
                    else:
                        safe_log(
                            self.tracer_instance,
                            "warning",
                            f"Failed to instrument {library_name}",
                            honeyhive_data={"library": library_name},
                        )

                except Exception as e:
                    results[library_name] = False
                    safe_log(
                        self.tracer_instance,
                        "error",
                        f"Error instrumenting {library_name}",
                        honeyhive_data={
                            "library": library_name,
                            "error": str(e),
                            "error_type": type(e).__name__,
                        },
                    )
            else:
                results[library_name] = False

        return results

    def _should_instrument_library_dynamically(self, library_name: str) -> bool:
        """Dynamically determine if library should be instrumented.

        Args:
            library_name: Name of the library to check

        Returns:
            True if library should be instrumented
        """
        library_config = self._instrumentation_config["libraries"].get(library_name, {})

        return library_config.get("enabled", False) and self._library_availability.get(
            library_name, False
        )

    def _instrument_httpx_dynamically(self) -> bool:
        """Dynamically instrument httpx library.

        Returns:
            True if instrumentation successful
        """
        try:
            import httpx  # pylint: disable=import-outside-toplevel

            # Store original methods dynamically
            original_methods = self._store_original_methods_dynamically(
                httpx, ["Client", "AsyncClient"], ["request"]
            )

            if not original_methods:
                return False

            self._original_methods["httpx"] = original_methods

            # Create instrumented methods dynamically
            instrumented_methods = self._create_instrumented_methods_dynamically(
                "httpx", original_methods
            )

            # Apply instrumentation dynamically
            return self._apply_instrumentation_dynamically(
                httpx, instrumented_methods, ["Client", "AsyncClient"]
            )

        except ImportError:
            safe_log(
                self.tracer_instance, "debug", "httpx not available for instrumentation"
            )
            return False
        except Exception as e:
            safe_log(
                self.tracer_instance,
                "error",
                "Failed to instrument httpx",
                honeyhive_data={"error": str(e)},
            )
            return False

    def _instrument_httpx(self) -> None:
        """Instrument httpx for automatic tracing (compatibility method)."""
        # This is a compatibility method for tests that expect the original naming
        self._instrument_httpx_dynamically()

    def _instrument_requests(self) -> None:
        """Instrument requests for automatic tracing (compatibility method)."""
        # This is a compatibility method for tests that expect the original naming
        self._instrument_requests_dynamically()

    def _instrument_requests_dynamically(self) -> bool:
        """Dynamically instrument requests library.

        Returns:
            True if instrumentation successful
        """
        try:
            import requests  # pylint: disable=import-outside-toplevel

            # Store original methods dynamically
            original_methods = self._store_original_methods_dynamically(
                requests, ["Session"], ["request"]
            )

            if not original_methods:
                return False

            self._original_methods["requests"] = original_methods

            # Create instrumented methods dynamically
            instrumented_methods = self._create_instrumented_methods_dynamically(
                "requests", original_methods
            )

            # Apply instrumentation dynamically
            return self._apply_instrumentation_dynamically(
                requests, instrumented_methods, ["Session"]
            )

        except ImportError:
            safe_log(
                self.tracer_instance,
                "debug",
                "requests not available for instrumentation",
            )
            return False
        except Exception as e:
            safe_log(
                self.tracer_instance,
                "error",
                "Failed to instrument requests",
                honeyhive_data={"error": str(e)},
            )
            return False

    def _store_original_methods_dynamically(
        self, module: Any, class_names: List[str], method_names: List[str]
    ) -> Dict[str, Any]:
        """Dynamically store original methods before instrumentation.

        Args:
            module: Module containing classes to instrument
            class_names: Names of classes to instrument
            method_names: Names of methods to instrument

        Returns:
            Dictionary of original methods
        """
        original_methods = {}

        for class_name in class_names:
            if not hasattr(module, class_name):
                continue

            class_obj = getattr(module, class_name)

            for method_name in method_names:
                if hasattr(class_obj, method_name):
                    method_key = f"{class_name}.{method_name}"
                    original_methods[method_key] = getattr(class_obj, method_name)

        return original_methods

    def _create_instrumented_methods_dynamically(
        self, library_name: str, original_methods: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Dynamically create instrumented methods.

        Args:
            library_name: Name of the library being instrumented
            original_methods: Dictionary of original methods

        Returns:
            Dictionary of instrumented methods
        """
        instrumented_methods = {}

        for method_key, original_method in original_methods.items():
            # Create instrumented wrapper dynamically
            instrumented_method = self._create_method_wrapper_dynamically(
                library_name, method_key, original_method
            )
            instrumented_methods[method_key] = instrumented_method

        return instrumented_methods

    def _create_method_wrapper_dynamically(
        self, library_name: str, method_key: str, original_method: Any
    ) -> Any:
        """Dynamically create method wrapper with tracing.

        Args:
            library_name: Name of the library
            method_key: Key identifying the method
            original_method: Original method to wrap

        Returns:
            Wrapped method with tracing
        """

        def instrumented_wrapper(self_obj: Any, *args: Any, **kwargs: Any) -> Any:
            """Dynamically instrumented method wrapper."""
            try:
                # Dynamic span creation and tracing
                return self._execute_with_tracing_dynamically(
                    library_name,
                    method_key,
                    original_method=original_method,
                    self_obj=self_obj,
                    args=args,
                    kwargs=kwargs,
                )
            except Exception as e:
                # Graceful degradation on instrumentation failure
                safe_log(
                    self.tracer_instance,
                    "debug",
                    "Instrumentation wrapper failed, falling back to original",
                    honeyhive_data={
                        "library": library_name,
                        "method": method_key,
                        "error": str(e),
                    },
                )
                return original_method(self_obj, *args, **kwargs)

        return instrumented_wrapper

    # pylint: disable=too-many-arguments
    # Justification: HTTP tracing execution requires multiple parameters for
    # library identification, method handling, and argument processing.
    def _execute_with_tracing_dynamically(
        self,
        library_name: str,
        _method_key: str,
        *,
        original_method: Any,
        self_obj: Any,
        args: tuple,
        kwargs: dict,
    ) -> Any:
        """Dynamically execute method with tracing.

        Args:
            library_name: Name of the library
            method_key: Method identifier
            original_method: Original method to call
            self_obj: Instance object
            args: Method arguments
            kwargs: Method keyword arguments

        Returns:
            Result of original method call
        """
        # For now, just call the original method
        # Future enhancement: Add actual tracing logic here

        # Dynamic attribute extraction for tracing
        trace_attributes = self._extract_trace_attributes_dynamically(
            library_name, args, kwargs
        )

        safe_log(
            self.tracer_instance,
            "debug",
            "HTTP request traced",
            honeyhive_data={
                "library": library_name,
                "method": _method_key,
                "attributes": trace_attributes,
            },
        )

        return original_method(self_obj, *args, **kwargs)

    def _extract_trace_attributes_dynamically(
        self, library_name: str, args: tuple, _kwargs: dict
    ) -> Dict[str, Any]:
        """Dynamically extract trace attributes from method arguments.

        Args:
            library_name: Name of the library
            args: Method arguments
            kwargs: Method keyword arguments

        Returns:
            Dictionary of trace attributes
        """
        attributes = {}

        # Dynamic attribute extraction patterns
        if library_name in {"httpx", "requests"}:
            # Extract HTTP method and URL
            if len(args) >= 1:
                attributes["http.method"] = str(args[0]).upper()
            if len(args) >= 2:
                attributes["http.url"] = str(args[1])

        return attributes

    def _apply_instrumentation_dynamically(
        self, module: Any, instrumented_methods: Dict[str, Any], class_names: List[str]
    ) -> bool:
        """Dynamically apply instrumentation to module classes.

        Args:
            module: Module to instrument
            instrumented_methods: Dictionary of instrumented methods
            class_names: Names of classes to instrument

        Returns:
            True if instrumentation applied successfully
        """
        try:
            for method_key, instrumented_method in instrumented_methods.items():
                class_name, method_name = method_key.split(".", 1)

                if class_name in class_names and hasattr(module, class_name):
                    class_obj = getattr(module, class_name)
                    setattr(class_obj, method_name, instrumented_method)

            return True

        except Exception as e:
            safe_log(
                self.tracer_instance,
                "error",
                "Failed to apply instrumentation",
                honeyhive_data={
                    "module": module.__name__,
                    "error": str(e),
                },
            )
            return False

    def uninstrument(self) -> None:
        """Dynamically remove HTTP instrumentation."""
        if not self._is_instrumented:
            safe_log(self.tracer_instance, "debug", "HTTP instrumentation not active")
            return

        # Dynamic uninstrumentation
        uninstrumentation_results = self._execute_uninstrumentation_dynamically()

        self._is_instrumented = False

        safe_log(
            self.tracer_instance,
            "info",
            "HTTP uninstrumentation completed",
            honeyhive_data={
                "uninstrumented_libraries": [
                    lib for lib, success in uninstrumentation_results.items() if success
                ],
                "total_uninstrumented": sum(uninstrumentation_results.values()),
            },
        )

    def _execute_uninstrumentation_dynamically(self) -> Dict[str, bool]:
        """Dynamically execute uninstrumentation for all libraries.

        Returns:
            Dictionary mapping library names to uninstrumentation success status
        """
        results = {}

        for library_name, original_methods in self._original_methods.items():
            try:
                success = self._restore_original_methods_dynamically(
                    library_name, original_methods
                )
                results[library_name] = success
            except Exception as e:
                results[library_name] = False
                safe_log(
                    self.tracer_instance,
                    "error",
                    f"Error uninstrumenting {library_name}",
                    honeyhive_data={
                        "library": library_name,
                        "error": str(e),
                    },
                )

        return results

    def _restore_original_methods_dynamically(
        self, library_name: str, original_methods: Dict[str, Any]
    ) -> bool:
        """Dynamically restore original methods.

        Args:
            library_name: Name of the library
            original_methods: Dictionary of original methods

        Returns:
            True if restoration successful
        """
        try:
            # Dynamic module import
            module = __import__(library_name)

            # Restore methods dynamically
            for method_key, original_method in original_methods.items():
                class_name, method_name = method_key.split(".", 1)

                if hasattr(module, class_name):
                    class_obj = getattr(module, class_name)
                    setattr(class_obj, method_name, original_method)

            return True

        except Exception as e:
            safe_log(
                self.tracer_instance,
                "error",
                f"Failed to restore original methods for {library_name}",
                honeyhive_data={"error": str(e)},
            )
            return False

    def get_instrumentation_status(self) -> Dict[str, Any]:
        """Get dynamic instrumentation status.

        Returns:
            Dictionary with instrumentation status information
        """
        return {
            "is_instrumented": self._is_instrumented,
            "enabled": self._instrumentation_config["enabled"],
            "library_availability": self._library_availability,
            "instrumented_libraries": list(self._original_methods.keys()),
            "configuration": self._instrumentation_config,
        }

tracer_instance instance-attribute

tracer_instance = tracer_instance

instrument

instrument() -> None

Dynamically instrument HTTP libraries for automatic tracing.

Source code in src/honeyhive/tracer/integration/http.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def instrument(self) -> None:
    """Dynamically instrument HTTP libraries for automatic tracing."""
    if self._is_instrumented:
        safe_log(
            self.tracer_instance, "debug", "HTTP instrumentation already active"
        )
        return

    if not self._instrumentation_config["enabled"]:
        safe_log(
            self.tracer_instance, "info", "HTTP tracing disabled by configuration"
        )
        return

    # Dynamic instrumentation execution
    instrumentation_results = self._execute_instrumentation_dynamically()

    self._is_instrumented = any(instrumentation_results.values())

    safe_log(
        self.tracer_instance,
        "info",
        "HTTP instrumentation completed",
        honeyhive_data={
            "instrumented_libraries": [
                lib for lib, success in instrumentation_results.items() if success
            ],
            "total_instrumented": sum(instrumentation_results.values()),
        },
    )

uninstrument

uninstrument() -> None

Dynamically remove HTTP instrumentation.

Source code in src/honeyhive/tracer/integration/http.py
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
def uninstrument(self) -> None:
    """Dynamically remove HTTP instrumentation."""
    if not self._is_instrumented:
        safe_log(self.tracer_instance, "debug", "HTTP instrumentation not active")
        return

    # Dynamic uninstrumentation
    uninstrumentation_results = self._execute_uninstrumentation_dynamically()

    self._is_instrumented = False

    safe_log(
        self.tracer_instance,
        "info",
        "HTTP uninstrumentation completed",
        honeyhive_data={
            "uninstrumented_libraries": [
                lib for lib, success in uninstrumentation_results.items() if success
            ],
            "total_uninstrumented": sum(uninstrumentation_results.values()),
        },
    )

get_instrumentation_status

get_instrumentation_status() -> Dict[str, Any]

Get dynamic instrumentation status.

Returns:

Type Description
Dict[str, Any]

Dictionary with instrumentation status information

Source code in src/honeyhive/tracer/integration/http.py
604
605
606
607
608
609
610
611
612
613
614
615
616
def get_instrumentation_status(self) -> Dict[str, Any]:
    """Get dynamic instrumentation status.

    Returns:
        Dictionary with instrumentation status information
    """
    return {
        "is_instrumented": self._is_instrumented,
        "enabled": self._instrumentation_config["enabled"],
        "library_availability": self._library_availability,
        "instrumented_libraries": list(self._original_methods.keys()),
        "configuration": self._instrumentation_config,
    }

ProcessorIntegrationError

Bases: Exception

Base exception for processor integration errors.

Source code in src/honeyhive/tracer/integration/processor.py
22
23
class ProcessorIntegrationError(Exception):
    """Base exception for processor integration errors."""

ProcessorIntegrator

Dynamically manages integration of HoneyHive processors with providers.

Source code in src/honeyhive/tracer/integration/processor.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
class ProcessorIntegrator:
    """Dynamically manages integration of HoneyHive processors with providers."""

    def __init__(self, tracer_instance: Any = None) -> None:
        """Initialize the processor integrator with dynamic configuration.

        Args:
            tracer_instance: Optional tracer instance for logging context
        """
        self.tracer_instance = tracer_instance
        self._lock = threading.Lock()
        self._integrated_processors: List["SpanProcessor"] = []
        self._integration_strategies = self._build_integration_strategies_dynamically()

    def _build_integration_strategies_dynamically(self) -> Dict[str, Any]:
        """Dynamically build integration strategies.

        Returns:
            Dictionary of integration strategies and their configurations
        """
        return {
            "processor_validation": {
                "required_methods": ["add_span_processor"],
                "optional_methods": ["remove_span_processor", "get_span_processors"],
            },
            "context_enrichment": {
                "baggage_keys": ["source", "project", "honeyhive_tracer_id"],
                "context_timeout": 30.0,
            },
            "processor_ordering": {
                "default_position": -1,  # Append to end
                "priority_processors": [],  # Future: priority-based ordering
            },
        }

    def integrate_with_provider(
        self,
        provider: "TracerProvider",
        source: str = "dev",
        project: Optional[str] = None,
        **kwargs: Any,
    ) -> bool:
        """Dynamically add HoneyHive processor to existing provider.

        Args:
            provider: The TracerProvider to integrate with
            source: Source environment for span enrichment
            project: Optional project for span enrichment
            **kwargs: Additional integration parameters

        Returns:
            bool: True if integration successful, False otherwise

        Raises:
            ProviderIncompatibleError: If provider doesn't support span processors
        """
        with self._lock:
            try:
                # Dynamic compatibility validation
                if not self._validate_provider_compatibility_dynamically(provider):
                    safe_log(
                        self.tracer_instance,
                        "warning",
                        "Provider doesn't support span processors",
                        honeyhive_data={
                            "provider_class": type(provider).__name__,
                            "missing_capabilities": (
                                self._get_missing_capabilities_dynamically(provider)
                            ),
                        },
                    )
                    return False

                # Dynamic context setup
                self._setup_integration_context_dynamically(source, project, **kwargs)

                # Dynamic processor creation and integration
                processor = self._create_processor_dynamically(provider, **kwargs)
                integration_success = self._integrate_processor_dynamically(
                    provider, processor
                )

                if integration_success:
                    self._integrated_processors.append(processor)
                    self._log_integration_success_dynamically(provider, source, project)

                return integration_success

            except Exception as e:
                self._log_integration_failure_dynamically(provider, e)
                return False

    def _validate_provider_compatibility_dynamically(
        self, provider: "TracerProvider"
    ) -> bool:
        """Dynamically validate provider compatibility.

        Args:
            provider: The TracerProvider to check

        Returns:
            bool: True if provider supports required operations
        """
        required_methods = self._integration_strategies["processor_validation"][
            "required_methods"
        ]

        # Dynamic method validation
        for method_name in required_methods:
            if not self._has_callable_method_dynamically(provider, method_name):
                return False

        return True

    def _has_callable_method_dynamically(self, obj: Any, method_name: str) -> bool:
        """Dynamically check if object has callable method.

        Args:
            obj: Object to check
            method_name: Name of method to check

        Returns:
            bool: True if object has callable method
        """
        return hasattr(obj, method_name) and callable(getattr(obj, method_name))

    def _get_missing_capabilities_dynamically(
        self, provider: "TracerProvider"
    ) -> List[str]:
        """Dynamically identify missing provider capabilities.

        Args:
            provider: Provider to analyze

        Returns:
            List of missing capability names
        """
        required_methods = self._integration_strategies["processor_validation"][
            "required_methods"
        ]

        missing = []
        for method_name in required_methods:
            if not self._has_callable_method_dynamically(provider, method_name):
                missing.append(method_name)

        return missing

    def _setup_integration_context_dynamically(
        self, source: str, project: Optional[str], **kwargs: Any
    ) -> None:
        """Dynamically set up integration context with baggage.

        Args:
            source: Source environment for span enrichment
            project: Optional project for span enrichment
            **kwargs: Additional context parameters
        """
        ctx = context.get_current()

        # Dynamic baggage setup
        _baggage_config = self._integration_strategies["context_enrichment"]
        baggage_mappings = {
            "source": source,
            "project": project,
        }

        # Add additional baggage from kwargs
        for key, value in kwargs.items():
            if key.startswith("honeyhive_") and value is not None:
                baggage_mappings[key] = value

        # Apply baggage dynamically
        for key, value in baggage_mappings.items():
            if value is not None:
                ctx = baggage.set_baggage(key, str(value), ctx)

        # Attach the updated context
        context.attach(ctx)

        safe_log(
            self.tracer_instance,
            "debug",
            "Integration context set up",
            honeyhive_data={
                "baggage_keys": list(baggage_mappings.keys()),
                "source": source,
                "project": project,
            },
        )

    def _create_processor_dynamically(
        self, _provider: "TracerProvider", **kwargs: Any
    ) -> "SpanProcessor":
        """Dynamically create HoneyHive span processor.

        Args:
            provider: The TracerProvider for context
            **kwargs: Additional processor configuration

        Returns:
            SpanProcessor: Configured HoneyHive span processor
        """
        # Import here to avoid circular imports
        # pylint: disable=import-outside-toplevel
        from ..processing.span_processor import HoneyHiveSpanProcessor

        # Dynamic processor configuration
        processor_config = {
            "client": kwargs.get("client"),
            "disable_batch": kwargs.get("disable_batch", False),
            "otlp_exporter": kwargs.get("otlp_exporter"),
            "tracer_instance": kwargs.get("tracer_instance"),
        }

        # Filter out None values
        processor_config = {k: v for k, v in processor_config.items() if v is not None}

        return HoneyHiveSpanProcessor(**processor_config)

    def _integrate_processor_dynamically(
        self, provider: "TracerProvider", processor: "SpanProcessor"
    ) -> bool:
        """Dynamically integrate processor with provider.

        Args:
            provider: Provider to integrate with
            processor: Processor to integrate

        Returns:
            bool: True if integration successful
        """
        try:
            # Dynamic insertion point determination
            insertion_point = self._get_processor_insertion_point_dynamically(provider)

            # Add processor to provider
            if insertion_point == -1:
                # Append to end (default)
                provider.add_span_processor(processor)
            else:
                # Future: Support for specific insertion points
                provider.add_span_processor(processor)

            return True

        except Exception as e:
            safe_log(
                self.tracer_instance,
                "error",
                "Failed to integrate processor",
                honeyhive_data={
                    "provider_class": type(provider).__name__,
                    "processor_class": type(processor).__name__,
                    "error": str(e),
                },
            )
            return False

    def _get_processor_insertion_point_dynamically(
        self, _provider: "TracerProvider"
    ) -> int:
        """Dynamically determine optimal processor insertion point.

        Args:
            provider: The TracerProvider to analyze

        Returns:
            int: Index where processor should be inserted (-1 for append)
        """
        # Dynamic insertion point strategies
        ordering_config = self._integration_strategies["processor_ordering"]

        # For now, use default position
        # Future: Implement sophisticated ordering based on existing processors
        return int(ordering_config["default_position"])

    def _log_integration_success_dynamically(
        self, provider: "TracerProvider", source: str, project: Optional[str]
    ) -> None:
        """Dynamically log successful integration.

        Args:
            provider: Integrated provider
            source: Source environment
            project: Project name
        """
        safe_log(
            self.tracer_instance,
            "info",
            "Successfully integrated HoneyHive span processor",
            honeyhive_data={
                "provider_class": type(provider).__name__,
                "source": source,
                "project": project,
                "total_processors": len(self._integrated_processors) + 1,
            },
        )

    def _log_integration_failure_dynamically(
        self, provider: "TracerProvider", error: Exception
    ) -> None:
        """Dynamically log integration failure.

        Args:
            provider: Provider that failed integration
            error: Exception that occurred
        """
        safe_log(
            self.tracer_instance,
            "error",
            "Failed to integrate with provider",
            honeyhive_data={
                "provider_class": type(provider).__name__,
                "error": str(error),
                "error_type": type(error).__name__,
            },
        )

    def get_integrated_processors(self) -> List["SpanProcessor"]:
        """Get list of processors that have been integrated.

        Returns:
            List[SpanProcessor]: List of integrated HoneyHive processors
        """
        with self._lock:
            return self._integrated_processors.copy()

    def cleanup_processors(self) -> None:
        """Dynamically clean up integrated processors.

        This should be called during shutdown to ensure proper cleanup.
        """
        with self._lock:
            cleanup_results = self._cleanup_processors_dynamically()

            self._integrated_processors.clear()

            safe_log(
                self.tracer_instance,
                "info",
                "Cleaned up integrated processors",
                honeyhive_data={
                    "total_cleaned": cleanup_results["total_cleaned"],
                    "cleanup_errors": cleanup_results["cleanup_errors"],
                },
            )

    def _cleanup_processors_dynamically(self) -> Dict[str, int]:
        """Dynamically clean up all integrated processors.

        Returns:
            Dictionary with cleanup statistics
        """
        cleanup_stats = {"total_cleaned": 0, "cleanup_errors": 0}

        for processor in self._integrated_processors:
            try:
                if hasattr(processor, "shutdown"):
                    processor.shutdown()
                cleanup_stats["total_cleaned"] += 1
            except Exception as e:
                cleanup_stats["cleanup_errors"] += 1
                safe_log(
                    self.tracer_instance,
                    "warning",
                    "Error shutting down processor",
                    honeyhive_data={
                        "processor_class": type(processor).__name__,
                        "error": str(e),
                    },
                )

        return cleanup_stats

tracer_instance instance-attribute

tracer_instance = tracer_instance

integrate_with_provider

integrate_with_provider(
    provider: TracerProvider,
    source: str = "dev",
    project: Optional[str] = None,
    **kwargs: Any
) -> bool

Dynamically add HoneyHive processor to existing provider.

Parameters:

Name Type Description Default
provider TracerProvider

The TracerProvider to integrate with

required
source str

Source environment for span enrichment

'dev'
project Optional[str]

Optional project for span enrichment

None
**kwargs Any

Additional integration parameters

{}

Returns:

Name Type Description
bool bool

True if integration successful, False otherwise

Raises:

Type Description
ProviderIncompatibleError

If provider doesn't support span processors

Source code in src/honeyhive/tracer/integration/processor.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def integrate_with_provider(
    self,
    provider: "TracerProvider",
    source: str = "dev",
    project: Optional[str] = None,
    **kwargs: Any,
) -> bool:
    """Dynamically add HoneyHive processor to existing provider.

    Args:
        provider: The TracerProvider to integrate with
        source: Source environment for span enrichment
        project: Optional project for span enrichment
        **kwargs: Additional integration parameters

    Returns:
        bool: True if integration successful, False otherwise

    Raises:
        ProviderIncompatibleError: If provider doesn't support span processors
    """
    with self._lock:
        try:
            # Dynamic compatibility validation
            if not self._validate_provider_compatibility_dynamically(provider):
                safe_log(
                    self.tracer_instance,
                    "warning",
                    "Provider doesn't support span processors",
                    honeyhive_data={
                        "provider_class": type(provider).__name__,
                        "missing_capabilities": (
                            self._get_missing_capabilities_dynamically(provider)
                        ),
                    },
                )
                return False

            # Dynamic context setup
            self._setup_integration_context_dynamically(source, project, **kwargs)

            # Dynamic processor creation and integration
            processor = self._create_processor_dynamically(provider, **kwargs)
            integration_success = self._integrate_processor_dynamically(
                provider, processor
            )

            if integration_success:
                self._integrated_processors.append(processor)
                self._log_integration_success_dynamically(provider, source, project)

            return integration_success

        except Exception as e:
            self._log_integration_failure_dynamically(provider, e)
            return False

get_integrated_processors

get_integrated_processors() -> List[SpanProcessor]

Get list of processors that have been integrated.

Returns:

Type Description
List[SpanProcessor]

List[SpanProcessor]: List of integrated HoneyHive processors

Source code in src/honeyhive/tracer/integration/processor.py
349
350
351
352
353
354
355
356
def get_integrated_processors(self) -> List["SpanProcessor"]:
    """Get list of processors that have been integrated.

    Returns:
        List[SpanProcessor]: List of integrated HoneyHive processors
    """
    with self._lock:
        return self._integrated_processors.copy()

cleanup_processors

cleanup_processors() -> None

Dynamically clean up integrated processors.

This should be called during shutdown to ensure proper cleanup.

Source code in src/honeyhive/tracer/integration/processor.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def cleanup_processors(self) -> None:
    """Dynamically clean up integrated processors.

    This should be called during shutdown to ensure proper cleanup.
    """
    with self._lock:
        cleanup_results = self._cleanup_processors_dynamically()

        self._integrated_processors.clear()

        safe_log(
            self.tracer_instance,
            "info",
            "Cleaned up integrated processors",
            honeyhive_data={
                "total_cleaned": cleanup_results["total_cleaned"],
                "cleanup_errors": cleanup_results["cleanup_errors"],
            },
        )

ProviderIncompatibleError

Bases: ProcessorIntegrationError

Provider doesn't support required operations.

Source code in src/honeyhive/tracer/integration/processor.py
26
27
class ProviderIncompatibleError(ProcessorIntegrationError):
    """Provider doesn't support required operations."""

enrich_session

enrich_session(
    session_id: str,
    metadata: Optional[Dict[str, Any]] = None,
    tracer: Optional[Any] = None,
    tracer_instance: Optional[Any] = None,
) -> None

LEGACY (v1.0+): Dynamically enrich session with metadata.

.. deprecated:: 1.0 This free function pattern is provided for backward compatibility only. Use instance methods instead: tracer.enrich_session() This pattern will be removed in v2.0.

Recommended Pattern (v1.0+): Use the tracer instance method for explicit tracer reference::

tracer = HoneyHiveTracer.init(api_key="...", project="...")
tracer.enrich_session(
    metadata={"user_id": "user-456"},
    user_properties={"plan": "premium"}
)

This function provides backward compatibility for the global enrich_session function using dynamic tracer discovery and flexible metadata handling.

Parameters:

Name Type Description Default
session_id str

The session ID to enrich

required
metadata Optional[Dict[str, Any]]

Metadata dictionary to add to the session

None
tracer Optional[Any]

Optional tracer instance to use

None
tracer_instance Optional[Any]

Optional tracer instance for logging context

None
Legacy Example

Using default tracer (backward compatibility)

enrich_session("session-123", {"user_id": "user-456"})

Using specific tracer (backward compatibility)

enrich_session("session-123", {"user_id": "user-456"}, tracer=my_tracer)

See Also
  • :meth:HoneyHiveTracer.enrich_session - Primary pattern (v1.0+)
  • :meth:HoneyHiveTracer.enrich_span - Span enrichment
Source code in src/honeyhive/tracer/integration/compatibility.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def enrich_session(
    session_id: str,
    metadata: Optional[Dict[str, Any]] = None,
    tracer: Optional[Any] = None,
    tracer_instance: Optional[Any] = None,
) -> None:
    """**LEGACY (v1.0+):** Dynamically enrich session with metadata.

    .. deprecated:: 1.0
       This free function pattern is provided for backward compatibility only.
       **Use instance methods instead:** ``tracer.enrich_session()``
       This pattern will be removed in v2.0.

    **Recommended Pattern (v1.0+):**
    Use the tracer instance method for explicit tracer reference::

        tracer = HoneyHiveTracer.init(api_key="...", project="...")
        tracer.enrich_session(
            metadata={"user_id": "user-456"},
            user_properties={"plan": "premium"}
        )

    This function provides backward compatibility for the global enrich_session
    function using dynamic tracer discovery and flexible metadata handling.

    Args:
        session_id: The session ID to enrich
        metadata: Metadata dictionary to add to the session
        tracer: Optional tracer instance to use
        tracer_instance: Optional tracer instance for logging context

    Legacy Example:
        >>> # Using default tracer (backward compatibility)
        >>> enrich_session("session-123", {"user_id": "user-456"})
        >>>
        >>> # Using specific tracer (backward compatibility)
        >>> enrich_session("session-123", {"user_id": "user-456"}, tracer=my_tracer)

    See Also:
        - :meth:`HoneyHiveTracer.enrich_session` - Primary pattern (v1.0+)
        - :meth:`HoneyHiveTracer.enrich_span` - Span enrichment
    """
    # Dynamic tracer discovery
    active_tracer = _discover_tracer_dynamically(tracer, tracer_instance)

    if active_tracer is None:
        safe_log(
            tracer_instance,
            "warning",
            "No tracer available for session enrichment",
            honeyhive_data={
                "session_id": session_id,
                "metadata_keys": list(metadata.keys()) if metadata else [],
            },
        )
        return

    # Dynamic session enrichment
    try:
        _enrich_session_dynamically(
            active_tracer, session_id, metadata, tracer_instance
        )

        safe_log(
            tracer_instance,
            "debug",
            "Session enriched successfully",
            honeyhive_data={
                "session_id": session_id,
                "tracer_type": type(active_tracer).__name__,
                "metadata_count": len(metadata) if metadata else 0,
            },
        )

    except Exception as e:
        safe_log(
            tracer_instance,
            "error",
            "Failed to enrich session",
            honeyhive_data={
                "session_id": session_id,
                "error": str(e),
                "error_type": type(e).__name__,
            },
        )
    return

get_global_provider

get_global_provider(tracer_instance: Any = None) -> Any

Dynamically get the current global OpenTelemetry tracer provider.

Returns:

Type Description
Any

The current global TracerProvider instance

Example

from honeyhive.tracer.integration import get_global_provider provider = get_global_provider()

Source code in src/honeyhive/tracer/integration/detection.py
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
def get_global_provider(tracer_instance: Any = None) -> Any:
    """Dynamically get the current global OpenTelemetry tracer provider.

    Returns:
        The current global TracerProvider instance

    Example:
        >>> from honeyhive.tracer.integration import get_global_provider
        >>> provider = get_global_provider()
    """
    provider = trace.get_tracer_provider()

    safe_log(
        tracer_instance,
        "debug",
        "Retrieved global provider",
        honeyhive_data={
            "provider_class": type(provider).__name__,
        },
    )

    return provider

set_global_provider

set_global_provider(
    provider: Any,
    force_override: bool = False,
    tracer_instance: Any = None,
) -> None

Dynamically set the global OpenTelemetry tracer provider.

This function properly handles OpenTelemetry's internal warnings when setting a tracer provider, using dynamic provider management techniques.

Parameters:

Name Type Description Default
provider Any

The TracerProvider instance to set as global

required
force_override bool

If True, allows overriding existing real providers (intended for test utilities and clean state management)

False
tracer_instance Any

Optional tracer instance for logging context

None
Example

from opentelemetry.sdk.trace import TracerProvider from honeyhive.tracer.integration import set_global_provider provider = TracerProvider() set_global_provider(provider)

For test utilities - force override existing provider

set_global_provider(NoOpTracerProvider(), force_override=True)

Source code in src/honeyhive/tracer/integration/detection.py
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
def set_global_provider(
    provider: Any, force_override: bool = False, tracer_instance: Any = None
) -> None:
    """Dynamically set the global OpenTelemetry tracer provider.

    This function properly handles OpenTelemetry's internal warnings when
    setting a tracer provider, using dynamic provider management techniques.

    Args:
        provider: The TracerProvider instance to set as global
        force_override: If True, allows overriding existing real providers
                       (intended for test utilities and clean state management)
        tracer_instance: Optional tracer instance for logging context

    Example:
        >>> from opentelemetry.sdk.trace import TracerProvider
        >>> from honeyhive.tracer.integration import set_global_provider
        >>> provider = TracerProvider()
        >>> set_global_provider(provider)

        # For test utilities - force override existing provider
        >>> set_global_provider(NoOpTracerProvider(), force_override=True)
    """
    try:
        # Check if a provider is already set to avoid the override warning
        current_provider = trace.get_tracer_provider()
        provider_type = type(current_provider).__name__

        # Determine if we should set the provider
        should_set = False
        reason = ""

        if provider_type in ["NoOpTracerProvider", "ProxyTracerProvider"]:
            # Safe to set as global provider - no real provider exists
            should_set = True
            reason = "no_real_provider_exists"
            # Reset SET_ONCE flag for both NoOp and Proxy providers
            # Both set the flag but should be replaceable
            _reset_provider_flag_dynamically(tracer_instance)
        elif force_override:
            # Force override requested (for test utilities)
            should_set = True
            reason = "force_override_requested"
            # Reset the SET_ONCE flag to allow clean override
            _reset_provider_flag_dynamically(tracer_instance)
        else:
            # Another real provider exists and no force override
            should_set = False
            reason = "real_provider_exists_no_force"

        if should_set:
            _set_tracer_provider(provider, log=False)
            safe_log(
                tracer_instance,
                "debug",
                "Global provider set successfully",
                honeyhive_data={
                    "provider_class": type(provider).__name__,
                    "replaced_provider": provider_type,
                    "reason": reason,
                    "force_override": force_override,
                },
            )
        else:
            # Another real provider is already set, don't override
            safe_log(
                tracer_instance,
                "debug",
                "Real TracerProvider already exists, skipping global provider set",
                honeyhive_data={
                    "existing_provider": provider_type,
                    "requested_provider": type(provider).__name__,
                    "reason": reason,
                    "force_override": force_override,
                },
            )

    except Exception as e:
        safe_log(
            tracer_instance,
            "warning",
            "Failed to set global provider",
            honeyhive_data={
                "provider_class": type(provider).__name__,
                "error": str(e),
            },
        )
        raise

with_error_handling

with_error_handling(
    component: str = "unknown",
    operation: str = "unknown",
    resilience_level: ResilienceLevel = BALANCED,
    tracer_instance: Any = None,
) -> Any

Decorator for dynamic error handling.

Parameters:

Name Type Description Default
component str

Component name for error context

'unknown'
operation str

Operation name for error context

'unknown'
resilience_level ResilienceLevel

Resilience level for error handling

BALANCED
tracer_instance Any

Optional tracer instance for logging context

None

Returns:

Type Description
Any

Decorator function

Source code in src/honeyhive/tracer/integration/error_handling.py
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def with_error_handling(
    component: str = "unknown",
    operation: str = "unknown",
    resilience_level: ResilienceLevel = ResilienceLevel.BALANCED,
    tracer_instance: Any = None,
) -> Any:
    """Decorator for dynamic error handling.

    Args:
        component: Component name for error context
        operation: Operation name for error context
        resilience_level: Resilience level for error handling
        tracer_instance: Optional tracer instance for logging context

    Returns:
        Decorator function
    """

    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            try:
                return func(*args, **kwargs)
            except Exception as e:
                error_handler = get_error_handler(resilience_level, tracer_instance)
                handled = error_handler.handle_error(
                    e,
                    component=component,
                    operation=operation,
                    function_name=func.__name__,
                    args_count=len(args),
                    kwargs_keys=list(kwargs.keys()),
                )

                if not handled and resilience_level == ResilienceLevel.STRICT:
                    raise

                # Return None or appropriate default for graceful degradation
                return None

        return wrapper

    return decorator