Skip to content

honeyhive.utils

HoneyHive utilities package.

BaggageDict

Dictionary-like interface for OpenTelemetry baggage.

This class provides a convenient way to work with OpenTelemetry baggage as if it were a regular dictionary, while maintaining proper context propagation.

Source code in src/honeyhive/utils/baggage_dict.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class BaggageDict:
    """Dictionary-like interface for OpenTelemetry baggage.

    This class provides a convenient way to work with OpenTelemetry baggage
    as if it were a regular dictionary, while maintaining proper context
    propagation.
    """

    def __init__(self, ctx: Optional[Context] = None):
        """Initialize BaggageDict with optional context.

        Args:
            ctx: OpenTelemetry context. If None, uses current context.
        """

        self._context = ctx or context.get_current()

    @property
    def context(self) -> Context:
        """Get the current context."""
        return self._context

    def get(self, key: str, default: Any = None) -> Any:
        """Get a value from baggage.

        Args:
            key: Baggage key
            default: Default value if key not found

        Returns:
            Value from baggage or default
        """

        value = baggage.get_baggage(key, self._context)
        return value if value is not None else default

    def set(self, key: str, value: Any) -> "BaggageDict":
        """Set a value in baggage.

        Args:
            key: Baggage key
            value: Value to set

        Returns:
            New BaggageDict with updated context
        """

        new_context = baggage.set_baggage(key, str(value), self._context)
        return BaggageDict(new_context)

    def delete(self, key: str) -> "BaggageDict":
        """Delete a key from baggage.

        Args:
            key: Baggage key to delete

        Returns:
            New BaggageDict with updated context
        """

        new_context = baggage.set_baggage(key, None, self._context)
        return BaggageDict(new_context)

    def update(self, **kwargs: Any) -> "BaggageDict":
        """Update multiple baggage values.

        Args:
            **kwargs: Key-value pairs to set

        Returns:
            New BaggageDict with updated context
        """

        new_context = self._context
        for key, value in kwargs.items():
            new_context = baggage.set_baggage(key, str(value), new_context)

        return BaggageDict(new_context)

    def clear(self) -> "BaggageDict":
        """Clear all baggage.

        Returns:
            New BaggageDict with empty baggage
        """

        # Create new context without baggage
        new_context = context.get_current()
        return BaggageDict(new_context)

    def items(self) -> Dict[str, str]:
        """Get all baggage items as a dictionary.

        Returns:
            Dictionary of baggage key-value pairs
        """

        try:
            # Get current baggage context
            current_baggage = baggage.get_all()
            if current_baggage:
                # Convert to string values to match the expected return type
                return {k: str(v) for k, v in current_baggage.items()}
            return {}
        except Exception:
            return {}

    def keys(self) -> KeysView[str]:
        """Get all baggage keys."""
        return self.items().keys()

    def values(self) -> ValuesView[str]:
        """Get all baggage values."""
        return self.items().values()

    def __getitem__(self, key: str) -> str:
        """Get baggage value using bracket notation."""
        value = self.get(key)
        if value is None:
            raise KeyError(key)
        return str(value)  # Ensure we return a string

    def __setitem__(self, key: str, value: Any) -> None:
        """Set baggage value using bracket notation."""
        self.set(key, value)

    def __delitem__(self, key: str) -> None:
        """Delete baggage key using bracket notation."""
        self.delete(key)

    def __contains__(self, key: str) -> bool:
        """Check if key exists in baggage."""
        return self.get(key) is not None

    def __len__(self) -> int:
        """Get number of baggage items."""
        return len(self.items())

    def __iter__(self) -> Iterator[str]:
        """Iterate over baggage keys."""
        return iter(self.keys())

    def __repr__(self) -> str:
        """String representation."""
        items = self.items()
        return f"BaggageDict({items})"

    @classmethod
    def from_dict(
        cls, data: Dict[str, Any], ctx: Optional[Context] = None
    ) -> "BaggageDict":
        """Create BaggageDict from dictionary.

        Args:
            data: Dictionary of key-value pairs
            ctx: Optional OpenTelemetry context

        Returns:
            New BaggageDict with baggage from dictionary
        """
        baggage_dict = cls(ctx)
        return baggage_dict.update(**data)

    @contextmanager
    def as_context(self) -> Iterator[None]:
        """Context manager to temporarily set baggage in current context.

        Example:
            with BaggageDict().set("user_id", "123").as_context():
                # baggage is available in this context
                pass
        """
        token = context.attach(self._context)
        try:
            yield
        finally:
            context.detach(token)

context property

context: Context

Get the current context.

get

get(key: str, default: Any = None) -> Any

Get a value from baggage.

Parameters:

Name Type Description Default
key str

Baggage key

required
default Any

Default value if key not found

None

Returns:

Type Description
Any

Value from baggage or default

Source code in src/honeyhive/utils/baggage_dict.py
32
33
34
35
36
37
38
39
40
41
42
43
44
def get(self, key: str, default: Any = None) -> Any:
    """Get a value from baggage.

    Args:
        key: Baggage key
        default: Default value if key not found

    Returns:
        Value from baggage or default
    """

    value = baggage.get_baggage(key, self._context)
    return value if value is not None else default

set

set(key: str, value: Any) -> BaggageDict

Set a value in baggage.

Parameters:

Name Type Description Default
key str

Baggage key

required
value Any

Value to set

required

Returns:

Type Description
BaggageDict

New BaggageDict with updated context

Source code in src/honeyhive/utils/baggage_dict.py
46
47
48
49
50
51
52
53
54
55
56
57
58
def set(self, key: str, value: Any) -> "BaggageDict":
    """Set a value in baggage.

    Args:
        key: Baggage key
        value: Value to set

    Returns:
        New BaggageDict with updated context
    """

    new_context = baggage.set_baggage(key, str(value), self._context)
    return BaggageDict(new_context)

delete

delete(key: str) -> BaggageDict

Delete a key from baggage.

Parameters:

Name Type Description Default
key str

Baggage key to delete

required

Returns:

Type Description
BaggageDict

New BaggageDict with updated context

Source code in src/honeyhive/utils/baggage_dict.py
60
61
62
63
64
65
66
67
68
69
70
71
def delete(self, key: str) -> "BaggageDict":
    """Delete a key from baggage.

    Args:
        key: Baggage key to delete

    Returns:
        New BaggageDict with updated context
    """

    new_context = baggage.set_baggage(key, None, self._context)
    return BaggageDict(new_context)

update

update(**kwargs: Any) -> BaggageDict

Update multiple baggage values.

Parameters:

Name Type Description Default
**kwargs Any

Key-value pairs to set

{}

Returns:

Type Description
BaggageDict

New BaggageDict with updated context

Source code in src/honeyhive/utils/baggage_dict.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def update(self, **kwargs: Any) -> "BaggageDict":
    """Update multiple baggage values.

    Args:
        **kwargs: Key-value pairs to set

    Returns:
        New BaggageDict with updated context
    """

    new_context = self._context
    for key, value in kwargs.items():
        new_context = baggage.set_baggage(key, str(value), new_context)

    return BaggageDict(new_context)

clear

clear() -> BaggageDict

Clear all baggage.

Returns:

Type Description
BaggageDict

New BaggageDict with empty baggage

Source code in src/honeyhive/utils/baggage_dict.py
89
90
91
92
93
94
95
96
97
98
def clear(self) -> "BaggageDict":
    """Clear all baggage.

    Returns:
        New BaggageDict with empty baggage
    """

    # Create new context without baggage
    new_context = context.get_current()
    return BaggageDict(new_context)

items

items() -> Dict[str, str]

Get all baggage items as a dictionary.

Returns:

Type Description
Dict[str, str]

Dictionary of baggage key-value pairs

Source code in src/honeyhive/utils/baggage_dict.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def items(self) -> Dict[str, str]:
    """Get all baggage items as a dictionary.

    Returns:
        Dictionary of baggage key-value pairs
    """

    try:
        # Get current baggage context
        current_baggage = baggage.get_all()
        if current_baggage:
            # Convert to string values to match the expected return type
            return {k: str(v) for k, v in current_baggage.items()}
        return {}
    except Exception:
        return {}

keys

keys() -> KeysView[str]

Get all baggage keys.

Source code in src/honeyhive/utils/baggage_dict.py
117
118
119
def keys(self) -> KeysView[str]:
    """Get all baggage keys."""
    return self.items().keys()

values

values() -> ValuesView[str]

Get all baggage values.

Source code in src/honeyhive/utils/baggage_dict.py
121
122
123
def values(self) -> ValuesView[str]:
    """Get all baggage values."""
    return self.items().values()

from_dict classmethod

from_dict(
    data: Dict[str, Any], ctx: Optional[Context] = None
) -> BaggageDict

Create BaggageDict from dictionary.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary of key-value pairs

required
ctx Optional[Context]

Optional OpenTelemetry context

None

Returns:

Type Description
BaggageDict

New BaggageDict with baggage from dictionary

Source code in src/honeyhive/utils/baggage_dict.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@classmethod
def from_dict(
    cls, data: Dict[str, Any], ctx: Optional[Context] = None
) -> "BaggageDict":
    """Create BaggageDict from dictionary.

    Args:
        data: Dictionary of key-value pairs
        ctx: Optional OpenTelemetry context

    Returns:
        New BaggageDict with baggage from dictionary
    """
    baggage_dict = cls(ctx)
    return baggage_dict.update(**data)

as_context

as_context() -> Iterator[None]

Context manager to temporarily set baggage in current context.

Example

with BaggageDict().set("user_id", "123").as_context(): # baggage is available in this context pass

Source code in src/honeyhive/utils/baggage_dict.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@contextmanager
def as_context(self) -> Iterator[None]:
    """Context manager to temporarily set baggage in current context.

    Example:
        with BaggageDict().set("user_id", "123").as_context():
            # baggage is available in this context
            pass
    """
    token = context.attach(self._context)
    try:
        yield
    finally:
        context.detach(token)

Cache

In-memory cache with TTL and size limits.

Source code in src/honeyhive/utils/cache.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
class Cache:
    """In-memory cache with TTL and size limits."""

    def __init__(self, config: Optional[CacheConfig] = None):
        """Initialize cache.

        Args:
            config: Cache configuration
        """
        self.config = config or CacheConfig()

        # Cache storage
        self._cache: Dict[str, CacheEntry] = {}
        self._lock = threading.RLock()

        # Statistics
        self._stats = {
            "hits": 0,
            "misses": 0,
            "sets": 0,
            "deletes": 0,
            "expired": 0,
            "evictions": 0,
        }

        # Cleanup thread
        self._cleanup_thread: Optional[threading.Thread] = None
        self._stop_cleanup = threading.Event()
        self._start_cleanup_thread()

    @property
    def cache(self) -> Dict[str, CacheEntry]:
        """Get the underlying cache dictionary.

        Returns:
            Cache dictionary
        """
        return self._cache

    @property
    def hits(self) -> int:
        """Get cache hit count.

        Returns:
            Number of cache hits
        """
        return self._stats["hits"]

    @property
    def misses(self) -> int:
        """Get cache miss count.

        Returns:
            Number of cache misses
        """
        return self._stats["misses"]

    def _start_cleanup_thread(self) -> None:
        """Start cleanup thread."""
        if self.config.cleanup_interval > 0:
            self._cleanup_thread = threading.Thread(
                target=self._cleanup_worker, daemon=True
            )
            self._cleanup_thread.start()

    def _cleanup_worker(self) -> None:
        """Cleanup worker thread."""
        while not self._stop_cleanup.wait(self.config.cleanup_interval):
            self.cleanup_expired()

    def _generate_key(self, *args: Any, **kwargs: Any) -> str:
        """Generate cache key from arguments.

        Args:
            *args: Positional arguments
            **kwargs: Keyword arguments

        Returns:
            Cache key string
        """
        # Create a string representation of the arguments
        key_parts = [str(arg) for arg in args]
        key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items()))
        key_string = "|".join(key_parts)

        # Hash the key string for consistent length
        return hashlib.md5(key_string.encode()).hexdigest()

    def generate_key(self, *args: Any, **kwargs: Any) -> str:
        """Generate cache key from arguments (public method).

        Args:
            *args: Positional arguments
            **kwargs: Keyword arguments

        Returns:
            Cache key string
        """
        return self._generate_key(*args, **kwargs)

    def get(self, key: str, default: Any = None) -> Any:
        """Get value from cache.

        Args:
            key: Cache key
            default: Default value if key not found

        Returns:
            Cached value or default
        """
        with self._lock:
            if key in self._cache:
                entry = self._cache[key]

                if entry.is_expired():
                    # Remove expired entry
                    del self._cache[key]
                    self._stats["expired"] += 1
                    self._stats["misses"] += 1
                    return default

                # Mark as accessed
                entry.access()
                self._stats["hits"] += 1
                return entry.value

            self._stats["misses"] += 1
            return default

    def set(self, key: str, value: Any, ttl: Optional[float] = None) -> None:
        """Set value in cache.

        Args:
            key: Cache key
            value: Value to cache
            ttl: Time to live in seconds (uses default if None)
        """
        if ttl is None:
            ttl = self.config.default_ttl

        with self._lock:
            # Check if we need to evict entries
            if len(self._cache) >= self.config.max_size:
                self._evict_entries()

            # Create cache entry
            entry = CacheEntry(key, value, ttl)
            self._cache[key] = entry
            self._stats["sets"] += 1

    def delete(self, key: str) -> bool:
        """Delete key from cache.

        Args:
            key: Cache key to delete

        Returns:
            True if key was deleted, False if not found
        """
        with self._lock:
            if key in self._cache:
                del self._cache[key]
                self._stats["deletes"] += 1
                return True
            return False

    def exists(self, key: str) -> bool:
        """Check if key exists in cache.

        Args:
            key: Cache key to check

        Returns:
            True if key exists and not expired, False otherwise
        """
        with self._lock:
            if key in self._cache:
                entry = self._cache[key]
                if entry.is_expired():
                    del self._cache[key]
                    self._stats["expired"] += 1
                    return False
                return True
            return False

    def clear(self) -> None:
        """Clear all entries from cache."""
        with self._lock:
            self._cache.clear()
            self._reset_stats()

    def cleanup_expired(self) -> int:
        """Clean up expired entries.

        Returns:
            Number of entries cleaned up
        """
        cleaned = 0
        current_time = time.time()

        with self._lock:
            expired_keys = [
                key
                for key, entry in self._cache.items()
                if current_time - entry.created_at > entry.ttl
            ]

            for key in expired_keys:
                del self._cache[key]
                cleaned += 1
                self._stats["expired"] += 1

        return cleaned

    def _evict_entries(self, count: int = 1) -> None:
        """Evict entries based on LRU policy.

        Args:
            count: Number of entries to evict
        """
        if len(self._cache) < count:
            return

        # Sort entries by last accessed time (LRU)
        entries = sorted(self._cache.items(), key=lambda x: x[1].last_accessed)

        # Remove oldest entries
        for i in range(count):
            if i < len(entries):
                key, _ = entries[i]
                del self._cache[key]
                self._stats["evictions"] += 1

    def get_stats(self) -> Dict[str, Any]:
        """Get cache statistics.

        Returns:
            Dictionary with cache statistics
        """
        with self._lock:
            stats = self._stats.copy()
            stats["size"] = len(self._cache)
            stats["max_size"] = self.config.max_size
            stats["hit_rate"] = int(
                self._stats["hits"]
                / max(1, self._stats["hits"] + self._stats["misses"])
                * 100
            )
            return stats

    def _reset_stats(self) -> None:
        """Reset cache statistics."""
        for key in self._stats:
            self._stats[key] = 0

    def stats(self) -> Dict[str, Any]:
        """Get cache statistics.

        Returns:
            Dictionary with cache statistics
        """
        with self._lock:
            total_requests = self._stats["hits"] + self._stats["misses"]
            return {
                "size": len(self._cache),
                "max_size": self.config.max_size,
                "hits": self._stats["hits"],
                "misses": self._stats["misses"],
                "total_requests": total_requests,
                "hit_rate": self._stats["hits"] / max(1, total_requests),
                "sets": self._stats["sets"],
                "deletes": self._stats["deletes"],
                "expired": self._stats["expired"],
                "evictions": self._stats["evictions"],
            }

    def cleanup(self) -> None:
        """Clean up expired entries and perform maintenance."""
        self.cleanup_expired()

    def close(self) -> None:
        """Close cache and cleanup resources."""
        self._stop_cleanup.set()
        if self._cleanup_thread and self._cleanup_thread.is_alive():
            self._cleanup_thread.join(timeout=1.0)
        self.clear()

    def __enter__(self) -> "Cache":
        """Context manager entry."""
        return self

    def __exit__(
        self,
        exc_type: Optional[type],
        exc_val: Optional[BaseException],
        exc_tb: Optional[Any],
    ) -> None:
        """Context manager exit."""
        self.close()

