Performance Testing & Benchmarking

Note

Problem-solving guide for performance testing HoneyHive SDK

Comprehensive solutions for measuring, validating, and optimizing HoneyHive SDK performance across different environments and workloads.

Performance testing ensures that HoneyHive SDK meets your application’s performance requirements and identifies potential bottlenecks before they impact production.

Quick Start

Problem: I need to quickly test if HoneyHive SDK adds acceptable overhead.

Solution:

import time
import statistics
from honeyhive import HoneyHiveTracer, trace

def quick_performance_test():
    """Quick performance impact assessment."""
    tracer = HoneyHiveTracer.init(
        api_key="test-key",      # Or set HH_API_KEY environment variable
        project="test-project",  # Or set HH_PROJECT environment variable
        test_mode=True           # Or set HH_TEST_MODE=true
    )

    # Baseline measurement
    def baseline_operation():
        return sum(range(1000))

    baseline_times = []
    for _ in range(10):
        start = time.perf_counter()
        baseline_operation()
        end = time.perf_counter()
        baseline_times.append(end - start)

    # Traced measurement
    @trace(tracer=tracer)
    def traced_operation():
        return sum(range(1000))

    traced_times = []
    for _ in range(10):
        start = time.perf_counter()
        traced_operation()
        end = time.perf_counter()
        traced_times.append(end - start)

    # Calculate overhead
    baseline_avg = statistics.mean(baseline_times)
    traced_avg = statistics.mean(traced_times)
    overhead_ratio = traced_avg / baseline_avg

    print(f"Baseline average: {baseline_avg * 1000:.2f}ms")
    print(f"Traced average: {traced_avg * 1000:.2f}ms")
    print(f"Overhead ratio: {overhead_ratio:.2f}x")

    # Acceptable overhead: < 2x for most applications
    assert overhead_ratio < 2.0, f"Overhead too high: {overhead_ratio:.2f}x"

    return {
        "baseline_ms": baseline_avg * 1000,
        "traced_ms": traced_avg * 1000,
        "overhead_ratio": overhead_ratio
    }

# Run the test
results = quick_performance_test()
print(f"✅ Performance test passed: {results['overhead_ratio']:.2f}x overhead")

Performance Testing Framework

Problem: Set up comprehensive performance testing infrastructure.

Solution - Performance Test Framework:

"""Comprehensive performance testing framework for HoneyHive SDK."""

import time
import statistics
import threading
import asyncio
import psutil
import os
from typing import Dict, List, Any, Callable
from dataclasses import dataclass
from honeyhive import HoneyHiveTracer, trace

@dataclass
class PerformanceMetrics:
    """Performance measurement results."""
    avg_time_ms: float
    std_dev_ms: float
    min_time_ms: float
    max_time_ms: float
    p95_time_ms: float
    p99_time_ms: float
    throughput_ops_per_sec: float
    memory_usage_mb: float

