Skip to content

honeyhive.experiments.core

Core experiment functionality.

This module provides the core experiment execution functionality including: - ExperimentContext for organizing experiment metadata - run_experiment() with tracer multi-instance pattern - Integration with backend result endpoints

logger module-attribute

logger = get_logger('honeyhive.experiments.core')

ScalarScore module-attribute

ScalarScore = Union[bool, int, float, str]

EvaluatorMetricResult dataclass

One evaluator's verdict for one datapoint, normalized.

Score is the bare metrics[eval_name] value. explanation becomes metrics[f"{eval_name}_explanation"]. Each extras entry becomes metrics[f"{eval_name}_{key}"].

Source code in src/honeyhive/experiments/core.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
@dataclass
class EvaluatorMetricResult:
    """One evaluator's verdict for one datapoint, normalized.

    Score is the bare ``metrics[eval_name]`` value. ``explanation`` becomes
    ``metrics[f"{eval_name}_explanation"]``. Each ``extras`` entry becomes
    ``metrics[f"{eval_name}_{key}"]``.
    """

    eval_name: str
    score: Optional[ScalarScore] = None
    explanation: Optional[str] = None
    extras: Dict[str, ScalarScore] = field(default_factory=dict)

    def to_metric_attrs(self) -> Dict[str, Any]:
        """Flatten into the dict shape expected by ``enrich_span(metrics=…)``."""
        attrs: Dict[str, Any] = {}
        if self.score is not None:
            attrs[self.eval_name] = self.score
        if self.explanation is not None:
            attrs[f"{self.eval_name}_explanation"] = self.explanation
        for key, value in self.extras.items():
            attrs[f"{self.eval_name}_{key}"] = value
        return attrs

    @classmethod
    def from_raw(cls, eval_name: str, raw: Any) -> "EvaluatorMetricResult":
        """Parse an evaluator's raw return value into the canonical metrics shape.

        Accepts:
          * scalar (bool/int/float/str) → ``score``
          * 1-element list/tuple → element coerced to scalar (legacy behavior)
          * dict → ``score`` + optional ``explanation`` + scalar extras flattened
          * None → score stays None (failed evaluator path)

        Non-scalar score values are rejected with a warning; non-scalar
        extras are dropped with a warning. Score-less dicts log a warning
        but still surface their scalar entries as extras so the data
        isn't lost outright.
        """
        # Legacy coercion: list/tuple of length 1 unwrapped to its element.
        if isinstance(raw, (list, tuple)) and len(raw) == 1:
            raw = raw[0]

        if raw is None:
            return cls(eval_name=eval_name)

        if _is_scalar_metric_value(raw):
            return cls(eval_name=eval_name, score=raw)

        if isinstance(raw, dict):
            return cls._from_dict(eval_name, raw)

        # Unknown return shape — refuse to invent semantics.
        logger.warning(
            "Evaluator %s returned an unsupported value type %s; dropping its metric for this datapoint.",
            eval_name,
            type(raw).__name__,
        )
        return cls(eval_name=eval_name)

    @classmethod
    def _from_dict(cls, eval_name: str, raw: Dict[str, Any]) -> "EvaluatorMetricResult":
        """Dict-shape branch of ``from_raw``, split out to keep nesting shallow."""
        score: Optional[ScalarScore] = None
        explanation: Optional[str] = None
        extras: Dict[str, ScalarScore] = {}

        if "score" in raw:
            raw_score = raw["score"]
            if _is_scalar_metric_value(raw_score):
                score = raw_score
            else:
                logger.warning(
                    "Evaluator %s returned a non-scalar 'score' value (%s); "
                    "dropping the score (compareRunMetrics only diffs scalar "
                    "metrics, so a nested value would be silently lost "
                    "during comparison).",
                    eval_name,
                    type(raw_score).__name__,
                )
        else:
            logger.warning(
                "Evaluator %s returned a dict missing 'score' key; the bare "
                "metrics[%s] entry won't be written, but scalar dict "
                "entries will surface as %s_<key> for diagnostics.",
                eval_name,
                eval_name,
                eval_name,
            )

        if "explanation" in raw:
            raw_expl = raw["explanation"]
            if isinstance(raw_expl, str):
                explanation = raw_expl
            elif raw_expl is not None:
                # Coerce non-string explanations to str rather than dropping —
                # they're informational and the UI just renders the value.
                explanation = str(raw_expl)

        for key, value in raw.items():
            if key in ("score", "explanation"):
                continue
            if _is_scalar_metric_value(value):
                extras[key] = value
            else:
                logger.warning(
                    "Evaluator %s returned a non-scalar extra '%s' (%s); dropping it from event.metrics.",
                    eval_name,
                    key,
                    type(value).__name__,
                )

        return cls(
            eval_name=eval_name,
            score=score,
            explanation=explanation,
            extras=extras,
        )

eval_name instance-attribute

eval_name: str

score class-attribute instance-attribute

score: Optional[ScalarScore] = None

explanation class-attribute instance-attribute

explanation: Optional[str] = None

extras class-attribute instance-attribute

extras: Dict[str, ScalarScore] = field(default_factory=dict)

to_metric_attrs

to_metric_attrs() -> Dict[str, Any]

Flatten into the dict shape expected by enrich_span(metrics=…).

Source code in src/honeyhive/experiments/core.py
89
90
91
92
93
94
95
96
97
98
def to_metric_attrs(self) -> Dict[str, Any]:
    """Flatten into the dict shape expected by ``enrich_span(metrics=…)``."""
    attrs: Dict[str, Any] = {}
    if self.score is not None:
        attrs[self.eval_name] = self.score
    if self.explanation is not None:
        attrs[f"{self.eval_name}_explanation"] = self.explanation
    for key, value in self.extras.items():
        attrs[f"{self.eval_name}_{key}"] = value
    return attrs