config instance-attribute

config = config or CacheConfig()

cache property

cache: Dict[str, CacheEntry]

Get the underlying cache dictionary.

Returns:

Type Description
Dict[str, CacheEntry]

Cache dictionary

hits property

hits: int

Get cache hit count.

Returns:

Type Description
int

Number of cache hits

misses property

misses: int

Get cache miss count.

Returns:

Type Description
int

Number of cache misses

generate_key

generate_key(*args: Any, **kwargs: Any) -> str

Generate cache key from arguments (public method).

Parameters:

Name Type Description Default
*args Any

Positional arguments

()
**kwargs Any

Keyword arguments

{}

Returns:

Type Description
str

Cache key string

Source code in src/honeyhive/utils/cache.py
166
167
168
169
170
171
172
173
174
175
176
def generate_key(self, *args: Any, **kwargs: Any) -> str:
    """Generate cache key from arguments (public method).

    Args:
        *args: Positional arguments
        **kwargs: Keyword arguments

    Returns:
        Cache key string
    """
    return self._generate_key(*args, **kwargs)

get

get(key: str, default: Any = None) -> Any

Get value from cache.

Parameters:

Name Type Description Default
key str

Cache key

required
default Any

Default value if key not found

None

Returns:

Type Description
Any

Cached value or default

Source code in src/honeyhive/utils/cache.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def get(self, key: str, default: Any = None) -> Any:
    """Get value from cache.

    Args:
        key: Cache key
        default: Default value if key not found

    Returns:
        Cached value or default
    """
    with self._lock:
        if key in self._cache:
            entry = self._cache[key]

            if entry.is_expired():
                # Remove expired entry
                del self._cache[key]
                self._stats["expired"] += 1
                self._stats["misses"] += 1
                return default

            # Mark as accessed
            entry.access()
            self._stats["hits"] += 1
            return entry.value

        self._stats["misses"] += 1
        return default

set

set(
    key: str, value: Any, ttl: Optional[float] = None
) -> None

Set value in cache.

Parameters:

Name Type Description Default
key str

Cache key

required
value Any

Value to cache

required
ttl Optional[float]

Time to live in seconds (uses default if None)

None
Source code in src/honeyhive/utils/cache.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def set(self, key: str, value: Any, ttl: Optional[float] = None) -> None:
    """Set value in cache.

    Args:
        key: Cache key
        value: Value to cache
        ttl: Time to live in seconds (uses default if None)
    """
    if ttl is None:
        ttl = self.config.default_ttl

    with self._lock:
        # Check if we need to evict entries
        if len(self._cache) >= self.config.max_size:
            self._evict_entries()

        # Create cache entry
        entry = CacheEntry(key, value, ttl)
        self._cache[key] = entry
        self._stats["sets"] += 1

delete

delete(key: str) -> bool

Delete key from cache.

Parameters:

Name Type Description Default
key str

Cache key to delete

required

Returns:

Type Description
bool

True if key was deleted, False if not found

Source code in src/honeyhive/utils/cache.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def delete(self, key: str) -> bool:
    """Delete key from cache.

    Args:
        key: Cache key to delete

    Returns:
        True if key was deleted, False if not found
    """
    with self._lock:
        if key in self._cache:
            del self._cache[key]
            self._stats["deletes"] += 1
            return True
        return False

exists

exists(key: str) -> bool

Check if key exists in cache.

Parameters:

Name Type Description Default
key str

Cache key to check

required

Returns:

Type Description
bool

True if key exists and not expired, False otherwise

Source code in src/honeyhive/utils/cache.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def exists(self, key: str) -> bool:
    """Check if key exists in cache.

    Args:
        key: Cache key to check

    Returns:
        True if key exists and not expired, False otherwise
    """
    with self._lock:
        if key in self._cache:
            entry = self._cache[key]
            if entry.is_expired():
                del self._cache[key]
                self._stats["expired"] += 1
                return False
            return True
        return False

clear

clear() -> None

Clear all entries from cache.

Source code in src/honeyhive/utils/cache.py
263
264
265
266
267
def clear(self) -> None:
    """Clear all entries from cache."""
    with self._lock:
        self._cache.clear()
        self._reset_stats()

cleanup_expired

cleanup_expired() -> int

Clean up expired entries.

Returns:

Type Description
int

Number of entries cleaned up

Source code in src/honeyhive/utils/cache.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def cleanup_expired(self) -> int:
    """Clean up expired entries.

    Returns:
        Number of entries cleaned up
    """
    cleaned = 0
    current_time = time.time()

    with self._lock:
        expired_keys = [
            key
            for key, entry in self._cache.items()
            if current_time - entry.created_at > entry.ttl
        ]

        for key in expired_keys:
            del self._cache[key]
            cleaned += 1
            self._stats["expired"] += 1

    return cleaned

get_stats

get_stats() -> Dict[str, Any]

Get cache statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary with cache statistics

Source code in src/honeyhive/utils/cache.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def get_stats(self) -> Dict[str, Any]:
    """Get cache statistics.

    Returns:
        Dictionary with cache statistics
    """
    with self._lock:
        stats = self._stats.copy()
        stats["size"] = len(self._cache)
        stats["max_size"] = self.config.max_size
        stats["hit_rate"] = int(
            self._stats["hits"]
            / max(1, self._stats["hits"] + self._stats["misses"])
            * 100
        )
        return stats

stats

stats() -> Dict[str, Any]

Get cache statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary with cache statistics

Source code in src/honeyhive/utils/cache.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def stats(self) -> Dict[str, Any]:
    """Get cache statistics.

    Returns:
        Dictionary with cache statistics
    """
    with self._lock:
        total_requests = self._stats["hits"] + self._stats["misses"]
        return {
            "size": len(self._cache),
            "max_size": self.config.max_size,
            "hits": self._stats["hits"],
            "misses": self._stats["misses"],
            "total_requests": total_requests,
            "hit_rate": self._stats["hits"] / max(1, total_requests),
            "sets": self._stats["sets"],
            "deletes": self._stats["deletes"],
            "expired": self._stats["expired"],
            "evictions": self._stats["evictions"],
        }

cleanup

cleanup() -> None

Clean up expired entries and perform maintenance.

Source code in src/honeyhive/utils/cache.py
354
355
356
def cleanup(self) -> None:
    """Clean up expired entries and perform maintenance."""
    self.cleanup_expired()

close

close() -> None

Close cache and cleanup resources.

Source code in src/honeyhive/utils/cache.py
358
359
360
361
362
363
def close(self) -> None:
    """Close cache and cleanup resources."""
    self._stop_cleanup.set()
    if self._cleanup_thread and self._cleanup_thread.is_alive():
        self._cleanup_thread.join(timeout=1.0)
    self.clear()

CacheConfig dataclass

Configuration for cache.

Source code in src/honeyhive/utils/cache.py
10
11
12
13
14
15
16
17
@dataclass
class CacheConfig:
    """Configuration for cache."""

    max_size: int = 1000
    default_ttl: float = 300.0  # 5 minutes
    cleanup_interval: float = 60.0  # 1 minute
    enable_stats: bool = True

max_size class-attribute instance-attribute

max_size: int = 1000

default_ttl class-attribute instance-attribute

default_ttl: float = 300.0

cleanup_interval class-attribute instance-attribute

cleanup_interval: float = 60.0

enable_stats class-attribute instance-attribute

enable_stats: bool = True

CacheEntry

Cache entry with metadata.

Source code in src/honeyhive/utils/cache.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class CacheEntry:
    """Cache entry with metadata."""

    def __init__(self, key: str, value: Any, ttl: float = 300.0):
        """Initialize cache entry.

        Args:
            key: Cache key
            value: Cached value
            ttl: Time to live in seconds
        """
        self.key = key
        self.value = value
        self.created_at = time.time()
        self.ttl = ttl
        self.access_count = 0
        self.last_accessed = self.created_at

    def is_expired(self) -> bool:
        """Check if entry is expired.

        Returns:
            True if expired, False otherwise
        """
        return time.time() - self.created_at > self.ttl

    def access(self) -> None:
        """Mark entry as accessed."""
        self.access_count += 1
        self.last_accessed = time.time()

    def get_age(self) -> float:
        """Get age of entry in seconds.

        Returns:
            Age in seconds
        """
        return time.time() - self.created_at

    def get_remaining_ttl(self) -> float:
        """Get remaining TTL in seconds.

        Returns:
            Remaining TTL in seconds
        """
        remaining = self.ttl - self.get_age()
        return max(0, remaining)

    @property
    def expiry(self) -> float:
        """Get expiry timestamp.

        Returns:
            Timestamp when entry expires
        """
        return self.created_at + self.ttl

key instance-attribute

key = key

value instance-attribute

value = value

created_at instance-attribute

created_at = time()

ttl instance-attribute

ttl = ttl

access_count instance-attribute

access_count = 0

last_accessed instance-attribute

last_accessed = created_at

expiry property

expiry: float

Get expiry timestamp.

Returns:

Type Description
float

Timestamp when entry expires

is_expired

is_expired() -> bool

Check if entry is expired.

Returns:

Type Description
bool

True if expired, False otherwise

Source code in src/honeyhive/utils/cache.py
38
39
40
41
42
43
44
def is_expired(self) -> bool:
    """Check if entry is expired.

    Returns:
        True if expired, False otherwise
    """
    return time.time() - self.created_at > self.ttl

access

access() -> None

Mark entry as accessed.

Source code in src/honeyhive/utils/cache.py
46
47
48
49
def access(self) -> None:
    """Mark entry as accessed."""
    self.access_count += 1
    self.last_accessed = time.time()

get_age

get_age() -> float

Get age of entry in seconds.

Returns:

Type Description
float

Age in seconds

Source code in src/honeyhive/utils/cache.py
51
52
53
54
55
56
57
def get_age(self) -> float:
    """Get age of entry in seconds.

    Returns:
        Age in seconds
    """
    return time.time() - self.created_at

get_remaining_ttl

get_remaining_ttl() -> float

Get remaining TTL in seconds.

Returns:

Type Description
float

Remaining TTL in seconds

Source code in src/honeyhive/utils/cache.py
59
60
61
62
63
64
65
66
def get_remaining_ttl(self) -> float:
    """Get remaining TTL in seconds.

    Returns:
        Remaining TTL in seconds
    """
    remaining = self.ttl - self.get_age()
    return max(0, remaining)

CacheManager

Multi-instance cache manager for tracer instances.

This class provides per-instance cache management that aligns with the multi-instance tracer architecture. Each tracer instance can have its own isolated cache instances.

