Skip to content

honeyhive.tracer.lifecycle.core

Core lifecycle management infrastructure.

This module provides the foundational infrastructure for tracer lifecycle management including safe logging, tracer registration, and thread-safe lock management utilities.

get_lock_strategy

get_lock_strategy() -> str

Detect deployment environment and return optimal lock strategy.

Returns:

Type Description
str

Lock strategy name based on environment detection

Environment Detection
  • AWS Lambda: AWS_LAMBDA_FUNCTION_NAME environment variable
  • Kubernetes: KUBERNETES_SERVICE_HOST environment variable
  • High Concurrency: HH_HIGH_CONCURRENCY environment variable
  • Standard: Default fallback

Examples:

>>> # In AWS Lambda
>>> strategy = get_lock_strategy()
>>> print(strategy)  # 'lambda_optimized'
>>> # In Kubernetes
>>> strategy = get_lock_strategy()
>>> print(strategy)  # 'k8s_optimized'
Source code in src/honeyhive/tracer/lifecycle/core.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def get_lock_strategy() -> str:
    """Detect deployment environment and return optimal lock strategy.

    Returns:
        Lock strategy name based on environment detection

    Environment Detection:
        - AWS Lambda: AWS_LAMBDA_FUNCTION_NAME environment variable
        - Kubernetes: KUBERNETES_SERVICE_HOST environment variable
        - High Concurrency: HH_HIGH_CONCURRENCY environment variable
        - Standard: Default fallback

    Examples:
        >>> # In AWS Lambda
        >>> strategy = get_lock_strategy()
        >>> print(strategy)  # 'lambda_optimized'

        >>> # In Kubernetes
        >>> strategy = get_lock_strategy()
        >>> print(strategy)  # 'k8s_optimized'
    """
    # AWS Lambda detection
    if os.environ.get("AWS_LAMBDA_FUNCTION_NAME"):
        return "lambda_optimized"

    # Kubernetes detection
    if os.environ.get("KUBERNETES_SERVICE_HOST"):
        return "k8s_optimized"

    # High concurrency mode (explicit opt-in)
    if os.environ.get("HH_HIGH_CONCURRENCY", "").lower() in ("true", "1", "yes"):
        return "high_concurrency"

    # Standard environment (default)
    return "standard"

get_lock_config

get_lock_config(
    strategy: Optional[str] = None,
) -> Dict[str, Any]

Get lock configuration for the specified or detected strategy.

Parameters:

Name Type Description Default
strategy Optional[str]

Optional strategy name. If None, auto-detects environment

None

Returns:

Type Description
Dict[str, Any]

Dictionary containing timeout and configuration values

Examples:

>>> # Auto-detect environment
>>> config = get_lock_config()
>>> print(config['lifecycle_timeout'])  # 1.0 (standard)
>>> # Explicit strategy
>>> config = get_lock_config('lambda_optimized')
>>> print(config['lifecycle_timeout'])  # 0.5 (Lambda optimized)
Source code in src/honeyhive/tracer/lifecycle/core.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def get_lock_config(strategy: Optional[str] = None) -> Dict[str, Any]:
    """Get lock configuration for the specified or detected strategy.

    Args:
        strategy: Optional strategy name. If None, auto-detects environment

    Returns:
        Dictionary containing timeout and configuration values

    Examples:
        >>> # Auto-detect environment
        >>> config = get_lock_config()
        >>> print(config['lifecycle_timeout'])  # 1.0 (standard)

        >>> # Explicit strategy
        >>> config = get_lock_config('lambda_optimized')
        >>> print(config['lifecycle_timeout'])  # 0.5 (Lambda optimized)
    """
    if strategy is None:
        strategy = get_lock_strategy()

    return _LOCK_STRATEGIES.get(strategy, _LOCK_STRATEGIES["standard"])

register_tracer_for_atexit_cleanup

register_tracer_for_atexit_cleanup(
    tracer_instance: Any,
) -> None

