"""Caching utilities for HoneyHive."""
import hashlib
import threading
import time
from dataclasses import dataclass
from typing import Any, Callable, Dict, Optional
@dataclass
class CacheConfig:
"""Configuration for cache."""
max_size: int = 1000
default_ttl: float = 300.0 # 5 minutes
cleanup_interval: float = 60.0 # 1 minute
enable_stats: bool = True
[docs]
class CacheEntry:
"""Cache entry with metadata."""
def __init__(self, key: str, value: Any, ttl: float = 300.0):
"""Initialize cache entry.
Args:
key: Cache key
value: Cached value
ttl: Time to live in seconds
"""
self.key = key
self.value = value
self.created_at = time.time()
self.ttl = ttl
self.access_count = 0
self.last_accessed = self.created_at
[docs]
def is_expired(self) -> bool:
"""Check if entry is expired.
Returns:
True if expired, False otherwise
"""
return time.time() - self.created_at > self.ttl
[docs]
def access(self) -> None:
"""Mark entry as accessed."""
self.access_count += 1
self.last_accessed = time.time()
[docs]
def get_age(self) -> float:
"""Get age of entry in seconds.
Returns:
Age in seconds
"""
return time.time() - self.created_at
[docs]
def get_remaining_ttl(self) -> float:
"""Get remaining TTL in seconds.
Returns:
Remaining TTL in seconds
"""
remaining = self.ttl - self.get_age()
return max(0, remaining)
@property
def expiry(self) -> float:
"""Get expiry timestamp.
Returns:
Timestamp when entry expires
"""
return self.created_at + self.ttl
[docs]
class Cache:
"""In-memory cache with TTL and size limits."""
def __init__(self, config: Optional[CacheConfig] = None):
"""Initialize cache.
Args:
config: Cache configuration
"""
self.config = config or CacheConfig()
# Cache storage
self._cache: Dict[str, CacheEntry] = {}
self._lock = threading.RLock()
# Statistics
self._stats = {
"hits": 0,
"misses": 0,
"sets": 0,
"deletes": 0,
"expired": 0,
"evictions": 0,
}
# Cleanup thread
self._cleanup_thread: Optional[threading.Thread] = None
self._stop_cleanup = threading.Event()
self._start_cleanup_thread()
@property
def cache(self) -> Dict[str, CacheEntry]:
"""Get the underlying cache dictionary.
Returns:
Cache dictionary
"""
return self._cache
@property
def hits(self) -> int:
"""Get cache hit count.
Returns:
Number of cache hits
"""
return self._stats["hits"]
@property
def misses(self) -> int:
"""Get cache miss count.
Returns:
Number of cache misses
"""
return self._stats["misses"]
def _start_cleanup_thread(self) -> None:
"""Start cleanup thread."""
if self.config.cleanup_interval > 0:
self._cleanup_thread = threading.Thread(
target=self._cleanup_worker, daemon=True
)
self._cleanup_thread.start()
def _cleanup_worker(self) -> None:
"""Cleanup worker thread."""
while not self._stop_cleanup.wait(self.config.cleanup_interval):
self.cleanup_expired()
def _generate_key(self, *args: Any, **kwargs: Any) -> str:
"""Generate cache key from arguments.
Args:
*args: Positional arguments
**kwargs: Keyword arguments
Returns:
Cache key string
"""
# Create a string representation of the arguments
key_parts = [str(arg) for arg in args]
key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items()))
key_string = "|".join(key_parts)
# Hash the key string for consistent length
return hashlib.md5(key_string.encode()).hexdigest()
[docs]
def generate_key(self, *args: Any, **kwargs: Any) -> str:
"""Generate cache key from arguments (public method).
Args:
*args: Positional arguments
**kwargs: Keyword arguments
Returns:
Cache key string
"""
return self._generate_key(*args, **kwargs)
[docs]
def get(self, key: str, default: Any = None) -> Any:
"""Get value from cache.
Args:
key: Cache key
default: Default value if key not found
Returns:
Cached value or default
"""
with self._lock:
if key in self._cache:
entry = self._cache[key]
if entry.is_expired():
# Remove expired entry
del self._cache[key]
self._stats["expired"] += 1
self._stats["misses"] += 1
return default
# Mark as accessed
entry.access()
self._stats["hits"] += 1
return entry.value
self._stats["misses"] += 1
return default
[docs]
def set(self, key: str, value: Any, ttl: Optional[float] = None) -> None:
"""Set value in cache.
Args:
key: Cache key
value: Value to cache
ttl: Time to live in seconds (uses default if None)
"""
if ttl is None:
ttl = self.config.default_ttl
with self._lock:
# Check if we need to evict entries
if len(self._cache) >= self.config.max_size:
self._evict_entries()
# Create cache entry
entry = CacheEntry(key, value, ttl)
self._cache[key] = entry
self._stats["sets"] += 1
[docs]
def delete(self, key: str) -> bool:
"""Delete key from cache.
Args:
key: Cache key to delete
Returns:
True if key was deleted, False if not found
"""
with self._lock:
if key in self._cache:
del self._cache[key]
self._stats["deletes"] += 1
return True
return False
[docs]
def exists(self, key: str) -> bool:
"""Check if key exists in cache.
Args:
key: Cache key to check
Returns:
True if key exists and not expired, False otherwise
"""
with self._lock:
if key in self._cache:
entry = self._cache[key]
if entry.is_expired():
del self._cache[key]
self._stats["expired"] += 1
return False
return True
return False
[docs]
def clear(self) -> None:
"""Clear all entries from cache."""
with self._lock:
self._cache.clear()
self._reset_stats()
[docs]
def cleanup_expired(self) -> int:
"""Clean up expired entries.
Returns:
Number of entries cleaned up
"""
cleaned = 0
current_time = time.time()
with self._lock:
expired_keys = [
key
for key, entry in self._cache.items()
if current_time - entry.created_at > entry.ttl
]
for key in expired_keys:
del self._cache[key]
cleaned += 1
self._stats["expired"] += 1
return cleaned
def _evict_entries(self, count: int = 1) -> None:
"""Evict entries based on LRU policy.
Args:
count: Number of entries to evict
"""
if len(self._cache) < count:
return
# Sort entries by last accessed time (LRU)
entries = sorted(self._cache.items(), key=lambda x: x[1].last_accessed)
# Remove oldest entries
for i in range(count):
if i < len(entries):
key, _ = entries[i]
del self._cache[key]
self._stats["evictions"] += 1
[docs]
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics.
Returns:
Dictionary with cache statistics
"""
with self._lock:
stats = self._stats.copy()
stats["size"] = len(self._cache)
stats["max_size"] = self.config.max_size
stats["hit_rate"] = int(
self._stats["hits"]
/ max(1, self._stats["hits"] + self._stats["misses"])
* 100
)
return stats
def _reset_stats(self) -> None:
"""Reset cache statistics."""
for key in self._stats:
self._stats[key] = 0
[docs]
def stats(self) -> Dict[str, Any]:
"""Get cache statistics.
Returns:
Dictionary with cache statistics
"""
with self._lock:
total_requests = self._stats["hits"] + self._stats["misses"]
return {
"size": len(self._cache),
"max_size": self.config.max_size,
"hits": self._stats["hits"],
"misses": self._stats["misses"],
"total_requests": total_requests,
"hit_rate": self._stats["hits"] / max(1, total_requests),
"sets": self._stats["sets"],
"deletes": self._stats["deletes"],
"expired": self._stats["expired"],
"evictions": self._stats["evictions"],
}
[docs]
def cleanup(self) -> None:
"""Clean up expired entries and perform maintenance."""
self.cleanup_expired()
[docs]
def close(self) -> None:
"""Close cache and cleanup resources."""
self._stop_cleanup.set()
if self._cleanup_thread and self._cleanup_thread.is_alive():
self._cleanup_thread.join(timeout=1.0)
self.clear()
def __enter__(self) -> "Cache":
"""Context manager entry."""
return self
def __exit__(
self,
exc_type: Optional[type],
exc_val: Optional[BaseException],
exc_tb: Optional[Any],
) -> None:
"""Context manager exit."""
self.close()
[docs]
class FunctionCache: # pylint: disable=too-few-public-methods
"""Function result cache decorator.
Note: too-few-public-methods disabled - Decorator classes need __init__/__call__.
"""
def __init__(
self,
cache: Optional[Cache] = None,
ttl: Optional[float] = None,
key_func: Optional[Callable] = None,
):
"""Initialize function cache.
Args:
cache: Cache instance to use
ttl: Time to live for cached results
key_func: Custom key generation function
"""
self.cache = cache or Cache()
self.ttl = ttl
self.key_func = key_func
def __call__(self, func: Callable) -> Callable:
"""Cache decorator.
Args:
func: Function to cache
Returns:
Cached function
"""
def cached_func(*args: Any, **kwargs: Any) -> Any:
# Generate cache key
if self.key_func:
key = self.key_func(func, *args, **kwargs)
else:
key = self.cache.generate_key(func.__name__, *args, **kwargs)
# Try to get from cache
result = self.cache.get(key)
if result is not None:
return result
# Execute function and cache result
result = func(*args, **kwargs)
self.cache.set(key, result, self.ttl)
return result
return cached_func
[docs]
class AsyncFunctionCache: # pylint: disable=too-few-public-methods
"""Async function result cache decorator.
Note: too-few-public-methods disabled - Decorator classes need __init__/__call__.
"""
def __init__(
self,
cache: Optional[Cache] = None,
ttl: Optional[float] = None,
key_func: Optional[Callable] = None,
):
"""Initialize async function cache.
Args:
cache: Cache instance to use
ttl: Time to live for cached results
key_func: Custom key generation function
"""
self.cache = cache or Cache()
self.ttl = ttl
self.key_func = key_func
def __call__(self, func: Callable) -> Callable:
"""Async cache decorator.
Args:
func: Async function to cache
Returns:
Cached async function
"""
async def cached_func(*args: Any, **kwargs: Any) -> Any:
# Generate cache key
if self.key_func:
key = self.key_func(func, *args, **kwargs)
else:
key = self.cache.generate_key(func.__name__, *args, **kwargs)
# Try to get from cache
result = self.cache.get(key)
if result is not None:
return result
# Execute function and cache result
result = await func(*args, **kwargs)
self.cache.set(key, result, self.ttl)
return result
return cached_func
# Multi-Instance Cache Management
# Note: Global cache functions maintained for CLI compatibility only
class CacheManager:
"""Multi-instance cache manager for tracer instances.
This class provides per-instance cache management that aligns with
the multi-instance tracer architecture. Each tracer instance can
have its own isolated cache instances.
"""
def __init__(self, instance_id: str, config: Optional[CacheConfig] = None):
"""Initialize cache manager for a specific instance.
Args:
instance_id: Unique identifier for the instance (e.g., tracer ID)
config: Cache configuration
"""
self.instance_id = instance_id
self.config = config or CacheConfig()
self._caches: Dict[str, Cache] = {}
def get_cache(self, cache_name: str, config: Optional[CacheConfig] = None) -> Cache:
"""Get or create a named cache for this instance.
Args:
cache_name: Name of the cache (e.g., 'attributes', 'resources')
config: Optional cache-specific configuration
Returns:
Cache instance for the specified name
"""
if cache_name not in self._caches:
cache_config = config or self.config
self._caches[cache_name] = Cache(cache_config)
return self._caches[cache_name]
def close_all(self) -> None:
"""Close all caches managed by this instance."""
for cache in self._caches.values():
cache.close()
self._caches.clear()
def get_stats(self) -> Dict[str, Dict[str, Any]]:
"""Get statistics for all caches in this instance.
Returns:
Dictionary mapping cache names to their statistics
"""
return {name: cache.get_stats() for name, cache in self._caches.items()}
# Domain-specific cache methods for tracer functionality
def get_config_value(
self,
config_hash: str,
key: str,
default: Any,
resolver_func: Callable[[], Any],
) -> Any:
"""Get cached configuration value or resolve and cache it.
Args:
config_hash: Hash of the configuration object
key: Configuration key
default: Default value if not found
resolver_func: Function to resolve the value if not cached
Returns:
Cached or resolved configuration value
"""
cache = self.get_cache(
"config",
CacheConfig(
max_size=100,
default_ttl=900.0, # 15-minute TTL for config stability
cleanup_interval=180.0,
),
)
cache_key = f"config:{config_hash}:{key}:{hash(str(default))}"
# Check cache first
if cached := cache.get(cache_key):
return cached
# Resolve and cache
try:
value = resolver_func()
cache.set(cache_key, value)
return value
except Exception:
return default
def get_cached_attributes(
self,
attr_key: str,
normalizer_func: Callable[[], Any],
) -> Any:
"""Get cached normalized attributes or normalize and cache them.
Args:
attr_key: Attribute cache key
normalizer_func: Function to normalize the attribute if not cached
Returns:
Cached or normalized attribute value
"""
cache = self.get_cache(
"attributes",
CacheConfig(
max_size=1000, # High frequency operations
default_ttl=300.0, # 5-minute TTL
cleanup_interval=60.0,
),
)
# Check cache first
if cached := cache.get(attr_key):
return cached
# Normalize and cache
try:
value = normalizer_func()
cache.set(attr_key, value)
return value
except Exception:
return None
def get_cached_resources(
self,
resource_key: str,
detector_func: Callable[[], Dict[str, Any]],
) -> Dict[str, Any]:
"""Get cached resource detection results or detect and cache them.
Args:
resource_key: Resource cache key
detector_func: Function to detect resources if not cached
Returns:
Cached or detected resource information
"""
cache = self.get_cache(
"resources",
CacheConfig(
max_size=50, # Lower frequency, stable data
default_ttl=3600.0, # 1-hour TTL for system info
cleanup_interval=300.0,
),
)
# Check cache first
if cached := cache.get(resource_key):
return cached # type: ignore[no-any-return]
# Detect and cache
try:
resources = detector_func()
cache.set(resource_key, resources)
return resources
except Exception:
return {}
# Legacy global cache support for CLI and backward compatibility
_global_cache: Optional[Cache] = None
def get_global_cache(config: Optional[CacheConfig] = None) -> Cache:
"""Get or create global cache instance.
Note: This function is maintained for CLI and backward compatibility.
For tracer instances, use CacheManager for proper multi-instance isolation.
Args:
config: Cache configuration
Returns:
Global cache instance
"""
global _global_cache # pylint: disable=global-statement
if _global_cache is None:
_global_cache = Cache(config)
return _global_cache
def close_global_cache() -> None:
"""Close global cache instance.
Note: This function is maintained for CLI and backward compatibility.
"""
global _global_cache # pylint: disable=global-statement
if _global_cache is not None:
_global_cache.close()
_global_cache = None
def cache_function(
ttl: Optional[float] = None,
cache: Optional[Cache] = None,
key_func: Optional[Callable] = None,
) -> FunctionCache:
"""Decorator to cache function results.
Args:
ttl: Time to live for cached results
cache: Cache instance to use
key_func: Custom key generation function
Returns:
Function cache decorator
"""
return FunctionCache(cache=cache, ttl=ttl, key_func=key_func)
def cache_async_function(
ttl: Optional[float] = None,
cache: Optional[Cache] = None,
key_func: Optional[Callable] = None,
) -> AsyncFunctionCache:
"""Decorator to cache async function results.
Args:
ttl: Time to live for cached results
cache: Cache instance to use
key_func: Custom key generation function
Returns:
Async function cache decorator
"""
return AsyncFunctionCache(cache=cache, ttl=ttl, key_func=key_func)