Skip to content

honeyhive.utils.connection_pool

Connection pool utilities for HTTP clients.

HTTPX_AVAILABLE module-attribute

HTTPX_AVAILABLE = True

PoolConfig dataclass

Configuration for connection pool.

Source code in src/honeyhive/utils/connection_pool.py
69
70
71
72
73
74
75
76
77
78
@dataclass
class PoolConfig:
    """Configuration for connection pool."""

    max_connections: int = 100
    max_keepalive_connections: int = 20
    keepalive_expiry: float = 30.0
    retries: int = 3
    timeout: float = 30.0
    pool_timeout: float = 10.0

max_connections class-attribute instance-attribute

max_connections: int = 100

max_keepalive_connections class-attribute instance-attribute

max_keepalive_connections: int = 20

keepalive_expiry class-attribute instance-attribute

keepalive_expiry: float = 30.0

retries class-attribute instance-attribute

retries: int = 3

timeout class-attribute instance-attribute

timeout: float = 30.0

pool_timeout class-attribute instance-attribute

pool_timeout: float = 10.0

ConnectionPool

Connection pool for HTTP clients.

Source code in src/honeyhive/utils/connection_pool.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
class ConnectionPool:
    """Connection pool for HTTP clients."""

    # Type annotations for instance attributes
    _lock: Union[threading.Lock, "_NoOpLock"]

    def __init__(
        self,
        config: Optional[PoolConfig] = None,
        *,
        # Backwards compatibility parameters
        max_connections: Optional[int] = None,
        max_keepalive: Optional[int] = None,
        max_keepalive_connections: Optional[int] = None,
        keepalive_expiry: Optional[float] = None,
        retries: Optional[int] = None,
        timeout: Optional[float] = None,
        pool_timeout: Optional[float] = None,
    ):
        """Initialize connection pool with hybrid config approach.

        Args:
            config: Pool configuration object (recommended)
            max_connections: Maximum number of connections (backwards compatibility)
            max_keepalive: Alias for max_keepalive_connections (backwards compatibility)
            max_keepalive_connections: Maximum keepalive connections
            keepalive_expiry: Keepalive expiry time in seconds
            retries: Number of retries
            timeout: Request timeout in seconds
            pool_timeout: Pool acquisition timeout in seconds
        """
        if not HTTPX_AVAILABLE:
            raise ImportError("httpx is required for connection pooling")

        # Hybrid approach: merge config object with individual parameters
        if config is None:
            config = PoolConfig()

        # Override config with any explicitly provided parameters
        if max_connections is not None:
            config.max_connections = max_connections
        if max_keepalive is not None:
            config.max_keepalive_connections = max_keepalive
        if max_keepalive_connections is not None:
            config.max_keepalive_connections = max_keepalive_connections
        if keepalive_expiry is not None:
            config.keepalive_expiry = keepalive_expiry
        if retries is not None:
            config.retries = retries
        if timeout is not None:
            config.timeout = timeout
        if pool_timeout is not None:
            config.pool_timeout = pool_timeout

        self.config = config
        self.logger = get_logger(__name__)

        # Backwards compatibility attributes
        self.max_connections = self.config.max_connections
        self.max_keepalive = self.config.max_keepalive_connections
        self.max_keepalive_connections = self.config.max_keepalive_connections
        self.keepalive_expiry = self.config.keepalive_expiry
        self.retries = self.config.retries
        self.timeout = self.config.timeout
        self.pool_timeout = self.config.pool_timeout

        # Pool state
        self._clients: Dict[str, httpx.Client] = {}
        self._async_clients: Dict[str, httpx.AsyncClient] = {}

        # ENVIRONMENT-AWARE LOCKING: Use appropriate locking strategy
        # Production: Full threading.Lock() for thread safety
        # pytest-xdist: Simplified locking to prevent cross-process deadlocks
        self._use_locking = not _is_pytest_xdist_worker()
        if self._use_locking:
            self._lock = threading.Lock()
        else:
            # In pytest-xdist, each worker is isolated, so we can use a no-op lock
            self._lock = _NoOpLock()

        self._last_used: Dict[str, float] = {}

        # Statistics
        self._stats = {
            "total_requests": 0,
            "pool_hits": 0,
            "pool_misses": 0,
            "connections_created": 0,
            "connections_reused": 0,
        }

    def get_client(
        self, base_url: str, headers: Optional[Dict[str, str]] = None, **kwargs: Any
    ) -> httpx.Client:
        """Get or create an HTTP client from the pool.

        Args:
            base_url: Base URL for the client
            headers: Default headers
            **kwargs: Additional client configuration

        Returns:
            HTTP client instance
        """
        with self._lock:
            # Check if we have a client for this base URL
            if base_url in self._clients:
                client = self._clients[base_url]
                if self._is_client_healthy(client):
                    self._last_used[base_url] = time.time()
                    self._stats["pool_hits"] += 1
                    self._stats["connections_reused"] += 1
                    return client

                # Remove unhealthy client
                del self._clients[base_url]
                if base_url in self._last_used:
                    del self._last_used[base_url]

            # Create new client
            self._stats["pool_misses"] += 1
            self._stats["connections_created"] += 1
            self._stats["total_requests"] += 1

            # Remove timeout from kwargs if it exists to avoid duplicate
            client_kwargs = kwargs.copy()
            if "timeout" in client_kwargs:
                del client_kwargs["timeout"]

            client = httpx.Client(
                base_url=base_url,
                headers=headers,
                limits=httpx.Limits(
                    max_connections=self.config.max_connections,
                    max_keepalive_connections=self.config.max_keepalive_connections,
                    keepalive_expiry=self.config.keepalive_expiry,
                ),
                timeout=self.config.timeout,
                **client_kwargs,
            )

            self._clients[base_url] = client
            self._last_used[base_url] = time.time()

            self.logger.debug(f"Created new HTTP client for {base_url}")
            return client

    def get_async_client(
        self, base_url: str, headers: Optional[Dict[str, str]] = None, **kwargs: Any
    ) -> httpx.AsyncClient:
        """Get or create an async HTTP client from the pool.

        Args:
            base_url: Base URL for the client
            headers: Default headers
            **kwargs: Additional client configuration

        Returns:
            Async HTTP client instance
        """
        with self._lock:
            # Check if we have a client for this base URL
            if base_url in self._async_clients:
                client = self._async_clients[base_url]
                if self._is_async_client_healthy(client):
                    self._last_used[base_url] = time.time()
                    self._stats["pool_hits"] += 1
                    self._stats["connections_reused"] += 1
                    return client

                # Remove unhealthy client
                del self._async_clients[base_url]
                if base_url in self._last_used:
                    del self._last_used[base_url]

            # Create new client
            self._stats["pool_misses"] += 1
            self._stats["connections_created"] += 1
            self._stats["total_requests"] += 1

            # Remove timeout from kwargs if it exists to avoid duplicate
            client_kwargs = kwargs.copy()
            if "timeout" in client_kwargs:
                del client_kwargs["timeout"]

            client = httpx.AsyncClient(
                base_url=base_url,
                headers=headers,
                limits=httpx.Limits(
                    max_connections=self.config.max_connections,
                    max_keepalive_connections=self.config.max_keepalive_connections,
                    keepalive_expiry=self.config.keepalive_expiry,
                ),
                timeout=self.config.timeout,
                **client_kwargs,
            )

            self._async_clients[base_url] = client
            self._last_used[base_url] = time.time()

            self.logger.debug(f"Created new async HTTP client for {base_url}")
            return client

    def _is_client_healthy(self, client: httpx.Client) -> bool:
        """Check if a client is healthy and can be reused."""
        try:
            # Check if client is closed
            if hasattr(client, "is_closed") and client.is_closed:
                return False

            # Check if client has been idle too long
            if hasattr(client, "_transport"):
                transport = client._transport
                if hasattr(transport, "pool"):
                    pool = transport.pool
                    if hasattr(pool, "connections"):
                        # Check if pool has available connections
                        return len(pool.connections) > 0

            # If we can't determine health from transport, assume it's healthy
            # This covers cases where the client is open but transport details
            # are not accessible
            return True
        except Exception:
            return False

    def _is_async_client_healthy(self, client: httpx.AsyncClient) -> bool:
        """Check if an async client is healthy and can be reused."""
        try:
            # Check if client is closed
            if hasattr(client, "is_closed") and client.is_closed:
                return False

            # For async clients, we can't easily check transport state
            # So we assume they're healthy if not explicitly closed
            return True
        except Exception:
            return False

    def cleanup_idle_connections(self, max_idle_time: float = 300.0) -> None:
        """Clean up idle connections.

        Args:
            max_idle_time: Maximum idle time in seconds
        """
        current_time = time.time()
        to_remove = []

        with self._lock:
            for base_url, last_used in self._last_used.items():
                if current_time - last_used > max_idle_time:
                    to_remove.append(base_url)

            for base_url in to_remove:
                if base_url in self._clients:
                    try:
                        self._clients[base_url].close()
                    except Exception:
                        pass
                    del self._clients[base_url]

                if base_url in self._async_clients:
                    try:
                        # Note: AsyncClient doesn't have close() method
                        pass
                    except Exception:
                        pass
                    del self._async_clients[base_url]

                if base_url in self._last_used:
                    del self._last_used[base_url]

                self.logger.debug(f"Cleaned up idle connection for {base_url}")

    def get_stats(self) -> Dict[str, Any]:
        """Get pool statistics.

        Returns:
            Dictionary with pool statistics
        """
        with self._lock:
            stats = self._stats.copy()
            stats.update(
                {
                    "active_connections": len(self._clients),
                    "active_async_connections": len(self._async_clients),
                    "total_connections": len(self._clients) + len(self._async_clients),
                }
            )
            return stats

    @property
    def active_connections(self) -> int:
        """Get number of active connections.

        Returns:
            Number of active connections
        """
        with self._lock:
            return len(self._clients)

    def get_connection(self, base_url: str) -> Optional[httpx.Client]:
        """Get a connection for a specific base URL.

        Args:
            base_url: Base URL for the connection

        Returns:
            HTTP client instance or None if not found
        """
        with self._lock:
            if base_url in self._clients:
                client = self._clients[base_url]
                if self._is_client_healthy(client):
                    return client
        return None

    def return_connection(self, base_url: str, client: httpx.Client) -> None:
        """Return a connection to the pool.

        Args:
            base_url: Base URL for the connection
            client: HTTP client to return
        """
        with self._lock:
            if base_url not in self._clients:
                self._clients[base_url] = client
                self._last_used[base_url] = time.time()

    def get_async_connection(self, base_url: str) -> Optional[httpx.AsyncClient]:
        """Get an async connection for a specific base URL.

        Args:
            base_url: Base URL for the connection

        Returns:
            Async HTTP client instance or None if not found
        """
        with self._lock:
            if base_url in self._async_clients:
                client = self._async_clients[base_url]
                if self._is_async_client_healthy(client):
                    return client
        return None

    def return_async_connection(self, base_url: str, client: httpx.AsyncClient) -> None:
        """Return an async connection to the pool.

        Args:
            base_url: Base URL for the connection
            client: Async HTTP client to return
        """
        with self._lock:
            if base_url not in self._async_clients:
                self._async_clients[base_url] = client
                self._last_used[base_url] = time.time()

    def close_connection(self, base_url: str) -> None:
        """Close a specific connection.

        Args:
            base_url: Base URL for the connection
        """
        with self._lock:
            if base_url in self._clients:
                try:
                    self._clients[base_url].close()
                except Exception as e:
                    self.logger.warning(f"Failed to close client: {e}")
                finally:
                    del self._clients[base_url]
                    if base_url in self._last_used:
                        del self._last_used[base_url]

    def cleanup(self) -> None:
        """Clean up expired connections."""
        current_time = time.time()

        # First, identify expired URLs while holding the lock
        with self._lock:
            expired_urls = []
            for base_url, last_used in self._last_used.items():
                if current_time - last_used > self.config.keepalive_expiry:
                    expired_urls.append(base_url)

        # Then close expired connections without holding the lock
        for base_url in expired_urls:
            self.close_connection(base_url)

    def close_all(self) -> None:
        """Close all connections in the pool."""
        with self._lock:
            # Close sync clients
            for client in self._clients.values():
                try:
                    client.close()
                except Exception as e:
                    self.logger.warning(f"Failed to close client: {e}")

            # Note: AsyncClient doesn't have close() method
            # They should be closed by the user when done

            self._clients.clear()
            self._async_clients.clear()
            self._last_used.clear()

            self.logger.info("Closed all connections in pool")

    def reset_stats(self) -> None:
        """Reset pool statistics."""
        with self._lock:
            self._stats = {
                "pool_hits": 0,
                "pool_misses": 0,
                "connections_created": 0,
                "connections_reused": 0,
                "total_requests": 0,
            }

    def close_all_clients(self) -> None:
        """Close all clients in the pool (alias for close_all)."""
        self.close_all()

    async def aclose_all_clients(self) -> None:
        """Close all async clients in the pool."""
        with self._lock:
            for client in self._async_clients.values():
                try:
                    await client.aclose()
                except Exception as e:
                    self.logger.warning(f"Error closing async client: {e}")

            self._async_clients.clear()
            # Remove async clients from last_used
            keys_to_remove = [
                k for k, v in self._last_used.items() if k in self._async_clients
            ]
            for key in keys_to_remove:
                del self._last_used[key]

    async def __aenter__(self) -> "ConnectionPool":
        """Async context manager entry."""
        return self

    async def __aexit__(
        self,
        exc_type: Optional[type],
        exc_val: Optional[BaseException],
        exc_tb: Optional[Any],
    ) -> None:
        """Async context manager exit."""
        await self.aclose_all_clients()

    def __enter__(self) -> "ConnectionPool":
        """Context manager entry."""
        return self

    def __exit__(
        self,
        exc_type: Optional[type],
        exc_val: Optional[BaseException],
        exc_tb: Optional[Any],
    ) -> None:
        """Context manager exit."""
        self.close_all()