from_raw classmethod

from_raw(eval_name: str, raw: Any) -> EvaluatorMetricResult

Parse an evaluator's raw return value into the canonical metrics shape.

Accepts
  • scalar (bool/int/float/str) → score
  • 1-element list/tuple → element coerced to scalar (legacy behavior)
  • dict → score + optional explanation + scalar extras flattened
  • None → score stays None (failed evaluator path)

Non-scalar score values are rejected with a warning; non-scalar extras are dropped with a warning. Score-less dicts log a warning but still surface their scalar entries as extras so the data isn't lost outright.

Source code in src/honeyhive/experiments/core.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@classmethod
def from_raw(cls, eval_name: str, raw: Any) -> "EvaluatorMetricResult":
    """Parse an evaluator's raw return value into the canonical metrics shape.

    Accepts:
      * scalar (bool/int/float/str) → ``score``
      * 1-element list/tuple → element coerced to scalar (legacy behavior)
      * dict → ``score`` + optional ``explanation`` + scalar extras flattened
      * None → score stays None (failed evaluator path)

    Non-scalar score values are rejected with a warning; non-scalar
    extras are dropped with a warning. Score-less dicts log a warning
    but still surface their scalar entries as extras so the data
    isn't lost outright.
    """
    # Legacy coercion: list/tuple of length 1 unwrapped to its element.
    if isinstance(raw, (list, tuple)) and len(raw) == 1:
        raw = raw[0]

    if raw is None:
        return cls(eval_name=eval_name)

    if _is_scalar_metric_value(raw):
        return cls(eval_name=eval_name, score=raw)

    if isinstance(raw, dict):
        return cls._from_dict(eval_name, raw)

    # Unknown return shape — refuse to invent semantics.
    logger.warning(
        "Evaluator %s returned an unsupported value type %s; dropping its metric for this datapoint.",
        eval_name,
        type(raw).__name__,
    )
    return cls(eval_name=eval_name)

ExperimentContext

Lightweight experiment context for metadata linking.

NOTE: This is NOT a replacement for tracer config. This is just a convenience class for organizing experiment metadata that gets passed to the tracer.

The tracer handles actual metadata propagation when is_evaluation=True.

Attributes:

Name Type Description
run_id

Experiment run identifier

dataset_id

Dataset identifier (may have EXT- prefix)

source

Source identifier (default: "evaluation")

metadata

Additional metadata dictionary

Example

context = ExperimentContext( ... run_id="run-123", ... dataset_id="EXT-abc", ... ) tracer_config = context.to_tracer_config("dp-1") tracer_config["is_evaluation"] True

Source code in src/honeyhive/experiments/core.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
class ExperimentContext:  # pylint: disable=too-few-public-methods
    """
    Lightweight experiment context for metadata linking.

    NOTE: This is NOT a replacement for tracer config. This is just
    a convenience class for organizing experiment metadata that gets
    passed to the tracer.

    The tracer handles actual metadata propagation when is_evaluation=True.

    Attributes:
        run_id: Experiment run identifier
        dataset_id: Dataset identifier (may have EXT- prefix)
        source: Source identifier (default: "evaluation")
        metadata: Additional metadata dictionary

    Example:
        >>> context = ExperimentContext(
        ...     run_id="run-123",
        ...     dataset_id="EXT-abc",
        ... )
        >>> tracer_config = context.to_tracer_config("dp-1")
        >>> tracer_config["is_evaluation"]
        True
    """

    def __init__(
        self,
        run_id: str,
        dataset_id: str,
        *,
        run_name: Optional[str] = None,
        source: str = "evaluation",
        metadata: Optional[Dict[str, Any]] = None,
    ):
        """
        Initialize experiment context.

        Args:
            run_id: Experiment run identifier
            dataset_id: Dataset identifier
            run_name: Experiment run name (used for session naming)
            source: Source identifier (default: "evaluation")
            metadata: Additional metadata
        """
        self.run_id = run_id
        self.dataset_id = dataset_id
        self.run_name = run_name
        self.source = source
        self.metadata = metadata or {}

    def to_tracer_config(self, datapoint_id: str) -> Dict[str, Any]:
        """
        Convert to tracer initialization config.

        This returns kwargs for HoneyHiveTracer(...) initialization.
        The tracer will automatically propagate all metadata to spans
        when is_evaluation=True.

        Args:
            datapoint_id: Datapoint identifier for this execution

        Returns:
            Dictionary of tracer initialization kwargs

        Example:
            >>> config = context.to_tracer_config("dp-1")
            >>> config
            {
                'is_evaluation': True,
                'run_id': 'run-123',
                'dataset_id': 'EXT-abc',
                'datapoint_id': 'dp-1',
                'source': 'evaluation'
            }
        """
        return {
            "is_evaluation": True,
            "run_id": self.run_id,
            "dataset_id": self.dataset_id,
            "datapoint_id": datapoint_id,
            "source": self.source,
        }

run_id instance-attribute

run_id = run_id

dataset_id instance-attribute

dataset_id = dataset_id

run_name instance-attribute

run_name = run_name

source instance-attribute

source = source

metadata instance-attribute

metadata = metadata or {}

to_tracer_config

to_tracer_config(datapoint_id: str) -> Dict[str, Any]

Convert to tracer initialization config.

This returns kwargs for HoneyHiveTracer(...) initialization. The tracer will automatically propagate all metadata to spans when is_evaluation=True.

Parameters:

Name Type Description Default
datapoint_id str

Datapoint identifier for this execution

required

Returns:

Type Description
Dict[str, Any]