Source code in src/honeyhive/utils/cache.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
class CacheManager:
    """Multi-instance cache manager for tracer instances.

    This class provides per-instance cache management that aligns with
    the multi-instance tracer architecture. Each tracer instance can
    have its own isolated cache instances.
    """

    def __init__(self, instance_id: str, config: Optional[CacheConfig] = None):
        """Initialize cache manager for a specific instance.

        Args:
            instance_id: Unique identifier for the instance (e.g., tracer ID)
            config: Cache configuration
        """
        self.instance_id = instance_id
        self.config = config or CacheConfig()
        self._caches: Dict[str, Cache] = {}

    def get_cache(self, cache_name: str, config: Optional[CacheConfig] = None) -> Cache:
        """Get or create a named cache for this instance.

        Args:
            cache_name: Name of the cache (e.g., 'attributes', 'resources')
            config: Optional cache-specific configuration

        Returns:
            Cache instance for the specified name
        """
        if cache_name not in self._caches:
            cache_config = config or self.config
            self._caches[cache_name] = Cache(cache_config)

        return self._caches[cache_name]

    def close_all(self) -> None:
        """Close all caches managed by this instance."""
        for cache in self._caches.values():
            cache.close()
        self._caches.clear()

    def get_stats(self) -> Dict[str, Dict[str, Any]]:
        """Get statistics for all caches in this instance.

        Returns:
            Dictionary mapping cache names to their statistics
        """
        return {name: cache.get_stats() for name, cache in self._caches.items()}

    # Domain-specific cache methods for tracer functionality
    def get_config_value(
        self,
        config_hash: str,
        key: str,
        default: Any,
        resolver_func: Callable[[], Any],
    ) -> Any:
        """Get cached configuration value or resolve and cache it.

        Args:
            config_hash: Hash of the configuration object
            key: Configuration key
            default: Default value if not found
            resolver_func: Function to resolve the value if not cached

        Returns:
            Cached or resolved configuration value
        """
        cache = self.get_cache(
            "config",
            CacheConfig(
                max_size=100,
                default_ttl=900.0,  # 15-minute TTL for config stability
                cleanup_interval=180.0,
            ),
        )

        cache_key = f"config:{config_hash}:{key}:{hash(str(default))}"

        # Check cache first
        if cached := cache.get(cache_key):
            return cached

        # Resolve and cache
        try:
            value = resolver_func()
            cache.set(cache_key, value)
            return value
        except Exception:
            return default

    def get_cached_attributes(
        self,
        attr_key: str,
        normalizer_func: Callable[[], Any],
    ) -> Any:
        """Get cached normalized attributes or normalize and cache them.

        Args:
            attr_key: Attribute cache key
            normalizer_func: Function to normalize the attribute if not cached

        Returns:
            Cached or normalized attribute value
        """
        cache = self.get_cache(
            "attributes",
            CacheConfig(
                max_size=1000,  # High frequency operations
                default_ttl=300.0,  # 5-minute TTL
                cleanup_interval=60.0,
            ),
        )

        # Check cache first
        if cached := cache.get(attr_key):
            return cached

        # Normalize and cache
        try:
            value = normalizer_func()
            cache.set(attr_key, value)
            return value
        except Exception:
            return None

    def get_cached_resources(
        self,
        resource_key: str,
        detector_func: Callable[[], Dict[str, Any]],
    ) -> Dict[str, Any]:
        """Get cached resource detection results or detect and cache them.

        Args:
            resource_key: Resource cache key
            detector_func: Function to detect resources if not cached

        Returns:
            Cached or detected resource information
        """
        cache = self.get_cache(
            "resources",
            CacheConfig(
                max_size=50,  # Lower frequency, stable data
                default_ttl=3600.0,  # 1-hour TTL for system info
                cleanup_interval=300.0,
            ),
        )

        # Check cache first
        if cached := cache.get(resource_key):
            return cached  # type: ignore[no-any-return]

        # Detect and cache
        try:
            resources = detector_func()
            cache.set(resource_key, resources)
            return resources
        except Exception:
            return {}

instance_id instance-attribute

instance_id = instance_id

config instance-attribute

config = config or CacheConfig()

get_cache

get_cache(
    cache_name: str, config: Optional[CacheConfig] = None
) -> Cache

Get or create a named cache for this instance.

Parameters:

Name Type Description Default
cache_name str

Name of the cache (e.g., 'attributes', 'resources')

required
config Optional[CacheConfig]

Optional cache-specific configuration

None

Returns:

Type Description
Cache

Cache instance for the specified name

Source code in src/honeyhive/utils/cache.py
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
def get_cache(self, cache_name: str, config: Optional[CacheConfig] = None) -> Cache:
    """Get or create a named cache for this instance.

    Args:
        cache_name: Name of the cache (e.g., 'attributes', 'resources')
        config: Optional cache-specific configuration

    Returns:
        Cache instance for the specified name
    """
    if cache_name not in self._caches:
        cache_config = config or self.config
        self._caches[cache_name] = Cache(cache_config)

    return self._caches[cache_name]

close_all

close_all() -> None

Close all caches managed by this instance.

Source code in src/honeyhive/utils/cache.py
524
525
526
527
528
def close_all(self) -> None:
    """Close all caches managed by this instance."""
    for cache in self._caches.values():
        cache.close()
    self._caches.clear()

get_stats

get_stats() -> Dict[str, Dict[str, Any]]

Get statistics for all caches in this instance.

Returns:

Type Description
Dict[str, Dict[str, Any]]

Dictionary mapping cache names to their statistics

Source code in src/honeyhive/utils/cache.py
530
531
532
533
534
535
536
def get_stats(self) -> Dict[str, Dict[str, Any]]:
    """Get statistics for all caches in this instance.

    Returns:
        Dictionary mapping cache names to their statistics
    """
    return {name: cache.get_stats() for name, cache in self._caches.items()}

get_config_value

get_config_value(
    config_hash: str,
    key: str,
    default: Any,
    resolver_func: Callable[[], Any],
) -> Any

Get cached configuration value or resolve and cache it.

Parameters:

Name Type Description Default
config_hash str

Hash of the configuration object

required
key str

Configuration key

required
default Any

Default value if not found

required
resolver_func Callable[[], Any]

Function to resolve the value if not cached

required

Returns:

Type Description
Any

Cached or resolved configuration value

Source code in src/honeyhive/utils/cache.py
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
def get_config_value(
    self,
    config_hash: str,
    key: str,
    default: Any,
    resolver_func: Callable[[], Any],
) -> Any:
    """Get cached configuration value or resolve and cache it.

    Args:
        config_hash: Hash of the configuration object
        key: Configuration key
        default: Default value if not found
        resolver_func: Function to resolve the value if not cached

    Returns:
        Cached or resolved configuration value
    """
    cache = self.get_cache(
        "config",
        CacheConfig(
            max_size=100,
            default_ttl=900.0,  # 15-minute TTL for config stability
            cleanup_interval=180.0,
        ),
    )

    cache_key = f"config:{config_hash}:{key}:{hash(str(default))}"

    # Check cache first
    if cached := cache.get(cache_key):
        return cached

    # Resolve and cache
    try:
        value = resolver_func()
        cache.set(cache_key, value)
        return value
    except Exception:
        return default

get_cached_attributes

get_cached_attributes(
    attr_key: str, normalizer_func: Callable[[], Any]
) -> Any

Get cached normalized attributes or normalize and cache them.

Parameters:

Name Type Description Default
attr_key str

Attribute cache key

required
normalizer_func Callable[[], Any]

Function to normalize the attribute if not cached

required

Returns:

Type Description
Any

Cached or normalized attribute value

Source code in src/honeyhive/utils/cache.py
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
def get_cached_attributes(
    self,
    attr_key: str,
    normalizer_func: Callable[[], Any],
) -> Any:
    """Get cached normalized attributes or normalize and cache them.

    Args:
        attr_key: Attribute cache key
        normalizer_func: Function to normalize the attribute if not cached

    Returns:
        Cached or normalized attribute value
    """
    cache = self.get_cache(
        "attributes",
        CacheConfig(
            max_size=1000,  # High frequency operations
            default_ttl=300.0,  # 5-minute TTL
            cleanup_interval=60.0,
        ),
    )

    # Check cache first
    if cached := cache.get(attr_key):
        return cached

    # Normalize and cache
    try:
        value = normalizer_func()
        cache.set(attr_key, value)
        return value
    except Exception:
        return None

get_cached_resources

get_cached_resources(
    resource_key: str,
    detector_func: Callable[[], Dict[str, Any]],
) -> Dict[str, Any]

Get cached resource detection results or detect and cache them.

Parameters:

Name Type Description Default
resource_key str

Resource cache key

required
detector_func Callable[[], Dict[str, Any]]

Function to detect resources if not cached

required

Returns:

Type Description
Dict[str, Any]

Cached or detected resource information

Source code in src/honeyhive/utils/cache.py
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
def get_cached_resources(
    self,
    resource_key: str,
    detector_func: Callable[[], Dict[str, Any]],
) -> Dict[str, Any]:
    """Get cached resource detection results or detect and cache them.

    Args:
        resource_key: Resource cache key
        detector_func: Function to detect resources if not cached

    Returns:
        Cached or detected resource information
    """
    cache = self.get_cache(
        "resources",
        CacheConfig(
            max_size=50,  # Lower frequency, stable data
            default_ttl=3600.0,  # 1-hour TTL for system info
            cleanup_interval=300.0,
        ),
    )

    # Check cache first
    if cached := cache.get(resource_key):
        return cached  # type: ignore[no-any-return]

    # Detect and cache
    try:
        resources = detector_func()
        cache.set(resource_key, resources)
        return resources
    except Exception:
        return {}

ConnectionPool

Connection pool for HTTP clients.

Source code in src/honeyhive/utils/connection_pool.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
class ConnectionPool:
    """Connection pool for HTTP clients."""

    # Type annotations for instance attributes
    _lock: Union[threading.Lock, "_NoOpLock"]

    def __init__(
        self,
        config: Optional[PoolConfig] = None,
        *,
        # Backwards compatibility parameters
        max_connections: Optional[int] = None,
        max_keepalive: Optional[int] = None,
        max_keepalive_connections: Optional[int] = None,
        keepalive_expiry: Optional[float] = None,
        retries: Optional[int] = None,
        timeout: Optional[float] = None,
        pool_timeout: Optional[float] = None,
    ):
        """Initialize connection pool with hybrid config approach.

        Args:
            config: Pool configuration object (recommended)
            max_connections: Maximum number of connections (backwards compatibility)
            max_keepalive: Alias for max_keepalive_connections (backwards compatibility)
            max_keepalive_connections: Maximum keepalive connections
            keepalive_expiry: Keepalive expiry time in seconds
            retries: Number of retries
            timeout: Request timeout in seconds
            pool_timeout: Pool acquisition timeout in seconds
        """
        if not HTTPX_AVAILABLE:
            raise ImportError("httpx is required for connection pooling")

        # Hybrid approach: merge config object with individual parameters
        if config is None:
            config = PoolConfig()

        # Override config with any explicitly provided parameters
        if max_connections is not None:
            config.max_connections = max_connections
        if max_keepalive is not None:
            config.max_keepalive_connections = max_keepalive
        if max_keepalive_connections is not None:
            config.max_keepalive_connections = max_keepalive_connections
        if keepalive_expiry is not None:
            config.keepalive_expiry = keepalive_expiry
        if retries is not None:
            config.retries = retries
        if timeout is not None:
            config.timeout = timeout
        if pool_timeout is not None:
            config.pool_timeout = pool_timeout

        self.config = config
        self.logger = get_logger(__name__)

        # Backwards compatibility attributes
        self.max_connections = self.config.max_connections
        self.max_keepalive = self.config.max_keepalive_connections
        self.max_keepalive_connections = self.config.max_keepalive_connections
        self.keepalive_expiry = self.config.keepalive_expiry
        self.retries = self.config.retries
        self.timeout = self.config.timeout
        self.pool_timeout = self.config.pool_timeout

        # Pool state
        self._clients: Dict[str, httpx.Client] = {}
        self._async_clients: Dict[str, httpx.AsyncClient] = {}

        # ENVIRONMENT-AWARE LOCKING: Use appropriate locking strategy
        # Production: Full threading.Lock() for thread safety
        # pytest-xdist: Simplified locking to prevent cross-process deadlocks
        self._use_locking = not _is_pytest_xdist_worker()
        if self._use_locking:
            self._lock = threading.Lock()
        else:
            # In pytest-xdist, each worker is isolated, so we can use a no-op lock
            self._lock = _NoOpLock()

        self._last_used: Dict[str, float] = {}

        # Statistics
        self._stats = {
            "total_requests": 0,
            "pool_hits": 0,
            "pool_misses": 0,
            "connections_created": 0,
            "connections_reused": 0,
        }

    def get_client(
        self, base_url: str, headers: Optional[Dict[str, str]] = None, **kwargs: Any
    ) -> httpx.Client:
        """Get or create an HTTP client from the pool.

        Args:
            base_url: Base URL for the client
            headers: Default headers
            **kwargs: Additional client configuration

        Returns:
            HTTP client instance
        """
        with self._lock:
            # Check if we have a client for this base URL
            if base_url in self._clients:
                client = self._clients[base_url]
                if self._is_client_healthy(client):
                    self._last_used[base_url] = time.time()
                    self._stats["pool_hits"] += 1
                    self._stats["connections_reused"] += 1
                    return client

                # Remove unhealthy client
                del self._clients[base_url]
                if base_url in self._last_used:
                    del self._last_used[base_url]

            # Create new client
            self._stats["pool_misses"] += 1
            self._stats["connections_created"] += 1
            self._stats["total_requests"] += 1

            # Remove timeout from kwargs if it exists to avoid duplicate
            client_kwargs = kwargs.copy()
            if "timeout" in client_kwargs:
                del client_kwargs["timeout"]

            client = httpx.Client(
                base_url=base_url,
                headers=headers,
                limits=httpx.Limits(
                    max_connections=self.config.max_connections,
                    max_keepalive_connections=self.config.max_keepalive_connections,
                    keepalive_expiry=self.config.keepalive_expiry,
                ),
                timeout=self.config.timeout,
                **client_kwargs,
            )

            self._clients[base_url] = client
            self._last_used[base_url] = time.time()

            self.logger.debug(f"Created new HTTP client for {base_url}")
            return client

    def get_async_client(
        self, base_url: str, headers: Optional[Dict[str, str]] = None, **kwargs: Any
    ) -> httpx.AsyncClient:
        """Get or create an async HTTP client from the pool.

        Args:
            base_url: Base URL for the client
            headers: Default headers
            **kwargs: Additional client configuration

        Returns:
            Async HTTP client instance
        """
        with self._lock:
            # Check if we have a client for this base URL
            if base_url in self._async_clients:
                client = self._async_clients[base_url]
                if self._is_async_client_healthy(client):
                    self._last_used[base_url] = time.time()
                    self._stats["pool_hits"] += 1
                    self._stats["connections_reused"] += 1
                    return client

                # Remove unhealthy client
                del self._async_clients[base_url]
                if base_url in self._last_used:
                    del self._last_used[base_url]

            # Create new client
            self._stats["pool_misses"] += 1
            self._stats["connections_created"] += 1
            self._stats["total_requests"] += 1

            # Remove timeout from kwargs if it exists to avoid duplicate
            client_kwargs = kwargs.copy()
            if "timeout" in client_kwargs:
                del client_kwargs["timeout"]

            client = httpx.AsyncClient(
                base_url=base_url,
                headers=headers,
                limits=httpx.Limits(
                    max_connections=self.config.max_connections,
                    max_keepalive_connections=self.config.max_keepalive_connections,
                    keepalive_expiry=self.config.keepalive_expiry,
                ),
                timeout=self.config.timeout,
                **client_kwargs,
            )

            self._async_clients[base_url] = client
            self._last_used[base_url] = time.time()

            self.logger.debug(f"Created new async HTTP client for {base_url}")
            return client

    def _is_client_healthy(self, client: httpx.Client) -> bool:
        """Check if a client is healthy and can be reused."""
        try:
            # Check if client is closed
            if hasattr(client, "is_closed") and client.is_closed:
                return False

            # Check if client has been idle too long
            if hasattr(client, "_transport"):
                transport = client._transport
                if hasattr(transport, "pool"):
                    pool = transport.pool
                    if hasattr(pool, "connections"):
                        # Check if pool has available connections
                        return len(pool.connections) > 0

            # If we can't determine health from transport, assume it's healthy
            # This covers cases where the client is open but transport details
            # are not accessible
            return True
        except Exception:
            return False

    def _is_async_client_healthy(self, client: httpx.AsyncClient) -> bool:
        """Check if an async client is healthy and can be reused."""
        try:
            # Check if client is closed
            if hasattr(client, "is_closed") and client.is_closed:
                return False

            # For async clients, we can't easily check transport state
            # So we assume they're healthy if not explicitly closed
            return True
        except Exception:
            return False

    def cleanup_idle_connections(self, max_idle_time: float = 300.0) -> None:
        """Clean up idle connections.

        Args:
            max_idle_time: Maximum idle time in seconds
        """
        current_time = time.time()
        to_remove = []

        with self._lock:
            for base_url, last_used in self._last_used.items():
                if current_time - last_used > max_idle_time:
                    to_remove.append(base_url)

            for base_url in to_remove:
                if base_url in self._clients:
                    try:
                        self._clients[base_url].close()
                    except Exception:
                        pass
                    del self._clients[base_url]

                if base_url in self._async_clients:
                    try:
                        # Note: AsyncClient doesn't have close() method
                        pass
                    except Exception:
                        pass
                    del self._async_clients[base_url]

                if base_url in self._last_used:
                    del self._last_used[base_url]

                self.logger.debug(f"Cleaned up idle connection for {base_url}")

    def get_stats(self) -> Dict[str, Any]:
        """Get pool statistics.

        Returns:
            Dictionary with pool statistics
        """
        with self._lock:
            stats = self._stats.copy()
            stats.update(
                {
                    "active_connections": len(self._clients),
                    "active_async_connections": len(self._async_clients),
                    "total_connections": len(self._clients) + len(self._async_clients),
                }
            )
            return stats

    @property
    def active_connections(self) -> int:
        """Get number of active connections.

        Returns:
            Number of active connections
        """
        with self._lock:
            return len(self._clients)

    def get_connection(self, base_url: str) -> Optional[httpx.Client]:
        """Get a connection for a specific base URL.

        Args:
            base_url: Base URL for the connection

        Returns:
            HTTP client instance or None if not found
        """
        with self._lock:
            if base_url in self._clients:
                client = self._clients[base_url]
                if self._is_client_healthy(client):
                    return client
        return None

    def return_connection(self, base_url: str, client: httpx.Client) -> None:
        """Return a connection to the pool.

        Args:
            base_url: Base URL for the connection
            client: HTTP client to return
        """
        with self._lock:
            if base_url not in self._clients:
                self._clients[base_url] = client
                self._last_used[base_url] = time.time()

    def get_async_connection(self, base_url: str) -> Optional[httpx.AsyncClient]:
        """Get an async connection for a specific base URL.

        Args:
            base_url: Base URL for the connection

        Returns:
            Async HTTP client instance or None if not found
        """
        with self._lock:
            if base_url in self._async_clients:
                client = self._async_clients[base_url]
                if self._is_async_client_healthy(client):
                    return client
        return None

    def return_async_connection(self, base_url: str, client: httpx.AsyncClient) -> None:
        """Return an async connection to the pool.

        Args:
            base_url: Base URL for the connection
            client: Async HTTP client to return
        """
        with self._lock:
            if base_url not in self._async_clients:
                self._async_clients[base_url] = client
                self._last_used[base_url] = time.time()

    def close_connection(self, base_url: str) -> None:
        """Close a specific connection.

        Args:
            base_url: Base URL for the connection
        """
        with self._lock:
            if base_url in self._clients:
                try:
                    self._clients[base_url].close()
                except Exception as e:
                    self.logger.warning(f"Failed to close client: {e}")
                finally:
                    del self._clients[base_url]
                    if base_url in self._last_used:
                        del self._last_used[base_url]

    def cleanup(self) -> None:
        """Clean up expired connections."""
        current_time = time.time()

        # First, identify expired URLs while holding the lock
        with self._lock:
            expired_urls = []
            for base_url, last_used in self._last_used.items():
                if current_time - last_used > self.config.keepalive_expiry:
                    expired_urls.append(base_url)

        # Then close expired connections without holding the lock
        for base_url in expired_urls:
            self.close_connection(base_url)

    def close_all(self) -> None:
        """Close all connections in the pool."""
        with self._lock:
            # Close sync clients
            for client in self._clients.values():
                try:
                    client.close()
                except Exception as e:
                    self.logger.warning(f"Failed to close client: {e}")

            # Note: AsyncClient doesn't have close() method
            # They should be closed by the user when done

            self._clients.clear()
            self._async_clients.clear()
            self._last_used.clear()

            self.logger.info("Closed all connections in pool")

    def reset_stats(self) -> None:
        """Reset pool statistics."""
        with self._lock:
            self._stats = {
                "pool_hits": 0,
                "pool_misses": 0,
                "connections_created": 0,
                "connections_reused": 0,
                "total_requests": 0,
            }

    def close_all_clients(self) -> None:
        """Close all clients in the pool (alias for close_all)."""
        self.close_all()

    async def aclose_all_clients(self) -> None:
        """Close all async clients in the pool."""
        with self._lock:
            for client in self._async_clients.values():
                try:
                    await client.aclose()
                except Exception as e:
                    self.logger.warning(f"Error closing async client: {e}")

            self._async_clients.clear()
            # Remove async clients from last_used
            keys_to_remove = [
                k for k, v in self._last_used.items() if k in self._async_clients
            ]
            for key in keys_to_remove:
                del self._last_used[key]

    async def __aenter__(self) -> "ConnectionPool":
        """Async context manager entry."""
        return self

    async def __aexit__(
        self,
        exc_type: Optional[type],
        exc_val: Optional[BaseException],
        exc_tb: Optional[Any],
    ) -> None:
        """Async context manager exit."""
        await self.aclose_all_clients()

    def __enter__(self) -> "ConnectionPool":
        """Context manager entry."""
        return self

    def __exit__(
        self,
        exc_type: Optional[type],
        exc_val: Optional[BaseException],
        exc_tb: Optional[Any],
    ) -> None:
        """Context manager exit."""
        self.close_all()