config instance-attribute

config = config

logger instance-attribute

logger = get_logger(__name__)

max_connections instance-attribute

max_connections = max_connections

max_keepalive instance-attribute

max_keepalive = max_keepalive_connections

max_keepalive_connections instance-attribute

max_keepalive_connections = max_keepalive_connections

keepalive_expiry instance-attribute

keepalive_expiry = keepalive_expiry

retries instance-attribute

retries = retries

timeout instance-attribute

timeout = timeout

pool_timeout instance-attribute

pool_timeout = pool_timeout

active_connections property

active_connections: int

Get number of active connections.

Returns:

Type Description
int

Number of active connections

get_client

get_client(
    base_url: str,
    headers: Optional[Dict[str, str]] = None,
    **kwargs: Any
) -> Client

Get or create an HTTP client from the pool.

Parameters:

Name Type Description Default
base_url str

Base URL for the client

required
headers Optional[Dict[str, str]]

Default headers

None
**kwargs Any

Additional client configuration

{}

Returns:

Type Description
Client

HTTP client instance

Source code in src/honeyhive/utils/connection_pool.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def get_client(
    self, base_url: str, headers: Optional[Dict[str, str]] = None, **kwargs: Any
) -> httpx.Client:
    """Get or create an HTTP client from the pool.

    Args:
        base_url: Base URL for the client
        headers: Default headers
        **kwargs: Additional client configuration

    Returns:
        HTTP client instance
    """
    with self._lock:
        # Check if we have a client for this base URL
        if base_url in self._clients:
            client = self._clients[base_url]
            if self._is_client_healthy(client):
                self._last_used[base_url] = time.time()
                self._stats["pool_hits"] += 1
                self._stats["connections_reused"] += 1
                return client

            # Remove unhealthy client
            del self._clients[base_url]
            if base_url in self._last_used:
                del self._last_used[base_url]

        # Create new client
        self._stats["pool_misses"] += 1
        self._stats["connections_created"] += 1
        self._stats["total_requests"] += 1

        # Remove timeout from kwargs if it exists to avoid duplicate
        client_kwargs = kwargs.copy()
        if "timeout" in client_kwargs:
            del client_kwargs["timeout"]

        client = httpx.Client(
            base_url=base_url,
            headers=headers,
            limits=httpx.Limits(
                max_connections=self.config.max_connections,
                max_keepalive_connections=self.config.max_keepalive_connections,
                keepalive_expiry=self.config.keepalive_expiry,
            ),
            timeout=self.config.timeout,
            **client_kwargs,
        )

        self._clients[base_url] = client
        self._last_used[base_url] = time.time()

        self.logger.debug(f"Created new HTTP client for {base_url}")
        return client