Dictionary of tracer initialization kwargs

Example

config = context.to_tracer_config("dp-1") config { 'is_evaluation': True, 'run_id': 'run-123', 'dataset_id': 'EXT-abc', 'datapoint_id': 'dp-1', 'source': 'evaluation' }

Source code in src/honeyhive/experiments/core.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def to_tracer_config(self, datapoint_id: str) -> Dict[str, Any]:
    """
    Convert to tracer initialization config.

    This returns kwargs for HoneyHiveTracer(...) initialization.
    The tracer will automatically propagate all metadata to spans
    when is_evaluation=True.

    Args:
        datapoint_id: Datapoint identifier for this execution

    Returns:
        Dictionary of tracer initialization kwargs

    Example:
        >>> config = context.to_tracer_config("dp-1")
        >>> config
        {
            'is_evaluation': True,
            'run_id': 'run-123',
            'dataset_id': 'EXT-abc',
            'datapoint_id': 'dp-1',
            'source': 'evaluation'
        }
    """
    return {
        "is_evaluation": True,
        "run_id": self.run_id,
        "dataset_id": self.dataset_id,
        "datapoint_id": datapoint_id,
        "source": self.source,
    }

run_experiment

run_experiment(
    function: Callable,
    dataset: List[Dict[str, Any]],
    datapoint_ids: List[str],
    *,
    server_url: Optional[str] = None,
    experiment_context: ExperimentContext,
    api_key: Optional[str] = None,
    max_workers: int = 10,
    verbose: bool = False,
    instrumentors: Optional[List[Callable[[], Any]]] = None,
    evaluators: Optional[List[Callable]] = None
) -> List[Dict[str, Any]]

Run experiment with tracer multi-instance pattern.

CRITICAL: Each datapoint gets its OWN tracer instance for isolation. This prevents: - Metadata contamination between datapoints - Race conditions in concurrent execution - Session ID collisions

Threading Model: - Uses ThreadPoolExecutor (not multiprocessing) - I/O-bound operations (LLM calls, API requests) - Each tracer instance is completely isolated - Python 3.11+ GIL improvements for I/O

Parameters:

Name Type Description Default
function Callable

User function to execute against each datapoint. Can be either a synchronous function or an async function. Async functions are automatically detected and executed with asyncio.run().

required
dataset List[Dict[str, Any]]

List of datapoint dictionaries

required
datapoint_ids List[str]

List of datapoint IDs (parallel to dataset)

required
experiment_context ExperimentContext

ExperimentContext with run metadata

required
api_key Optional[str]

HoneyHive API key for tracer (or set HONEYHIVE_API_KEY env var)

None
max_workers int

ThreadPool size (default: 10)

10
verbose bool

Enable verbose logging

False
instrumentors Optional[List[Callable[[], Any]]]

List of instrumentor factory functions. Each factory should return a new instrumentor instance when called. This ensures each datapoint gets its own instrumentor instance for proper trace routing. Example: [lambda: OpenAIInstrumentor(), lambda: AnthropicInstrumentor()]

None
evaluators Optional[List[Callable]]

Optional list of evaluator callables. When set, each evaluator runs inline on the user function's outputs inside the per-datapoint chain span; their normalized scores attach to the chain span via enrich_span before the span closes.

None

Returns:

Type Description
List[Dict[str, Any]]

List of execution results (one per datapoint)

Examples:

>>> def my_function(inputs, ground_truth):
...     return {"output": "test"}
>>>
>>> # Async functions are also supported
>>> async def my_async_function(inputs, ground_truth):
...     result = await some_async_call()
...     return {"output": result}
>>>
>>> context = ExperimentContext(
...     run_id="run-123",
...     dataset_id="ds-456",
... )
>>>
>>> results = run_experiment(
...     function=my_function,  # or my_async_function
...     dataset=[{"inputs": {}, "ground_truth": {}}],
...     datapoint_ids=["dp-1"],
...     experiment_context=context,
...     api_key="hh_...",
...     max_workers=10,
...     instrumentors=[lambda: OpenAIInstrumentor()]
... )
Source code in src/honeyhive/experiments/core.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
def run_experiment(
    function: Callable,
    dataset: List[Dict[str, Any]],
    datapoint_ids: List[str],
    *,
    server_url: Optional[str] = None,
    experiment_context: ExperimentContext,
    api_key: Optional[str] = None,
    max_workers: int = 10,
    verbose: bool = False,
    instrumentors: Optional[List[Callable[[], Any]]] = None,
    evaluators: Optional[List[Callable]] = None,
) -> List[Dict[str, Any]]:
    """
    Run experiment with tracer multi-instance pattern.

    CRITICAL: Each datapoint gets its OWN tracer instance for isolation.
    This prevents:
    - Metadata contamination between datapoints
    - Race conditions in concurrent execution
    - Session ID collisions

    Threading Model:
    - Uses ThreadPoolExecutor (not multiprocessing)
    - I/O-bound operations (LLM calls, API requests)
    - Each tracer instance is completely isolated
    - Python 3.11+ GIL improvements for I/O

    Args:
        function: User function to execute against each datapoint. Can be either
            a synchronous function or an async function. Async functions are
            automatically detected and executed with asyncio.run().
        dataset: List of datapoint dictionaries
        datapoint_ids: List of datapoint IDs (parallel to dataset)
        experiment_context: ExperimentContext with run metadata
        api_key: HoneyHive API key for tracer (or set HONEYHIVE_API_KEY env var)
        max_workers: ThreadPool size (default: 10)
        verbose: Enable verbose logging
        instrumentors: List of instrumentor factory functions. Each factory should
            return a new instrumentor instance when called. This ensures each
            datapoint gets its own instrumentor instance for proper trace routing.
            Example: [lambda: OpenAIInstrumentor(), lambda: AnthropicInstrumentor()]
        evaluators: Optional list of evaluator callables. When set, each
            evaluator runs inline on the user function's outputs inside
            the per-datapoint chain span; their normalized scores attach
            to the chain span via ``enrich_span`` before the span closes.

    Returns:
        List of execution results (one per datapoint)

    Examples:
        >>> def my_function(inputs, ground_truth):
        ...     return {"output": "test"}
        >>>
        >>> # Async functions are also supported
        >>> async def my_async_function(inputs, ground_truth):
        ...     result = await some_async_call()
        ...     return {"output": result}
        >>>
        >>> context = ExperimentContext(
        ...     run_id="run-123",
        ...     dataset_id="ds-456",
        ... )
        >>>
        >>> results = run_experiment(
        ...     function=my_function,  # or my_async_function
        ...     dataset=[{"inputs": {}, "ground_truth": {}}],
        ...     datapoint_ids=["dp-1"],
        ...     experiment_context=context,
        ...     api_key="hh_...",
        ...     max_workers=10,
        ...     instrumentors=[lambda: OpenAIInstrumentor()]
        ... )
    """
    is_async = asyncio.iscoroutinefunction(function)
    user_fn_accepts_tracer = "tracer" in inspect.signature(function).parameters

    # Whole-experiment instrumentor lifecycle. The first datapoint into the
    # pool acquires _INSTRUMENTOR_LIFECYCLE_LOCK, binds each instrumentor to
    # its tracer.provider, and records that tracer in binding_tracer. Later
    # datapoints find active_instrumentors populated and skip. Cleanup runs
    # once after the pool drains.
    #
    # binding_tracer is the transport path for every wrapped call across the
    # experiment — all such spans flow through its provider's
    # BatchSpanProcessor, so it gets one more force_flush at teardown to
    # catch anything emitted after its own datapoint's flush ran.
    active_instrumentors: List[Any] = []
    binding_tracer: List[Any] = []  # singleton container so the closure can mutate it

    def process_datapoint(
        datapoint: Dict[str, Any], datapoint_id: str
    ) -> Dict[str, Any]:
        """
        Process single datapoint with isolated tracer and instrumentors.

        This function:
        1. Creates a NEW tracer instance for this datapoint
        2. Creates NEW instrumentor instances and sets tracer provider on them
        3. Executes the user function with tracer active
        4. Uninstruments all instrumentors
        5. Flushes the tracer to ensure all spans sent
        6. Returns result with status
        """
        # Extract inputs and ground truths from datapoint
        inputs = datapoint.get("inputs", {})
        ground_truth = datapoint.get("ground_truth")

        # Create tracer config for this datapoint with inputs
        tracer_config = experiment_context.to_tracer_config(datapoint_id)
        tracer_config["inputs"] = inputs  # Set session inputs

        if experiment_context.run_name:
            tracer_config["session_name"] = experiment_context.run_name

        # Create NEW tracer instance for this datapoint
        # Each tracer is completely isolated (own API client, logger, state)
        tracer = HoneyHiveTracer(
            api_key=api_key, server_url=server_url, verbose=verbose, **tracer_config
        )

        # Instrument once for the whole experiment under the module lock.
        # An instrumentor that raises here stays uninstrumented for the rest
        # of the experiment — install failures are deterministic (missing
        # dep, version mismatch), not transient.
        if instrumentors:
            with _INSTRUMENTOR_LIFECYCLE_LOCK:
                if not active_instrumentors:
                    binding_tracer.append(tracer)
                    for instrumentor_factory in instrumentors:
                        try:
                            instrumentor = instrumentor_factory()
                            instrumentor.instrument(tracer_provider=tracer.provider)
                            active_instrumentors.append(instrumentor)
                            if verbose:
                                safe_log(
                                    tracer,
                                    "info",
                                    "Initialized instrumentor %s for experiment",
                                    type(instrumentor).__name__,
                                )
                        except Exception as e:
                            safe_log(
                                tracer,
                                "warning",
                                "Failed to initialize instrumentor: %s",
                                str(e),
                            )

        try:
            # Execute function with tracer active
            # Tracer automatically adds all experiment metadata to spans!
            if verbose:
                # Use safe_log with tracer instance (multi-instance safety)
                safe_log(
                    tracer,
                    "info",
                    "Processing datapoint %s (run: %s)",
                    datapoint_id,
                    experiment_context.run_id,
                )

            # Wrap the user function so evaluators run before the chain span
            # closes — their scores attach to the still-recording span via
            # enrich_span(metrics=…) and ride out on the OTLP export.
            #
            # Sync and async user fns take separate paths so async evaluators
            # under an async user fn can be awaited directly; spinning up a
            # nested loop in the same thread that's already running one (via
            # asyncio.run below) would raise.

            def function_with_inline_evals(dp: Dict[str, Any]) -> Any:
                fn_outputs = (
                    function(dp, tracer=tracer)
                    if user_fn_accepts_tracer
                    else function(dp)
                )
                if evaluators:
                    _apply_inline_evaluators(
                        evaluators,
                        inputs=dp.get("inputs", {}),
                        outputs=fn_outputs,
                        ground_truth=dp.get("ground_truth"),
                        tracer=tracer,
                        max_workers=max_workers,
                        verbose=verbose,
                    )
                return fn_outputs

            async def afunction_with_inline_evals(dp: Dict[str, Any]) -> Any:
                fn_outputs = await (
                    function(dp, tracer=tracer)
                    if user_fn_accepts_tracer
                    else function(dp)
                )
                if evaluators:
                    await _aapply_inline_evaluators(
                        evaluators,
                        inputs=dp.get("inputs", {}),
                        outputs=fn_outputs,
                        ground_truth=dp.get("ground_truth"),
                        tracer=tracer,
                        verbose=verbose,
                    )
                return fn_outputs

            wrapped_for_trace = (
                afunction_with_inline_evals if is_async else function_with_inline_evals
            )
            functools.update_wrapper(wrapped_for_trace, function)
            # Drop __wrapped__ so inspect.signature(..., follow_wrapped=True) —
            # used by trace's input-capture path — stops at the closure's
            # (dp,) signature instead of walking back to the user fn's
            # (dp, tracer) and failing sig.bind(datapoint).
            try:
                del wrapped_for_trace.__wrapped__
            except AttributeError:
                pass

            traced_function = trace(
                event_type="chain",
                event_name=function.__name__,
                tracer=tracer,
            )(wrapped_for_trace)

            if verbose:
                safe_log(
                    tracer,
                    "info",
                    "Calling function (async=%s, accepts_tracer=%s, evaluators=%d)",
                    is_async,
                    user_fn_accepts_tracer,
                    len(evaluators or []),
                )
            if is_async:
                outputs = asyncio.run(traced_function(datapoint))
            else:
                outputs = traced_function(datapoint)

            # Capture session ID from tracer for linking to run
            # Outputs will be enriched later via UpdateEventRequest after tracer flush
            session_id = getattr(tracer, "session_id", None)

            return {
                "datapoint_id": datapoint_id,
                "inputs": inputs,
                "outputs": outputs,
                "ground_truth": ground_truth,
                "status": "success",
                "error": None,
                "session_id": session_id,  # Include session ID for run linkage
            }

        except Exception as e:
            # Use safe_log with tracer instance for error logging
            safe_log(
                tracer,
                "error",
                "Function execution failed for datapoint %s: %s",
                datapoint_id,
                str(e),
            )

            # Capture session ID even on failure
            session_id = getattr(tracer, "session_id", None)

            return {
                "datapoint_id": datapoint_id,
                "inputs": datapoint.get("inputs", {}),
                "outputs": None,
                "ground_truth": datapoint.get("ground_truth"),
                "status": "failed",
                "error": str(e),
                "session_id": session_id,  # Include session ID for run linkage
            }

        finally:
            # CRITICAL: Flush tracer to ensure all spans sent. Instrumentor
            # teardown happens once after the pool drains (in run_experiment)
            # so an early-finishing datapoint doesn't unwrap the client out
            # from under a sibling that's still mid-call.
            try:
                force_flush_tracer(tracer)
            except Exception as e:
                # Use safe_log for flush errors (tracer may be shutting down)
                safe_log(
                    tracer,
                    "warning",
                    "Failed to flush tracer for datapoint %s: %s",
                    datapoint_id,
                    str(e),
                )

    # Validate inputs
    if len(dataset) != len(datapoint_ids):
        raise ValueError(
            f"Dataset length ({len(dataset)}) does not match datapoint_ids length ({len(datapoint_ids)})"
        )

    if verbose:
        # Module-level orchestration logging (no tracer instance)
        logger.info(
            "Executing function against %d datapoints with %d workers",
            len(dataset),
            max_workers,
        )

    # Use ThreadPoolExecutor for I/O-bound concurrent execution
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all datapoint executions
        future_to_datapoint = {}
        for datapoint, datapoint_id in zip(dataset, datapoint_ids):
            future = executor.submit(process_datapoint, datapoint, datapoint_id)
            future_to_datapoint[future] = datapoint_id

        # Collect results as they complete
        for future in as_completed(future_to_datapoint):
            datapoint_id = future_to_datapoint[future]
            try:
                result = future.result()
                results.append(result)

                if verbose:
                    status = result.get("status", "unknown")
                    # Module-level logging (tracer already flushed)
                    logger.info("Completed datapoint %s: %s", datapoint_id, status)

            except Exception as e:
                # Module-level error logging (tracer context lost)
                logger.error(
                    "Unexpected error processing datapoint %s: %s",
                    datapoint_id,
                    str(e),
                    exc_info=True,
                )
                results.append(
                    {
                        "datapoint_id": datapoint_id,
                        "status": "failed",
                        "error": str(e),
                    }
                )

    # Flush the binding tracer. Every wrapped span across the experiment
    # was emitted through its provider, and short scripts / container
    # exits can race the BatchSpanProcessor's 5 s tick and atexit hook.
    if binding_tracer:
        try:
            force_flush_tracer(binding_tracer[0])
        except Exception as e:
            logger.warning("Failed to flush binding tracer for experiment: %s", str(e))

    # Uninstrument once every datapoint has finished — unwrapping the
    # wrapped client while a sibling is still mid-call would silently drop
    # its spans.
    for instrumentor in active_instrumentors:
        try:
            instrumentor.uninstrument()
            if verbose:
                logger.info(
                    "Uninstrumented %s for experiment",
                    type(instrumentor).__name__,
                )
        except Exception as e:
            logger.warning(
                "Failed to uninstrument %s: %s",
                type(instrumentor).__name__,
                str(e),
            )

    # Log summary
    success_count = sum(1 for r in results if r.get("status") == "success")
    failed_count = sum(1 for r in results if r.get("status") == "failed")

    if verbose:
        # Module-level summary logging
        logger.info(
            "Experiment execution complete: %d succeeded, %d failed",
            success_count,
            failed_count,
        )

    return results