config instance-attribute

config = config

logger instance-attribute

logger = get_logger(__name__)

max_connections instance-attribute

max_connections = max_connections

max_keepalive instance-attribute

max_keepalive = max_keepalive_connections

max_keepalive_connections instance-attribute

max_keepalive_connections = max_keepalive_connections

keepalive_expiry instance-attribute

keepalive_expiry = keepalive_expiry

retries instance-attribute

retries = retries

timeout instance-attribute

timeout = timeout

pool_timeout instance-attribute

pool_timeout = pool_timeout

active_connections property

active_connections: int

Get number of active connections.

Returns:

Type Description
int

Number of active connections

get_client

get_client(
    base_url: str,
    headers: Optional[Dict[str, str]] = None,
    **kwargs: Any
) -> Client

Get or create an HTTP client from the pool.

Parameters:

Name Type Description Default
base_url str

Base URL for the client

required
headers Optional[Dict[str, str]]

Default headers

None
**kwargs Any

Additional client configuration

{}

Returns:

Type Description
Client

HTTP client instance

Source code in src/honeyhive/utils/connection_pool.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def get_client(
    self, base_url: str, headers: Optional[Dict[str, str]] = None, **kwargs: Any
) -> httpx.Client:
    """Get or create an HTTP client from the pool.

    Args:
        base_url: Base URL for the client
        headers: Default headers
        **kwargs: Additional client configuration

    Returns:
        HTTP client instance
    """
    with self._lock:
        # Check if we have a client for this base URL
        if base_url in self._clients:
            client = self._clients[base_url]
            if self._is_client_healthy(client):
                self._last_used[base_url] = time.time()
                self._stats["pool_hits"] += 1
                self._stats["connections_reused"] += 1
                return client

            # Remove unhealthy client
            del self._clients[base_url]
            if base_url in self._last_used:
                del self._last_used[base_url]

        # Create new client
        self._stats["pool_misses"] += 1
        self._stats["connections_created"] += 1
        self._stats["total_requests"] += 1

        # Remove timeout from kwargs if it exists to avoid duplicate
        client_kwargs = kwargs.copy()
        if "timeout" in client_kwargs:
            del client_kwargs["timeout"]

        client = httpx.Client(
            base_url=base_url,
            headers=headers,
            limits=httpx.Limits(
                max_connections=self.config.max_connections,
                max_keepalive_connections=self.config.max_keepalive_connections,
                keepalive_expiry=self.config.keepalive_expiry,
            ),
            timeout=self.config.timeout,
            **client_kwargs,
        )

        self._clients[base_url] = client
        self._last_used[base_url] = time.time()

        self.logger.debug(f"Created new HTTP client for {base_url}")
        return client

get_async_client

get_async_client(
    base_url: str,
    headers: Optional[Dict[str, str]] = None,
    **kwargs: Any
) -> AsyncClient

Get or create an async HTTP client from the pool.

Parameters:

Name Type Description Default
base_url str

Base URL for the client

required
headers Optional[Dict[str, str]]

Default headers

None
**kwargs Any

Additional client configuration

{}

Returns:

Type Description
AsyncClient

Async HTTP client instance

Source code in src/honeyhive/utils/connection_pool.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def get_async_client(
    self, base_url: str, headers: Optional[Dict[str, str]] = None, **kwargs: Any
) -> httpx.AsyncClient:
    """Get or create an async HTTP client from the pool.

    Args:
        base_url: Base URL for the client
        headers: Default headers
        **kwargs: Additional client configuration

    Returns:
        Async HTTP client instance
    """
    with self._lock:
        # Check if we have a client for this base URL
        if base_url in self._async_clients:
            client = self._async_clients[base_url]
            if self._is_async_client_healthy(client):
                self._last_used[base_url] = time.time()
                self._stats["pool_hits"] += 1
                self._stats["connections_reused"] += 1
                return client

            # Remove unhealthy client
            del self._async_clients[base_url]
            if base_url in self._last_used:
                del self._last_used[base_url]

        # Create new client
        self._stats["pool_misses"] += 1
        self._stats["connections_created"] += 1
        self._stats["total_requests"] += 1

        # Remove timeout from kwargs if it exists to avoid duplicate
        client_kwargs = kwargs.copy()
        if "timeout" in client_kwargs:
            del client_kwargs["timeout"]

        client = httpx.AsyncClient(
            base_url=base_url,
            headers=headers,
            limits=httpx.Limits(
                max_connections=self.config.max_connections,
                max_keepalive_connections=self.config.max_keepalive_connections,
                keepalive_expiry=self.config.keepalive_expiry,
            ),
            timeout=self.config.timeout,
            **client_kwargs,
        )

        self._async_clients[base_url] = client
        self._last_used[base_url] = time.time()

        self.logger.debug(f"Created new async HTTP client for {base_url}")
        return client

cleanup_idle_connections

cleanup_idle_connections(
    max_idle_time: float = 300.0,
) -> None

Clean up idle connections.

Parameters:

Name Type Description Default
max_idle_time float

Maximum idle time in seconds

300.0
Source code in src/honeyhive/utils/connection_pool.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def cleanup_idle_connections(self, max_idle_time: float = 300.0) -> None:
    """Clean up idle connections.

    Args:
        max_idle_time: Maximum idle time in seconds
    """
    current_time = time.time()
    to_remove = []

    with self._lock:
        for base_url, last_used in self._last_used.items():
            if current_time - last_used > max_idle_time:
                to_remove.append(base_url)

        for base_url in to_remove:
            if base_url in self._clients:
                try:
                    self._clients[base_url].close()
                except Exception:
                    pass
                del self._clients[base_url]

            if base_url in self._async_clients:
                try:
                    # Note: AsyncClient doesn't have close() method
                    pass
                except Exception:
                    pass
                del self._async_clients[base_url]

            if base_url in self._last_used:
                del self._last_used[base_url]

            self.logger.debug(f"Cleaned up idle connection for {base_url}")

get_stats

get_stats() -> Dict[str, Any]

Get pool statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary with pool statistics

Source code in src/honeyhive/utils/connection_pool.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def get_stats(self) -> Dict[str, Any]:
    """Get pool statistics.

    Returns:
        Dictionary with pool statistics
    """
    with self._lock:
        stats = self._stats.copy()
        stats.update(
            {
                "active_connections": len(self._clients),
                "active_async_connections": len(self._async_clients),
                "total_connections": len(self._clients) + len(self._async_clients),
            }
        )
        return stats

get_connection

get_connection(base_url: str) -> Optional[Client]

Get a connection for a specific base URL.

Parameters:

Name Type Description Default
base_url str

Base URL for the connection

required

Returns:

Type Description
Optional[Client]

HTTP client instance or None if not found

Source code in src/honeyhive/utils/connection_pool.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def get_connection(self, base_url: str) -> Optional[httpx.Client]:
    """Get a connection for a specific base URL.

    Args:
        base_url: Base URL for the connection

    Returns:
        HTTP client instance or None if not found
    """
    with self._lock:
        if base_url in self._clients:
            client = self._clients[base_url]
            if self._is_client_healthy(client):
                return client
    return None

return_connection

return_connection(base_url: str, client: Client) -> None

Return a connection to the pool.

Parameters:

Name Type Description Default
base_url str

Base URL for the connection

required
client Client

HTTP client to return

required
Source code in src/honeyhive/utils/connection_pool.py
398
399
400
401
402
403
404
405
406
407
408
def return_connection(self, base_url: str, client: httpx.Client) -> None:
    """Return a connection to the pool.

    Args:
        base_url: Base URL for the connection
        client: HTTP client to return
    """
    with self._lock:
        if base_url not in self._clients:
            self._clients[base_url] = client
            self._last_used[base_url] = time.time()

get_async_connection

get_async_connection(
    base_url: str,
) -> Optional[AsyncClient]

Get an async connection for a specific base URL.

Parameters:

Name Type Description Default
base_url str

Base URL for the connection

required

Returns:

Type Description
Optional[AsyncClient]

Async HTTP client instance or None if not found