get_async_client

get_async_client(
    base_url: str,
    headers: Optional[Dict[str, str]] = None,
    **kwargs: Any
) -> AsyncClient

Get or create an async HTTP client from the pool.

Parameters:

Name Type Description Default
base_url str

Base URL for the client

required
headers Optional[Dict[str, str]]

Default headers

None
**kwargs Any

Additional client configuration

{}

Returns:

Type Description
AsyncClient

Async HTTP client instance

Source code in src/honeyhive/utils/connection_pool.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def get_async_client(
    self, base_url: str, headers: Optional[Dict[str, str]] = None, **kwargs: Any
) -> httpx.AsyncClient:
    """Get or create an async HTTP client from the pool.

    Args:
        base_url: Base URL for the client
        headers: Default headers
        **kwargs: Additional client configuration

    Returns:
        Async HTTP client instance
    """
    with self._lock:
        # Check if we have a client for this base URL
        if base_url in self._async_clients:
            client = self._async_clients[base_url]
            if self._is_async_client_healthy(client):
                self._last_used[base_url] = time.time()
                self._stats["pool_hits"] += 1
                self._stats["connections_reused"] += 1
                return client

            # Remove unhealthy client
            del self._async_clients[base_url]
            if base_url in self._last_used:
                del self._last_used[base_url]

        # Create new client
        self._stats["pool_misses"] += 1
        self._stats["connections_created"] += 1
        self._stats["total_requests"] += 1

        # Remove timeout from kwargs if it exists to avoid duplicate
        client_kwargs = kwargs.copy()
        if "timeout" in client_kwargs:
            del client_kwargs["timeout"]

        client = httpx.AsyncClient(
            base_url=base_url,
            headers=headers,
            limits=httpx.Limits(
                max_connections=self.config.max_connections,
                max_keepalive_connections=self.config.max_keepalive_connections,
                keepalive_expiry=self.config.keepalive_expiry,
            ),
            timeout=self.config.timeout,
            **client_kwargs,
        )

        self._async_clients[base_url] = client
        self._last_used[base_url] = time.time()

        self.logger.debug(f"Created new async HTTP client for {base_url}")
        return client