evaluate

evaluate(
    function: Callable,
    *,
    dataset: Optional[List[Dict[str, Any]]] = None,
    dataset_id: Optional[str] = None,
    evaluators: Optional[List[Callable]] = None,
    instrumentors: Optional[List[Callable[[], Any]]] = None,
    api_key: Optional[str] = None,
    server_url: Optional[str] = None,
    project: Optional[str] = None,
    name: Optional[str] = None,
    run_id: Optional[str] = None,
    max_workers: int = 10,
    aggregate_function: str = "average",
    verbose: bool = False,
    print_results: bool = True
) -> Any

Run experiment evaluation with backend aggregation.

This is the main user-facing API for running experiments. It: 1. Prepares dataset (external or HoneyHive) 2. Creates experiment run via API 3. Executes function against dataset with tracer multi-instance 4. Runs evaluators (if provided) 5. Retrieves aggregated results from backend

Parameters:

Name Type Description Default
function Callable

User function to execute against each datapoint. Can be either a synchronous function or an async function. Async functions are automatically detected and executed with asyncio.run().

required
dataset Optional[List[Dict[str, Any]]]

External dataset (list of dicts with 'inputs' and 'ground_truth')

None
dataset_id Optional[str]

HoneyHive dataset ID (alternative to external dataset)