Source code in src/honeyhive/utils/connection_pool.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
def get_async_connection(self, base_url: str) -> Optional[httpx.AsyncClient]:
    """Get an async connection for a specific base URL.

    Args:
        base_url: Base URL for the connection

    Returns:
        Async HTTP client instance or None if not found
    """
    with self._lock:
        if base_url in self._async_clients:
            client = self._async_clients[base_url]
            if self._is_async_client_healthy(client):
                return client
    return None

return_async_connection

return_async_connection(
    base_url: str, client: AsyncClient
) -> None

Return an async connection to the pool.

Parameters:

Name Type Description Default
base_url str

Base URL for the connection

required
client AsyncClient

Async HTTP client to return

required
Source code in src/honeyhive/utils/connection_pool.py
426
427
428
429
430
431
432
433
434
435
436
def return_async_connection(self, base_url: str, client: httpx.AsyncClient) -> None:
    """Return an async connection to the pool.

    Args:
        base_url: Base URL for the connection
        client: Async HTTP client to return
    """
    with self._lock:
        if base_url not in self._async_clients:
            self._async_clients[base_url] = client
            self._last_used[base_url] = time.time()

close_connection

close_connection(base_url: str) -> None

Close a specific connection.

Parameters:

Name Type Description Default
base_url str

Base URL for the connection

required
Source code in src/honeyhive/utils/connection_pool.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def close_connection(self, base_url: str) -> None:
    """Close a specific connection.

    Args:
        base_url: Base URL for the connection
    """
    with self._lock:
        if base_url in self._clients:
            try:
                self._clients[base_url].close()
            except Exception as e:
                self.logger.warning(f"Failed to close client: {e}")
            finally:
                del self._clients[base_url]
                if base_url in self._last_used:
                    del self._last_used[base_url]

cleanup

cleanup() -> None

Clean up expired connections.

Source code in src/honeyhive/utils/connection_pool.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def cleanup(self) -> None:
    """Clean up expired connections."""
    current_time = time.time()

    # First, identify expired URLs while holding the lock
    with self._lock:
        expired_urls = []
        for base_url, last_used in self._last_used.items():
            if current_time - last_used > self.config.keepalive_expiry:
                expired_urls.append(base_url)

    # Then close expired connections without holding the lock
    for base_url in expired_urls:
        self.close_connection(base_url)

close_all

close_all() -> None

Close all connections in the pool.

Source code in src/honeyhive/utils/connection_pool.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def close_all(self) -> None:
    """Close all connections in the pool."""
    with self._lock:
        # Close sync clients
        for client in self._clients.values():
            try:
                client.close()
            except Exception as e:
                self.logger.warning(f"Failed to close client: {e}")

        # Note: AsyncClient doesn't have close() method
        # They should be closed by the user when done

        self._clients.clear()
        self._async_clients.clear()
        self._last_used.clear()

        self.logger.info("Closed all connections in pool")

reset_stats

reset_stats() -> None

Reset pool statistics.

Source code in src/honeyhive/utils/connection_pool.py
489
490
491
492
493
494
495
496
497
498
def reset_stats(self) -> None:
    """Reset pool statistics."""
    with self._lock:
        self._stats = {
            "pool_hits": 0,
            "pool_misses": 0,
            "connections_created": 0,
            "connections_reused": 0,
            "total_requests": 0,
        }

close_all_clients

close_all_clients() -> None

Close all clients in the pool (alias for close_all).

Source code in src/honeyhive/utils/connection_pool.py
500
501
502
def close_all_clients(self) -> None:
    """Close all clients in the pool (alias for close_all)."""
    self.close_all()

aclose_all_clients async

aclose_all_clients() -> None

Close all async clients in the pool.

Source code in src/honeyhive/utils/connection_pool.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
async def aclose_all_clients(self) -> None:
    """Close all async clients in the pool."""
    with self._lock:
        for client in self._async_clients.values():
            try:
                await client.aclose()
            except Exception as e:
                self.logger.warning(f"Error closing async client: {e}")

        self._async_clients.clear()
        # Remove async clients from last_used
        keys_to_remove = [
            k for k, v in self._last_used.items() if k in self._async_clients
        ]
        for key in keys_to_remove:
            del self._last_used[key]

PoolConfig dataclass

Configuration for connection pool.

Source code in src/honeyhive/utils/connection_pool.py
69
70
71
72
73
74
75
76
77
78
@dataclass
class PoolConfig:
    """Configuration for connection pool."""

    max_connections: int = 100
    max_keepalive_connections: int = 20
    keepalive_expiry: float = 30.0
    retries: int = 3
    timeout: float = 30.0
    pool_timeout: float = 10.0

max_connections class-attribute instance-attribute

max_connections: int = 100

max_keepalive_connections class-attribute instance-attribute

max_keepalive_connections: int = 20

keepalive_expiry class-attribute instance-attribute

keepalive_expiry: float = 30.0

retries class-attribute instance-attribute

retries: int = 3

timeout class-attribute instance-attribute

timeout: float = 30.0

pool_timeout class-attribute instance-attribute

pool_timeout: float = 10.0

DotDict

Bases: dict

Dictionary with dot notation access.

Example

d = DotDict({'foo': {'bar': 'baz'}}) d.foo.bar 'baz' d.foo.bar = 'qux' d['foo']['bar'] 'qux'

Source code in src/honeyhive/utils/dotdict.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class DotDict(dict):
    """Dictionary with dot notation access.

    Example:
        >>> d = DotDict({'foo': {'bar': 'baz'}})
        >>> d.foo.bar
        'baz'
        >>> d.foo.bar = 'qux'
        >>> d['foo']['bar']
        'qux'
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the dotdict."""
        super().__init__(*args, **kwargs)
        # Convert nested dictionaries to dotdict
        for key, value in self.items():
            if isinstance(value, dict):
                self[key] = DotDict(value)

    def __getattr__(self, key: str) -> Any:
        """Get attribute using dot notation."""
        try:
            return self[key]
        except KeyError as exc:
            raise AttributeError(
                f"'{type(self).__name__}' object has no attribute '{key}'"
            ) from exc

    def __setattr__(self, key: str, value: Any) -> None:
        """Set attribute using dot notation."""
        if isinstance(value, dict):
            value = DotDict(value)
        self[key] = value

    def __delattr__(self, key: str) -> None:
        """Delete attribute using dot notation."""
        try:
            del self[key]
        except KeyError as exc:
            raise AttributeError(
                f"'{type(self).__name__}' object has no attribute '{key}'"
            ) from exc

    def __getitem__(self, key: str) -> Any:
        """Get item with dot notation support."""
        if "." in key:
            keys = key.split(".")
            value = self
            for k in keys:
                value = value[k]
            return value
        return super().__getitem__(key)

    def __setitem__(self, key: str, value: Any) -> None:
        """Set item with dot notation support."""
        if "." in key:
            keys = key.split(".")
            target = self
            for k in keys[:-1]:
                if k not in target:
                    target[k] = DotDict()
                target = target[k]
            target[keys[-1]] = value
        else:
            if isinstance(value, dict):
                value = DotDict(value)
            super().__setitem__(key, value)

    def get(self, key: str, default: Any = None) -> Any:
        """Get item with default value, supporting dot notation."""
        try:
            return self[key]
        except (KeyError, AttributeError):
            return default

    def setdefault(self, key: str, default: Any = None) -> Any:
        """Set default value for key, supporting dot notation."""
        if "." in key:
            keys = key.split(".")
            target = self
            for k in keys[:-1]:
                if k not in target:
                    target[k] = DotDict()
                target = target[k]
            if keys[-1] not in target:
                target[keys[-1]] = default
            return target[keys[-1]]

        return super().setdefault(key, default)

    def update(self, other: Any = None, /, **kwargs: Any) -> None:
        """Update dictionary with dot notation support."""
        if other is not None:
            for key, value in other.items():
                self[key] = value
        for key, value in kwargs.items():
            self[key] = value

    def to_dict(self) -> Dict[str, Any]:
        """Convert dotdict back to regular dictionary."""
        result = {}
        for key, value in self.items():
            if isinstance(value, DotDict):
                result[key] = value.to_dict()
            else:
                result[key] = value
        return result

    def copy(self) -> "DotDict":
        """Create a shallow copy."""
        return DotDict(super().copy())

    def deepcopy(self) -> "DotDict":
        """Create a deep copy."""
        return copy.deepcopy(self)

get

get(key: str, default: Any = None) -> Any

Get item with default value, supporting dot notation.

Source code in src/honeyhive/utils/dotdict.py
76
77
78
79
80
81
def get(self, key: str, default: Any = None) -> Any:
    """Get item with default value, supporting dot notation."""
    try:
        return self[key]
    except (KeyError, AttributeError):
        return default

setdefault

setdefault(key: str, default: Any = None) -> Any

Set default value for key, supporting dot notation.

Source code in src/honeyhive/utils/dotdict.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def setdefault(self, key: str, default: Any = None) -> Any:
    """Set default value for key, supporting dot notation."""
    if "." in key:
        keys = key.split(".")
        target = self
        for k in keys[:-1]:
            if k not in target:
                target[k] = DotDict()
            target = target[k]
        if keys[-1] not in target:
            target[keys[-1]] = default
        return target[keys[-1]]

    return super().setdefault(key, default)

update

update(other: Any = None, /, **kwargs: Any) -> None

Update dictionary with dot notation support.

Source code in src/honeyhive/utils/dotdict.py
 98
 99
100
101
102
103
104
def update(self, other: Any = None, /, **kwargs: Any) -> None:
    """Update dictionary with dot notation support."""
    if other is not None:
        for key, value in other.items():
            self[key] = value
    for key, value in kwargs.items():
        self[key] = value

to_dict

to_dict() -> Dict[str, Any]

Convert dotdict back to regular dictionary.

Source code in src/honeyhive/utils/dotdict.py
106
107
108
109
110
111
112
113
114
def to_dict(self) -> Dict[str, Any]:
    """Convert dotdict back to regular dictionary."""
    result = {}
    for key, value in self.items():
        if isinstance(value, DotDict):
            result[key] = value.to_dict()
        else:
            result[key] = value
    return result

copy

copy() -> DotDict

Create a shallow copy.

Source code in src/honeyhive/utils/dotdict.py
116
117
118
def copy(self) -> "DotDict":
    """Create a shallow copy."""
    return DotDict(super().copy())

deepcopy

deepcopy() -> DotDict

Create a deep copy.

Source code in src/honeyhive/utils/dotdict.py
120
121
122
def deepcopy(self) -> "DotDict":
    """Create a deep copy."""
    return copy.deepcopy(self)

APIError

Bases: HoneyHiveError

API-related errors.

Source code in src/honeyhive/utils/error_handler.py
81
82
class APIError(HoneyHiveError):
    """API-related errors."""

AuthenticationError

Bases: HoneyHiveError

Authentication and authorization errors.

Source code in src/honeyhive/utils/error_handler.py
97
98
class AuthenticationError(HoneyHiveError):
    """Authentication and authorization errors."""

ErrorContext dataclass

Context information for error handling.

Source code in src/honeyhive/utils/error_handler.py
14
15
16
17
18
19
20
21
22
23
24
@dataclass
class ErrorContext:
    """Context information for error handling."""

    operation: str
    method: Optional[str] = None
    url: Optional[str] = None
    params: Optional[Dict[str, Any]] = None
    json_data: Optional[Dict[str, Any]] = None
    client_name: Optional[str] = None
    additional_context: Dict[str, Any] = field(default_factory=dict)

operation instance-attribute

operation: str

method class-attribute instance-attribute

method: Optional[str] = None

url class-attribute instance-attribute

url: Optional[str] = None

params class-attribute instance-attribute

params: Optional[Dict[str, Any]] = None

json_data class-attribute instance-attribute

json_data: Optional[Dict[str, Any]] = None

client_name class-attribute instance-attribute

client_name: Optional[str] = None

additional_context class-attribute instance-attribute

additional_context: Dict[str, Any] = field(
    default_factory=dict
)

ErrorHandler

Standardized error handling middleware.

This class provides a single public method for error handling, which is appropriate for its focused responsibility.