cleanup_idle_connections

cleanup_idle_connections(
    max_idle_time: float = 300.0,
) -> None

Clean up idle connections.

Parameters:

Name Type Description Default
max_idle_time float

Maximum idle time in seconds

300.0
Source code in src/honeyhive/utils/connection_pool.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def cleanup_idle_connections(self, max_idle_time: float = 300.0) -> None:
    """Clean up idle connections.

    Args:
        max_idle_time: Maximum idle time in seconds
    """
    current_time = time.time()
    to_remove = []

    with self._lock:
        for base_url, last_used in self._last_used.items():
            if current_time - last_used > max_idle_time:
                to_remove.append(base_url)

        for base_url in to_remove:
            if base_url in self._clients:
                try:
                    self._clients[base_url].close()
                except Exception:
                    pass
                del self._clients[base_url]

            if base_url in self._async_clients:
                try:
                    # Note: AsyncClient doesn't have close() method
                    pass
                except Exception:
                    pass
                del self._async_clients[base_url]

            if base_url in self._last_used:
                del self._last_used[base_url]

            self.logger.debug(f"Cleaned up idle connection for {base_url}")

get_stats

get_stats() -> Dict[str, Any]

Get pool statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary with pool statistics

Source code in src/honeyhive/utils/connection_pool.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def get_stats(self) -> Dict[str, Any]:
    """Get pool statistics.

    Returns:
        Dictionary with pool statistics
    """
    with self._lock:
        stats = self._stats.copy()
        stats.update(
            {
                "active_connections": len(self._clients),
                "active_async_connections": len(self._async_clients),
                "total_connections": len(self._clients) + len(self._async_clients),
            }
        )
        return stats

get_connection

get_connection(base_url: str) -> Optional[Client]

Get a connection for a specific base URL.

Parameters:

Name Type Description Default
base_url str

Base URL for the connection

required

Returns:

Type Description
Optional[Client]

HTTP client instance or None if not found

Source code in src/honeyhive/utils/connection_pool.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def get_connection(self, base_url: str) -> Optional[httpx.Client]:
    """Get a connection for a specific base URL.

    Args:
        base_url: Base URL for the connection

    Returns:
        HTTP client instance or None if not found
    """
    with self._lock:
        if base_url in self._clients:
            client = self._clients[base_url]
            if self._is_client_healthy(client):
                return client
    return None

return_connection

return_connection(base_url: str, client: Client) -> None

Return a connection to the pool.

Parameters:

Name Type Description Default
base_url str

Base URL for the connection

required
client Client

HTTP client to return

required
Source code in src/honeyhive/utils/connection_pool.py
398
399
400
401
402
403
404
405
406
407
408
def return_connection(self, base_url: str, client: httpx.Client) -> None:
    """Return a connection to the pool.

    Args:
        base_url: Base URL for the connection
        client: HTTP client to return
    """
    with self._lock:
        if base_url not in self._clients:
            self._clients[base_url] = client
            self._last_used[base_url] = time.time()

get_async_connection

get_async_connection(
    base_url: str,
) -> Optional[AsyncClient]

Get an async connection for a specific base URL.

Parameters:

Name Type Description Default
base_url str

Base URL for the connection

required

Returns:

Type Description
Optional[AsyncClient]

Async HTTP client instance or None if not found

Source code in src/honeyhive/utils/connection_pool.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
def get_async_connection(self, base_url: str) -> Optional[httpx.AsyncClient]:
    """Get an async connection for a specific base URL.

    Args:
        base_url: Base URL for the connection

    Returns:
        Async HTTP client instance or None if not found
    """
    with self._lock:
        if base_url in self._async_clients:
            client = self._async_clients[base_url]
            if self._is_async_client_healthy(client):
                return client
    return None

return_async_connection

return_async_connection(
    base_url: str, client: AsyncClient
) -> None

Return an async connection to the pool.

Parameters:

Name Type Description Default
base_url str

Base URL for the connection

required
client AsyncClient

Async HTTP client to return

required
Source code in src/honeyhive/utils/connection_pool.py
426
427
428
429
430
431
432
433
434
435
436
def return_async_connection(self, base_url: str, client: httpx.AsyncClient) -> None:
    """Return an async connection to the pool.

    Args:
        base_url: Base URL for the connection
        client: Async HTTP client to return
    """
    with self._lock:
        if base_url not in self._async_clients:
            self._async_clients[base_url] = client
            self._last_used[base_url] = time.time()

close_connection

close_connection(base_url: str) -> None

Close a specific connection.

Parameters:

Name Type Description Default
base_url str

Base URL for the connection

required
Source code in src/honeyhive/utils/connection_pool.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def close_connection(self, base_url: str) -> None:
    """Close a specific connection.

    Args:
        base_url: Base URL for the connection
    """
    with self._lock:
        if base_url in self._clients:
            try:
                self._clients[base_url].close()
            except Exception as e:
                self.logger.warning(f"Failed to close client: {e}")
            finally:
                del self._clients[base_url]
                if base_url in self._last_used:
                    del self._last_used[base_url]