None
evaluators Optional[List[Callable]]

List of evaluator functions (optional)

None
instrumentors Optional[List[Callable[[], Any]]]

List of instrumentor factory functions. Each factory should return a new instrumentor instance when called. This ensures each datapoint gets its own tracer and instrumentor instance for proper trace routing. Example: [lambda: OpenAIInstrumentor()]

None
api_key Optional[str]

HoneyHive API key (or set HONEYHIVE_API_KEY/HH_API_KEY env var)

None
server_url Optional[str]

HoneyHive server URL (or set HONEYHIVE_SERVER_URL/ HH_SERVER_URL/HH_API_URL env var)

None
project Optional[str]

Deprecated and ignored. Project scope is determined by the API key.

None
name Optional[str]

Experiment run name (auto-generated if not provided)

None
run_id Optional[str]

Experiment run ID to send to the backend (auto-generated UUID if not provided). The backend's returned run_id is always honored as the final ID.

None
max_workers int

ThreadPool size for concurrent execution (default: 10)

10
aggregate_function str

Backend aggregation function ("average", "sum", "min", "max")

'average'
verbose bool

Enable verbose logging

False
print_results bool

Print formatted results table after evaluation (default: True)

True

Returns:

Type Description
Any

ExperimentResultSummary with backend-computed aggregates

Raises:

Type Description
ValueError

If neither dataset nor dataset_id provided, or both provided

Examples:

>>> from honeyhive import HoneyHive
>>> from honeyhive.experiments import evaluate
>>>
>>> # Define function to test (sync)
>>> def my_function(inputs, ground_truth):
...     # Your LLM call or function logic
...     return {"output": "result"}
>>>
>>> # Async functions are also supported
>>> async def my_async_function(inputs, ground_truth):
...     result = await some_async_llm_call()
...     return {"output": result}
>>>
>>> # External dataset
>>> dataset = [
...     {"inputs": {"query": "test1"}, "ground_truth": {"answer": "a1"}},
...     {"inputs": {"query": "test2"}, "ground_truth": {"answer": "a2"}}
... ]
>>>
>>> result = evaluate(
...     function=my_function,  # or my_async_function
...     dataset=dataset,
...     api_key="hh_...",
...     name="My Experiment"
... )
>>>
>>> print(f"Success: {result.success}")
>>> print(f"Passed: {len(result.passed)}")
>>> print(f"Metrics: {result.metrics.list_metrics()}")
>>>
>>> # HoneyHive dataset
>>> result = evaluate(
...     function=my_function,
...     dataset_id="ds-123",
...     api_key="hh_..."
... )
>>>
>>> # With instrumentors for automatic LLM tracing
>>> from openinference.instrumentation.openai import OpenAIInstrumentor
>>> result = evaluate(
...     function=my_function,
...     dataset=dataset,
...     api_key="hh_...",
...     instrumentors=[lambda: OpenAIInstrumentor()]
... )
Source code in src/honeyhive/experiments/core.py
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
def evaluate(  # pylint: disable=too-many-locals,too-many-branches
    function: Callable,
    *,
    dataset: Optional[List[Dict[str, Any]]] = None,
    dataset_id: Optional[str] = None,
    evaluators: Optional[List[Callable]] = None,
    instrumentors: Optional[List[Callable[[], Any]]] = None,
    api_key: Optional[str] = None,
    server_url: Optional[str] = None,
    project: Optional[str] = None,
    name: Optional[str] = None,
    run_id: Optional[str] = None,
    max_workers: int = 10,
    aggregate_function: str = "average",
    verbose: bool = False,
    print_results: bool = True,
) -> Any:
    """
    Run experiment evaluation with backend aggregation.

    This is the main user-facing API for running experiments. It:
    1. Prepares dataset (external or HoneyHive)
    2. Creates experiment run via API
    3. Executes function against dataset with tracer multi-instance
    4. Runs evaluators (if provided)
    5. Retrieves aggregated results from backend

    Args:
        function: User function to execute against each datapoint. Can be either
            a synchronous function or an async function. Async functions are
            automatically detected and executed with asyncio.run().
        dataset: External dataset (list of dicts with 'inputs' and 'ground_truth')
        dataset_id: HoneyHive dataset ID (alternative to external dataset)
        evaluators: List of evaluator functions (optional)
        instrumentors: List of instrumentor factory functions. Each factory should
            return a new instrumentor instance when called. This ensures each
            datapoint gets its own tracer and instrumentor instance for proper
            trace routing. Example: [lambda: OpenAIInstrumentor()]
        api_key: HoneyHive API key (or set HONEYHIVE_API_KEY/HH_API_KEY env var)
        server_url: HoneyHive server URL (or set HONEYHIVE_SERVER_URL/
            HH_SERVER_URL/HH_API_URL env var)
        project: Deprecated and ignored. Project scope is determined by the API key.
        name: Experiment run name (auto-generated if not provided)
        run_id: Experiment run ID to send to the backend (auto-generated UUID if not
            provided). The backend's returned run_id is always honored as the final ID.
        max_workers: ThreadPool size for concurrent execution (default: 10)
        aggregate_function: Backend aggregation function
            ("average", "sum", "min", "max")
        verbose: Enable verbose logging
        print_results: Print formatted results table after evaluation
            (default: True)

    Returns:
        ExperimentResultSummary with backend-computed aggregates

    Raises:
        ValueError: If neither dataset nor dataset_id provided, or both provided

    Examples:
        >>> from honeyhive import HoneyHive
        >>> from honeyhive.experiments import evaluate
        >>>
        >>> # Define function to test (sync)
        >>> def my_function(inputs, ground_truth):
        ...     # Your LLM call or function logic
        ...     return {"output": "result"}
        >>>
        >>> # Async functions are also supported
        >>> async def my_async_function(inputs, ground_truth):
        ...     result = await some_async_llm_call()
        ...     return {"output": result}
        >>>
        >>> # External dataset
        >>> dataset = [
        ...     {"inputs": {"query": "test1"}, "ground_truth": {"answer": "a1"}},
        ...     {"inputs": {"query": "test2"}, "ground_truth": {"answer": "a2"}}
        ... ]
        >>>
        >>> result = evaluate(
        ...     function=my_function,  # or my_async_function
        ...     dataset=dataset,
        ...     api_key="hh_...",
        ...     name="My Experiment"
        ... )
        >>>
        >>> print(f"Success: {result.success}")
        >>> print(f"Passed: {len(result.passed)}")
        >>> print(f"Metrics: {result.metrics.list_metrics()}")
        >>>
        >>> # HoneyHive dataset
        >>> result = evaluate(
        ...     function=my_function,
        ...     dataset_id="ds-123",
        ...     api_key="hh_..."
        ... )
        >>>
        >>> # With instrumentors for automatic LLM tracing
        >>> from openinference.instrumentation.openai import OpenAIInstrumentor
        >>> result = evaluate(
        ...     function=my_function,
        ...     dataset=dataset,
        ...     api_key="hh_...",
        ...     instrumentors=[lambda: OpenAIInstrumentor()]
        ... )
    """
    # Validate inputs
    if dataset is None and dataset_id is None:
        raise ValueError("Must provide either 'dataset' or 'dataset_id'")
    if dataset is not None and dataset_id is not None:
        raise ValueError("Cannot provide both 'dataset' and 'dataset_id'")
    if project is not None:
        warnings.warn(
            "The 'project' argument to evaluate() is deprecated and ignored. "
            "Project scope is determined by the API key.",
            DeprecationWarning,
            stacklevel=2,
        )

    # Load from environment variables if not provided
    # Support both HONEYHIVE_* and HH_* prefixes for convenience
    # Note: HoneyHive client's config only reads HH_* prefix, so we check
    # HONEYHIVE_* first for better UX, then pass explicitly to client
    if api_key is None:
        api_key = os.getenv("HONEYHIVE_API_KEY") or os.getenv("HH_API_KEY")

    if server_url is None:
        # Check multiple variations for maximum compatibility
        server_url = (
            os.getenv("HONEYHIVE_SERVER_URL")  # Most intuitive
            or os.getenv("HH_SERVER_URL")  # Alternative shorthand
            or os.getenv("HH_API_URL")  # Client config uses this
        )

    # Initialize client - passing explicit values ensures both HONEYHIVE_* and HH_*
    # environment variables work (client's config only checks HH_* prefix)
    client_params = {"api_key": api_key}
    if server_url:
        client_params["base_url"] = server_url
    client = HoneyHive(**client_params)

    # Step 1: Prepare dataset
    if dataset is not None:
        # External dataset - generate EXT- IDs
        if verbose:
            logger.info("Preparing external dataset with %d datapoints", len(dataset))

        external_dataset_id, datapoint_ids = prepare_external_dataset(dataset)
        dataset_list = dataset

        if verbose:
            logger.info("Generated external dataset ID: %s", external_dataset_id)
    else:
        # HoneyHive dataset - fetch from API
        # At this point dataset_id is guaranteed to be str (not None)
        assert dataset_id is not None, "dataset_id must be provided"

        if verbose:
            logger.info("Fetching HoneyHive dataset: %s", dataset_id)
            logger.info("DEBUG - Input dataset_id type: %s", type(dataset_id))
            logger.info("DEBUG - Is EXT- dataset: %s", dataset_id.startswith("EXT-"))

        # Get dataset metadata - list() returns GetDatasetsResponse with datasets list
        ds_response = client.datasets.list(dataset_id=dataset_id)
        dataset_list = []
        datapoint_ids = []

        # Extract the dataset from the response
        if not ds_response.datasets:
            raise ValueError(f"Dataset not found: {dataset_id}")
        dataset_obj = ds_response.datasets[0]

        # Dataset.datapoints is List[str] (IDs only), fetch each datapoint.
        # get_datapoint returns a typed GetDatapointResponse Pydantic model
        # whose `.datapoint` field is List[Datapoint] (also Pydantic).
        #
        # Catch ONLY the exception types that represent a fetch failure
        # we can reasonably skip and keep going on (HTTP errors from the
        # generated SDK + httpx transport-level errors). Anything else
        # — AttributeError, TypeError, KeyError, etc. — indicates a real
        # bug we want to surface immediately rather than silently produce
        # an empty datapoint list.
        if dataset_obj.datapoints:
            for dp_id in dataset_obj.datapoints:
                try:
                    dp_response = client.datapoints.get_datapoint(dp_id)
                except (HTTPException, httpx.HTTPError) as e:
                    logger.warning("Failed to fetch datapoint %s: %s", dp_id, str(e))
                    continue
                dp_list = getattr(dp_response, "datapoint", []) or []
                if dp_list:
                    dp = dp_list[0]
                    dataset_list.append(
                        {
                            "inputs": getattr(dp, "inputs", None) or {},
                            "ground_truth": getattr(dp, "ground_truth", None),
                            "id": getattr(dp, "id", None) or dp_id,
                        }
                    )
                    datapoint_ids.append(getattr(dp, "id", None) or dp_id)

            # Guard against the silent-data-loss shape that the narrow
            # except above doesn't cover: every fetch logged + skipped
            # (transient HTTP failure on every datapoint), or every
            # response had an empty `.datapoint` list. In either case
            # the dataset claimed N datapoints but we collected zero —
            # better to fail loudly than to proceed with an empty
            # dataset and report passed=0.
            if not dataset_list:
                raise ValueError(
                    f"Dataset {dataset_id} listed "
                    f"{len(dataset_obj.datapoints)} datapoint(s) but every "
                    f"fetch returned no usable datapoint. Check warnings "
                    f"above for per-datapoint errors."
                )

        external_dataset_id = dataset_id

        if verbose:
            logger.info(
                "Loaded %d datapoints from HoneyHive dataset", len(dataset_list)
            )
            logger.info("DEBUG - external_dataset_id set to: %s", external_dataset_id)
            logger.info("DEBUG - datapoint_ids collected: %s", datapoint_ids)

    # Step 2: Create experiment run
    # Generate a client-side UUID if no run_id was provided. The backend also
    # generates a UUID when run_id is omitted, but we do it here so the
    # default run name ("experiment-{short_id}") is derived from the same ID
    # that will be sent in the request.
    run_id = run_id or str(uuid.uuid4())
    run_name = name or f"experiment-{run_id[:8]}"

    if verbose:
        logger.info("Creating experiment run: %s", run_name)
        logger.info("DEBUG - Before prepare_run_request_data:")
        logger.info("  external_dataset_id: %s", external_dataset_id)
        logger.info("  datapoint_ids: %s", datapoint_ids)

    git_context = get_git_context()

    run_metadata: Dict[str, Any] = {}
    if git_context:
        run_metadata["git"] = git_context

    run_data = prepare_run_request_data(
        run_id=run_id,
        name=run_name,
        dataset_id=external_dataset_id,
        event_ids=[],  # Empty initially
        datapoint_ids=datapoint_ids,  # Link datapoints to run
        configuration={
            "function": function.__name__,
            "evaluators": [e.__name__ for e in (evaluators or [])],
            "max_workers": max_workers,
            "aggregate_function": aggregate_function,
        },
        metadata=run_metadata,
        status="pending",
    )

    if verbose:
        logger.info("DEBUG - After prepare_run_request_data:")
        logger.info("  run_data['dataset_id']: %s", run_data.get("dataset_id"))
        logger.info("  run_data['datapoint_ids']: %s", run_data.get("datapoint_ids"))
        logger.info("  run_data['metadata']: %s", run_data.get("metadata"))

    # Create run via API (experiments API handles runs)
    run_request = PostExperimentRunRequest(**run_data)
    run_response = client.experiments.create_run(run_request)

    # Use backend-generated run_id if available
    if hasattr(run_response, "run_id") and run_response.run_id:
        run_id = str(run_response.run_id)

    if verbose:
        logger.info("Created experiment run: %s", run_id)

    # Step 3: Create experiment context
    # external_dataset_id is guaranteed to be str at this point
    context = ExperimentContext(
        run_id=run_id,
        dataset_id=external_dataset_id or "",  # Type safety
        run_name=run_name,
        source="evaluation",
    )

    # Step 4: Execute experiment with tracer multi-instance
    if verbose:
        logger.info(
            "Executing function against %d datapoints with %d workers",
            len(dataset_list),
            max_workers,
        )

    execution_results = run_experiment(
        function=function,
        dataset=dataset_list,
        datapoint_ids=datapoint_ids,
        server_url=server_url,
        experiment_context=context,
        api_key=api_key,
        max_workers=max_workers,
        verbose=verbose,
        instrumentors=instrumentors,
        evaluators=evaluators,
    )

    if verbose:
        logger.info("Enriching sessions with outputs and ground_truth")

    for result in execution_results:
        session_id = result.get("session_id")
        if session_id:
            _enrich_session_with_results(
                session_id=session_id,
                outputs=result.get("outputs"),
                ground_truth=result.get("ground_truth"),
                client=client,
                verbose=verbose,
            )

    _update_run_with_results(
        run_id=run_id,
        run_name=run_name,
        execution_results=execution_results,
        external_dataset_id=external_dataset_id,
        client=client,
        verbose=verbose,
    )

    # Step 7: Retrieve aggregated results from backend
    if verbose:
        logger.info(
            "Retrieving aggregated results with %s aggregation", aggregate_function
        )

    result_summary = get_run_result(
        client=client,
        run_id=run_id,
        aggregate_function=aggregate_function,
    )

    if verbose:
        logger.info(
            "Experiment complete: %s (passed: %d, failed: %d)",
            "SUCCESS" if result_summary.success else "FAILED",
            len(result_summary.passed),
            len(result_summary.failed),
        )

    # Print formatted results table if requested
    if print_results:
        result_summary.print_table(run_name=run_name)

    return result_summary