Source code in src/honeyhive/utils/error_handler.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
class ErrorHandler:  # pylint: disable=too-few-public-methods
    """Standardized error handling middleware.

    This class provides a single public method for error handling,
    which is appropriate for its focused responsibility.
    """

    def __init__(self, logger_name: str = "honeyhive.error_handler"):
        """Initialize error handler.

        Args:
            logger_name: Name for the logger instance
        """
        self.logger = get_logger(logger_name)
        self._error_handlers: Dict[
            Type[Exception], Callable[[Any, ErrorContext], ErrorResponse]
        ] = {
            httpx.ConnectError: self._handle_connection_error,
            httpx.ConnectTimeout: self._handle_connection_error,
            httpx.ReadTimeout: self._handle_connection_error,
            httpx.WriteTimeout: self._handle_connection_error,
            httpx.PoolTimeout: self._handle_connection_error,
            httpx.HTTPStatusError: self._handle_http_error,
            httpx.RequestError: self._handle_request_error,
            ValueError: self._handle_validation_error,
            TypeError: self._handle_validation_error,
            KeyError: self._handle_validation_error,
            json.JSONDecodeError: self._handle_json_error,
        }

    @contextmanager
    def handle_operation(
        self,
        context: ErrorContext,
        raise_on_error: bool = True,
        return_error_response: bool = False,
    ) -> Generator[None, None, None]:
        """Context manager for handling operations with standardized error handling.

        Args:
            context: Error context information
            raise_on_error: Whether to raise exceptions or return error responses
            return_error_response: Whether to return ErrorResponse objects \
                instead of raising

        Yields:
            None

        Raises:
            HoneyHiveError: If raise_on_error is True and an error occurs
        """
        try:
            yield
        except Exception as e:
            error_response = self._process_error(e, context)

            # Log the error
            self._log_error(error_response, e)

            if return_error_response:
                # Return the error response instead of raising
                # This is handled by the calling code
                return

            if raise_on_error:
                # Convert to appropriate HoneyHive exception
                honeyhive_error = self._create_honeyhive_error(error_response, e)
                raise honeyhive_error from e

    def _process_error(
        self, exception: Exception, context: ErrorContext
    ) -> ErrorResponse:
        """Process an exception and create a standardized error response.

        Args:
            exception: The exception that occurred
            context: Context information

        Returns:
            Standardized error response
        """
        # Try to find a specific handler for this exception type
        for exc_type, handler in self._error_handlers.items():
            if isinstance(exception, exc_type):
                return handler(exception, context)

        # Default handler for unknown exceptions
        return self._handle_unknown_error(exception, context)

    def _handle_connection_error(
        self, exception: Exception, context: ErrorContext
    ) -> ErrorResponse:
        """Handle connection-related errors."""
        return ErrorResponse(
            error_type="ConnectionError",
            error_message=f"Connection failed: {str(exception)}",
            error_code="CONNECTION_FAILED",
            details={
                "operation": context.operation,
                "url": context.url,
                "exception_type": type(exception).__name__,
            },
            context=context,
            retry_after=1.0,  # Suggest retry after 1 second
        )

    def _handle_http_error(
        self, exception: httpx.HTTPStatusError, context: ErrorContext
    ) -> ErrorResponse:
        """Handle HTTP status errors."""
        response = exception.response

        # Try to parse error details from response
        details = {"operation": context.operation, "url": context.url}
        try:
            if response.headers.get("content-type", "").startswith("application/json"):
                error_data = response.json()
                details.update(error_data)
        except Exception:
            # If we can't parse the response, include the raw text
            details["response_text"] = response.text

        # Determine error type based on status code
        error_type = "APIError"
        error_code = f"HTTP_{response.status_code}"

        if response.status_code == 401:
            error_type = "AuthenticationError"
            error_code = "UNAUTHORIZED"
        elif response.status_code == 403:
            error_type = "AuthenticationError"
            error_code = "FORBIDDEN"
        elif response.status_code == 429:
            error_type = "RateLimitError"
            error_code = "RATE_LIMITED"
        elif response.status_code >= 500:
            error_type = "APIError"
            error_code = "SERVER_ERROR"
        elif response.status_code >= 400:
            error_type = "APIError"
            error_code = "CLIENT_ERROR"

        # Extract retry-after header if present
        retry_after = None
        if "retry-after" in response.headers:
            try:
                retry_after = float(response.headers["retry-after"])
            except ValueError:
                pass

        return ErrorResponse(
            error_type=error_type,
            error_message=f"HTTP {response.status_code}: {response.reason_phrase}",
            error_code=error_code,
            status_code=response.status_code,
            details=details,
            context=context,
            retry_after=retry_after,
        )

    def _handle_request_error(
        self, exception: httpx.RequestError, context: ErrorContext
    ) -> ErrorResponse:
        """Handle general request errors."""
        return ErrorResponse(
            error_type="RequestError",
            error_message=f"Request failed: {str(exception)}",
            error_code="REQUEST_FAILED",
            details={
                "operation": context.operation,
                "url": context.url,
                "exception_type": type(exception).__name__,
            },
            context=context,
            retry_after=1.0,
        )

    def _handle_validation_error(
        self, exception: Exception, context: ErrorContext
    ) -> ErrorResponse:
        """Handle validation errors (ValueError, TypeError, KeyError)."""
        return ErrorResponse(
            error_type="ValidationError",
            error_message=f"Validation failed: {str(exception)}",
            error_code="VALIDATION_FAILED",
            details={
                "operation": context.operation,
                "exception_type": type(exception).__name__,
                "params": context.params,
                "json_data": context.json_data,
            },
            context=context,
        )

    def _handle_json_error(
        self, exception: json.JSONDecodeError, context: ErrorContext
    ) -> ErrorResponse:
        """Handle JSON decode errors."""
        return ErrorResponse(
            error_type="JSONError",
            error_message=f"Failed to parse JSON response: {str(exception)}",
            error_code="JSON_PARSE_FAILED",
            details={
                "operation": context.operation,
                "url": context.url,
                "exception_type": type(exception).__name__,
                "json_position": exception.pos if hasattr(exception, "pos") else None,
            },
            context=context,
        )

    def _handle_unknown_error(
        self, exception: Exception, context: ErrorContext
    ) -> ErrorResponse:
        """Handle unknown/unexpected errors."""
        return ErrorResponse(
            error_type="UnknownError",
            error_message=f"Unexpected error: {str(exception)}",
            error_code="UNKNOWN_ERROR",
            details={
                "operation": context.operation,
                "exception_type": type(exception).__name__,
                "traceback": traceback.format_exc(),
            },
            context=context,
        )

    def _create_honeyhive_error(
        self, error_response: ErrorResponse, original_exception: Exception
    ) -> HoneyHiveError:
        """Create appropriate HoneyHive exception from error response.

        Args:
            error_response: Standardized error response
            original_exception: Original exception

        Returns:
            Appropriate HoneyHive exception
        """
        message = error_response.error_message

        if error_response.error_type == "ConnectionError":
            return HoneyHiveConnectionError(message, error_response, original_exception)
        if error_response.error_type == "AuthenticationError":
            return AuthenticationError(message, error_response, original_exception)
        if error_response.error_type == "RateLimitError":
            return RateLimitError(message, error_response, original_exception)
        if error_response.error_type == "ValidationError":
            return ValidationError(message, error_response, original_exception)
        if error_response.error_type in ("APIError", "RequestError", "JSONError"):
            return APIError(message, error_response, original_exception)
        return HoneyHiveError(message, error_response, original_exception)

    def _log_error(self, error_response: ErrorResponse, exception: Exception) -> None:
        """Log error details.

        Args:
            error_response: Standardized error response
            exception: Original exception
        """
        log_data: Dict[str, Any] = {
            "error_type": error_response.error_type,
            "error_code": error_response.error_code,
            "error_message": error_response.error_message,
            "operation": (
                error_response.context.operation if error_response.context else None
            ),
            "method": error_response.context.method if error_response.context else None,
            "url": error_response.context.url if error_response.context else None,
            "status_code": error_response.status_code,
            "exception_type": type(exception).__name__,
        }

        if error_response.details:
            log_data["details"] = error_response.details

        # Log at appropriate level based on error type
        if error_response.error_type in ("RateLimitError", "ConnectionError"):
            self.logger.warning("API operation failed", honeyhive_data=log_data)
        elif error_response.error_type == "ValidationError":
            self.logger.error("Validation error", honeyhive_data=log_data)
        else:
            self.logger.error("API error", honeyhive_data=log_data)

logger instance-attribute

logger = get_logger(logger_name)

handle_operation

handle_operation(
    context: ErrorContext,
    raise_on_error: bool = True,
    return_error_response: bool = False,
) -> Generator[None, None, None]

Context manager for handling operations with standardized error handling.

Parameters:

Name Type Description Default
context ErrorContext

Error context information

required
raise_on_error bool

Whether to raise exceptions or return error responses

True
return_error_response bool

Whether to return ErrorResponse objects instead of raising

False

Yields:

Type Description
None

None

Raises:

Type Description
HoneyHiveError

If raise_on_error is True and an error occurs

Source code in src/honeyhive/utils/error_handler.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@contextmanager
def handle_operation(
    self,
    context: ErrorContext,
    raise_on_error: bool = True,
    return_error_response: bool = False,
) -> Generator[None, None, None]:
    """Context manager for handling operations with standardized error handling.

    Args:
        context: Error context information
        raise_on_error: Whether to raise exceptions or return error responses
        return_error_response: Whether to return ErrorResponse objects \
            instead of raising

    Yields:
        None

    Raises:
        HoneyHiveError: If raise_on_error is True and an error occurs
    """
    try:
        yield
    except Exception as e:
        error_response = self._process_error(e, context)

        # Log the error
        self._log_error(error_response, e)

        if return_error_response:
            # Return the error response instead of raising
            # This is handled by the calling code
            return

        if raise_on_error:
            # Convert to appropriate HoneyHive exception
            honeyhive_error = self._create_honeyhive_error(error_response, e)
            raise honeyhive_error from e

ErrorResponse dataclass

Standardized error response.

Source code in src/honeyhive/utils/error_handler.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@dataclass
class ErrorResponse:
    """Standardized error response."""

    success: bool = False
    error_type: str = "UnknownError"
    error_message: str = "An unknown error occurred"
    error_code: Optional[str] = None
    status_code: Optional[int] = None
    details: Optional[Dict[str, Any]] = None
    context: Optional[ErrorContext] = None
    retry_after: Optional[float] = None

    def to_dict(self) -> Dict[str, Any]:
        """Convert error response to dictionary."""
        result = {
            "success": self.success,
            "error_type": self.error_type,
            "error_message": self.error_message,
        }

        if self.error_code:
            result["error_code"] = self.error_code
        if self.status_code:
            result["status_code"] = self.status_code
        if self.details:
            result["details"] = self.details
        if self.retry_after:
            result["retry_after"] = self.retry_after

        return result

success class-attribute instance-attribute

success: bool = False

error_type class-attribute instance-attribute

error_type: str = 'UnknownError'

error_message class-attribute instance-attribute

error_message: str = 'An unknown error occurred'

error_code class-attribute instance-attribute

error_code: Optional[str] = None

status_code class-attribute instance-attribute

status_code: Optional[int] = None

details class-attribute instance-attribute

details: Optional[Dict[str, Any]] = None

context class-attribute instance-attribute

context: Optional[ErrorContext] = None

retry_after class-attribute instance-attribute

retry_after: Optional[float] = None

to_dict

to_dict() -> Dict[str, Any]

Convert error response to dictionary.

Source code in src/honeyhive/utils/error_handler.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def to_dict(self) -> Dict[str, Any]:
    """Convert error response to dictionary."""
    result = {
        "success": self.success,
        "error_type": self.error_type,
        "error_message": self.error_message,
    }

    if self.error_code:
        result["error_code"] = self.error_code
    if self.status_code:
        result["status_code"] = self.status_code
    if self.details:
        result["details"] = self.details
    if self.retry_after:
        result["retry_after"] = self.retry_after

    return result

HoneyHiveError

Bases: Exception

Base exception for HoneyHive errors.

Source code in src/honeyhive/utils/error_handler.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class HoneyHiveError(Exception):
    """Base exception for HoneyHive errors."""

    def __init__(
        self,
        message: str,
        error_response: Optional[ErrorResponse] = None,
        original_exception: Optional[Exception] = None,
    ):
        """Initialize HoneyHive error.

        Args:
            message: Error message
            error_response: Structured error response
            original_exception: Original exception that caused this error
        """
        super().__init__(message)
        self.error_response = error_response
        self.original_exception = original_exception

error_response instance-attribute

error_response = error_response

original_exception instance-attribute

original_exception = original_exception

RateLimitError

Bases: HoneyHiveError

Rate limiting errors.

Source code in src/honeyhive/utils/error_handler.py
93
94
class RateLimitError(HoneyHiveError):
    """Rate limiting errors."""

ValidationError

Bases: HoneyHiveError

Data validation errors.

Source code in src/honeyhive/utils/error_handler.py
85
86
class ValidationError(HoneyHiveError):
    """Data validation errors."""

HoneyHiveFormatter

Bases: Formatter

Custom formatter for HoneyHive logs.

Provides structured JSON logging with configurable fields including timestamps, log levels, and HoneyHive-specific data.

Source code in src/honeyhive/utils/logger.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class HoneyHiveFormatter(logging.Formatter):
    """Custom formatter for HoneyHive logs.

    Provides structured JSON logging with configurable fields
    including timestamps, log levels, and HoneyHive-specific data.
    """

    def __init__(
        self, include_timestamp: bool = True, include_level: bool = True
    ) -> None:
        """Initialize the formatter.

        Args:
            include_timestamp: Whether to include timestamp in log output
            include_level: Whether to include log level in log output
        """
        super().__init__()
        self.include_timestamp = include_timestamp
        self.include_level = include_level

    def format(self, record: logging.LogRecord) -> str:
        """Format log record with HoneyHive structure.

        Args:
            record: Log record to format

        Returns:
            JSON-formatted log string
        """
        log_data = {
            "timestamp": (
                datetime.now(timezone.utc).isoformat()
                if self.include_timestamp
                else None
            ),
            "level": record.levelname if self.include_level else None,
            "logger": record.name,
            "message": record.getMessage(),
        }

        # Add extra fields if present
        if hasattr(record, "honeyhive_data"):
            log_data.update(getattr(record, "honeyhive_data", {}))

        # Add exception info if present
        if record.exc_info:
            log_data["exception"] = self.formatException(record.exc_info)

        # Remove None values
        log_data = {k: v for k, v in log_data.items() if v is not None}

        return json.dumps(log_data, default=str)

include_timestamp instance-attribute

include_timestamp = include_timestamp

include_level instance-attribute

include_level = include_level

format

format(record: LogRecord) -> str

Format log record with HoneyHive structure.

Parameters:

Name Type Description Default
record LogRecord

Log record to format

required

Returns:

Type Description
str

JSON-formatted log string

Source code in src/honeyhive/utils/logger.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def format(self, record: logging.LogRecord) -> str:
    """Format log record with HoneyHive structure.

    Args:
        record: Log record to format

    Returns:
        JSON-formatted log string
    """
    log_data = {
        "timestamp": (
            datetime.now(timezone.utc).isoformat()
            if self.include_timestamp
            else None
        ),
        "level": record.levelname if self.include_level else None,
        "logger": record.name,
        "message": record.getMessage(),
    }

    # Add extra fields if present
    if hasattr(record, "honeyhive_data"):
        log_data.update(getattr(record, "honeyhive_data", {}))

    # Add exception info if present
    if record.exc_info:
        log_data["exception"] = self.formatException(record.exc_info)

    # Remove None values
    log_data = {k: v for k, v in log_data.items() if v is not None}

    return json.dumps(log_data, default=str)

HoneyHiveLogger

HoneyHive logger with structured logging.

Provides a structured logging interface with HoneyHive-specific formatting and context data support. Uses per-instance configuration instead of global config for multi-instance architecture.