cleanup

cleanup() -> None

Clean up expired connections.

Source code in src/honeyhive/utils/connection_pool.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def cleanup(self) -> None:
    """Clean up expired connections."""
    current_time = time.time()

    # First, identify expired URLs while holding the lock
    with self._lock:
        expired_urls = []
        for base_url, last_used in self._last_used.items():
            if current_time - last_used > self.config.keepalive_expiry:
                expired_urls.append(base_url)

    # Then close expired connections without holding the lock
    for base_url in expired_urls:
        self.close_connection(base_url)

close_all

close_all() -> None

Close all connections in the pool.

Source code in src/honeyhive/utils/connection_pool.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def close_all(self) -> None:
    """Close all connections in the pool."""
    with self._lock:
        # Close sync clients
        for client in self._clients.values():
            try:
                client.close()
            except Exception as e:
                self.logger.warning(f"Failed to close client: {e}")

        # Note: AsyncClient doesn't have close() method
        # They should be closed by the user when done

        self._clients.clear()
        self._async_clients.clear()
        self._last_used.clear()

        self.logger.info("Closed all connections in pool")

reset_stats

reset_stats() -> None

Reset pool statistics.

Source code in src/honeyhive/utils/connection_pool.py
489
490
491
492
493
494
495
496
497
498
def reset_stats(self) -> None:
    """Reset pool statistics."""
    with self._lock:
        self._stats = {
            "pool_hits": 0,
            "pool_misses": 0,
            "connections_created": 0,
            "connections_reused": 0,
            "total_requests": 0,
        }

close_all_clients

close_all_clients() -> None

Close all clients in the pool (alias for close_all).

Source code in src/honeyhive/utils/connection_pool.py
500
501
502
def close_all_clients(self) -> None:
    """Close all clients in the pool (alias for close_all)."""
    self.close_all()

aclose_all_clients async

aclose_all_clients() -> None

Close all async clients in the pool.

Source code in src/honeyhive/utils/connection_pool.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
async def aclose_all_clients(self) -> None:
    """Close all async clients in the pool."""
    with self._lock:
        for client in self._async_clients.values():
            try:
                await client.aclose()
            except Exception as e:
                self.logger.warning(f"Error closing async client: {e}")

        self._async_clients.clear()
        # Remove async clients from last_used
        keys_to_remove = [
            k for k, v in self._last_used.items() if k in self._async_clients
        ]
        for key in keys_to_remove:
            del self._last_used[key]

PooledHTTPClient

HTTP client that uses connection pooling.

Source code in src/honeyhive/utils/connection_pool.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
class PooledHTTPClient:
    """HTTP client that uses connection pooling."""

    def __init__(self, pool: ConnectionPool, **kwargs: Any) -> None:
        """Initialize pooled HTTP client.

        Args:
            pool: Connection pool instance
            **kwargs: Client configuration
        """
        self.pool = pool
        self.config = kwargs
        self.logger = get_logger(__name__)

    def get(self, url: str, **kwargs: Any) -> httpx.Response:
        """Make GET request."""
        # Extract base URL for pooling
        if url.startswith("http"):
            parsed = urllib.parse.urlparse(url)
            base_url = f"{parsed.scheme}://{parsed.netloc}"
        else:
            base_url = "http://localhost"

        # Get client from pool
        client = self.pool.get_connection(base_url)

        # If no client in pool, create a new one
        if client is None:
            client = httpx.Client(**self.config)
            self.logger.debug(f"Created new HTTP client for {base_url}")

        # Make request
        self.pool._stats["total_requests"] += 1

        try:
            response = client.get(url, **kwargs)
            return response
        except Exception as e:
            self.logger.error(f"HTTP GET request failed: {e}")
            raise
        finally:
            # Always return the connection to the pool
            self.pool.return_connection(base_url, client)

    def post(self, url: str, **kwargs: Any) -> httpx.Response:
        """Make POST request."""
        # Extract base URL for pooling
        if url.startswith("http"):
            parsed = urllib.parse.urlparse(url)
            base_url = f"{parsed.scheme}://{parsed.netloc}"
        else:
            base_url = "http://localhost"

        # Get client from pool
        client = self.pool.get_connection(base_url)

        # If no client in pool, create a new one
        if client is None:
            client = httpx.Client(**self.config)
            self.logger.debug(f"Created new HTTP client for {base_url}")

        # Make request
        self.pool._stats["total_requests"] += 1

        try:
            response = client.post(url, **kwargs)
            return response
        except Exception as e:
            self.logger.error(f"HTTP POST request failed: {e}")
            raise
        finally:
            # Always return the connection to the pool
            self.pool.return_connection(base_url, client)

    def put(self, url: str, **kwargs: Any) -> httpx.Response:
        """Make PUT request."""
        # Extract base URL for pooling
        if url.startswith("http"):
            parsed = urllib.parse.urlparse(url)
            base_url = f"{parsed.scheme}://{parsed.netloc}"
        else:
            base_url = "http://localhost"

        # Get client from pool
        client = self.pool.get_connection(base_url)

        # If no client in pool, create a new one
        if client is None:
            client = httpx.Client(**self.config)
            self.logger.debug(f"Created new HTTP client for {base_url}")

        # Make request
        self.pool._stats["total_requests"] += 1

        try:
            response = client.put(url, **kwargs)
            return response
        except Exception as e:
            self.logger.error(f"HTTP PUT request failed: {e}")
            raise
        finally:
            # Always return the connection to the pool
            self.pool.return_connection(base_url, client)

    def delete(self, url: str, **kwargs: Any) -> httpx.Response:
        """Make DELETE request."""
        # Extract base URL for pooling
        if url.startswith("http"):
            parsed = urllib.parse.urlparse(url)
            base_url = f"{parsed.scheme}://{parsed.netloc}"
        else:
            base_url = "http://localhost"

        # Get client from pool
        client = self.pool.get_connection(base_url)

        # If no client in pool, create a new one
        if client is None:
            client = httpx.Client(**self.config)
            self.logger.debug(f"Created new HTTP client for {base_url}")

        # Make request
        self.pool._stats["total_requests"] += 1

        try:
            response = client.delete(url, **kwargs)
            return response
        except Exception as e:
            self.logger.error(f"HTTP DELETE request failed: {e}")
            raise
        finally:
            # Always return the connection to the pool
            self.pool.return_connection(base_url, client)

    def patch(self, url: str, **kwargs: Any) -> httpx.Response:
        """Make PATCH request."""
        # Extract base URL for pooling
        if url.startswith("http"):
            parsed = urllib.parse.urlparse(url)
            base_url = f"{parsed.scheme}://{parsed.netloc}"
        else:
            base_url = "http://localhost"

        # Get client from pool
        client = self.pool.get_connection(base_url)

        # If no client in pool, create a new one
        if client is None:
            client = httpx.Client(**self.config)
            self.logger.debug(f"Created new HTTP client for {base_url}")

        # Make request
        self.pool._stats["total_requests"] += 1

        try:
            response = client.patch(url, **kwargs)
            return response
        except Exception as e:
            self.logger.error(f"HTTP PATCH request failed: {e}")
            raise
        finally:
            # Always return the connection to the pool
            self.pool.return_connection(base_url, client)

