Source code for honeyhive.utils.connection_pool

"""Connection pool utilities for HTTP clients."""

# pylint: disable=protected-access
# Note: Protected access to _stats and _transport is required for connection
# pool health monitoring and statistics tracking. This is legitimate internal
# access for performance monitoring and connection management.

import os
import threading
import time
import urllib.parse
from dataclasses import dataclass
from typing import Any, Dict, Optional, Union

import httpx

from ..utils.logger import get_logger

HTTPX_AVAILABLE = True


def _is_pytest_xdist_worker() -> bool:
    """Detect if running in pytest-xdist worker process.

    Returns:
        True if running in pytest-xdist worker, False otherwise
    """
    return os.environ.get("PYTEST_XDIST_WORKER") is not None


def _is_test_environment() -> bool:
    """Detect if running in any test environment.

    Returns:
        True if running in test environment, False otherwise
    """
    test_indicators = [
        "PYTEST_CURRENT_TEST",
        "PYTEST_XDIST_WORKER",
        "_PYTEST_RAISE",
        "HH_TEST_MODE",
    ]
    return any(os.environ.get(indicator) for indicator in test_indicators)


class _NoOpLock:
    """No-op lock for pytest-xdist workers where each process is isolated.

    This provides the same interface as threading.Lock() but without actual
    locking, since pytest-xdist workers are separate processes and don't
    share memory space.
    """

    def acquire(self, _blocking: bool = True, _timeout: float = -1) -> bool:
        """No-op acquire - always succeeds immediately."""
        return True

    def release(self) -> None:
        """No-op release."""

    def __enter__(self) -> bool:
        """Context manager entry - no-op."""
        return True

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Context manager exit - no-op."""


@dataclass
class PoolConfig:
    """Configuration for connection pool."""

    max_connections: int = 100
    max_keepalive_connections: int = 20
    keepalive_expiry: float = 30.0
    retries: int = 3
    timeout: float = 30.0
    pool_timeout: float = 10.0


[docs] class ConnectionPool: """Connection pool for HTTP clients.""" # Type annotations for instance attributes _lock: Union[threading.Lock, "_NoOpLock"] def __init__( self, config: Optional[PoolConfig] = None, *, # Backwards compatibility parameters max_connections: Optional[int] = None, max_keepalive: Optional[int] = None, max_keepalive_connections: Optional[int] = None, keepalive_expiry: Optional[float] = None, retries: Optional[int] = None, timeout: Optional[float] = None, pool_timeout: Optional[float] = None, ): """Initialize connection pool with hybrid config approach. Args: config: Pool configuration object (recommended) max_connections: Maximum number of connections (backwards compatibility) max_keepalive: Alias for max_keepalive_connections (backwards compatibility) max_keepalive_connections: Maximum keepalive connections keepalive_expiry: Keepalive expiry time in seconds retries: Number of retries timeout: Request timeout in seconds pool_timeout: Pool acquisition timeout in seconds """ if not HTTPX_AVAILABLE: raise ImportError("httpx is required for connection pooling") # Hybrid approach: merge config object with individual parameters if config is None: config = PoolConfig() # Override config with any explicitly provided parameters if max_connections is not None: config.max_connections = max_connections if max_keepalive is not None: config.max_keepalive_connections = max_keepalive if max_keepalive_connections is not None: config.max_keepalive_connections = max_keepalive_connections if keepalive_expiry is not None: config.keepalive_expiry = keepalive_expiry if retries is not None: config.retries = retries if timeout is not None: config.timeout = timeout if pool_timeout is not None: config.pool_timeout = pool_timeout self.config = config self.logger = get_logger(__name__) # Backwards compatibility attributes self.max_connections = self.config.max_connections self.max_keepalive = self.config.max_keepalive_connections self.max_keepalive_connections = self.config.max_keepalive_connections self.keepalive_expiry = self.config.keepalive_expiry self.retries = self.config.retries self.timeout = self.config.timeout self.pool_timeout = self.config.pool_timeout # Pool state self._clients: Dict[str, httpx.Client] = {} self._async_clients: Dict[str, httpx.AsyncClient] = {} # ENVIRONMENT-AWARE LOCKING: Use appropriate locking strategy # Production: Full threading.Lock() for thread safety # pytest-xdist: Simplified locking to prevent cross-process deadlocks self._use_locking = not _is_pytest_xdist_worker() if self._use_locking: self._lock = threading.Lock() else: # In pytest-xdist, each worker is isolated, so we can use a no-op lock self._lock = _NoOpLock() self._last_used: Dict[str, float] = {} # Statistics self._stats = { "total_requests": 0, "pool_hits": 0, "pool_misses": 0, "connections_created": 0, "connections_reused": 0, }
[docs] def get_client( self, base_url: str, headers: Optional[Dict[str, str]] = None, **kwargs: Any ) -> httpx.Client: """Get or create an HTTP client from the pool. Args: base_url: Base URL for the client headers: Default headers **kwargs: Additional client configuration Returns: HTTP client instance """ with self._lock: # Check if we have a client for this base URL if base_url in self._clients: client = self._clients[base_url] if self._is_client_healthy(client): self._last_used[base_url] = time.time() self._stats["pool_hits"] += 1 self._stats["connections_reused"] += 1 return client # Remove unhealthy client del self._clients[base_url] if base_url in self._last_used: del self._last_used[base_url] # Create new client self._stats["pool_misses"] += 1 self._stats["connections_created"] += 1 self._stats["total_requests"] += 1 # Remove timeout from kwargs if it exists to avoid duplicate client_kwargs = kwargs.copy() if "timeout" in client_kwargs: del client_kwargs["timeout"] client = httpx.Client( base_url=base_url, headers=headers, limits=httpx.Limits( max_connections=self.config.max_connections, max_keepalive_connections=self.config.max_keepalive_connections, keepalive_expiry=self.config.keepalive_expiry, ), timeout=self.config.timeout, **client_kwargs, ) self._clients[base_url] = client self._last_used[base_url] = time.time() self.logger.debug(f"Created new HTTP client for {base_url}") return client
[docs] def get_async_client( self, base_url: str, headers: Optional[Dict[str, str]] = None, **kwargs: Any ) -> httpx.AsyncClient: """Get or create an async HTTP client from the pool. Args: base_url: Base URL for the client headers: Default headers **kwargs: Additional client configuration Returns: Async HTTP client instance """ with self._lock: # Check if we have a client for this base URL if base_url in self._async_clients: client = self._async_clients[base_url] if self._is_async_client_healthy(client): self._last_used[base_url] = time.time() self._stats["pool_hits"] += 1 self._stats["connections_reused"] += 1 return client # Remove unhealthy client del self._async_clients[base_url] if base_url in self._last_used: del self._last_used[base_url] # Create new client self._stats["pool_misses"] += 1 self._stats["connections_created"] += 1 self._stats["total_requests"] += 1 # Remove timeout from kwargs if it exists to avoid duplicate client_kwargs = kwargs.copy() if "timeout" in client_kwargs: del client_kwargs["timeout"] client = httpx.AsyncClient( base_url=base_url, headers=headers, limits=httpx.Limits( max_connections=self.config.max_connections, max_keepalive_connections=self.config.max_keepalive_connections, keepalive_expiry=self.config.keepalive_expiry, ), timeout=self.config.timeout, **client_kwargs, ) self._async_clients[base_url] = client self._last_used[base_url] = time.time() self.logger.debug(f"Created new async HTTP client for {base_url}") return client
def _is_client_healthy(self, client: httpx.Client) -> bool: """Check if a client is healthy and can be reused.""" try: # Check if client is closed if hasattr(client, "is_closed") and client.is_closed: return False # Check if client has been idle too long if hasattr(client, "_transport"): transport = client._transport if hasattr(transport, "pool"): pool = transport.pool if hasattr(pool, "connections"): # Check if pool has available connections return len(pool.connections) > 0 # If we can't determine health from transport, assume it's healthy # This covers cases where the client is open but transport details # are not accessible return True except Exception: return False def _is_async_client_healthy(self, client: httpx.AsyncClient) -> bool: """Check if an async client is healthy and can be reused.""" try: # Check if client is closed if hasattr(client, "is_closed") and client.is_closed: return False # For async clients, we can't easily check transport state # So we assume they're healthy if not explicitly closed return True except Exception: return False
[docs] def cleanup_idle_connections(self, max_idle_time: float = 300.0) -> None: """Clean up idle connections. Args: max_idle_time: Maximum idle time in seconds """ current_time = time.time() to_remove = [] with self._lock: for base_url, last_used in self._last_used.items(): if current_time - last_used > max_idle_time: to_remove.append(base_url) for base_url in to_remove: if base_url in self._clients: try: self._clients[base_url].close() except Exception: pass del self._clients[base_url] if base_url in self._async_clients: try: # Note: AsyncClient doesn't have close() method pass except Exception: pass del self._async_clients[base_url] if base_url in self._last_used: del self._last_used[base_url] self.logger.debug(f"Cleaned up idle connection for {base_url}")
[docs] def get_stats(self) -> Dict[str, Any]: """Get pool statistics. Returns: Dictionary with pool statistics """ with self._lock: stats = self._stats.copy() stats.update( { "active_connections": len(self._clients), "active_async_connections": len(self._async_clients), "total_connections": len(self._clients) + len(self._async_clients), } ) return stats
@property def active_connections(self) -> int: """Get number of active connections. Returns: Number of active connections """ with self._lock: return len(self._clients)
[docs] def get_connection(self, base_url: str) -> Optional[httpx.Client]: """Get a connection for a specific base URL. Args: base_url: Base URL for the connection Returns: HTTP client instance or None if not found """ with self._lock: if base_url in self._clients: client = self._clients[base_url] if self._is_client_healthy(client): return client return None
[docs] def return_connection(self, base_url: str, client: httpx.Client) -> None: """Return a connection to the pool. Args: base_url: Base URL for the connection client: HTTP client to return """ with self._lock: if base_url not in self._clients: self._clients[base_url] = client self._last_used[base_url] = time.time()
[docs] def get_async_connection(self, base_url: str) -> Optional[httpx.AsyncClient]: """Get an async connection for a specific base URL. Args: base_url: Base URL for the connection Returns: Async HTTP client instance or None if not found """ with self._lock: if base_url in self._async_clients: client = self._async_clients[base_url] if self._is_async_client_healthy(client): return client return None
[docs] def return_async_connection(self, base_url: str, client: httpx.AsyncClient) -> None: """Return an async connection to the pool. Args: base_url: Base URL for the connection client: Async HTTP client to return """ with self._lock: if base_url not in self._async_clients: self._async_clients[base_url] = client self._last_used[base_url] = time.time()
[docs] def close_connection(self, base_url: str) -> None: """Close a specific connection. Args: base_url: Base URL for the connection """ with self._lock: if base_url in self._clients: try: self._clients[base_url].close() except Exception as e: self.logger.warning(f"Failed to close client: {e}") finally: del self._clients[base_url] if base_url in self._last_used: del self._last_used[base_url]
[docs] def cleanup(self) -> None: """Clean up expired connections.""" current_time = time.time() # First, identify expired URLs while holding the lock with self._lock: expired_urls = [] for base_url, last_used in self._last_used.items(): if current_time - last_used > self.config.keepalive_expiry: expired_urls.append(base_url) # Then close expired connections without holding the lock for base_url in expired_urls: self.close_connection(base_url)
[docs] def close_all(self) -> None: """Close all connections in the pool.""" with self._lock: # Close sync clients for client in self._clients.values(): try: client.close() except Exception as e: self.logger.warning(f"Failed to close client: {e}") # Note: AsyncClient doesn't have close() method # They should be closed by the user when done self._clients.clear() self._async_clients.clear() self._last_used.clear() self.logger.info("Closed all connections in pool")
[docs] def reset_stats(self) -> None: """Reset pool statistics.""" with self._lock: self._stats = { "pool_hits": 0, "pool_misses": 0, "connections_created": 0, "connections_reused": 0, "total_requests": 0, }
[docs] def close_all_clients(self) -> None: """Close all clients in the pool (alias for close_all).""" self.close_all()
[docs] async def aclose_all_clients(self) -> None: """Close all async clients in the pool.""" with self._lock: for client in self._async_clients.values(): try: await client.aclose() except Exception as e: self.logger.warning(f"Error closing async client: {e}") self._async_clients.clear() # Remove async clients from last_used keys_to_remove = [ k for k, v in self._last_used.items() if k in self._async_clients ] for key in keys_to_remove: del self._last_used[key]
async def __aenter__(self) -> "ConnectionPool": """Async context manager entry.""" return self async def __aexit__( self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[Any], ) -> None: """Async context manager exit.""" await self.aclose_all_clients() def __enter__(self) -> "ConnectionPool": """Context manager entry.""" return self def __exit__( self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[Any], ) -> None: """Context manager exit.""" self.close_all()
[docs] class PooledHTTPClient: """HTTP client that uses connection pooling.""" def __init__(self, pool: ConnectionPool, **kwargs: Any) -> None: """Initialize pooled HTTP client. Args: pool: Connection pool instance **kwargs: Client configuration """ self.pool = pool self.config = kwargs self.logger = get_logger(__name__)
[docs] def get(self, url: str, **kwargs: Any) -> httpx.Response: """Make GET request.""" # Extract base URL for pooling if url.startswith("http"): parsed = urllib.parse.urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}" else: base_url = "http://localhost" # Get client from pool client = self.pool.get_connection(base_url) # If no client in pool, create a new one if client is None: client = httpx.Client(**self.config) self.logger.debug(f"Created new HTTP client for {base_url}") # Make request self.pool._stats["total_requests"] += 1 try: response = client.get(url, **kwargs) return response except Exception as e: self.logger.error(f"HTTP GET request failed: {e}") raise finally: # Always return the connection to the pool self.pool.return_connection(base_url, client)
[docs] def post(self, url: str, **kwargs: Any) -> httpx.Response: """Make POST request.""" # Extract base URL for pooling if url.startswith("http"): parsed = urllib.parse.urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}" else: base_url = "http://localhost" # Get client from pool client = self.pool.get_connection(base_url) # If no client in pool, create a new one if client is None: client = httpx.Client(**self.config) self.logger.debug(f"Created new HTTP client for {base_url}") # Make request self.pool._stats["total_requests"] += 1 try: response = client.post(url, **kwargs) return response except Exception as e: self.logger.error(f"HTTP POST request failed: {e}") raise finally: # Always return the connection to the pool self.pool.return_connection(base_url, client)
[docs] def put(self, url: str, **kwargs: Any) -> httpx.Response: """Make PUT request.""" # Extract base URL for pooling if url.startswith("http"): parsed = urllib.parse.urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}" else: base_url = "http://localhost" # Get client from pool client = self.pool.get_connection(base_url) # If no client in pool, create a new one if client is None: client = httpx.Client(**self.config) self.logger.debug(f"Created new HTTP client for {base_url}") # Make request self.pool._stats["total_requests"] += 1 try: response = client.put(url, **kwargs) return response except Exception as e: self.logger.error(f"HTTP PUT request failed: {e}") raise finally: # Always return the connection to the pool self.pool.return_connection(base_url, client)
[docs] def delete(self, url: str, **kwargs: Any) -> httpx.Response: """Make DELETE request.""" # Extract base URL for pooling if url.startswith("http"): parsed = urllib.parse.urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}" else: base_url = "http://localhost" # Get client from pool client = self.pool.get_connection(base_url) # If no client in pool, create a new one if client is None: client = httpx.Client(**self.config) self.logger.debug(f"Created new HTTP client for {base_url}") # Make request self.pool._stats["total_requests"] += 1 try: response = client.delete(url, **kwargs) return response except Exception as e: self.logger.error(f"HTTP DELETE request failed: {e}") raise finally: # Always return the connection to the pool self.pool.return_connection(base_url, client)
[docs] def patch(self, url: str, **kwargs: Any) -> httpx.Response: """Make PATCH request.""" # Extract base URL for pooling if url.startswith("http"): parsed = urllib.parse.urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}" else: base_url = "http://localhost" # Get client from pool client = self.pool.get_connection(base_url) # If no client in pool, create a new one if client is None: client = httpx.Client(**self.config) self.logger.debug(f"Created new HTTP client for {base_url}") # Make request self.pool._stats["total_requests"] += 1 try: response = client.patch(url, **kwargs) return response except Exception as e: self.logger.error(f"HTTP PATCH request failed: {e}") raise finally: # Always return the connection to the pool self.pool.return_connection(base_url, client)
[docs] class PooledAsyncHTTPClient: """Async HTTP client that uses connection pooling.""" def __init__(self, pool: ConnectionPool, **kwargs: Any) -> None: """Initialize pooled async HTTP client. Args: pool: Connection pool instance **kwargs: Client configuration """ self.pool = pool self.config = kwargs self.logger = get_logger(__name__)
[docs] async def get(self, url: str, **kwargs: Any) -> httpx.Response: """Make async GET request.""" # Extract base URL for pooling if url.startswith("http"): parsed = urllib.parse.urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}" else: base_url = "http://localhost" # Get client from pool client = self.pool.get_async_connection(base_url) # If no client in pool, create a new one if client is None: client = httpx.AsyncClient(**self.config) self.logger.debug(f"Created new async HTTP client for {base_url}") # Make request self.pool._stats["total_requests"] += 1 try: response = await client.get(url, **kwargs) return response except Exception as e: self.logger.error(f"Async HTTP GET request failed: {e}") raise finally: # Always return the connection to the pool self.pool.return_async_connection(base_url, client)
[docs] async def post(self, url: str, **kwargs: Any) -> httpx.Response: """Make async POST request.""" # Extract base URL for pooling if url.startswith("http"): parsed = urllib.parse.urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}" else: base_url = "http://localhost" # Get client from pool client = self.pool.get_async_connection(base_url) # If no client in pool, create a new one if client is None: client = httpx.AsyncClient(**self.config) self.logger.debug(f"Created new async HTTP client for {base_url}") # Make request self.pool._stats["total_requests"] += 1 try: response = await client.post(url, **kwargs) return response except Exception as e: self.logger.error(f"HTTP POST request failed: {e}") raise finally: # Always return the connection to the pool self.pool.return_async_connection(base_url, client)
[docs] async def put(self, url: str, **kwargs: Any) -> httpx.Response: """Make async PUT request.""" # Extract base URL for pooling if url.startswith("http"): parsed = urllib.parse.urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}" else: base_url = "http://localhost" # Get client from pool client = self.pool.get_async_connection(base_url) # If no client in pool, create a new one if client is None: client = httpx.AsyncClient(**self.config) self.logger.debug(f"Created new async HTTP client for {base_url}") # Make request self.pool._stats["total_requests"] += 1 try: response = await client.put(url, **kwargs) return response except Exception as e: self.logger.error(f"Async HTTP PUT request failed: {e}") raise finally: # Always return the connection to the pool self.pool.return_async_connection(base_url, client)
[docs] async def delete(self, url: str, **kwargs: Any) -> httpx.Response: """Make async DELETE request.""" # Extract base URL for pooling if url.startswith("http"): parsed = urllib.parse.urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}" else: base_url = "http://localhost" # Get client from pool client = self.pool.get_async_connection(base_url) # If no client in pool, create a new one if client is None: client = httpx.AsyncClient(**self.config) self.logger.debug(f"Created new async HTTP client for {base_url}") # Make request self.pool._stats["total_requests"] += 1 try: response = await client.delete(url, **kwargs) return response except Exception as e: self.logger.error(f"Async HTTP DELETE request failed: {e}") raise finally: # Always return the connection to the pool self.pool.return_async_connection(base_url, client)
[docs] async def patch(self, url: str, **kwargs: Any) -> httpx.Response: """Make async PATCH request.""" # Extract base URL for pooling if url.startswith("http"): parsed = urllib.parse.urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}" else: base_url = "http://localhost" # Get client from pool client = self.pool.get_async_connection(base_url) # If no client in pool, create a new one if client is None: client = httpx.AsyncClient(**self.config) self.logger.debug(f"Created new async HTTP client for {base_url}") # Make request self.pool._stats["total_requests"] += 1 try: response = await client.patch(url, **kwargs) return response except Exception as e: self.logger.error(f"Async HTTP PATCH request failed: {e}") raise finally: # Always return the connection to the pool self.pool.return_async_connection(base_url, client)
# DEPRECATED: Global connection pool removed in favor of multi-instance pattern # Each HoneyHive client now creates its own ConnectionPool instance to prevent # pytest-xdist deadlocks and improve isolation between tracer instances. def get_global_pool(config: Optional[PoolConfig] = None) -> ConnectionPool: """DEPRECATED: Create a new connection pool instance. This function is deprecated and maintained only for backward compatibility. New code should create ConnectionPool instances directly. MIGRATION: Replace get_global_pool() with ConnectionPool(config) Args: config: Pool configuration Returns: New ConnectionPool instance (not global) """ # Return a new instance instead of a global singleton # This maintains backward compatibility while preventing deadlocks return ConnectionPool(config or PoolConfig()) def close_global_pool() -> None: """DEPRECATED: No-op function for backward compatibility. Since connection pools are now per-client instance, there's no global pool to close. Each ConnectionPool is closed when its parent client is garbage collected or explicitly closed. """ # No-op for backward compatibility