Register a tracer instance for automatic cleanup on Python exit.

This function provides thread-safe registration of tracer instances for automatic cleanup during Python interpreter shutdown. This prevents race conditions in pytest-xdist workers where logging streams are closed before fixture teardown runs.

:param tracer_instance: The tracer instance to register for cleanup :type tracer_instance: HoneyHiveTracer

Thread Safety:

This function is thread-safe and supports the multi-instance architecture. Multiple tracer instances can be registered concurrently from different threads without conflicts.

Example:

.. code-block:: python

# Register tracer for automatic cleanup
tracer = HoneyHiveTracer.init(api_key="...", project="...")
register_tracer_for_atexit_cleanup(tracer)

# Tracer will be automatically cleaned up on Python exit
# even if pytest-xdist closes logging streams early

Note:

Uses WeakSet to avoid keeping tracers alive just for cleanup. If a tracer is garbage collected, it's automatically removed from the cleanup registry.

Source code in src/honeyhive/tracer/lifecycle/core.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def register_tracer_for_atexit_cleanup(tracer_instance: Any) -> None:
    """Register a tracer instance for automatic cleanup on Python exit.

    This function provides thread-safe registration of tracer instances
    for automatic cleanup during Python interpreter shutdown. This prevents
    race conditions in pytest-xdist workers where logging streams are closed
    before fixture teardown runs.

    :param tracer_instance: The tracer instance to register for cleanup
    :type tracer_instance: HoneyHiveTracer

    **Thread Safety:**

    This function is thread-safe and supports the multi-instance architecture.
    Multiple tracer instances can be registered concurrently from different
    threads without conflicts.

    **Example:**

    .. code-block:: python

        # Register tracer for automatic cleanup
        tracer = HoneyHiveTracer.init(api_key="...", project="...")
        register_tracer_for_atexit_cleanup(tracer)

        # Tracer will be automatically cleaned up on Python exit
        # even if pytest-xdist closes logging streams early

    **Note:**

    Uses WeakSet to avoid keeping tracers alive just for cleanup.
    If a tracer is garbage collected, it's automatically removed
    from the cleanup registry.
    """
    with _lifecycle_lock:
        # Check if already registered to prevent duplicate atexit handlers
        if tracer_instance in _registered_tracers:
            safe_log(
                tracer_instance,
                "debug",
                f"Tracer already registered for atexit cleanup: {id(tracer_instance)}",
            )
            return

        # Add to registry using WeakSet (won't keep tracer alive)
        _registered_tracers.add(tracer_instance)

        # Create cleanup function that captures tracer by weak reference
        tracer_ref = weakref.ref(tracer_instance)

        def cleanup_tracer_on_exit() -> None:
            """Cleanup function that runs during Python shutdown."""
            # safe_log will automatically detect shutdown conditions

            tracer = tracer_ref()
            if tracer is not None:
                try:
                    # Import here to avoid circular imports
                    # pylint: disable=import-outside-toplevel
                    from .flush import force_flush_tracer
                    from .shutdown import shutdown_tracer

                    # Force flush first, then shutdown (no logging during shutdown)
                    force_flush_tracer(tracer, timeout_millis=1000)  # Shorter timeout
                    shutdown_tracer(tracer)

                except Exception as e:
                    # Graceful degradation
                    # Silent failure during shutdown is expected, but log for debugging
                    safe_log(
                        None,
                        "debug",
                        "Expected shutdown exception during cleanup",
                        honeyhive_data={"error_type": type(e).__name__},
                    )

        # Register the cleanup function with atexit
        atexit.register(cleanup_tracer_on_exit)

        safe_log(
            tracer_instance,
            "debug",
            f"Registered tracer for atexit cleanup: {id(tracer_instance)}",
            honeyhive_data={
                "tracer_id": id(tracer_instance),
                "registered_count": len(_registered_tracers),
                "architecture": "multi-instance",
            },
        )