pool instance-attribute

pool = pool

config instance-attribute

config = kwargs

logger instance-attribute

logger = get_logger(__name__)

get

get(url: str, **kwargs: Any) -> Response

Make GET request.

Source code in src/honeyhive/utils/connection_pool.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
def get(self, url: str, **kwargs: Any) -> httpx.Response:
    """Make GET request."""
    # Extract base URL for pooling
    if url.startswith("http"):
        parsed = urllib.parse.urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"
    else:
        base_url = "http://localhost"

    # Get client from pool
    client = self.pool.get_connection(base_url)

    # If no client in pool, create a new one
    if client is None:
        client = httpx.Client(**self.config)
        self.logger.debug(f"Created new HTTP client for {base_url}")

    # Make request
    self.pool._stats["total_requests"] += 1

    try:
        response = client.get(url, **kwargs)
        return response
    except Exception as e:
        self.logger.error(f"HTTP GET request failed: {e}")
        raise
    finally:
        # Always return the connection to the pool
        self.pool.return_connection(base_url, client)

post

post(url: str, **kwargs: Any) -> Response

Make POST request.

Source code in src/honeyhive/utils/connection_pool.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
def post(self, url: str, **kwargs: Any) -> httpx.Response:
    """Make POST request."""
    # Extract base URL for pooling
    if url.startswith("http"):
        parsed = urllib.parse.urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"
    else:
        base_url = "http://localhost"

    # Get client from pool
    client = self.pool.get_connection(base_url)

    # If no client in pool, create a new one
    if client is None:
        client = httpx.Client(**self.config)
        self.logger.debug(f"Created new HTTP client for {base_url}")

    # Make request
    self.pool._stats["total_requests"] += 1

    try:
        response = client.post(url, **kwargs)
        return response
    except Exception as e:
        self.logger.error(f"HTTP POST request failed: {e}")
        raise
    finally:
        # Always return the connection to the pool
        self.pool.return_connection(base_url, client)

put

put(url: str, **kwargs: Any) -> Response

Make PUT request.

Source code in src/honeyhive/utils/connection_pool.py
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
def put(self, url: str, **kwargs: Any) -> httpx.Response:
    """Make PUT request."""
    # Extract base URL for pooling
    if url.startswith("http"):
        parsed = urllib.parse.urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"
    else:
        base_url = "http://localhost"

    # Get client from pool
    client = self.pool.get_connection(base_url)

    # If no client in pool, create a new one
    if client is None:
        client = httpx.Client(**self.config)
        self.logger.debug(f"Created new HTTP client for {base_url}")

    # Make request
    self.pool._stats["total_requests"] += 1

    try:
        response = client.put(url, **kwargs)
        return response
    except Exception as e:
        self.logger.error(f"HTTP PUT request failed: {e}")
        raise
    finally:
        # Always return the connection to the pool
        self.pool.return_connection(base_url, client)

delete

delete(url: str, **kwargs: Any) -> Response

Make DELETE request.

Source code in src/honeyhive/utils/connection_pool.py
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
def delete(self, url: str, **kwargs: Any) -> httpx.Response:
    """Make DELETE request."""
    # Extract base URL for pooling
    if url.startswith("http"):
        parsed = urllib.parse.urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"
    else:
        base_url = "http://localhost"

    # Get client from pool
    client = self.pool.get_connection(base_url)

    # If no client in pool, create a new one
    if client is None:
        client = httpx.Client(**self.config)
        self.logger.debug(f"Created new HTTP client for {base_url}")

    # Make request
    self.pool._stats["total_requests"] += 1

    try:
        response = client.delete(url, **kwargs)
        return response
    except Exception as e:
        self.logger.error(f"HTTP DELETE request failed: {e}")
        raise
    finally:
        # Always return the connection to the pool
        self.pool.return_connection(base_url, client)

patch

patch(url: str, **kwargs: Any) -> Response

Make PATCH request.

Source code in src/honeyhive/utils/connection_pool.py
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
def patch(self, url: str, **kwargs: Any) -> httpx.Response:
    """Make PATCH request."""
    # Extract base URL for pooling
    if url.startswith("http"):
        parsed = urllib.parse.urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"
    else:
        base_url = "http://localhost"

    # Get client from pool
    client = self.pool.get_connection(base_url)

    # If no client in pool, create a new one
    if client is None:
        client = httpx.Client(**self.config)
        self.logger.debug(f"Created new HTTP client for {base_url}")

    # Make request
    self.pool._stats["total_requests"] += 1

    try:
        response = client.patch(url, **kwargs)
        return response
    except Exception as e:
        self.logger.error(f"HTTP PATCH request failed: {e}")
        raise
    finally:
        # Always return the connection to the pool
        self.pool.return_connection(base_url, client)