class PerformanceTester:
    """Performance testing framework."""

    def __init__(self, tracer: HoneyHiveTracer):
        self.tracer = tracer
        self.results = {}

    def measure_function_performance(
        self,
        func: Callable,
        iterations: int = 100,
        warmup_iterations: int = 10,
        name: str = None
    ) -> PerformanceMetrics:
        """Measure function performance with statistical analysis."""

        name = name or func.__name__

        # Warmup runs
        for _ in range(warmup_iterations):
            func()

        # Measurement runs
        times = []
        initial_memory = self._get_memory_usage()

        for _ in range(iterations):
            start = time.perf_counter()
            func()
            end = time.perf_counter()
            times.append(end - start)

        final_memory = self._get_memory_usage()
        memory_delta = final_memory - initial_memory

        # Calculate statistics
        times_ms = [t * 1000 for t in times]
        avg_time = statistics.mean(times_ms)
        std_dev = statistics.stdev(times_ms) if len(times_ms) > 1 else 0
        min_time = min(times_ms)
        max_time = max(times_ms)

        # Calculate percentiles
        sorted_times = sorted(times_ms)
        p95_index = int(0.95 * len(sorted_times))
        p99_index = int(0.99 * len(sorted_times))
        p95_time = sorted_times[p95_index]
        p99_time = sorted_times[p99_index]

        # Calculate throughput
        total_time = sum(times)
        throughput = iterations / total_time if total_time > 0 else 0

        metrics = PerformanceMetrics(
            avg_time_ms=avg_time,
            std_dev_ms=std_dev,
            min_time_ms=min_time,
            max_time_ms=max_time,
            p95_time_ms=p95_time,
            p99_time_ms=p99_time,
            throughput_ops_per_sec=throughput,
            memory_usage_mb=memory_delta
        )

        self.results[name] = metrics
        return metrics

    def compare_performance(
        self,
        baseline_func: Callable,
        traced_func: Callable,
        iterations: int = 100,
        name: str = "comparison"
    ) -> Dict[str, Any]:
        """Compare performance between baseline and traced functions."""

        baseline_metrics = self.measure_function_performance(
            baseline_func, iterations, name=f"{name}_baseline"
        )

        traced_metrics = self.measure_function_performance(
            traced_func, iterations, name=f"{name}_traced"
        )

        overhead_ratio = traced_metrics.avg_time_ms / baseline_metrics.avg_time_ms
        throughput_ratio = traced_metrics.throughput_ops_per_sec / baseline_metrics.throughput_ops_per_sec

        comparison = {
            "baseline": baseline_metrics,
            "traced": traced_metrics,
            "overhead_ratio": overhead_ratio,
            "throughput_ratio": throughput_ratio,
            "is_acceptable": overhead_ratio < 2.0,  # Configurable threshold
            "memory_overhead_mb": traced_metrics.memory_usage_mb - baseline_metrics.memory_usage_mb
        }

        self.results[f"{name}_comparison"] = comparison
        return comparison

    def measure_concurrent_performance(
        self,
        func: Callable,
        num_threads: int = 10,
        operations_per_thread: int = 50
    ) -> Dict[str, Any]:
        """Measure performance under concurrent load."""

        results = []
        errors = []

        def worker():
            """Worker thread function."""
            thread_results = []
            try:
                for _ in range(operations_per_thread):
                    start = time.perf_counter()
                    func()
                    end = time.perf_counter()
                    thread_results.append(end - start)
                results.extend(thread_results)
            except Exception as e:
                errors.append(e)

        # Start concurrent workers
        start_time = time.perf_counter()
        threads = []

        for _ in range(num_threads):
            thread = threading.Thread(target=worker)
            threads.append(thread)
            thread.start()

        # Wait for completion
        for thread in threads:
            thread.join()

        end_time = time.perf_counter()
        total_time = end_time - start_time

        # Calculate concurrent metrics
        if results:
            times_ms = [t * 1000 for t in results]
            avg_time = statistics.mean(times_ms)
            total_operations = len(results)
            throughput = total_operations / total_time
            error_rate = len(errors) / (total_operations + len(errors))
        else:
            avg_time = 0
            throughput = 0
            error_rate = 1.0

        concurrent_metrics = {
            "num_threads": num_threads,
            "operations_per_thread": operations_per_thread,
            "total_operations": len(results),
            "avg_time_ms": avg_time,
            "total_time_s": total_time,
            "throughput_ops_per_sec": throughput,
            "error_count": len(errors),
            "error_rate": error_rate,
            "errors": [str(e) for e in errors[:5]]  # First 5 errors
        }

        self.results["concurrent_performance"] = concurrent_metrics
        return concurrent_metrics

    def _get_memory_usage(self) -> float:
        """Get current memory usage in MB."""
        process = psutil.Process(os.getpid())
        return process.memory_info().rss / 1024 / 1024

    def generate_report(self) -> str:
        """Generate performance test report."""
        report = ["Performance Test Report", "=" * 25, ""]

        for name, result in self.results.items():
            report.append(f"## {name}")
            if isinstance(result, PerformanceMetrics):
                report.extend([
                    f"Average Time: {result.avg_time_ms:.2f}ms",
                    f"Std Deviation: {result.std_dev_ms:.2f}ms",
                    f"P95: {result.p95_time_ms:.2f}ms",
                    f"P99: {result.p99_time_ms:.2f}ms",
                    f"Throughput: {result.throughput_ops_per_sec:.2f} ops/sec",
                    f"Memory Usage: {result.memory_usage_mb:.2f}MB",
                    ""
                ])
            elif "comparison" in name:
                report.extend([
                    f"Overhead Ratio: {result['overhead_ratio']:.2f}x",
                    f"Throughput Ratio: {result['throughput_ratio']:.2f}x",
                    f"Acceptable: {'✅' if result['is_acceptable'] else '❌'}",
                    f"Memory Overhead: {result['memory_overhead_mb']:.2f}MB",
                    ""
                ])

        return "\n".join(report)

Using the Performance Framework:

def test_comprehensive_performance():
    """Comprehensive performance test using the framework."""
    tracer = HoneyHiveTracer.init(
        api_key="perf-test-key", # Or set HH_API_KEY environment variable
        project="perf-project",  # Or set HH_PROJECT environment variable
        test_mode=True           # Or set HH_TEST_MODE=true
    )

    tester = PerformanceTester(tracer)

    # Define test functions
    def baseline_computation():
        return sum(i * i for i in range(100))

    @trace(tracer=tracer)
    def traced_computation():
        return sum(i * i for i in range(100))

    # Run performance comparisons
    comparison = tester.compare_performance(
        baseline_computation,
        traced_computation,
        iterations=200,
        name="computation_test"
    )

    # Test concurrent performance
    concurrent_results = tester.measure_concurrent_performance(
        traced_computation,
        num_threads=5,
        operations_per_thread=20
    )

    # Generate and print report
    report = tester.generate_report()
    print(report)

    # Assert performance requirements
    assert comparison["overhead_ratio"] < 2.0
    assert concurrent_results["error_rate"] < 0.01
    assert concurrent_results["throughput_ops_per_sec"] > 100

Memory Performance Testing

Problem: Test memory usage and detect memory leaks.

Solution - Memory Testing Framework:

"""Memory performance testing for HoneyHive SDK."""

import gc
import psutil
import os
import time
from typing import List, Dict
from honeyhive import HoneyHiveTracer