Source code in src/honeyhive/utils/logger.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
class HoneyHiveLogger:
    """HoneyHive logger with structured logging.

    Provides a structured logging interface with HoneyHive-specific
    formatting and context data support. Uses per-instance configuration
    instead of global config for multi-instance architecture.
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        name: str,
        *,
        level: Optional[Union[str, int]] = None,
        formatter: Optional[logging.Formatter] = None,
        handler: Optional[logging.Handler] = None,
        verbose: Optional[bool] = None,
    ):
        """Initialize the logger.

        Note: too-many-positional-arguments disabled - Logger class requires multiple
        configuration parameters (name, level, formatter, handler, verbose) for
        proper initialization and flexibility.

        Args:
            name: Logger name
            level: Log level (string or integer)
            formatter: Custom formatter to use
            handler: Custom handler to use
            verbose: Whether to enable debug logging (overrides level if provided)
        """
        self.logger = logging.getLogger(name)
        self.verbose = verbose

        # Dynamic level determination with verbose parameter priority
        effective_level = self._determine_log_level_dynamically(level, verbose)
        self.logger.setLevel(effective_level)

        # Add handler if not already present
        if not self.logger.handlers:
            if handler is None:
                handler = logging.StreamHandler(sys.stdout)
                if formatter is None:
                    formatter = HoneyHiveFormatter()
                handler.setFormatter(formatter)
            self.logger.addHandler(handler)

        # Prevent propagation to root logger
        self.logger.propagate = False

    def _determine_log_level_dynamically(
        self, level: Optional[Union[str, int]], verbose: Optional[bool]
    ) -> int:
        """Dynamically determine the appropriate log level.

        Uses dynamic logic to prioritize:
        1. Explicit level parameter
        2. Verbose parameter (True = DEBUG, False = WARNING)
        3. Default to WARNING

        Args:
            level: Explicit log level
            verbose: Verbose flag from tracer

        Returns:
            Resolved log level as integer
        """
        # Priority 1: Explicit level parameter
        if level is not None:
            if isinstance(level, str):
                return getattr(logging, level.upper(), logging.WARNING)
            if isinstance(level, int):
                return level

        # Priority 2: Verbose parameter from tracer
        if verbose is True:
            return logging.DEBUG
        if verbose is False:
            return logging.WARNING

        # Priority 3: Default to WARNING (suppress INFO/DEBUG, show
        # WARNING/ERROR/CRITICAL)
        return logging.WARNING

    def update_verbose_setting(self, verbose: bool) -> None:
        """Dynamically update the logger's verbose setting.

        This allows the tracer to update the logger's level
        after initialization based on configuration changes.

        Args:
            verbose: New verbose setting
        """
        self.verbose = verbose
        new_level = logging.DEBUG if verbose else logging.WARNING
        self.logger.setLevel(new_level)

    def _log_with_context(
        self,
        level: int,
        message: str,
        args: tuple = (),
        honeyhive_data: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        """Log with HoneyHive context data and lazy formatting support.

        Args:
            level: Log level
            message: Log message format string
            args: Arguments for lazy string formatting
            honeyhive_data: Additional HoneyHive context data
            **kwargs: Additional logging parameters
        """
        extra = kwargs.copy()
        if honeyhive_data:
            extra["honeyhive_data"] = honeyhive_data

        # Use Python's standard logging lazy formatting
        self.logger.log(level, message, *args, extra=extra)

    def debug(
        self,
        message: str,
        *args: Any,
        honeyhive_data: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        """Log debug message with lazy formatting support.

        Args:
            message: Debug message format string (supports % formatting)
            *args: Arguments for lazy string formatting
            honeyhive_data: Additional HoneyHive context data
            **kwargs: Additional logging parameters
        """
        self._log_with_context(logging.DEBUG, message, args, honeyhive_data, **kwargs)

    def info(
        self,
        message: str,
        *args: Any,
        honeyhive_data: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        """Log info message with lazy formatting support."""
        self._log_with_context(logging.INFO, message, args, honeyhive_data, **kwargs)

    def warning(
        self,
        message: str,
        *args: Any,
        honeyhive_data: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        """Log warning message with lazy formatting support."""
        self._log_with_context(logging.WARNING, message, args, honeyhive_data, **kwargs)

    def error(
        self,
        message: str,
        *args: Any,
        honeyhive_data: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        """Log error message with lazy formatting support."""
        self._log_with_context(logging.ERROR, message, args, honeyhive_data, **kwargs)

    def critical(
        self,
        message: str,
        *args: Any,
        honeyhive_data: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        """Log critical message with lazy formatting support."""
        self._log_with_context(
            logging.CRITICAL, message, args, honeyhive_data, **kwargs
        )

    def exception(
        self,
        message: str,
        *args: Any,
        honeyhive_data: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        """Log exception message with traceback and lazy formatting support."""
        extra = kwargs.copy()
        if honeyhive_data:
            extra["honeyhive_data"] = honeyhive_data

        self.logger.exception(message, *args, extra=extra)

logger instance-attribute

logger = getLogger(name)

verbose instance-attribute

verbose = verbose

update_verbose_setting

update_verbose_setting(verbose: bool) -> None

Dynamically update the logger's verbose setting.

This allows the tracer to update the logger's level after initialization based on configuration changes.

Parameters:

Name Type Description Default
verbose bool

New verbose setting

required
Source code in src/honeyhive/utils/logger.py
213
214
215
216
217
218
219
220
221
222
223
224
def update_verbose_setting(self, verbose: bool) -> None:
    """Dynamically update the logger's verbose setting.

    This allows the tracer to update the logger's level
    after initialization based on configuration changes.

    Args:
        verbose: New verbose setting
    """
    self.verbose = verbose
    new_level = logging.DEBUG if verbose else logging.WARNING
    self.logger.setLevel(new_level)

debug

debug(
    message: str,
    *args: Any,
    honeyhive_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any
) -> None

Log debug message with lazy formatting support.

Parameters:

Name Type Description Default
message str

Debug message format string (supports % formatting)

required
*args Any

Arguments for lazy string formatting

()
honeyhive_data Optional[Dict[str, Any]]

Additional HoneyHive context data

None
**kwargs Any

Additional logging parameters

{}
Source code in src/honeyhive/utils/logger.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def debug(
    self,
    message: str,
    *args: Any,
    honeyhive_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> None:
    """Log debug message with lazy formatting support.

    Args:
        message: Debug message format string (supports % formatting)
        *args: Arguments for lazy string formatting
        honeyhive_data: Additional HoneyHive context data
        **kwargs: Additional logging parameters
    """
    self._log_with_context(logging.DEBUG, message, args, honeyhive_data, **kwargs)

info

info(
    message: str,
    *args: Any,
    honeyhive_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any
) -> None

Log info message with lazy formatting support.

Source code in src/honeyhive/utils/logger.py
267
268
269
270
271
272
273
274
275
def info(
    self,
    message: str,
    *args: Any,
    honeyhive_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> None:
    """Log info message with lazy formatting support."""
    self._log_with_context(logging.INFO, message, args, honeyhive_data, **kwargs)

warning

warning(
    message: str,
    *args: Any,
    honeyhive_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any
) -> None

Log warning message with lazy formatting support.

Source code in src/honeyhive/utils/logger.py
277
278
279
280
281
282
283
284
285
def warning(
    self,
    message: str,
    *args: Any,
    honeyhive_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> None:
    """Log warning message with lazy formatting support."""
    self._log_with_context(logging.WARNING, message, args, honeyhive_data, **kwargs)

error

error(
    message: str,
    *args: Any,
    honeyhive_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any
) -> None

Log error message with lazy formatting support.

Source code in src/honeyhive/utils/logger.py
287
288
289
290
291
292
293
294
295
def error(
    self,
    message: str,
    *args: Any,
    honeyhive_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> None:
    """Log error message with lazy formatting support."""
    self._log_with_context(logging.ERROR, message, args, honeyhive_data, **kwargs)

critical

critical(
    message: str,
    *args: Any,
    honeyhive_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any
) -> None

Log critical message with lazy formatting support.

Source code in src/honeyhive/utils/logger.py
297
298
299
300
301
302
303
304
305
306
307
def critical(
    self,
    message: str,
    *args: Any,
    honeyhive_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> None:
    """Log critical message with lazy formatting support."""
    self._log_with_context(
        logging.CRITICAL, message, args, honeyhive_data, **kwargs
    )

exception

exception(
    message: str,
    *args: Any,
    honeyhive_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any
) -> None

Log exception message with traceback and lazy formatting support.

Source code in src/honeyhive/utils/logger.py
309
310
311
312
313
314
315
316
317
318
319
320
321
def exception(
    self,
    message: str,
    *args: Any,
    honeyhive_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> None:
    """Log exception message with traceback and lazy formatting support."""
    extra = kwargs.copy()
    if honeyhive_data:
        extra["honeyhive_data"] = honeyhive_data

    self.logger.exception(message, *args, extra=extra)

BackoffStrategy dataclass

Backoff strategy for retries.

Source code in src/honeyhive/utils/retry.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@dataclass
class BackoffStrategy:
    """Backoff strategy for retries."""

    initial_delay: float = 1.0
    max_delay: float = 60.0
    multiplier: float = 2.0
    jitter: float = 0.1

    def get_delay(self, attempt: int) -> float:
        """Calculate delay for the given attempt."""
        if attempt == 0:
            return 0

        # Exponential backoff with jitter
        delay = min(
            self.initial_delay * (self.multiplier ** (attempt - 1)), self.max_delay
        )

        # Add jitter to prevent thundering herd
        if self.jitter > 0:
            jitter_amount = delay * self.jitter
            delay += random.uniform(-jitter_amount, jitter_amount)

        return max(0, delay)

initial_delay class-attribute instance-attribute

initial_delay: float = 1.0

max_delay class-attribute instance-attribute

max_delay: float = 60.0

multiplier class-attribute instance-attribute

multiplier: float = 2.0

jitter class-attribute instance-attribute

jitter: float = 0.1

get_delay

get_delay(attempt: int) -> float

Calculate delay for the given attempt.

Source code in src/honeyhive/utils/retry.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def get_delay(self, attempt: int) -> float:
    """Calculate delay for the given attempt."""
    if attempt == 0:
        return 0

    # Exponential backoff with jitter
    delay = min(
        self.initial_delay * (self.multiplier ** (attempt - 1)), self.max_delay
    )

    # Add jitter to prevent thundering herd
    if self.jitter > 0:
        jitter_amount = delay * self.jitter
        delay += random.uniform(-jitter_amount, jitter_amount)

    return max(0, delay)

RetryConfig dataclass

Configuration for retry behavior.

Source code in src/honeyhive/utils/retry.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
@dataclass
class RetryConfig:
    """Configuration for retry behavior."""

    strategy: str = "exponential"  # "exponential", "linear", "constant"
    backoff_strategy: Optional[BackoffStrategy] = None
    max_retries: int = 3
    retry_on_status_codes: Optional[set] = None

    def __post_init__(self) -> None:
        """Initialize default values."""
        if self.backoff_strategy is None:
            self.backoff_strategy = BackoffStrategy()

        if self.retry_on_status_codes is None:
            self.retry_on_status_codes = {408, 429, 500, 502, 503, 504}

    @classmethod
    def default(cls) -> "RetryConfig":
        """Create a default retry configuration."""
        return cls()

    @classmethod
    def exponential(
        cls,
        initial_delay: float = 1.0,
        max_delay: float = 60.0,
        multiplier: float = 2.0,
        max_retries: int = 3,
    ) -> "RetryConfig":
        """Create an exponential backoff retry configuration."""
        backoff = BackoffStrategy(
            initial_delay=initial_delay,
            max_delay=max_delay,
            multiplier=multiplier,
        )
        return cls(
            strategy="exponential",
            backoff_strategy=backoff,
            max_retries=max_retries,
        )

    @classmethod
    def linear(
        cls,
        delay: float = 1.0,
        max_retries: int = 3,
    ) -> "RetryConfig":
        """Create a linear backoff retry configuration."""
        backoff = BackoffStrategy(
            initial_delay=delay,
            max_delay=delay,
            multiplier=1.0,
        )
        return cls(
            strategy="linear",
            backoff_strategy=backoff,
            max_retries=max_retries,
        )

    @classmethod
    def constant(
        cls,
        delay: float = 1.0,
        max_retries: int = 3,
    ) -> "RetryConfig":
        """Create a constant delay retry configuration."""
        backoff = BackoffStrategy(
            initial_delay=delay,
            max_delay=delay,
            multiplier=1.0,
        )
        return cls(
            strategy="constant",
            backoff_strategy=backoff,
            max_retries=max_retries,
        )

    def should_retry(self, response: httpx.Response) -> bool:
        """Determine if a response should be retried."""
        # Check status code
        if (
            self.retry_on_status_codes
            and response.status_code in self.retry_on_status_codes
        ):
            return True

        # Check for connection errors
        if response.status_code == 0:  # Connection error
            return True

        return False

    def should_retry_exception(self, exc: Exception) -> bool:
        """Determine if an exception should be retried."""
        # Retry on connection errors
        if isinstance(
            exc,
            (
                httpx.ConnectError,
                httpx.ConnectTimeout,
                httpx.ReadTimeout,
                httpx.WriteTimeout,
                httpx.PoolTimeout,
            ),
        ):
            return True

        # Retry on HTTP errors that are retryable
        if isinstance(exc, httpx.HTTPStatusError):
            return bool(
                self.retry_on_status_codes
                and exc.response.status_code in self.retry_on_status_codes
            )

        return False

    def _raise_for_failure(
        self,
        operation: str,
        last_response: Optional[httpx.Response],
        last_exception: Optional[Exception],
    ) -> NoReturn:
        """Raise an appropriate error after all retries are exhausted.

        Args:
            operation: Name of the operation that failed (for error messages).
            last_response: The last HTTP response received (if any).
            last_exception: The last exception caught (if any).
        """
        if last_response is not None:
            raise APIError(
                f"{operation} failed after {self.max_retries + 1} attempts "
                f"with status code: {last_response.status_code}, "
                f"response: {last_response.text}",
                error_response=ErrorResponse(
                    error_type="APIError",
                    error_message=last_response.text,
                    status_code=last_response.status_code,
                ),
            )
        if last_exception is not None:
            raise last_exception
        # Should never happen — guard against unbound response after the loop.
        raise RuntimeError(f"{operation} retry loop exited unexpectedly")

    @staticmethod
    def _raise_non_retryable(operation: str, response: httpx.Response) -> NoReturn:
        """Raise an APIError for a non-retryable HTTP error.

        Args:
            operation: Name of the operation that failed.
            response: The failed HTTP response.
        """
        raise APIError(
            f"{operation} failed with status code: {response.status_code}, "
            f"response: {response.text}",
            error_response=ErrorResponse(
                error_type="APIError",
                error_message=response.text,
                status_code=response.status_code,
            ),
        )

    def execute(
        self,
        request_fn: Callable[[], httpx.Response],
        operation: str = "request",
    ) -> httpx.Response:
        """Execute a sync HTTP request with retries.

        Args:
            request_fn: A callable that performs the HTTP request and returns
                an httpx.Response.
            operation: Name of the operation (used in error messages).

        Returns:
            The successful httpx.Response.

        Raises:
            APIError: On non-retryable errors or after all retries exhausted.
        """
        last_exception: Optional[Exception] = None
        last_response: Optional[httpx.Response] = None

        for attempt in range(self.max_retries + 1):
            try:
                response = request_fn()

                if response.status_code == 200:
                    return response

                # Check if we should retry this status code
                if self.should_retry(response):
                    last_response = response
                    if attempt < self.max_retries:
                        delay = self.backoff_strategy.get_delay(attempt + 1)
                        time.sleep(delay)
                        continue
                    # Last attempt exhausted — break to _raise_for_failure
                    break

                # Non-retryable error — raise immediately
                self._raise_non_retryable(operation, response)

            except httpx.HTTPError as e:
                if self.should_retry_exception(e):
                    last_exception = e
                    if attempt < self.max_retries:
                        delay = self.backoff_strategy.get_delay(attempt + 1)
                        time.sleep(delay)
                        continue
                    # Last attempt exhausted — break to _raise_for_failure
                    break
                raise

        # All retries exhausted (loop finished without return/raise)
        self._raise_for_failure(operation, last_response, last_exception)

    async def execute_async(
        self,
        request_fn: Callable[[], Awaitable[httpx.Response]],
        operation: str = "request",
    ) -> httpx.Response:
        """Execute an async HTTP request with retries.

        Same contract as execute(), but awaits the request_fn and uses
        asyncio.sleep for backoff delays.

        Args:
            request_fn: An async callable that performs the HTTP request and
                returns an httpx.Response.
            operation: Name of the operation (used in error messages).

        Returns:
            The successful httpx.Response.

        Raises:
            APIError: On non-retryable errors or after all retries exhausted.
        """
        last_exception: Optional[Exception] = None
        last_response: Optional[httpx.Response] = None

        for attempt in range(self.max_retries + 1):
            try:
                response = await request_fn()

                if response.status_code == 200:
                    return response

                # Check if we should retry this status code
                if self.should_retry(response):
                    last_response = response
                    if attempt < self.max_retries:
                        delay = self.backoff_strategy.get_delay(attempt + 1)
                        await asyncio.sleep(delay)
                        continue
                    # Last attempt exhausted — break to _raise_for_failure
                    break

                # Non-retryable error — raise immediately
                self._raise_non_retryable(operation, response)

            except httpx.HTTPError as e:
                if self.should_retry_exception(e):
                    last_exception = e
                    if attempt < self.max_retries:
                        delay = self.backoff_strategy.get_delay(attempt + 1)
                        await asyncio.sleep(delay)
                        continue
                    # Last attempt exhausted — break to _raise_for_failure
                    break
                raise

        # All retries exhausted (loop finished without return/raise)
        self._raise_for_failure(operation, last_response, last_exception)

strategy class-attribute instance-attribute

strategy: str = 'exponential'

backoff_strategy class-attribute instance-attribute

backoff_strategy: Optional[BackoffStrategy] = None

max_retries class-attribute instance-attribute

max_retries: int = 3

retry_on_status_codes class-attribute instance-attribute

retry_on_status_codes: Optional[set] = None

default classmethod

default() -> RetryConfig

Create a default retry configuration.

Source code in src/honeyhive/utils/retry.py
60
61
62
63
@classmethod
def default(cls) -> "RetryConfig":
    """Create a default retry configuration."""
    return cls()

exponential classmethod

exponential(
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    multiplier: float = 2.0,
    max_retries: int = 3,
) -> RetryConfig

Create an exponential backoff retry configuration.

Source code in src/honeyhive/utils/retry.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@classmethod
def exponential(
    cls,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    multiplier: float = 2.0,
    max_retries: int = 3,
) -> "RetryConfig":
    """Create an exponential backoff retry configuration."""
    backoff = BackoffStrategy(
        initial_delay=initial_delay,
        max_delay=max_delay,
        multiplier=multiplier,
    )
    return cls(
        strategy="exponential",
        backoff_strategy=backoff,
        max_retries=max_retries,
    )

linear classmethod

linear(
    delay: float = 1.0, max_retries: int = 3
) -> RetryConfig

Create a linear backoff retry configuration.

Source code in src/honeyhive/utils/retry.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@classmethod
def linear(
    cls,
    delay: float = 1.0,
    max_retries: int = 3,
) -> "RetryConfig":
    """Create a linear backoff retry configuration."""
    backoff = BackoffStrategy(
        initial_delay=delay,
        max_delay=delay,
        multiplier=1.0,
    )
    return cls(
        strategy="linear",
        backoff_strategy=backoff,
        max_retries=max_retries,
    )

constant classmethod

constant(
    delay: float = 1.0, max_retries: int = 3
) -> RetryConfig

Create a constant delay retry configuration.

Source code in src/honeyhive/utils/retry.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@classmethod
def constant(
    cls,
    delay: float = 1.0,
    max_retries: int = 3,
) -> "RetryConfig":
    """Create a constant delay retry configuration."""
    backoff = BackoffStrategy(
        initial_delay=delay,
        max_delay=delay,
        multiplier=1.0,
    )
    return cls(
        strategy="constant",
        backoff_strategy=backoff,
        max_retries=max_retries,
    )

should_retry

should_retry(response: Response) -> bool

Determine if a response should be retried.

Source code in src/honeyhive/utils/retry.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def should_retry(self, response: httpx.Response) -> bool:
    """Determine if a response should be retried."""
    # Check status code
    if (
        self.retry_on_status_codes
        and response.status_code in self.retry_on_status_codes
    ):
        return True

    # Check for connection errors
    if response.status_code == 0:  # Connection error
        return True

    return False

should_retry_exception

should_retry_exception(exc: Exception) -> bool

Determine if an exception should be retried.

Source code in src/honeyhive/utils/retry.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def should_retry_exception(self, exc: Exception) -> bool:
    """Determine if an exception should be retried."""
    # Retry on connection errors
    if isinstance(
        exc,
        (
            httpx.ConnectError,
            httpx.ConnectTimeout,
            httpx.ReadTimeout,
            httpx.WriteTimeout,
            httpx.PoolTimeout,
        ),
    ):
        return True

    # Retry on HTTP errors that are retryable
    if isinstance(exc, httpx.HTTPStatusError):
        return bool(
            self.retry_on_status_codes
            and exc.response.status_code in self.retry_on_status_codes
        )

    return False

execute

execute(
    request_fn: Callable[[], Response],
    operation: str = "request",
) -> Response

Execute a sync HTTP request with retries.

Parameters:

Name Type Description Default
request_fn Callable[[], Response]

A callable that performs the HTTP request and returns an httpx.Response.

required
operation str

Name of the operation (used in error messages).

'request'

Returns:

Type Description
Response

The successful httpx.Response.

Raises:

Type Description
APIError

On non-retryable errors or after all retries exhausted.

Source code in src/honeyhive/utils/retry.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def execute(
    self,
    request_fn: Callable[[], httpx.Response],
    operation: str = "request",
) -> httpx.Response:
    """Execute a sync HTTP request with retries.

    Args:
        request_fn: A callable that performs the HTTP request and returns
            an httpx.Response.
        operation: Name of the operation (used in error messages).

    Returns:
        The successful httpx.Response.

    Raises:
        APIError: On non-retryable errors or after all retries exhausted.
    """
    last_exception: Optional[Exception] = None
    last_response: Optional[httpx.Response] = None

    for attempt in range(self.max_retries + 1):
        try:
            response = request_fn()

            if response.status_code == 200:
                return response

            # Check if we should retry this status code
            if self.should_retry(response):
                last_response = response
                if attempt < self.max_retries:
                    delay = self.backoff_strategy.get_delay(attempt + 1)
                    time.sleep(delay)
                    continue
                # Last attempt exhausted — break to _raise_for_failure
                break

            # Non-retryable error — raise immediately
            self._raise_non_retryable(operation, response)

        except httpx.HTTPError as e:
            if self.should_retry_exception(e):
                last_exception = e
                if attempt < self.max_retries:
                    delay = self.backoff_strategy.get_delay(attempt + 1)
                    time.sleep(delay)
                    continue
                # Last attempt exhausted — break to _raise_for_failure
                break
            raise

    # All retries exhausted (loop finished without return/raise)
    self._raise_for_failure(operation, last_response, last_exception)

execute_async async

execute_async(
    request_fn: Callable[[], Awaitable[Response]],
    operation: str = "request",
) -> Response

Execute an async HTTP request with retries.

Same contract as execute(), but awaits the request_fn and uses asyncio.sleep for backoff delays.

Parameters:

Name Type Description Default
request_fn Callable[[], Awaitable[Response]]

An async callable that performs the HTTP request and returns an httpx.Response.

required
operation str

Name of the operation (used in error messages).

'request'

Returns:

Type Description
Response

The successful httpx.Response.

Raises:

Type Description
APIError

On non-retryable errors or after all retries exhausted.

Source code in src/honeyhive/utils/retry.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
async def execute_async(
    self,
    request_fn: Callable[[], Awaitable[httpx.Response]],
    operation: str = "request",
) -> httpx.Response:
    """Execute an async HTTP request with retries.

    Same contract as execute(), but awaits the request_fn and uses
    asyncio.sleep for backoff delays.

    Args:
        request_fn: An async callable that performs the HTTP request and
            returns an httpx.Response.
        operation: Name of the operation (used in error messages).

    Returns:
        The successful httpx.Response.

    Raises:
        APIError: On non-retryable errors or after all retries exhausted.
    """
    last_exception: Optional[Exception] = None
    last_response: Optional[httpx.Response] = None

    for attempt in range(self.max_retries + 1):
        try:
            response = await request_fn()

            if response.status_code == 200:
                return response

            # Check if we should retry this status code
            if self.should_retry(response):
                last_response = response
                if attempt < self.max_retries:
                    delay = self.backoff_strategy.get_delay(attempt + 1)
                    await asyncio.sleep(delay)
                    continue
                # Last attempt exhausted — break to _raise_for_failure
                break

            # Non-retryable error — raise immediately
            self._raise_non_retryable(operation, response)

        except httpx.HTTPError as e:
            if self.should_retry_exception(e):
                last_exception = e
                if attempt < self.max_retries:
                    delay = self.backoff_strategy.get_delay(attempt + 1)
                    await asyncio.sleep(delay)
                    continue
                # Last attempt exhausted — break to _raise_for_failure
                break
            raise

    # All retries exhausted (loop finished without return/raise)
    self._raise_for_failure(operation, last_response, last_exception)

get_error_handler

get_error_handler() -> ErrorHandler

Get the default error handler instance.

Returns:

Type Description
ErrorHandler

Default error handler instance

Source code in src/honeyhive/utils/error_handler.py
390
391
392
393
394
395
396
def get_error_handler() -> ErrorHandler:
    """Get the default error handler instance.

    Returns:
        Default error handler instance
    """
    return _default_error_handler

handle_api_errors

handle_api_errors(
    operation: str,
    *,
    method: Optional[str] = None,
    url: Optional[str] = None,
    params: Optional[Dict[str, Any]] = None,
    json_data: Optional[Dict[str, Any]] = None,
    client_name: Optional[str] = None,
    raise_on_error: bool = True,
    **additional_context: Any
) -> Generator[None, None, None]

Convenience context manager for API error handling.

Parameters:

Name Type Description Default
operation str

Name of the operation being performed

required
method Optional[str]

HTTP method (if applicable)

None
url Optional[str]

URL being accessed (if applicable)

None
params Optional[Dict[str, Any]]

Request parameters (if applicable)

None
json_data Optional[Dict[str, Any]]

JSON data being sent (if applicable)

None
client_name Optional[str]

Name of the client making the request

None
raise_on_error bool

Whether to raise exceptions or return error responses

True
**additional_context Any

Additional context information

{}

Yields:

Type Description
None

None

Example

with handle_api_errors("create_project", method="POST", url="/projects"): response = client.request("POST", "/projects", json=data)

Source code in src/honeyhive/utils/error_handler.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
@contextmanager
def handle_api_errors(  # pylint: disable=too-many-arguments
    operation: str,
    *,
    method: Optional[str] = None,
    url: Optional[str] = None,
    params: Optional[Dict[str, Any]] = None,
    json_data: Optional[Dict[str, Any]] = None,
    client_name: Optional[str] = None,
    raise_on_error: bool = True,
    **additional_context: Any,
) -> Generator[None, None, None]:
    """Convenience context manager for API error handling.

    Args:
        operation: Name of the operation being performed
        method: HTTP method (if applicable)
        url: URL being accessed (if applicable)
        params: Request parameters (if applicable)
        json_data: JSON data being sent (if applicable)
        client_name: Name of the client making the request
        raise_on_error: Whether to raise exceptions or return error responses
        **additional_context: Additional context information

    Yields:
        None

    Example:
        with handle_api_errors("create_project", method="POST", url="/projects"):
            response = client.request("POST", "/projects", json=data)
    """
    # pylint: disable=duplicate-code
    # ErrorContext creation pattern is intentionally duplicated between
    # api.base and utils.error_handler as both modules need to create
    # error contexts with the same standard parameter structure
    context = ErrorContext(
        operation=operation,
        method=method,
        url=url,
        params=params,
        json_data=json_data,
        client_name=client_name,
        additional_context=additional_context,
    )

    with _default_error_handler.handle_operation(context, raise_on_error):
        yield

get_logger

get_logger(
    name: str,
    verbose: Optional[bool] = None,
    tracer_instance: Optional[Any] = None,
    **kwargs: Any
) -> HoneyHiveLogger

Get a HoneyHive logger instance with dynamic configuration.

Uses dynamic logic to determine logger configuration based on tracer instance settings or explicit parameters.

Parameters:

Name Type Description Default
name str

Logger name

required
verbose Optional[bool]

Explicit verbose setting

None
tracer_instance Optional[Any]

Tracer instance to extract verbose setting from

None
**kwargs Any

Additional logger parameters

{}

Returns:

Type Description
HoneyHiveLogger

Configured HoneyHive logger instance

Source code in src/honeyhive/utils/logger.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def get_logger(
    name: str,
    verbose: Optional[bool] = None,
    tracer_instance: Optional[Any] = None,
    **kwargs: Any,
) -> HoneyHiveLogger:
    """Get a HoneyHive logger instance with dynamic configuration.

    Uses dynamic logic to determine logger configuration based on
    tracer instance settings or explicit parameters.

    Args:
        name: Logger name
        verbose: Explicit verbose setting
        tracer_instance: Tracer instance to extract verbose setting from
        **kwargs: Additional logger parameters

    Returns:
        Configured HoneyHive logger instance
    """
    # Dynamic verbose detection from tracer instance
    if verbose is None and tracer_instance is not None:
        verbose = _extract_verbose_from_tracer_dynamically(tracer_instance)

    return HoneyHiveLogger(name, verbose=verbose, **kwargs)