PooledAsyncHTTPClient

Async HTTP client that uses connection pooling.

Source code in src/honeyhive/utils/connection_pool.py
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
class PooledAsyncHTTPClient:
    """Async HTTP client that uses connection pooling."""

    def __init__(self, pool: ConnectionPool, **kwargs: Any) -> None:
        """Initialize pooled async HTTP client.

        Args:
            pool: Connection pool instance
            **kwargs: Client configuration
        """
        self.pool = pool
        self.config = kwargs
        self.logger = get_logger(__name__)

    async def get(self, url: str, **kwargs: Any) -> httpx.Response:
        """Make async GET request."""
        # Extract base URL for pooling
        if url.startswith("http"):
            parsed = urllib.parse.urlparse(url)
            base_url = f"{parsed.scheme}://{parsed.netloc}"
        else:
            base_url = "http://localhost"

        # Get client from pool
        client = self.pool.get_async_connection(base_url)

        # If no client in pool, create a new one
        if client is None:
            client = httpx.AsyncClient(**self.config)
            self.logger.debug(f"Created new async HTTP client for {base_url}")

        # Make request
        self.pool._stats["total_requests"] += 1

        try:
            response = await client.get(url, **kwargs)
            return response
        except Exception as e:
            self.logger.error(f"Async HTTP GET request failed: {e}")
            raise
        finally:
            # Always return the connection to the pool
            self.pool.return_async_connection(base_url, client)

    async def post(self, url: str, **kwargs: Any) -> httpx.Response:
        """Make async POST request."""
        # Extract base URL for pooling
        if url.startswith("http"):
            parsed = urllib.parse.urlparse(url)
            base_url = f"{parsed.scheme}://{parsed.netloc}"
        else:
            base_url = "http://localhost"

        # Get client from pool
        client = self.pool.get_async_connection(base_url)

        # If no client in pool, create a new one
        if client is None:
            client = httpx.AsyncClient(**self.config)
            self.logger.debug(f"Created new async HTTP client for {base_url}")

        # Make request
        self.pool._stats["total_requests"] += 1

        try:
            response = await client.post(url, **kwargs)
            return response
        except Exception as e:
            self.logger.error(f"HTTP POST request failed: {e}")
            raise
        finally:
            # Always return the connection to the pool
            self.pool.return_async_connection(base_url, client)

    async def put(self, url: str, **kwargs: Any) -> httpx.Response:
        """Make async PUT request."""
        # Extract base URL for pooling
        if url.startswith("http"):
            parsed = urllib.parse.urlparse(url)
            base_url = f"{parsed.scheme}://{parsed.netloc}"
        else:
            base_url = "http://localhost"

        # Get client from pool
        client = self.pool.get_async_connection(base_url)

        # If no client in pool, create a new one
        if client is None:
            client = httpx.AsyncClient(**self.config)
            self.logger.debug(f"Created new async HTTP client for {base_url}")

        # Make request
        self.pool._stats["total_requests"] += 1

        try:
            response = await client.put(url, **kwargs)
            return response
        except Exception as e:
            self.logger.error(f"Async HTTP PUT request failed: {e}")
            raise
        finally:
            # Always return the connection to the pool
            self.pool.return_async_connection(base_url, client)

    async def delete(self, url: str, **kwargs: Any) -> httpx.Response:
        """Make async DELETE request."""
        # Extract base URL for pooling
        if url.startswith("http"):
            parsed = urllib.parse.urlparse(url)
            base_url = f"{parsed.scheme}://{parsed.netloc}"
        else:
            base_url = "http://localhost"

        # Get client from pool
        client = self.pool.get_async_connection(base_url)

        # If no client in pool, create a new one
        if client is None:
            client = httpx.AsyncClient(**self.config)
            self.logger.debug(f"Created new async HTTP client for {base_url}")

        # Make request
        self.pool._stats["total_requests"] += 1

        try:
            response = await client.delete(url, **kwargs)
            return response
        except Exception as e:
            self.logger.error(f"Async HTTP DELETE request failed: {e}")
            raise
        finally:
            # Always return the connection to the pool
            self.pool.return_async_connection(base_url, client)

    async def patch(self, url: str, **kwargs: Any) -> httpx.Response:
        """Make async PATCH request."""
        # Extract base URL for pooling
        if url.startswith("http"):
            parsed = urllib.parse.urlparse(url)
            base_url = f"{parsed.scheme}://{parsed.netloc}"
        else:
            base_url = "http://localhost"

        # Get client from pool
        client = self.pool.get_async_connection(base_url)

        # If no client in pool, create a new one
        if client is None:
            client = httpx.AsyncClient(**self.config)
            self.logger.debug(f"Created new async HTTP client for {base_url}")

        # Make request
        self.pool._stats["total_requests"] += 1

        try:
            response = await client.patch(url, **kwargs)
            return response
        except Exception as e:
            self.logger.error(f"Async HTTP PATCH request failed: {e}")
            raise
        finally:
            # Always return the connection to the pool
            self.pool.return_async_connection(base_url, client)

pool instance-attribute

pool = pool

config instance-attribute

config = kwargs

logger instance-attribute

logger = get_logger(__name__)

get async

get(url: str, **kwargs: Any) -> Response

Make async GET request.

Source code in src/honeyhive/utils/connection_pool.py
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
async def get(self, url: str, **kwargs: Any) -> httpx.Response:
    """Make async GET request."""
    # Extract base URL for pooling
    if url.startswith("http"):
        parsed = urllib.parse.urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"
    else:
        base_url = "http://localhost"

    # Get client from pool
    client = self.pool.get_async_connection(base_url)

    # If no client in pool, create a new one
    if client is None:
        client = httpx.AsyncClient(**self.config)
        self.logger.debug(f"Created new async HTTP client for {base_url}")

    # Make request
    self.pool._stats["total_requests"] += 1

    try:
        response = await client.get(url, **kwargs)
        return response
    except Exception as e:
        self.logger.error(f"Async HTTP GET request failed: {e}")
        raise
    finally:
        # Always return the connection to the pool
        self.pool.return_async_connection(base_url, client)