class MemoryTester:
    """Memory usage testing framework."""

    def __init__(self):
        self.process = psutil.Process(os.getpid())
        self.baseline_memory = None

    def start_monitoring(self):
        """Start memory monitoring baseline."""
        gc.collect()  # Force garbage collection
        time.sleep(0.1)  # Allow GC to complete
        self.baseline_memory = self.process.memory_info().rss / 1024 / 1024

    def measure_memory_usage(self) -> float:
        """Get current memory usage in MB."""
        return self.process.memory_info().rss / 1024 / 1024

    def test_tracer_memory_usage(self, num_tracers: int = 10) -> Dict[str, float]:
        """Test memory usage with multiple tracers."""
        self.start_monitoring()
        initial_memory = self.measure_memory_usage()

        tracers = []
        for i in range(num_tracers):
            tracer = HoneyHiveTracer.init(
                api_key=f"memory-test-key-{i}",  # Unique API key for each tracer instance
                project=f"memory-project-{i}",   # Unique project for each tracer instance
                test_mode=True                    # Or set HH_TEST_MODE=true
            )
            tracers.append(tracer)

            # Create some spans
            for j in range(10):
                with tracer.trace(f"memory-span-{j}") as span:
                    span.set_attribute("iteration", j)
                    span.set_attribute("tracer_id", i)

        after_creation_memory = self.measure_memory_usage()

        # Clean up tracers
        for tracer in tracers:
            tracer.close()

        del tracers
        gc.collect()
        time.sleep(0.1)

        after_cleanup_memory = self.measure_memory_usage()

        return {
            "initial_mb": initial_memory,
            "after_creation_mb": after_creation_memory,
            "after_cleanup_mb": after_cleanup_memory,
            "peak_usage_mb": after_creation_memory - initial_memory,
            "memory_leak_mb": after_cleanup_memory - initial_memory,
            "memory_per_tracer_mb": (after_creation_memory - initial_memory) / num_tracers
        }

    def test_span_memory_growth(self, num_spans: int = 1000) -> Dict[str, float]:
        """Test memory growth with many spans."""
        tracer = HoneyHiveTracer.init(
            api_key="span-memory-test",  # Or set HH_API_KEY environment variable
            project="span-memory-project", # Or set HH_PROJECT environment variable
            test_mode=True               # Or set HH_TEST_MODE=true
        )

        self.start_monitoring()
        initial_memory = self.measure_memory_usage()

        memory_samples = []
        sample_interval = max(1, num_spans // 10)  # Sample 10 times

        for i in range(num_spans):
            with tracer.trace(f"memory-test-span-{i}") as span:
                span.set_attribute("span.index", i)
                span.set_attribute("span.data", f"data-{i}" * 10)  # Some data

            if i % sample_interval == 0:
                memory_samples.append(self.measure_memory_usage())

        final_memory = self.measure_memory_usage()

        # Calculate memory growth
        if len(memory_samples) > 1:
            memory_growth_rate = (memory_samples[-1] - memory_samples[0]) / len(memory_samples)
        else:
            memory_growth_rate = 0

        tracer.close()

        return {
            "initial_mb": initial_memory,
            "final_mb": final_memory,
            "total_growth_mb": final_memory - initial_memory,
            "memory_per_span_kb": (final_memory - initial_memory) * 1024 / num_spans,
            "memory_growth_rate_mb": memory_growth_rate,
            "memory_samples": memory_samples
        }

    def test_long_running_memory_stability(self, duration_seconds: int = 60) -> Dict[str, Any]:
        """Test memory stability over time."""
        tracer = HoneyHiveTracer.init(
            api_key="stability-test",    # Or set HH_API_KEY environment variable
            project="stability-project", # Or set HH_PROJECT environment variable
            test_mode=True               # Or set HH_TEST_MODE=true
        )

        self.start_monitoring()
        start_time = time.time()
        memory_samples = []

        span_count = 0
        while time.time() - start_time < duration_seconds:
            with tracer.trace(f"stability-span-{span_count}") as span:
                span.set_attribute("timestamp", time.time())
                span_count += 1

            # Sample memory every second
            if span_count % 10 == 0:  # Assuming ~10 spans per second
                memory_samples.append({
                    "time": time.time() - start_time,
                    "memory_mb": self.measure_memory_usage(),
                    "span_count": span_count
                })

            time.sleep(0.1)  # ~10 spans per second

        tracer.close()

        # Analyze memory stability
        memories = [sample["memory_mb"] for sample in memory_samples]
        if memories:
            avg_memory = sum(memories) / len(memories)
            max_memory = max(memories)
            min_memory = min(memories)
            memory_variance = max_memory - min_memory
        else:
            avg_memory = max_memory = min_memory = memory_variance = 0

        return {
            "duration_seconds": duration_seconds,
            "span_count": span_count,
            "memory_samples": memory_samples,
            "avg_memory_mb": avg_memory,
            "max_memory_mb": max_memory,
            "min_memory_mb": min_memory,
            "memory_variance_mb": memory_variance,
            "spans_per_second": span_count / duration_seconds
        }

Running Memory Tests:

def test_memory_performance():
    """Run comprehensive memory performance tests."""
    tester = MemoryTester()

    # Test multiple tracers
    tracer_memory = tester.test_tracer_memory_usage(num_tracers=5)
    print(f"Memory per tracer: {tracer_memory['memory_per_tracer_mb']:.2f}MB")
    print(f"Memory leak: {tracer_memory['memory_leak_mb']:.2f}MB")

    # Test span memory growth
    span_memory = tester.test_span_memory_growth(num_spans=500)
    print(f"Memory per span: {span_memory['memory_per_span_kb']:.2f}KB")

    # Test long-running stability
    stability = tester.test_long_running_memory_stability(duration_seconds=30)
    print(f"Memory variance: {stability['memory_variance_mb']:.2f}MB")

    # Assert memory requirements
    assert tracer_memory['memory_per_tracer_mb'] < 10.0  # < 10MB per tracer
    assert tracer_memory['memory_leak_mb'] < 1.0  # < 1MB leak
    assert span_memory['memory_per_span_kb'] < 5.0  # < 5KB per span
    assert stability['memory_variance_mb'] < 50.0  # < 50MB variance

Async Performance Testing

Problem: Test performance of async operations with HoneyHive.

Solution - Async Performance Framework:

"""Async performance testing for HoneyHive SDK."""

import asyncio
import time
import statistics
from typing import List, Callable, Awaitable
from honeyhive import HoneyHiveTracer, atrace

class AsyncPerformanceTester:
    """Async performance testing framework."""

    def __init__(self, tracer: HoneyHiveTracer):
        self.tracer = tracer

    async def measure_async_function(
        self,
        async_func: Callable[[], Awaitable],
        iterations: int = 100,
        concurrent_tasks: int = 1
    ) -> Dict[str, float]:
        """Measure async function performance."""

        async def timed_execution():
            start = time.perf_counter()
            await async_func()
            return time.perf_counter() - start

        # Run iterations with specified concurrency
        all_times = []

        for batch in range(0, iterations, concurrent_tasks):
            batch_size = min(concurrent_tasks, iterations - batch)

            # Create concurrent tasks
            tasks = [timed_execution() for _ in range(batch_size)]

            # Execute concurrently
            batch_times = await asyncio.gather(*tasks)
            all_times.extend(batch_times)

        # Calculate statistics
        times_ms = [t * 1000 for t in all_times]

        return {
            "avg_time_ms": statistics.mean(times_ms),
            "std_dev_ms": statistics.stdev(times_ms) if len(times_ms) > 1 else 0,
            "min_time_ms": min(times_ms),
            "max_time_ms": max(times_ms),
            "p95_time_ms": sorted(times_ms)[int(0.95 * len(times_ms))],
            "total_time_s": sum(all_times),
            "throughput_ops_per_sec": len(all_times) / sum(all_times) if sum(all_times) > 0 else 0
        }

    async def compare_async_performance(
        self,
        baseline_func: Callable[[], Awaitable],
        traced_func: Callable[[], Awaitable],
        iterations: int = 50,
        concurrent_tasks: int = 5
    ) -> Dict[str, Any]:
        """Compare async performance between baseline and traced functions."""

        baseline_metrics = await self.measure_async_function(
            baseline_func, iterations, concurrent_tasks
        )

        traced_metrics = await self.measure_async_function(
            traced_func, iterations, concurrent_tasks
        )

        overhead_ratio = traced_metrics["avg_time_ms"] / baseline_metrics["avg_time_ms"]

        return {
            "baseline": baseline_metrics,
            "traced": traced_metrics,
            "overhead_ratio": overhead_ratio,
            "is_acceptable": overhead_ratio < 2.0
        }

Async Performance Test Example:

from honeyhive.models import EventType

async def test_async_performance():
    """Test async performance with HoneyHive tracing."""
    tracer = HoneyHiveTracer.init(
        api_key="async-test-key",    # Or set HH_API_KEY environment variable
        project="async-test-project", # Or set HH_PROJECT environment variable
        test_mode=True               # Or set HH_TEST_MODE=true
    )

    tester = AsyncPerformanceTester(tracer)

    # Define async test functions
    async def baseline_async_operation():
        await asyncio.sleep(0.01)  # Simulate async work
        return sum(range(100))

    @atrace(tracer=tracer, event_type=EventType.tool)
    async def traced_async_operation():
        await asyncio.sleep(0.01)  # Simulate async work
        return sum(range(100))

    # Compare performance
    comparison = await tester.compare_async_performance(
        baseline_async_operation,
        traced_async_operation,
        iterations=30,
        concurrent_tasks=10
    )

    print(f"Async overhead: {comparison['overhead_ratio']:.2f}x")
    print(f"Baseline throughput: {comparison['baseline']['throughput_ops_per_sec']:.2f} ops/sec")
    print(f"Traced throughput: {comparison['traced']['throughput_ops_per_sec']:.2f} ops/sec")

    # Assert performance requirements
    assert comparison["overhead_ratio"] < 1.5  # < 1.5x overhead for async
    assert comparison["traced"]["throughput_ops_per_sec"] > 50  # > 50 ops/sec

Load Testing

Problem: Test performance under high load conditions.

Solution - Load Testing Framework:

"""Load testing framework for HoneyHive SDK."""

import time
import threading
import queue
import statistics
from typing import Dict, List, Any
from honeyhive import HoneyHiveTracer, trace

class LoadTester:
    """Load testing framework."""

    def __init__(self, tracer: HoneyHiveTracer):
        self.tracer = tracer
        self.results = queue.Queue()
        self.errors = queue.Queue()

    def run_load_test(
        self,
        target_function: callable,
        num_threads: int = 10,
        duration_seconds: int = 60,
        ramp_up_seconds: int = 10
    ) -> Dict[str, Any]:
        """Run load test with gradual ramp-up."""

        start_time = time.time()
        end_time = start_time + duration_seconds
        ramp_up_interval = ramp_up_seconds / num_threads if num_threads > 0 else 0

        threads = []

        def worker(worker_id: int, start_delay: float):
            """Worker thread for load testing."""
            time.sleep(start_delay)  # Ramp-up delay

            while time.time() < end_time:
                try:
                    operation_start = time.perf_counter()
                    target_function()
                    operation_end = time.perf_counter()

                    self.results.put({
                        "worker_id": worker_id,
                        "timestamp": time.time(),
                        "duration_ms": (operation_end - operation_start) * 1000
                    })

                except Exception as e:
                    self.errors.put({
                        "worker_id": worker_id,
                        "timestamp": time.time(),
                        "error": str(e)
                    })

                # Small delay to prevent overwhelming
                time.sleep(0.001)

        # Start workers with ramp-up
        for i in range(num_threads):
            start_delay = i * ramp_up_interval
            thread = threading.Thread(
                target=worker,
                args=(i, start_delay)
            )
            threads.append(thread)
            thread.start()

        # Wait for test completion
        for thread in threads:
            thread.join()

        # Collect results
        results = []
        while not self.results.empty():
            results.append(self.results.get())

        errors = []
        while not self.errors.empty():
            errors.append(self.errors.get())

        # Analyze results
        if results:
            durations = [r["duration_ms"] for r in results]
            avg_duration = statistics.mean(durations)
            p95_duration = sorted(durations)[int(0.95 * len(durations))]
            p99_duration = sorted(durations)[int(0.99 * len(durations))]

            total_operations = len(results)
            throughput = total_operations / duration_seconds
            error_rate = len(errors) / (total_operations + len(errors))
        else:
            avg_duration = p95_duration = p99_duration = 0
            total_operations = 0
            throughput = 0
            error_rate = 1.0

        return {
            "test_config": {
                "num_threads": num_threads,
                "duration_seconds": duration_seconds,
                "ramp_up_seconds": ramp_up_seconds
            },
            "results": {
                "total_operations": total_operations,
                "total_errors": len(errors),
                "error_rate": error_rate,
                "avg_duration_ms": avg_duration,
                "p95_duration_ms": p95_duration,
                "p99_duration_ms": p99_duration,
                "throughput_ops_per_sec": throughput
            },
            "raw_data": {
                "operations": results,
                "errors": errors[:10]  # First 10 errors
            }
        }

Load Test Example:

def test_high_load_performance():
    """Test performance under high load."""
    tracer = HoneyHiveTracer.init(
        api_key="load-test-key",     # Or set HH_API_KEY environment variable
        project="load-test-project", # Or set HH_PROJECT environment variable
        test_mode=True               # Or set HH_TEST_MODE=true
    )

    tester = LoadTester(tracer)

    @trace(tracer=tracer, event_type=EventType.tool)
    def load_test_operation():
        """Operation to test under load."""
        # Simulate realistic work
        data = list(range(50))
        result = sum(x * x for x in data)
        return result

    # Run load test
    load_results = tester.run_load_test(
        target_function=load_test_operation,
        num_threads=20,
        duration_seconds=30,
        ramp_up_seconds=5
    )

    print(f"Throughput: {load_results['results']['throughput_ops_per_sec']:.2f} ops/sec")
    print(f"Error Rate: {load_results['results']['error_rate']:.2%}")
    print(f"P95 Duration: {load_results['results']['p95_duration_ms']:.2f}ms")

    # Assert load test requirements
    assert load_results["results"]["error_rate"] < 0.01  # < 1% error rate
    assert load_results["results"]["throughput_ops_per_sec"] > 100  # > 100 ops/sec
    assert load_results["results"]["p95_duration_ms"] < 100  # P95 < 100ms

Lambda Performance Testing

Problem: Test Lambda-specific performance characteristics.

Solution - Lambda Performance Framework (extracted from comprehensive testing):

"""Lambda-specific performance testing."""

import docker
import json
import time
import requests
import statistics
from typing import Dict, List

class LambdaPerformanceTester:
    """Lambda performance testing framework."""

    def __init__(self, container_image: str = "honeyhive-lambda:bundle-native"):
        self.container_image = container_image
        self.container = None

    def start_lambda_container(self, memory_size: int = 256):
        """Start Lambda container for testing."""
        client = docker.from_env()

        self.container = client.containers.run(
            self.container_image,
            ports={"8080/tcp": 9000},
            environment={
                "AWS_LAMBDA_FUNCTION_MEMORY_SIZE": str(memory_size),
                "HH_API_KEY": "test-key",
                "HH_PROJECT": "lambda-perf-test",
                "HH_TEST_MODE": "true"
            },
            detach=True,
            remove=True
        )

        # Wait for container startup
        time.sleep(3)

    def stop_lambda_container(self):
        """Stop Lambda container."""
        if self.container:
            try:
                self.container.stop()
            except:
                pass
            self.container = None

    def invoke_lambda(self, payload: Dict) -> Dict:
        """Invoke Lambda function and measure response time."""
        url = "http://localhost:9000/2015-03-31/functions/function/invocations"

        start_time = time.perf_counter()
        response = requests.post(
            url,
            json=payload,
            headers={"Content-Type": "application/json"},
            timeout=30
        )
        end_time = time.perf_counter()

        result = response.json()
        result["_total_time_ms"] = (end_time - start_time) * 1000

        return result

    def test_cold_start_performance(self, iterations: int = 5) -> Dict[str, Any]:
        """Test cold start performance."""
        cold_start_times = []

        for i in range(iterations):
            # Stop and start container to simulate cold start
            self.stop_lambda_container()
            time.sleep(1)
            self.start_lambda_container()

            # Invoke and measure
            result = self.invoke_lambda({"test": f"cold_start_{i}"})

            if result.get("statusCode") == 200:
                body = json.loads(result["body"])
                timings = body.get("timings", {})
                cold_start_times.append({
                    "total_time_ms": result["_total_time_ms"],
                    "sdk_import_ms": timings.get("sdk_import_ms", 0),
                    "tracer_init_ms": timings.get("tracer_init_ms", 0),
                    "handler_total_ms": timings.get("handler_total_ms", 0)
                })

        # Calculate cold start statistics
        if cold_start_times:
            total_times = [t["total_time_ms"] for t in cold_start_times]
            avg_cold_start = statistics.mean(total_times)
            p95_cold_start = sorted(total_times)[int(0.95 * len(total_times))]
        else:
            avg_cold_start = p95_cold_start = 0

        return {
            "iterations": iterations,
            "avg_cold_start_ms": avg_cold_start,
            "p95_cold_start_ms": p95_cold_start,
            "raw_measurements": cold_start_times,
            "meets_target": avg_cold_start < 500  # Target: < 500ms
        }

    def test_warm_start_performance(self, iterations: int = 10) -> Dict[str, Any]:
        """Test warm start performance."""
        # Ensure container is warm
        self.invoke_lambda({"test": "warmup"})

        warm_start_times = []
        for i in range(iterations):
            result = self.invoke_lambda({"test": f"warm_start_{i}"})

            if result.get("statusCode") == 200:
                body = json.loads(result["body"])
                warm_start_times.append({
                    "total_time_ms": result["_total_time_ms"],
                    "handler_total_ms": body.get("timings", {}).get("handler_total_ms", 0)
                })

        # Calculate warm start statistics
        if warm_start_times:
            total_times = [t["total_time_ms"] for t in warm_start_times]
            avg_warm_start = statistics.mean(total_times)
            std_dev = statistics.stdev(total_times) if len(total_times) > 1 else 0
        else:
            avg_warm_start = std_dev = 0

        return {
            "iterations": iterations,
            "avg_warm_start_ms": avg_warm_start,
            "std_dev_ms": std_dev,
            "raw_measurements": warm_start_times,
            "meets_target": avg_warm_start < 100  # Target: < 100ms
        }

Lambda Performance Test Usage:

def test_lambda_performance_comprehensive():
    """Comprehensive Lambda performance test."""
    tester = LambdaPerformanceTester()

    try:
        # Test cold start performance
        cold_start_results = tester.test_cold_start_performance(iterations=3)
        print(f"Cold start average: {cold_start_results['avg_cold_start_ms']:.2f}ms")

        # Test warm start performance
        warm_start_results = tester.test_warm_start_performance(iterations=10)
        print(f"Warm start average: {warm_start_results['avg_warm_start_ms']:.2f}ms")

        # Assert performance targets
        assert cold_start_results["meets_target"], "Cold start target not met"
        assert warm_start_results["meets_target"], "Warm start target not met"

    finally:
        tester.stop_lambda_container()

Performance Testing Commands

Running Performance Tests:

# Run all performance tests
pytest tests/performance/ -v

# Run specific performance test categories
pytest tests/performance/ -m "benchmark" -v
pytest tests/performance/ -m "memory" -v
pytest tests/performance/ -m "load" -v
pytest tests/performance/ -m "lambda" -v

# Run performance tests with reporting
pytest tests/performance/ --benchmark-json=performance_results.json

# Run Lambda performance tests
cd tests/lambda
make test-performance

# Run memory tests
pytest tests/performance/test_memory.py -v -s

# Run load tests
pytest tests/performance/test_load.py -v --duration=30

Performance Test Organization:

tests/performance/
├── test_basic_performance.py      # Basic overhead testing
├── test_memory_performance.py     # Memory usage testing
├── test_async_performance.py      # Async operation testing
├── test_load_performance.py       # High load testing
├── test_lambda_performance.py     # Lambda-specific testing
├── conftest.py                    # Performance test fixtures
└── performance_utils.py           # Performance testing utilities

Performance Benchmarking

Problem: Establish performance baselines and track regression.

Solution - Benchmarking Framework:

"""Performance benchmarking and regression tracking."""

import json
import time
from pathlib import Path
from typing import Dict, Any, Optional

class PerformanceBenchmark:
    """Performance benchmarking and regression tracking."""

    def __init__(self, benchmark_file: str = "performance_baselines.json"):
        self.benchmark_file = Path(benchmark_file)
        self.baselines = self._load_baselines()

    def _load_baselines(self) -> Dict[str, Any]:
        """Load existing performance baselines."""
        if self.benchmark_file.exists():
            with open(self.benchmark_file, 'r') as f:
                return json.load(f)
        return {}

    def save_baselines(self):
        """Save performance baselines to file."""
        with open(self.benchmark_file, 'w') as f:
            json.dump(self.baselines, f, indent=2)

    def record_baseline(self, test_name: str, metrics: Dict[str, float]):
        """Record performance baseline for a test."""
        self.baselines[test_name] = {
            "metrics": metrics,
            "timestamp": time.time(),
            "version": "current"  # Could be git commit hash
        }

    def check_regression(
        self,
        test_name: str,
        current_metrics: Dict[str, float],
        threshold_percent: float = 20.0
    ) -> Dict[str, Any]:
        """Check for performance regression."""
        if test_name not in self.baselines:
            # No baseline, record current as baseline
            self.record_baseline(test_name, current_metrics)
            return {
                "status": "baseline_recorded",
                "message": f"Baseline recorded for {test_name}"
            }

        baseline = self.baselines[test_name]["metrics"]
        regressions = []
        improvements = []

        for metric, current_value in current_metrics.items():
            if metric in baseline:
                baseline_value = baseline[metric]
                if baseline_value > 0:
                    change_percent = ((current_value - baseline_value) / baseline_value) * 100

                    if change_percent > threshold_percent:
                        regressions.append({
                            "metric": metric,
                            "baseline": baseline_value,
                            "current": current_value,
                            "change_percent": change_percent
                        })
                    elif change_percent < -5:  # Improvement threshold
                        improvements.append({
                            "metric": metric,
                            "baseline": baseline_value,
                            "current": current_value,
                            "change_percent": change_percent
                        })

        status = "regression" if regressions else "pass"
        if improvements and not regressions:
            status = "improvement"

        return {
            "status": status,
            "regressions": regressions,
            "improvements": improvements,
            "baseline": baseline,
            "current": current_metrics
        }

Benchmark Usage Example:

def test_with_benchmarking():
    """Performance test with regression checking."""
    benchmark = PerformanceBenchmark()

    # Run performance test
    tracer = HoneyHiveTracer.init(
        api_key="test",          # Or set HH_API_KEY environment variable
        project="test-project",  # Or set HH_PROJECT environment variable
        test_mode=True           # Or set HH_TEST_MODE=true
    )
    tester = PerformanceTester(tracer)

    # Measure performance
    metrics = tester.measure_function_performance(
        lambda: sum(range(1000)),
        iterations=100
    )

    # Check for regression
    regression_check = benchmark.check_regression(
        "basic_computation_test",
        {
            "avg_time_ms": metrics.avg_time_ms,
            "p95_time_ms": metrics.p95_time_ms,
            "throughput_ops_per_sec": metrics.throughput_ops_per_sec
        },
        threshold_percent=15.0  # 15% regression threshold
    )

    # Save updated baselines
    benchmark.save_baselines()

    # Assert no significant regression
    if regression_check["status"] == "regression":
        regression_details = regression_check["regressions"]
        raise AssertionError(f"Performance regression detected: {regression_details}")

    print(f"Performance check: {regression_check['status']}")

Performance Monitoring Integration

Problem: Integrate performance testing with monitoring systems.

Solution - Monitoring Integration:

"""Integration with monitoring systems for performance tracking."""

import requests
import time
from typing import Dict, Any

class PerformanceMonitor:
    """Performance monitoring integration."""

    def __init__(self, monitoring_endpoint: str = None):
        self.monitoring_endpoint = monitoring_endpoint

    def send_metrics(self, metrics: Dict[str, Any], tags: Dict[str, str] = None):
        """Send performance metrics to monitoring system."""
        if not self.monitoring_endpoint:
            return

        payload = {
            "timestamp": time.time(),
            "metrics": metrics,
            "tags": tags or {},
            "source": "honeyhive_performance_tests"
        }

        try:
            response = requests.post(
                self.monitoring_endpoint,
                json=payload,
                timeout=5
            )
            response.raise_for_status()
        except Exception as e:
            print(f"Failed to send metrics: {e}")

    def create_alert(self, test_name: str, regression_info: Dict[str, Any]):
        """Create alert for performance regression."""
        alert_payload = {
            "alert_type": "performance_regression",
            "test_name": test_name,
            "severity": "warning",
            "details": regression_info,
            "timestamp": time.time()
        }

        if self.monitoring_endpoint:
            try:
                requests.post(
                    f"{self.monitoring_endpoint}/alerts",
                    json=alert_payload,
                    timeout=5
                )
            except Exception as e:
                print(f"Failed to create alert: {e}")

See Also