Performance Testing & Benchmarking
Note
Problem-solving guide for performance testing HoneyHive SDK
Comprehensive solutions for measuring, validating, and optimizing HoneyHive SDK performance across different environments and workloads.
Performance testing ensures that HoneyHive SDK meets your application’s performance requirements and identifies potential bottlenecks before they impact production.
Quick Start
Problem: I need to quickly test if HoneyHive SDK adds acceptable overhead.
Solution:
import time
import statistics
from honeyhive import HoneyHiveTracer, trace
def quick_performance_test():
"""Quick performance impact assessment."""
tracer = HoneyHiveTracer.init(
api_key="test-key", # Or set HH_API_KEY environment variable
project="test-project", # Or set HH_PROJECT environment variable
test_mode=True # Or set HH_TEST_MODE=true
)
# Baseline measurement
def baseline_operation():
return sum(range(1000))
baseline_times = []
for _ in range(10):
start = time.perf_counter()
baseline_operation()
end = time.perf_counter()
baseline_times.append(end - start)
# Traced measurement
@trace(tracer=tracer)
def traced_operation():
return sum(range(1000))
traced_times = []
for _ in range(10):
start = time.perf_counter()
traced_operation()
end = time.perf_counter()
traced_times.append(end - start)
# Calculate overhead
baseline_avg = statistics.mean(baseline_times)
traced_avg = statistics.mean(traced_times)
overhead_ratio = traced_avg / baseline_avg
print(f"Baseline average: {baseline_avg * 1000:.2f}ms")
print(f"Traced average: {traced_avg * 1000:.2f}ms")
print(f"Overhead ratio: {overhead_ratio:.2f}x")
# Acceptable overhead: < 2x for most applications
assert overhead_ratio < 2.0, f"Overhead too high: {overhead_ratio:.2f}x"
return {
"baseline_ms": baseline_avg * 1000,
"traced_ms": traced_avg * 1000,
"overhead_ratio": overhead_ratio
}
# Run the test
results = quick_performance_test()
print(f"✅ Performance test passed: {results['overhead_ratio']:.2f}x overhead")
Performance Testing Framework
Problem: Set up comprehensive performance testing infrastructure.
Solution - Performance Test Framework:
"""Comprehensive performance testing framework for HoneyHive SDK."""
import time
import statistics
import threading
import asyncio
import psutil
import os
from typing import Dict, List, Any, Callable
from dataclasses import dataclass
from honeyhive import HoneyHiveTracer, trace
@dataclass
class PerformanceMetrics:
"""Performance measurement results."""
avg_time_ms: float
std_dev_ms: float
min_time_ms: float
max_time_ms: float
p95_time_ms: float
p99_time_ms: float
throughput_ops_per_sec: float
memory_usage_mb: float
class PerformanceTester:
"""Performance testing framework."""
def __init__(self, tracer: HoneyHiveTracer):
self.tracer = tracer
self.results = {}
def measure_function_performance(
self,
func: Callable,
iterations: int = 100,
warmup_iterations: int = 10,
name: str = None
) -> PerformanceMetrics:
"""Measure function performance with statistical analysis."""
name = name or func.__name__
# Warmup runs
for _ in range(warmup_iterations):
func()
# Measurement runs
times = []
initial_memory = self._get_memory_usage()
for _ in range(iterations):
start = time.perf_counter()
func()
end = time.perf_counter()
times.append(end - start)
final_memory = self._get_memory_usage()
memory_delta = final_memory - initial_memory
# Calculate statistics
times_ms = [t * 1000 for t in times]
avg_time = statistics.mean(times_ms)
std_dev = statistics.stdev(times_ms) if len(times_ms) > 1 else 0
min_time = min(times_ms)
max_time = max(times_ms)
# Calculate percentiles
sorted_times = sorted(times_ms)
p95_index = int(0.95 * len(sorted_times))
p99_index = int(0.99 * len(sorted_times))
p95_time = sorted_times[p95_index]
p99_time = sorted_times[p99_index]
# Calculate throughput
total_time = sum(times)
throughput = iterations / total_time if total_time > 0 else 0
metrics = PerformanceMetrics(
avg_time_ms=avg_time,
std_dev_ms=std_dev,
min_time_ms=min_time,
max_time_ms=max_time,
p95_time_ms=p95_time,
p99_time_ms=p99_time,
throughput_ops_per_sec=throughput,
memory_usage_mb=memory_delta
)
self.results[name] = metrics
return metrics
def compare_performance(
self,
baseline_func: Callable,
traced_func: Callable,
iterations: int = 100,
name: str = "comparison"
) -> Dict[str, Any]:
"""Compare performance between baseline and traced functions."""
baseline_metrics = self.measure_function_performance(
baseline_func, iterations, name=f"{name}_baseline"
)
traced_metrics = self.measure_function_performance(
traced_func, iterations, name=f"{name}_traced"
)
overhead_ratio = traced_metrics.avg_time_ms / baseline_metrics.avg_time_ms
throughput_ratio = traced_metrics.throughput_ops_per_sec / baseline_metrics.throughput_ops_per_sec
comparison = {
"baseline": baseline_metrics,
"traced": traced_metrics,
"overhead_ratio": overhead_ratio,
"throughput_ratio": throughput_ratio,
"is_acceptable": overhead_ratio < 2.0, # Configurable threshold
"memory_overhead_mb": traced_metrics.memory_usage_mb - baseline_metrics.memory_usage_mb
}
self.results[f"{name}_comparison"] = comparison
return comparison
def measure_concurrent_performance(
self,
func: Callable,
num_threads: int = 10,
operations_per_thread: int = 50
) -> Dict[str, Any]:
"""Measure performance under concurrent load."""
results = []
errors = []
def worker():
"""Worker thread function."""
thread_results = []
try:
for _ in range(operations_per_thread):
start = time.perf_counter()
func()
end = time.perf_counter()
thread_results.append(end - start)
results.extend(thread_results)
except Exception as e:
errors.append(e)
# Start concurrent workers
start_time = time.perf_counter()
threads = []
for _ in range(num_threads):
thread = threading.Thread(target=worker)
threads.append(thread)
thread.start()
# Wait for completion
for thread in threads:
thread.join()
end_time = time.perf_counter()
total_time = end_time - start_time
# Calculate concurrent metrics
if results:
times_ms = [t * 1000 for t in results]
avg_time = statistics.mean(times_ms)
total_operations = len(results)
throughput = total_operations / total_time
error_rate = len(errors) / (total_operations + len(errors))
else:
avg_time = 0
throughput = 0
error_rate = 1.0
concurrent_metrics = {
"num_threads": num_threads,
"operations_per_thread": operations_per_thread,
"total_operations": len(results),
"avg_time_ms": avg_time,
"total_time_s": total_time,
"throughput_ops_per_sec": throughput,
"error_count": len(errors),
"error_rate": error_rate,
"errors": [str(e) for e in errors[:5]] # First 5 errors
}
self.results["concurrent_performance"] = concurrent_metrics
return concurrent_metrics
def _get_memory_usage(self) -> float:
"""Get current memory usage in MB."""
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
def generate_report(self) -> str:
"""Generate performance test report."""
report = ["Performance Test Report", "=" * 25, ""]
for name, result in self.results.items():
report.append(f"## {name}")
if isinstance(result, PerformanceMetrics):
report.extend([
f"Average Time: {result.avg_time_ms:.2f}ms",
f"Std Deviation: {result.std_dev_ms:.2f}ms",
f"P95: {result.p95_time_ms:.2f}ms",
f"P99: {result.p99_time_ms:.2f}ms",
f"Throughput: {result.throughput_ops_per_sec:.2f} ops/sec",
f"Memory Usage: {result.memory_usage_mb:.2f}MB",
""
])
elif "comparison" in name:
report.extend([
f"Overhead Ratio: {result['overhead_ratio']:.2f}x",
f"Throughput Ratio: {result['throughput_ratio']:.2f}x",
f"Acceptable: {'✅' if result['is_acceptable'] else '❌'}",
f"Memory Overhead: {result['memory_overhead_mb']:.2f}MB",
""
])
return "\n".join(report)
Using the Performance Framework:
def test_comprehensive_performance():
"""Comprehensive performance test using the framework."""
tracer = HoneyHiveTracer.init(
api_key="perf-test-key", # Or set HH_API_KEY environment variable
project="perf-project", # Or set HH_PROJECT environment variable
test_mode=True # Or set HH_TEST_MODE=true
)
tester = PerformanceTester(tracer)
# Define test functions
def baseline_computation():
return sum(i * i for i in range(100))
@trace(tracer=tracer)
def traced_computation():
return sum(i * i for i in range(100))
# Run performance comparisons
comparison = tester.compare_performance(
baseline_computation,
traced_computation,
iterations=200,
name="computation_test"
)
# Test concurrent performance
concurrent_results = tester.measure_concurrent_performance(
traced_computation,
num_threads=5,
operations_per_thread=20
)
# Generate and print report
report = tester.generate_report()
print(report)
# Assert performance requirements
assert comparison["overhead_ratio"] < 2.0
assert concurrent_results["error_rate"] < 0.01
assert concurrent_results["throughput_ops_per_sec"] > 100
Memory Performance Testing
Problem: Test memory usage and detect memory leaks.
Solution - Memory Testing Framework:
"""Memory performance testing for HoneyHive SDK."""
import gc
import psutil
import os
import time
from typing import List, Dict
from honeyhive import HoneyHiveTracer
class MemoryTester:
"""Memory usage testing framework."""
def __init__(self):
self.process = psutil.Process(os.getpid())
self.baseline_memory = None
def start_monitoring(self):
"""Start memory monitoring baseline."""
gc.collect() # Force garbage collection
time.sleep(0.1) # Allow GC to complete
self.baseline_memory = self.process.memory_info().rss / 1024 / 1024
def measure_memory_usage(self) -> float:
"""Get current memory usage in MB."""
return self.process.memory_info().rss / 1024 / 1024
def test_tracer_memory_usage(self, num_tracers: int = 10) -> Dict[str, float]:
"""Test memory usage with multiple tracers."""
self.start_monitoring()
initial_memory = self.measure_memory_usage()
tracers = []
for i in range(num_tracers):
tracer = HoneyHiveTracer.init(
api_key=f"memory-test-key-{i}", # Unique API key for each tracer instance
project=f"memory-project-{i}", # Unique project for each tracer instance
test_mode=True # Or set HH_TEST_MODE=true
)
tracers.append(tracer)
# Create some spans
for j in range(10):
with tracer.trace(f"memory-span-{j}") as span:
span.set_attribute("iteration", j)
span.set_attribute("tracer_id", i)
after_creation_memory = self.measure_memory_usage()
# Clean up tracers
for tracer in tracers:
tracer.close()
del tracers
gc.collect()
time.sleep(0.1)
after_cleanup_memory = self.measure_memory_usage()
return {
"initial_mb": initial_memory,
"after_creation_mb": after_creation_memory,
"after_cleanup_mb": after_cleanup_memory,
"peak_usage_mb": after_creation_memory - initial_memory,
"memory_leak_mb": after_cleanup_memory - initial_memory,
"memory_per_tracer_mb": (after_creation_memory - initial_memory) / num_tracers
}
def test_span_memory_growth(self, num_spans: int = 1000) -> Dict[str, float]:
"""Test memory growth with many spans."""
tracer = HoneyHiveTracer.init(
api_key="span-memory-test", # Or set HH_API_KEY environment variable
project="span-memory-project", # Or set HH_PROJECT environment variable
test_mode=True # Or set HH_TEST_MODE=true
)
self.start_monitoring()
initial_memory = self.measure_memory_usage()
memory_samples = []
sample_interval = max(1, num_spans // 10) # Sample 10 times
for i in range(num_spans):
with tracer.trace(f"memory-test-span-{i}") as span:
span.set_attribute("span.index", i)
span.set_attribute("span.data", f"data-{i}" * 10) # Some data
if i % sample_interval == 0:
memory_samples.append(self.measure_memory_usage())
final_memory = self.measure_memory_usage()
# Calculate memory growth
if len(memory_samples) > 1:
memory_growth_rate = (memory_samples[-1] - memory_samples[0]) / len(memory_samples)
else:
memory_growth_rate = 0
tracer.close()
return {
"initial_mb": initial_memory,
"final_mb": final_memory,
"total_growth_mb": final_memory - initial_memory,
"memory_per_span_kb": (final_memory - initial_memory) * 1024 / num_spans,
"memory_growth_rate_mb": memory_growth_rate,
"memory_samples": memory_samples
}
def test_long_running_memory_stability(self, duration_seconds: int = 60) -> Dict[str, Any]:
"""Test memory stability over time."""
tracer = HoneyHiveTracer.init(
api_key="stability-test", # Or set HH_API_KEY environment variable
project="stability-project", # Or set HH_PROJECT environment variable
test_mode=True # Or set HH_TEST_MODE=true
)
self.start_monitoring()
start_time = time.time()
memory_samples = []
span_count = 0
while time.time() - start_time < duration_seconds:
with tracer.trace(f"stability-span-{span_count}") as span:
span.set_attribute("timestamp", time.time())
span_count += 1
# Sample memory every second
if span_count % 10 == 0: # Assuming ~10 spans per second
memory_samples.append({
"time": time.time() - start_time,
"memory_mb": self.measure_memory_usage(),
"span_count": span_count
})
time.sleep(0.1) # ~10 spans per second
tracer.close()
# Analyze memory stability
memories = [sample["memory_mb"] for sample in memory_samples]
if memories:
avg_memory = sum(memories) / len(memories)
max_memory = max(memories)
min_memory = min(memories)
memory_variance = max_memory - min_memory
else:
avg_memory = max_memory = min_memory = memory_variance = 0
return {
"duration_seconds": duration_seconds,
"span_count": span_count,
"memory_samples": memory_samples,
"avg_memory_mb": avg_memory,
"max_memory_mb": max_memory,
"min_memory_mb": min_memory,
"memory_variance_mb": memory_variance,
"spans_per_second": span_count / duration_seconds
}
Running Memory Tests:
def test_memory_performance():
"""Run comprehensive memory performance tests."""
tester = MemoryTester()
# Test multiple tracers
tracer_memory = tester.test_tracer_memory_usage(num_tracers=5)
print(f"Memory per tracer: {tracer_memory['memory_per_tracer_mb']:.2f}MB")
print(f"Memory leak: {tracer_memory['memory_leak_mb']:.2f}MB")
# Test span memory growth
span_memory = tester.test_span_memory_growth(num_spans=500)
print(f"Memory per span: {span_memory['memory_per_span_kb']:.2f}KB")
# Test long-running stability
stability = tester.test_long_running_memory_stability(duration_seconds=30)
print(f"Memory variance: {stability['memory_variance_mb']:.2f}MB")
# Assert memory requirements
assert tracer_memory['memory_per_tracer_mb'] < 10.0 # < 10MB per tracer
assert tracer_memory['memory_leak_mb'] < 1.0 # < 1MB leak
assert span_memory['memory_per_span_kb'] < 5.0 # < 5KB per span
assert stability['memory_variance_mb'] < 50.0 # < 50MB variance
Async Performance Testing
Problem: Test performance of async operations with HoneyHive.
Solution - Async Performance Framework:
"""Async performance testing for HoneyHive SDK."""
import asyncio
import time
import statistics
from typing import List, Callable, Awaitable
from honeyhive import HoneyHiveTracer, atrace
class AsyncPerformanceTester:
"""Async performance testing framework."""
def __init__(self, tracer: HoneyHiveTracer):
self.tracer = tracer
async def measure_async_function(
self,
async_func: Callable[[], Awaitable],
iterations: int = 100,
concurrent_tasks: int = 1
) -> Dict[str, float]:
"""Measure async function performance."""
async def timed_execution():
start = time.perf_counter()
await async_func()
return time.perf_counter() - start
# Run iterations with specified concurrency
all_times = []
for batch in range(0, iterations, concurrent_tasks):
batch_size = min(concurrent_tasks, iterations - batch)
# Create concurrent tasks
tasks = [timed_execution() for _ in range(batch_size)]
# Execute concurrently
batch_times = await asyncio.gather(*tasks)
all_times.extend(batch_times)
# Calculate statistics
times_ms = [t * 1000 for t in all_times]
return {
"avg_time_ms": statistics.mean(times_ms),
"std_dev_ms": statistics.stdev(times_ms) if len(times_ms) > 1 else 0,
"min_time_ms": min(times_ms),
"max_time_ms": max(times_ms),
"p95_time_ms": sorted(times_ms)[int(0.95 * len(times_ms))],
"total_time_s": sum(all_times),
"throughput_ops_per_sec": len(all_times) / sum(all_times) if sum(all_times) > 0 else 0
}
async def compare_async_performance(
self,
baseline_func: Callable[[], Awaitable],
traced_func: Callable[[], Awaitable],
iterations: int = 50,
concurrent_tasks: int = 5
) -> Dict[str, Any]:
"""Compare async performance between baseline and traced functions."""
baseline_metrics = await self.measure_async_function(
baseline_func, iterations, concurrent_tasks
)
traced_metrics = await self.measure_async_function(
traced_func, iterations, concurrent_tasks
)
overhead_ratio = traced_metrics["avg_time_ms"] / baseline_metrics["avg_time_ms"]
return {
"baseline": baseline_metrics,
"traced": traced_metrics,
"overhead_ratio": overhead_ratio,
"is_acceptable": overhead_ratio < 2.0
}
Async Performance Test Example:
from honeyhive.models import EventType
async def test_async_performance():
"""Test async performance with HoneyHive tracing."""
tracer = HoneyHiveTracer.init(
api_key="async-test-key", # Or set HH_API_KEY environment variable
project="async-test-project", # Or set HH_PROJECT environment variable
test_mode=True # Or set HH_TEST_MODE=true
)
tester = AsyncPerformanceTester(tracer)
# Define async test functions
async def baseline_async_operation():
await asyncio.sleep(0.01) # Simulate async work
return sum(range(100))
@atrace(tracer=tracer, event_type=EventType.tool)
async def traced_async_operation():
await asyncio.sleep(0.01) # Simulate async work
return sum(range(100))
# Compare performance
comparison = await tester.compare_async_performance(
baseline_async_operation,
traced_async_operation,
iterations=30,
concurrent_tasks=10
)
print(f"Async overhead: {comparison['overhead_ratio']:.2f}x")
print(f"Baseline throughput: {comparison['baseline']['throughput_ops_per_sec']:.2f} ops/sec")
print(f"Traced throughput: {comparison['traced']['throughput_ops_per_sec']:.2f} ops/sec")
# Assert performance requirements
assert comparison["overhead_ratio"] < 1.5 # < 1.5x overhead for async
assert comparison["traced"]["throughput_ops_per_sec"] > 50 # > 50 ops/sec
Load Testing
Problem: Test performance under high load conditions.
Solution - Load Testing Framework:
"""Load testing framework for HoneyHive SDK."""
import time
import threading
import queue
import statistics
from typing import Dict, List, Any
from honeyhive import HoneyHiveTracer, trace
class LoadTester:
"""Load testing framework."""
def __init__(self, tracer: HoneyHiveTracer):
self.tracer = tracer
self.results = queue.Queue()
self.errors = queue.Queue()
def run_load_test(
self,
target_function: callable,
num_threads: int = 10,
duration_seconds: int = 60,
ramp_up_seconds: int = 10
) -> Dict[str, Any]:
"""Run load test with gradual ramp-up."""
start_time = time.time()
end_time = start_time + duration_seconds
ramp_up_interval = ramp_up_seconds / num_threads if num_threads > 0 else 0
threads = []
def worker(worker_id: int, start_delay: float):
"""Worker thread for load testing."""
time.sleep(start_delay) # Ramp-up delay
while time.time() < end_time:
try:
operation_start = time.perf_counter()
target_function()
operation_end = time.perf_counter()
self.results.put({
"worker_id": worker_id,
"timestamp": time.time(),
"duration_ms": (operation_end - operation_start) * 1000
})
except Exception as e:
self.errors.put({
"worker_id": worker_id,
"timestamp": time.time(),
"error": str(e)
})
# Small delay to prevent overwhelming
time.sleep(0.001)
# Start workers with ramp-up
for i in range(num_threads):
start_delay = i * ramp_up_interval
thread = threading.Thread(
target=worker,
args=(i, start_delay)
)
threads.append(thread)
thread.start()
# Wait for test completion
for thread in threads:
thread.join()
# Collect results
results = []
while not self.results.empty():
results.append(self.results.get())
errors = []
while not self.errors.empty():
errors.append(self.errors.get())
# Analyze results
if results:
durations = [r["duration_ms"] for r in results]
avg_duration = statistics.mean(durations)
p95_duration = sorted(durations)[int(0.95 * len(durations))]
p99_duration = sorted(durations)[int(0.99 * len(durations))]
total_operations = len(results)
throughput = total_operations / duration_seconds
error_rate = len(errors) / (total_operations + len(errors))
else:
avg_duration = p95_duration = p99_duration = 0
total_operations = 0
throughput = 0
error_rate = 1.0
return {
"test_config": {
"num_threads": num_threads,
"duration_seconds": duration_seconds,
"ramp_up_seconds": ramp_up_seconds
},
"results": {
"total_operations": total_operations,
"total_errors": len(errors),
"error_rate": error_rate,
"avg_duration_ms": avg_duration,
"p95_duration_ms": p95_duration,
"p99_duration_ms": p99_duration,
"throughput_ops_per_sec": throughput
},
"raw_data": {
"operations": results,
"errors": errors[:10] # First 10 errors
}
}
Load Test Example:
def test_high_load_performance():
"""Test performance under high load."""
tracer = HoneyHiveTracer.init(
api_key="load-test-key", # Or set HH_API_KEY environment variable
project="load-test-project", # Or set HH_PROJECT environment variable
test_mode=True # Or set HH_TEST_MODE=true
)
tester = LoadTester(tracer)
@trace(tracer=tracer, event_type=EventType.tool)
def load_test_operation():
"""Operation to test under load."""
# Simulate realistic work
data = list(range(50))
result = sum(x * x for x in data)
return result
# Run load test
load_results = tester.run_load_test(
target_function=load_test_operation,
num_threads=20,
duration_seconds=30,
ramp_up_seconds=5
)
print(f"Throughput: {load_results['results']['throughput_ops_per_sec']:.2f} ops/sec")
print(f"Error Rate: {load_results['results']['error_rate']:.2%}")
print(f"P95 Duration: {load_results['results']['p95_duration_ms']:.2f}ms")
# Assert load test requirements
assert load_results["results"]["error_rate"] < 0.01 # < 1% error rate
assert load_results["results"]["throughput_ops_per_sec"] > 100 # > 100 ops/sec
assert load_results["results"]["p95_duration_ms"] < 100 # P95 < 100ms
Lambda Performance Testing
Problem: Test Lambda-specific performance characteristics.
Solution - Lambda Performance Framework (extracted from comprehensive testing):
"""Lambda-specific performance testing."""
import docker
import json
import time
import requests
import statistics
from typing import Dict, List
class LambdaPerformanceTester:
"""Lambda performance testing framework."""
def __init__(self, container_image: str = "honeyhive-lambda:bundle-native"):
self.container_image = container_image
self.container = None
def start_lambda_container(self, memory_size: int = 256):
"""Start Lambda container for testing."""
client = docker.from_env()
self.container = client.containers.run(
self.container_image,
ports={"8080/tcp": 9000},
environment={
"AWS_LAMBDA_FUNCTION_MEMORY_SIZE": str(memory_size),
"HH_API_KEY": "test-key",
"HH_PROJECT": "lambda-perf-test",
"HH_TEST_MODE": "true"
},
detach=True,
remove=True
)
# Wait for container startup
time.sleep(3)
def stop_lambda_container(self):
"""Stop Lambda container."""
if self.container:
try:
self.container.stop()
except:
pass
self.container = None
def invoke_lambda(self, payload: Dict) -> Dict:
"""Invoke Lambda function and measure response time."""
url = "http://localhost:9000/2015-03-31/functions/function/invocations"
start_time = time.perf_counter()
response = requests.post(
url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=30
)
end_time = time.perf_counter()
result = response.json()
result["_total_time_ms"] = (end_time - start_time) * 1000
return result
def test_cold_start_performance(self, iterations: int = 5) -> Dict[str, Any]:
"""Test cold start performance."""
cold_start_times = []
for i in range(iterations):
# Stop and start container to simulate cold start
self.stop_lambda_container()
time.sleep(1)
self.start_lambda_container()
# Invoke and measure
result = self.invoke_lambda({"test": f"cold_start_{i}"})
if result.get("statusCode") == 200:
body = json.loads(result["body"])
timings = body.get("timings", {})
cold_start_times.append({
"total_time_ms": result["_total_time_ms"],
"sdk_import_ms": timings.get("sdk_import_ms", 0),
"tracer_init_ms": timings.get("tracer_init_ms", 0),
"handler_total_ms": timings.get("handler_total_ms", 0)
})
# Calculate cold start statistics
if cold_start_times:
total_times = [t["total_time_ms"] for t in cold_start_times]
avg_cold_start = statistics.mean(total_times)
p95_cold_start = sorted(total_times)[int(0.95 * len(total_times))]
else:
avg_cold_start = p95_cold_start = 0
return {
"iterations": iterations,
"avg_cold_start_ms": avg_cold_start,
"p95_cold_start_ms": p95_cold_start,
"raw_measurements": cold_start_times,
"meets_target": avg_cold_start < 500 # Target: < 500ms
}
def test_warm_start_performance(self, iterations: int = 10) -> Dict[str, Any]:
"""Test warm start performance."""
# Ensure container is warm
self.invoke_lambda({"test": "warmup"})
warm_start_times = []
for i in range(iterations):
result = self.invoke_lambda({"test": f"warm_start_{i}"})
if result.get("statusCode") == 200:
body = json.loads(result["body"])
warm_start_times.append({
"total_time_ms": result["_total_time_ms"],
"handler_total_ms": body.get("timings", {}).get("handler_total_ms", 0)
})
# Calculate warm start statistics
if warm_start_times:
total_times = [t["total_time_ms"] for t in warm_start_times]
avg_warm_start = statistics.mean(total_times)
std_dev = statistics.stdev(total_times) if len(total_times) > 1 else 0
else:
avg_warm_start = std_dev = 0
return {
"iterations": iterations,
"avg_warm_start_ms": avg_warm_start,
"std_dev_ms": std_dev,
"raw_measurements": warm_start_times,
"meets_target": avg_warm_start < 100 # Target: < 100ms
}
Lambda Performance Test Usage:
def test_lambda_performance_comprehensive():
"""Comprehensive Lambda performance test."""
tester = LambdaPerformanceTester()
try:
# Test cold start performance
cold_start_results = tester.test_cold_start_performance(iterations=3)
print(f"Cold start average: {cold_start_results['avg_cold_start_ms']:.2f}ms")
# Test warm start performance
warm_start_results = tester.test_warm_start_performance(iterations=10)
print(f"Warm start average: {warm_start_results['avg_warm_start_ms']:.2f}ms")
# Assert performance targets
assert cold_start_results["meets_target"], "Cold start target not met"
assert warm_start_results["meets_target"], "Warm start target not met"
finally:
tester.stop_lambda_container()
Performance Testing Commands
Running Performance Tests:
# Run all performance tests
pytest tests/performance/ -v
# Run specific performance test categories
pytest tests/performance/ -m "benchmark" -v
pytest tests/performance/ -m "memory" -v
pytest tests/performance/ -m "load" -v
pytest tests/performance/ -m "lambda" -v
# Run performance tests with reporting
pytest tests/performance/ --benchmark-json=performance_results.json
# Run Lambda performance tests
cd tests/lambda
make test-performance
# Run memory tests
pytest tests/performance/test_memory.py -v -s
# Run load tests
pytest tests/performance/test_load.py -v --duration=30
Performance Test Organization:
tests/performance/
├── test_basic_performance.py # Basic overhead testing
├── test_memory_performance.py # Memory usage testing
├── test_async_performance.py # Async operation testing
├── test_load_performance.py # High load testing
├── test_lambda_performance.py # Lambda-specific testing
├── conftest.py # Performance test fixtures
└── performance_utils.py # Performance testing utilities
Performance Benchmarking
Problem: Establish performance baselines and track regression.
Solution - Benchmarking Framework:
"""Performance benchmarking and regression tracking."""
import json
import time
from pathlib import Path
from typing import Dict, Any, Optional
class PerformanceBenchmark:
"""Performance benchmarking and regression tracking."""
def __init__(self, benchmark_file: str = "performance_baselines.json"):
self.benchmark_file = Path(benchmark_file)
self.baselines = self._load_baselines()
def _load_baselines(self) -> Dict[str, Any]:
"""Load existing performance baselines."""
if self.benchmark_file.exists():
with open(self.benchmark_file, 'r') as f:
return json.load(f)
return {}
def save_baselines(self):
"""Save performance baselines to file."""
with open(self.benchmark_file, 'w') as f:
json.dump(self.baselines, f, indent=2)
def record_baseline(self, test_name: str, metrics: Dict[str, float]):
"""Record performance baseline for a test."""
self.baselines[test_name] = {
"metrics": metrics,
"timestamp": time.time(),
"version": "current" # Could be git commit hash
}
def check_regression(
self,
test_name: str,
current_metrics: Dict[str, float],
threshold_percent: float = 20.0
) -> Dict[str, Any]:
"""Check for performance regression."""
if test_name not in self.baselines:
# No baseline, record current as baseline
self.record_baseline(test_name, current_metrics)
return {
"status": "baseline_recorded",
"message": f"Baseline recorded for {test_name}"
}
baseline = self.baselines[test_name]["metrics"]
regressions = []
improvements = []
for metric, current_value in current_metrics.items():
if metric in baseline:
baseline_value = baseline[metric]
if baseline_value > 0:
change_percent = ((current_value - baseline_value) / baseline_value) * 100
if change_percent > threshold_percent:
regressions.append({
"metric": metric,
"baseline": baseline_value,
"current": current_value,
"change_percent": change_percent
})
elif change_percent < -5: # Improvement threshold
improvements.append({
"metric": metric,
"baseline": baseline_value,
"current": current_value,
"change_percent": change_percent
})
status = "regression" if regressions else "pass"
if improvements and not regressions:
status = "improvement"
return {
"status": status,
"regressions": regressions,
"improvements": improvements,
"baseline": baseline,
"current": current_metrics
}
Benchmark Usage Example:
def test_with_benchmarking():
"""Performance test with regression checking."""
benchmark = PerformanceBenchmark()
# Run performance test
tracer = HoneyHiveTracer.init(
api_key="test", # Or set HH_API_KEY environment variable
project="test-project", # Or set HH_PROJECT environment variable
test_mode=True # Or set HH_TEST_MODE=true
)
tester = PerformanceTester(tracer)
# Measure performance
metrics = tester.measure_function_performance(
lambda: sum(range(1000)),
iterations=100
)
# Check for regression
regression_check = benchmark.check_regression(
"basic_computation_test",
{
"avg_time_ms": metrics.avg_time_ms,
"p95_time_ms": metrics.p95_time_ms,
"throughput_ops_per_sec": metrics.throughput_ops_per_sec
},
threshold_percent=15.0 # 15% regression threshold
)
# Save updated baselines
benchmark.save_baselines()
# Assert no significant regression
if regression_check["status"] == "regression":
regression_details = regression_check["regressions"]
raise AssertionError(f"Performance regression detected: {regression_details}")
print(f"Performance check: {regression_check['status']}")
Performance Monitoring Integration
Problem: Integrate performance testing with monitoring systems.
Solution - Monitoring Integration:
"""Integration with monitoring systems for performance tracking."""
import requests
import time
from typing import Dict, Any
class PerformanceMonitor:
"""Performance monitoring integration."""
def __init__(self, monitoring_endpoint: str = None):
self.monitoring_endpoint = monitoring_endpoint
def send_metrics(self, metrics: Dict[str, Any], tags: Dict[str, str] = None):
"""Send performance metrics to monitoring system."""
if not self.monitoring_endpoint:
return
payload = {
"timestamp": time.time(),
"metrics": metrics,
"tags": tags or {},
"source": "honeyhive_performance_tests"
}
try:
response = requests.post(
self.monitoring_endpoint,
json=payload,
timeout=5
)
response.raise_for_status()
except Exception as e:
print(f"Failed to send metrics: {e}")
def create_alert(self, test_name: str, regression_info: Dict[str, Any]):
"""Create alert for performance regression."""
alert_payload = {
"alert_type": "performance_regression",
"test_name": test_name,
"severity": "warning",
"details": regression_info,
"timestamp": time.time()
}
if self.monitoring_endpoint:
try:
requests.post(
f"{self.monitoring_endpoint}/alerts",
json=alert_payload,
timeout=5
)
except Exception as e:
print(f"Failed to create alert: {e}")
See Also
AWS Lambda Testing Guide - AWS Lambda performance testing
Integration Testing Strategies - Integration performance testing
GitHub Actions CI/CD Testing - Automated performance testing
Advanced Configuration Guide - Performance optimization configuration
Environment Variables Reference - Performance-related settings