post async

post(url: str, **kwargs: Any) -> Response

Make async POST request.

Source code in src/honeyhive/utils/connection_pool.py
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
async def post(self, url: str, **kwargs: Any) -> httpx.Response:
    """Make async POST request."""
    # Extract base URL for pooling
    if url.startswith("http"):
        parsed = urllib.parse.urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"
    else:
        base_url = "http://localhost"

    # Get client from pool
    client = self.pool.get_async_connection(base_url)

    # If no client in pool, create a new one
    if client is None:
        client = httpx.AsyncClient(**self.config)
        self.logger.debug(f"Created new async HTTP client for {base_url}")

    # Make request
    self.pool._stats["total_requests"] += 1

    try:
        response = await client.post(url, **kwargs)
        return response
    except Exception as e:
        self.logger.error(f"HTTP POST request failed: {e}")
        raise
    finally:
        # Always return the connection to the pool
        self.pool.return_async_connection(base_url, client)

put async

put(url: str, **kwargs: Any) -> Response

Make async PUT request.

Source code in src/honeyhive/utils/connection_pool.py
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
async def put(self, url: str, **kwargs: Any) -> httpx.Response:
    """Make async PUT request."""
    # Extract base URL for pooling
    if url.startswith("http"):
        parsed = urllib.parse.urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"
    else:
        base_url = "http://localhost"

    # Get client from pool
    client = self.pool.get_async_connection(base_url)

    # If no client in pool, create a new one
    if client is None:
        client = httpx.AsyncClient(**self.config)
        self.logger.debug(f"Created new async HTTP client for {base_url}")

    # Make request
    self.pool._stats["total_requests"] += 1

    try:
        response = await client.put(url, **kwargs)
        return response
    except Exception as e:
        self.logger.error(f"Async HTTP PUT request failed: {e}")
        raise
    finally:
        # Always return the connection to the pool
        self.pool.return_async_connection(base_url, client)

delete async

delete(url: str, **kwargs: Any) -> Response

Make async DELETE request.

Source code in src/honeyhive/utils/connection_pool.py
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
async def delete(self, url: str, **kwargs: Any) -> httpx.Response:
    """Make async DELETE request."""
    # Extract base URL for pooling
    if url.startswith("http"):
        parsed = urllib.parse.urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"
    else:
        base_url = "http://localhost"

    # Get client from pool
    client = self.pool.get_async_connection(base_url)

    # If no client in pool, create a new one
    if client is None:
        client = httpx.AsyncClient(**self.config)
        self.logger.debug(f"Created new async HTTP client for {base_url}")

    # Make request
    self.pool._stats["total_requests"] += 1

    try:
        response = await client.delete(url, **kwargs)
        return response
    except Exception as e:
        self.logger.error(f"Async HTTP DELETE request failed: {e}")
        raise
    finally:
        # Always return the connection to the pool
        self.pool.return_async_connection(base_url, client)

patch async

patch(url: str, **kwargs: Any) -> Response

Make async PATCH request.

Source code in src/honeyhive/utils/connection_pool.py
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
async def patch(self, url: str, **kwargs: Any) -> httpx.Response:
    """Make async PATCH request."""
    # Extract base URL for pooling
    if url.startswith("http"):
        parsed = urllib.parse.urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"
    else:
        base_url = "http://localhost"

    # Get client from pool
    client = self.pool.get_async_connection(base_url)

    # If no client in pool, create a new one
    if client is None:
        client = httpx.AsyncClient(**self.config)
        self.logger.debug(f"Created new async HTTP client for {base_url}")

    # Make request
    self.pool._stats["total_requests"] += 1

    try:
        response = await client.patch(url, **kwargs)
        return response
    except Exception as e:
        self.logger.error(f"Async HTTP PATCH request failed: {e}")
        raise
    finally:
        # Always return the connection to the pool
        self.pool.return_async_connection(base_url, client)

get_global_pool

get_global_pool(
    config: Optional[PoolConfig] = None,
) -> ConnectionPool

DEPRECATED: Create a new connection pool instance.

This function is deprecated and maintained only for backward compatibility. New code should create ConnectionPool instances directly.

MIGRATION: Replace get_global_pool() with ConnectionPool(config)

Parameters:

Name Type Description Default
config Optional[PoolConfig]

Pool configuration

None

Returns:

Type Description
ConnectionPool

New ConnectionPool instance (not global)

Source code in src/honeyhive/utils/connection_pool.py
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
def get_global_pool(config: Optional[PoolConfig] = None) -> ConnectionPool:
    """DEPRECATED: Create a new connection pool instance.

    This function is deprecated and maintained only for backward compatibility.
    New code should create ConnectionPool instances directly.

    MIGRATION: Replace get_global_pool() with ConnectionPool(config)

    Args:
        config: Pool configuration

    Returns:
        New ConnectionPool instance (not global)
    """
    # Return a new instance instead of a global singleton
    # This maintains backward compatibility while preventing deadlocks
    return ConnectionPool(config or PoolConfig())

close_global_pool

close_global_pool() -> None

DEPRECATED: No-op function for backward compatibility.

Since connection pools are now per-client instance, there's no global pool to close. Each ConnectionPool is closed when its parent client is garbage collected or explicitly closed.

Source code in src/honeyhive/utils/connection_pool.py
902
903
904
905
906
907
908
def close_global_pool() -> None:
    """DEPRECATED: No-op function for backward compatibility.

    Since connection pools are now per-client instance, there's no global
    pool to close. Each ConnectionPool is closed when its parent client
    is garbage collected or explicitly closed.
    """