mark_stream_closure_detected

mark_stream_closure_detected() -> None

Mark that stream closure has been detected during shutdown.

This function should be called when stream closure is detected during shutdown to disable all logging and prevent race conditions with closed streams. This is useful in multiprocess environments, containers, and test frameworks.

Usage:

.. code-block:: python

# In cleanup handlers
try:
    tracer.shutdown()
except Exception as e:
    # Graceful degradation - never crash host
    safe_log(None, "debug", "Shutdown exception during stream closure",
            honeyhive_data={"error_type": type(e).__name__})
Source code in src/honeyhive/tracer/lifecycle/core.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def mark_stream_closure_detected() -> None:
    """Mark that stream closure has been detected during shutdown.

    This function should be called when stream closure is detected during
    shutdown to disable all logging and prevent race conditions with closed streams.
    This is useful in multiprocess environments, containers, and test frameworks.

    **Usage:**

    .. code-block:: python

        # In cleanup handlers
        try:
            tracer.shutdown()
        except Exception as e:
            # Graceful degradation - never crash host
            safe_log(None, "debug", "Shutdown exception during stream closure",
                    honeyhive_data={"error_type": type(e).__name__})
    """

disable_new_span_creation

disable_new_span_creation() -> None

Disable creation of new spans during shutdown phase.

This function prevents new spans from being created while allowing existing spans to complete naturally. This is part of the graceful shutdown process to prevent data loss.

Usage:

.. code-block:: python

# Phase 1: Graceful drain
disable_new_span_creation()
time.sleep(0.1)  # Allow existing spans to complete

# Phase 2: Force flush
tracer.force_flush()
Source code in src/honeyhive/tracer/lifecycle/core.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def disable_new_span_creation() -> None:
    """Disable creation of new spans during shutdown phase.

    This function prevents new spans from being created while allowing
    existing spans to complete naturally. This is part of the graceful
    shutdown process to prevent data loss.

    **Usage:**

    .. code-block:: python

        # Phase 1: Graceful drain
        disable_new_span_creation()
        time.sleep(0.1)  # Allow existing spans to complete

        # Phase 2: Force flush
        tracer.force_flush()
    """
    _new_spans_disabled.set()

is_new_span_creation_disabled

is_new_span_creation_disabled() -> bool

Check if new span creation is disabled.

Returns:

Name Type Description
bool bool

True if new span creation is disabled, False otherwise

Source code in src/honeyhive/tracer/lifecycle/core.py
261
262
263
264
265
266
267
def is_new_span_creation_disabled() -> bool:
    """Check if new span creation is disabled.

    Returns:
        bool: True if new span creation is disabled, False otherwise
    """
    return _new_spans_disabled.is_set()

unregister_tracer_from_atexit_cleanup

unregister_tracer_from_atexit_cleanup(
    tracer_instance: Any,
) -> None

Unregister a tracer instance from automatic cleanup.

This function removes a tracer from the atexit cleanup registry. Useful when manually shutting down a tracer before Python exit.

:param tracer_instance: The tracer instance to unregister :type tracer_instance: HoneyHiveTracer

Thread Safety:

This function is thread-safe and supports concurrent access.

Example:

.. code-block:: python

# Manual cleanup - unregister from atexit
unregister_tracer_from_atexit_cleanup(tracer)
tracer.force_flush()
tracer.shutdown()
Source code in src/honeyhive/tracer/lifecycle/core.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def unregister_tracer_from_atexit_cleanup(tracer_instance: Any) -> None:
    """Unregister a tracer instance from automatic cleanup.

    This function removes a tracer from the atexit cleanup registry.
    Useful when manually shutting down a tracer before Python exit.

    :param tracer_instance: The tracer instance to unregister
    :type tracer_instance: HoneyHiveTracer

    **Thread Safety:**

    This function is thread-safe and supports concurrent access.

    **Example:**

    .. code-block:: python

        # Manual cleanup - unregister from atexit
        unregister_tracer_from_atexit_cleanup(tracer)
        tracer.force_flush()
        tracer.shutdown()
    """
    with _lifecycle_lock:
        if tracer_instance in _registered_tracers:
            _registered_tracers.discard(tracer_instance)
            safe_log(
                tracer_instance,
                "debug",
                f"Unregistered tracer from atexit cleanup: {id(tracer_instance)}",
                honeyhive_data={
                    "tracer_id": id(tracer_instance),
                    "remaining_count": len(_registered_tracers),
                },
            )

acquire_lock_with_timeout

acquire_lock_with_timeout(
    lock: Any, timeout_seconds: float
) -> Iterator[bool]

Context manager for acquiring a lock with timeout.

Source code in src/honeyhive/tracer/lifecycle/core.py
306
307
308
309
310
311
312
313
314
315
316
@contextmanager
def acquire_lock_with_timeout(lock: Any, timeout_seconds: float) -> Iterator[bool]:
    """Context manager for acquiring a lock with timeout."""
    acquired = lock.acquire(timeout=timeout_seconds)
    if not acquired:
        yield False
    else:
        try:
            yield True
        finally:
            lock.release()

acquire_lifecycle_lock_optimized

acquire_lifecycle_lock_optimized(
    operation_type: str = "lifecycle",
    custom_timeout: Optional[float] = None,
) -> Iterator[bool]

Context manager for acquiring lifecycle lock with environment-optimized timeout.

Parameters:

Name Type Description Default
operation_type str

Type of operation ('lifecycle' or 'flush') for timeout selection

'lifecycle'
custom_timeout Optional[float]

Optional custom timeout override

None

Yields:

Name Type Description
bool bool

True if lock was acquired, False if timeout occurred

Examples:

>>> # Auto-optimized for environment
>>> with acquire_lifecycle_lock_optimized('lifecycle') as acquired:
...     if acquired:
...         # Perform lifecycle operation
...         pass
>>> # Custom timeout override
>>> with acquire_lifecycle_lock_optimized('flush', custom_timeout=5.0) as acq:
...     if acq:
...         # Perform flush operation
...         pass
Source code in src/honeyhive/tracer/lifecycle/core.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
@contextmanager
def acquire_lifecycle_lock_optimized(
    operation_type: str = "lifecycle", custom_timeout: Optional[float] = None
) -> Iterator[bool]:
    """Context manager for acquiring lifecycle lock with environment-optimized timeout.

    Args:
        operation_type: Type of operation ('lifecycle' or 'flush') for timeout selection
        custom_timeout: Optional custom timeout override

    Yields:
        bool: True if lock was acquired, False if timeout occurred

    Examples:
        >>> # Auto-optimized for environment
        >>> with acquire_lifecycle_lock_optimized('lifecycle') as acquired:
        ...     if acquired:
        ...         # Perform lifecycle operation
        ...         pass

        >>> # Custom timeout override
        >>> with acquire_lifecycle_lock_optimized('flush', custom_timeout=5.0) as acq:
        ...     if acq:
        ...         # Perform flush operation
        ...         pass
    """
    if custom_timeout is not None:
        timeout = custom_timeout
    else:
        config = get_lock_config()
        timeout_key = f"{operation_type}_timeout"
        timeout = config.get(timeout_key, config.get("lifecycle_timeout", 1.0))

    acquired = _lifecycle_lock.acquire(timeout=timeout)
    if not acquired:
        yield False
    else:
        try:
            yield True
        finally:
            _lifecycle_lock.release()

get_lifecycle_lock

get_lifecycle_lock() -> Lock

Get the global lifecycle lock for external use.

Source code in src/honeyhive/tracer/lifecycle/core.py
362
363
364
def get_lifecycle_lock() -> threading.Lock:
    """Get the global lifecycle lock for external use."""
    return _lifecycle_lock