Skip to content

honeyhive.experiments

Experiments module for HoneyHive SDK.

This module provides experiment management functionality including: - Experiment run creation and management - External dataset support with EXT- prefix - Result aggregation from backend - Run comparison and analysis - Evaluator framework integration

The experiments module replaces the legacy evaluation module while maintaining backward compatibility through deprecation aliases.

ExperimentRun module-attribute

ExperimentRun = ExperimentResultSummary

ExperimentContext

Lightweight experiment context for metadata linking.

NOTE: This is NOT a replacement for tracer config. This is just a convenience class for organizing experiment metadata that gets passed to the tracer.

The tracer handles actual metadata propagation when is_evaluation=True.

Attributes:

Name Type Description
run_id

Experiment run identifier

dataset_id

Dataset identifier (may have EXT- prefix)

source

Source identifier (default: "evaluation")

metadata

Additional metadata dictionary

Example

context = ExperimentContext( ... run_id="run-123", ... dataset_id="EXT-abc", ... ) tracer_config = context.to_tracer_config("dp-1") tracer_config["is_evaluation"] True

Source code in src/honeyhive/experiments/core.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
class ExperimentContext:  # pylint: disable=too-few-public-methods
    """
    Lightweight experiment context for metadata linking.

    NOTE: This is NOT a replacement for tracer config. This is just
    a convenience class for organizing experiment metadata that gets
    passed to the tracer.

    The tracer handles actual metadata propagation when is_evaluation=True.

    Attributes:
        run_id: Experiment run identifier
        dataset_id: Dataset identifier (may have EXT- prefix)
        source: Source identifier (default: "evaluation")
        metadata: Additional metadata dictionary

    Example:
        >>> context = ExperimentContext(
        ...     run_id="run-123",
        ...     dataset_id="EXT-abc",
        ... )
        >>> tracer_config = context.to_tracer_config("dp-1")
        >>> tracer_config["is_evaluation"]
        True
    """

    def __init__(
        self,
        run_id: str,
        dataset_id: str,
        *,
        run_name: Optional[str] = None,
        source: str = "evaluation",
        metadata: Optional[Dict[str, Any]] = None,
    ):
        """
        Initialize experiment context.

        Args:
            run_id: Experiment run identifier
            dataset_id: Dataset identifier
            run_name: Experiment run name (used for session naming)
            source: Source identifier (default: "evaluation")
            metadata: Additional metadata
        """
        self.run_id = run_id
        self.dataset_id = dataset_id
        self.run_name = run_name
        self.source = source
        self.metadata = metadata or {}

    def to_tracer_config(self, datapoint_id: str) -> Dict[str, Any]:
        """
        Convert to tracer initialization config.

        This returns kwargs for HoneyHiveTracer(...) initialization.
        The tracer will automatically propagate all metadata to spans
        when is_evaluation=True.

        Args:
            datapoint_id: Datapoint identifier for this execution

        Returns:
            Dictionary of tracer initialization kwargs

        Example:
            >>> config = context.to_tracer_config("dp-1")
            >>> config
            {
                'is_evaluation': True,
                'run_id': 'run-123',
                'dataset_id': 'EXT-abc',
                'datapoint_id': 'dp-1',
                'source': 'evaluation'
            }
        """
        return {
            "is_evaluation": True,
            "run_id": self.run_id,
            "dataset_id": self.dataset_id,
            "datapoint_id": datapoint_id,
            "source": self.source,
        }

run_id instance-attribute

run_id = run_id

dataset_id instance-attribute

dataset_id = dataset_id

run_name instance-attribute

run_name = run_name

source instance-attribute

source = source

metadata instance-attribute

metadata = metadata or {}

to_tracer_config

to_tracer_config(datapoint_id: str) -> Dict[str, Any]

Convert to tracer initialization config.

This returns kwargs for HoneyHiveTracer(...) initialization. The tracer will automatically propagate all metadata to spans when is_evaluation=True.

Parameters:

Name Type Description Default
datapoint_id str

Datapoint identifier for this execution

required

Returns:

Type Description
Dict[str, Any]

Dictionary of tracer initialization kwargs

Example

config = context.to_tracer_config("dp-1") config { 'is_evaluation': True, 'run_id': 'run-123', 'dataset_id': 'EXT-abc', 'datapoint_id': 'dp-1', 'source': 'evaluation' }

Source code in src/honeyhive/experiments/core.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def to_tracer_config(self, datapoint_id: str) -> Dict[str, Any]:
    """
    Convert to tracer initialization config.

    This returns kwargs for HoneyHiveTracer(...) initialization.
    The tracer will automatically propagate all metadata to spans
    when is_evaluation=True.

    Args:
        datapoint_id: Datapoint identifier for this execution

    Returns:
        Dictionary of tracer initialization kwargs

    Example:
        >>> config = context.to_tracer_config("dp-1")
        >>> config
        {
            'is_evaluation': True,
            'run_id': 'run-123',
            'dataset_id': 'EXT-abc',
            'datapoint_id': 'dp-1',
            'source': 'evaluation'
        }
    """
    return {
        "is_evaluation": True,
        "run_id": self.run_id,
        "dataset_id": self.dataset_id,
        "datapoint_id": datapoint_id,
        "source": self.source,
    }

EvalResult

Result container for evaluator execution.

Source code in src/honeyhive/experiments/evaluators.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
class EvalResult:
    """Result container for evaluator execution."""

    def __init__(self, score: Any, init_method: Optional[str] = None, **metadata):

        self.score: Any | EvalResult = score
        self.metadata: dict = metadata

        # determine the eval_type
        self.init_method = init_method or inspect.stack()[1].function

        self.eval_settings: Optional[EvalSettings] = EvalSettings(name=self.init_method)
        self.eval_kwargs: Optional[dict] = dict()

        # for easy access
        self.weight = self.eval_settings.weight

        self.func_impl: Callable = None
        self.func_args: tuple = None
        self.func_kwargs: dict = None

    def to_dict(self) -> dict:
        """Convert result to dictionary."""
        return {"score": self.score, "metadata": self.metadata}

    def copy(self) -> "EvalResult":
        copy_result = EvalResult(
            score=self.score, init_method=self.init_method, **self.metadata
        )
        return copy_result

score instance-attribute

score: Any | EvalResult = score

metadata instance-attribute

metadata: dict = metadata

init_method instance-attribute

init_method = init_method or function

eval_settings instance-attribute

eval_settings: Optional[EvalSettings] = EvalSettings(
    name=init_method
)

eval_kwargs instance-attribute

eval_kwargs: Optional[dict] = dict()

weight instance-attribute

weight = weight

func_impl instance-attribute

func_impl: Callable = None

func_args instance-attribute

func_args: tuple = None

func_kwargs instance-attribute

func_kwargs: dict = None

to_dict

to_dict() -> dict

Convert result to dictionary.

Source code in src/honeyhive/experiments/evaluators.py
263
264
265
def to_dict(self) -> dict:
    """Convert result to dictionary."""
    return {"score": self.score, "metadata": self.metadata}

copy

copy() -> EvalResult
Source code in src/honeyhive/experiments/evaluators.py
267
268
269
270
271
def copy(self) -> "EvalResult":
    copy_result = EvalResult(
        score=self.score, init_method=self.init_method, **self.metadata
    )
    return copy_result

EvalSettings dataclass

Configuration settings for evaluators.

Source code in src/honeyhive/experiments/evaluators.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@dataclass
class EvalSettings:
    """Configuration settings for evaluators."""

    name: str  # config key
    wraps: Optional[str | dict] = None
    weight: float = None
    asserts: bool = None
    repeat: Optional[int] = None
    transform: Optional[str] = None
    aggregate: Optional[str] = None
    checker: Optional[str] = None
    # ``target`` is the assertion/checker target (e.g. "score in range
    # [target]") consumed by run_asserts/pre_run_checker.
    target: Optional[str] = None
    evaluate: Optional[str] = None

    def copy(self) -> "EvalSettings":
        """Create a deep copy of the settings."""
        return EvalSettings(
            name=self.name,
            wraps=self.wraps,
            weight=self.weight,
            repeat=self.repeat,
            asserts=self.asserts,
            transform=self.transform,
            aggregate=self.aggregate,
            checker=self.checker,
            target=self.target,
            evaluate=self.evaluate,
        )

    def keys(self):
        """Return dictionary keys."""
        return self.__dict__.keys()

    # TODO: settings update should replace instead of merge
    def update(self, eval_settings: Any | None) -> None:
        """Update settings from dict or EvalSettings instance."""
        if eval_settings is None:
            return
        if isinstance(eval_settings, dict):
            update_dict = eval_settings
        elif isinstance(eval_settings, EvalSettings):
            update_dict = eval_settings.__dict__
        else:
            raise TypeError(
                "eval_settings must be either a dictionary or an EvalSettings instance. Got {}".format(
                    type(eval_settings)
                )
            )

        valid_fields = {f.name for f in fields(self)}

        for key, value in update_dict.items():
            if key not in valid_fields:
                raise ValueError(f"Invalid field name: {key}")
            if value is not None:
                setattr(self, key, value)

    @staticmethod
    def extract_eval_settings_and_kwargs(settings: dict[str, Any] | None):
        """Extract evaluator settings and kwargs from a combined dict."""

        eval_kwargs = dict()
        eval_settings = dict()

        if settings is not None:
            for key, value in settings.items():
                if key in EVALUATOR_SETTINGS_KEYS:
                    eval_settings[key] = value
                else:
                    eval_kwargs[key] = value

        return eval_settings, eval_kwargs

    def __str__(self) -> str:
        """Return string representation of settings."""
        dict_str = {k: str(v) for k, v in self.__dict__.items() if v is not None}
        return json.dumps(dict_str, indent=4).replace('"', "")

    def dict(self) -> dict:
        """Convert to dictionary, excluding name."""
        ret_dict = self.__dict__
        ret_dict.pop("name", None)
        return ret_dict

name instance-attribute

name: str

wraps class-attribute instance-attribute

wraps: Optional[str | dict] = None

weight class-attribute instance-attribute

weight: float = None

asserts class-attribute instance-attribute

asserts: bool = None

repeat class-attribute instance-attribute

repeat: Optional[int] = None

transform class-attribute instance-attribute

transform: Optional[str] = None

aggregate class-attribute instance-attribute

aggregate: Optional[str] = None

checker class-attribute instance-attribute

checker: Optional[str] = None

target class-attribute instance-attribute

target: Optional[str] = None

evaluate class-attribute instance-attribute

evaluate: Optional[str] = None

copy

copy() -> EvalSettings

Create a deep copy of the settings.

Source code in src/honeyhive/experiments/evaluators.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def copy(self) -> "EvalSettings":
    """Create a deep copy of the settings."""
    return EvalSettings(
        name=self.name,
        wraps=self.wraps,
        weight=self.weight,
        repeat=self.repeat,
        asserts=self.asserts,
        transform=self.transform,
        aggregate=self.aggregate,
        checker=self.checker,
        target=self.target,
        evaluate=self.evaluate,
    )

keys

keys()

Return dictionary keys.

Source code in src/honeyhive/experiments/evaluators.py
106
107
108
def keys(self):
    """Return dictionary keys."""
    return self.__dict__.keys()

update

update(eval_settings: Any | None) -> None

Update settings from dict or EvalSettings instance.

Source code in src/honeyhive/experiments/evaluators.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def update(self, eval_settings: Any | None) -> None:
    """Update settings from dict or EvalSettings instance."""
    if eval_settings is None:
        return
    if isinstance(eval_settings, dict):
        update_dict = eval_settings
    elif isinstance(eval_settings, EvalSettings):
        update_dict = eval_settings.__dict__
    else:
        raise TypeError(
            "eval_settings must be either a dictionary or an EvalSettings instance. Got {}".format(
                type(eval_settings)
            )
        )

    valid_fields = {f.name for f in fields(self)}

    for key, value in update_dict.items():
        if key not in valid_fields:
            raise ValueError(f"Invalid field name: {key}")
        if value is not None:
            setattr(self, key, value)

extract_eval_settings_and_kwargs staticmethod

extract_eval_settings_and_kwargs(
    settings: dict[str, Any] | None,
)

Extract evaluator settings and kwargs from a combined dict.

Source code in src/honeyhive/experiments/evaluators.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@staticmethod
def extract_eval_settings_and_kwargs(settings: dict[str, Any] | None):
    """Extract evaluator settings and kwargs from a combined dict."""

    eval_kwargs = dict()
    eval_settings = dict()

    if settings is not None:
        for key, value in settings.items():
            if key in EVALUATOR_SETTINGS_KEYS:
                eval_settings[key] = value
            else:
                eval_kwargs[key] = value

    return eval_settings, eval_kwargs

dict

dict() -> dict

Convert to dictionary, excluding name.

Source code in src/honeyhive/experiments/evaluators.py
155
156
157
158
159
def dict(self) -> dict:
    """Convert to dictionary, excluding name."""
    ret_dict = self.__dict__
    ret_dict.pop("name", None)
    return ret_dict

EvaluatorSettings dataclass

Hierarchical settings management for evaluators.

Source code in src/honeyhive/experiments/evaluators.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@dataclass
class EvaluatorSettings:
    """Hierarchical settings management for evaluators."""

    name: str

    # base default settings
    default_settings: EvalSettings = None
    default_kwargs: dict = field(default_factory=dict)

    # settings from defaults.yaml
    defaults_yaml_settings: EvalSettings = None
    defaults_yaml_kwargs: dict = field(default_factory=dict)

    # settings from evaluator init
    init_settings: EvalSettings = None
    init_kwargs: dict = field(default_factory=dict)

    # settings from decorator kwargs
    deco_settings: EvalSettings = None
    deco_kwargs: dict = field(default_factory=dict)

    # settings from config file
    config_settings: EvalSettings = None
    config_kwargs: dict = field(default_factory=dict)

    # settings from runtime
    explicit_settings: EvalSettings = None
    explicit_kwargs: dict = field(default_factory=dict)

    def __post_init__(self):
        self.default_settings = EvalSettings(name=self.name)
        self.defaults_yaml_settings = EvalSettings(name=self.name)
        self.init_settings = EvalSettings(name=self.name)
        self.deco_settings = EvalSettings(name=self.name)
        self.config_settings = EvalSettings(name=self.name)
        self.explicit_settings = None  # this must be set at runtime

        # set defaults
        self.default_settings.asserts = False
        self.default_settings.weight = 1.0

    def resolve_settings(self, settings: EvalSettings | None = None) -> EvalSettings:
        """Resolve settings from all sources in priority order."""
        if self.explicit_settings:
            return self.explicit_settings

        final_settings: EvalSettings = self.default_settings
        final_settings.update(self.defaults_yaml_settings)
        final_settings.update(self.init_settings)
        final_settings.update(self.config_settings)
        final_settings.update(self.deco_settings)

        if settings is not None:
            final_settings.update(settings)

        return final_settings

    def resolve_kwargs(self, kwargs: dict | None = None) -> dict:
        """Resolve kwargs from all sources in priority order."""
        if self.explicit_kwargs:
            return self.explicit_kwargs

        final_kwargs: dict = self.default_kwargs.copy()
        final_kwargs.update(self.defaults_yaml_kwargs)
        final_kwargs.update(self.init_kwargs)
        final_kwargs.update(self.config_kwargs)
        final_kwargs.update(self.deco_kwargs)

        if kwargs is not None:
            final_kwargs.update(kwargs)

        return final_kwargs

name instance-attribute

name: str

default_settings class-attribute instance-attribute

default_settings: EvalSettings = None

default_kwargs class-attribute instance-attribute

default_kwargs: dict = field(default_factory=dict)

defaults_yaml_settings class-attribute instance-attribute

defaults_yaml_settings: EvalSettings = None

defaults_yaml_kwargs class-attribute instance-attribute

defaults_yaml_kwargs: dict = field(default_factory=dict)

init_settings class-attribute instance-attribute

init_settings: EvalSettings = None

init_kwargs class-attribute instance-attribute

init_kwargs: dict = field(default_factory=dict)

deco_settings class-attribute instance-attribute

deco_settings: EvalSettings = None

deco_kwargs class-attribute instance-attribute

deco_kwargs: dict = field(default_factory=dict)

config_settings class-attribute instance-attribute

config_settings: EvalSettings = None

config_kwargs class-attribute instance-attribute

config_kwargs: dict = field(default_factory=dict)

explicit_settings class-attribute instance-attribute

explicit_settings: EvalSettings = None

explicit_kwargs class-attribute instance-attribute

explicit_kwargs: dict = field(default_factory=dict)

resolve_settings

resolve_settings(
    settings: EvalSettings | None = None,
) -> EvalSettings

Resolve settings from all sources in priority order.

Source code in src/honeyhive/experiments/evaluators.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def resolve_settings(self, settings: EvalSettings | None = None) -> EvalSettings:
    """Resolve settings from all sources in priority order."""
    if self.explicit_settings:
        return self.explicit_settings

    final_settings: EvalSettings = self.default_settings
    final_settings.update(self.defaults_yaml_settings)
    final_settings.update(self.init_settings)
    final_settings.update(self.config_settings)
    final_settings.update(self.deco_settings)

    if settings is not None:
        final_settings.update(settings)

    return final_settings

resolve_kwargs

resolve_kwargs(kwargs: dict | None = None) -> dict

Resolve kwargs from all sources in priority order.

Source code in src/honeyhive/experiments/evaluators.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def resolve_kwargs(self, kwargs: dict | None = None) -> dict:
    """Resolve kwargs from all sources in priority order."""
    if self.explicit_kwargs:
        return self.explicit_kwargs

    final_kwargs: dict = self.default_kwargs.copy()
    final_kwargs.update(self.defaults_yaml_kwargs)
    final_kwargs.update(self.init_kwargs)
    final_kwargs.update(self.config_kwargs)
    final_kwargs.update(self.deco_kwargs)

    if kwargs is not None:
        final_kwargs.update(kwargs)

    return final_kwargs

aevaluator

Bases: evaluator

Async evaluator decorator class.

Source code in src/honeyhive/experiments/evaluators.py
1168
1169
1170
1171
1172
1173
1174
1175
class aevaluator(evaluator):  # pylint: disable=invalid-name
    """Async evaluator decorator class."""

    async def __call__(self, *args, **kwargs):
        return await self.__acall__(*args, **kwargs)

    async def raw(self, *args, **kwargs):
        return await self.araw(*args, **kwargs)

raw async

raw(*args, **kwargs)
Source code in src/honeyhive/experiments/evaluators.py
1174
1175
async def raw(self, *args, **kwargs):
    return await self.araw(*args, **kwargs)

evaluator

Sync evaluator decorator class with pipeline support.

Source code in src/honeyhive/experiments/evaluators.py
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
class evaluator(metaclass=EvaluatorMeta):  # pylint: disable=invalid-name
    """Sync evaluator decorator class with pipeline support."""

    # ------------------------------------------------------------------------------
    # STATICS / INITIALIZE
    # ------------------------------------------------------------------------------

    # global registry of evaluator names to evaluator instances
    all_evaluators: dict[str, "evaluator" | Callable | Coroutine | "aevaluator"] = (
        dict()
    )

    # global registry of evaluator names to evaluator settings
    all_evaluator_settings: dict[str, EvaluatorSettings] = dict()

    def __unnamed__(self, *args, **kwargs):
        """Placeholder for unnamed evaluator."""
        raise NotImplementedError(f"Please decorate with an evaluator implementation.")

    def __new__(
        cls, func=None, eval_settings=None, eval_kwargs=None, **deco_settings_kwargs
    ) -> "evaluator":
        """Allows evaluator to be initialized in the decorator with kwargs"""
        if func is None:
            return lambda f: cls(f, eval_settings, eval_kwargs, **deco_settings_kwargs)
        return super().__new__(cls)

    def __init__(
        self,
        func: Callable = __unnamed__,
        eval_settings: EvalSettings | None = None,
        eval_kwargs: dict[str, Any] | None = None,
        **deco_settings_kwargs,
    ) -> None:

        # set the wrapped function implementation and its name
        self.func: Callable = func

        # set the evaluator name
        self.name: str = func.__name__ if hasattr(func, "__name__") else str(func)

        # register all_evaluators[func name] = this evaluator
        self.all_evaluators[self.name] = self

        # initialize the evaluator settings
        if self.name not in evaluator.all_evaluator_settings:
            evaluator.all_evaluator_settings[self.name] = EvaluatorSettings(self.name)

        # get the settings
        evaluator_settings = evaluator.all_evaluator_settings[self.name]

        # init settings
        if eval_settings:
            eval_settings.name = self.name
        else:
            eval_settings = EvalSettings(name=self.name)

        evaluator_settings.init_settings = eval_settings
        evaluator_settings.init_kwargs = eval_kwargs or {}

        # decorator kwargs
        kwarg_eval_settings, kwarg_eval_kwargs = (
            EvalSettings.extract_eval_settings_and_kwargs(deco_settings_kwargs)
        )
        kwarg_eval_settings["name"] = self.name
        evaluator_settings.deco_settings = EvalSettings(**kwarg_eval_settings)
        evaluator_settings.deco_kwargs = kwarg_eval_kwargs

        self.explicit_config = None

        # sets decorator metadata to that of wrapped function
        functools.update_wrapper(self, func)
        functools.update_wrapper(func, self)

    # ------------------------------------------------------------------------------
    # AGGREGATION
    def pre_apply_aggregation(
        self,
        eval_results: tuple[EvalResult] | list[EvalResult],
        eval_scores: tuple | list,
        final_settings: EvalSettings,
    ) -> tuple[EvalResult, Any] | Coroutine:

        locals_dict = {"values": eval_scores, "results": eval_results}

        # TODO: enable aggregate to be a function
        aggregation_expr = str(final_settings.aggregate)

        # apply aggregation
        aggregate_score = eval(aggregation_expr, evaluator.all_evaluators, locals_dict)

        return aggregate_score

    def post_apply_aggregation(
        self,
        eval_results: tuple[EvalResult] | list[EvalResult] | EvalResult,
        aggregate_score: Any,
        final_settings: EvalSettings,
    ):
        """Wrap aggregated score in EvalResult."""
        init_methods = set()

        # if no repetitions, we will only have one eval result
        if isinstance(eval_results, EvalResult):
            init_methods.add(eval_results.init_method)
        else:
            for eval_result in eval_results:
                if isinstance(eval_result, EvalResult):
                    init_methods.add(eval_result.init_method)

        init_method = "aggregate: "
        if len(init_methods) > 0:
            init_method += "-".join(init_methods)

        aggregate_result = EvalResult(
            aggregate_score, init_method=init_method, prev_results=eval_results
        )

        return aggregate_result

    def sync_apply_aggregation(
        self,
        eval_results: tuple[EvalResult] | list[EvalResult] | EvalResult,
        eval_scores: tuple | list | Any,
        final_settings: EvalSettings,
    ) -> tuple[EvalResult, Any]:
        """Synchronously apply aggregation to results."""

        if not final_settings.aggregate:
            return eval_results, eval_scores

        aggregate_score = self.pre_apply_aggregation(
            eval_results, eval_scores, final_settings
        )

        aggregate_score = evaluator.resolve_pipeline(aggregate_score, eval_scores)

        aggregate_result = self.post_apply_aggregation(
            eval_results, aggregate_score, final_settings
        )

        return aggregate_result, aggregate_score

    async def async_apply_aggregation(
        self,
        eval_results: tuple[EvalResult] | list[EvalResult],
        eval_scores: tuple | list,
        final_settings: EvalSettings,
    ) -> tuple[EvalResult, Any]:
        """Asynchronously apply aggregation to results."""

        if not final_settings.aggregate:
            return eval_results, eval_scores

        aggregate_score = self.pre_apply_aggregation(
            eval_results, eval_scores, final_settings
        )

        aggregate_score = await evaluator.aresolve_pipeline(
            aggregate_score, eval_scores
        )

        aggregate_result = self.post_apply_aggregation(
            eval_results, aggregate_score, final_settings
        )

        return aggregate_result, aggregate_score

    # ------------------------------------------------------------------------------

    # ------------------------------------------------------------------------------
    # TRANSFORMATION
    def pre_apply_transformation(
        self,
        eval_result: EvalResult,
        eval_score: Any,
        final_settings: EvalSettings,
    ):
        """Apply transformation expression to evaluation score."""

        transform_expr = str(final_settings.transform)

        locals_dict = {"value": eval_score, "result": eval_result}

        # apply transformation
        transformed_score = eval(transform_expr, evaluator.all_evaluators, locals_dict)

        return transformed_score

    def post_apply_transformation(
        self,
        eval_result: EvalResult,
        transformed_score: Any,
        final_settings: EvalSettings,
    ):
        """Wrap transformed score in EvalResult."""
        init_method = "transform: " + eval_result.init_method

        transformed_result = EvalResult(
            transformed_score, init_method=init_method, prev_result=eval_result
        )

        transformed_result.weight = final_settings.weight

        return transformed_result

    async def async_apply_transformation(
        self,
        eval_result: EvalResult,
        eval_score: Any,
        final_settings: EvalSettings,
    ) -> tuple[EvalResult, Any]:
        """Asynchronously apply transformation to result."""

        if not final_settings.transform:
            return eval_result, eval_score

        transformed_score = self.pre_apply_transformation(
            eval_result, eval_score, final_settings
        )

        transformed_score = await evaluator.aresolve_pipeline(
            transformed_score, eval_score
        )

        transformed_result = self.post_apply_transformation(
            eval_result, transformed_score, final_settings
        )

        return transformed_result, transformed_score

    def sync_apply_transformation(
        self,
        eval_result: EvalResult,
        eval_score: Any,
        final_settings: EvalSettings,
    ) -> tuple[EvalResult, Any]:
        """Synchronously apply transformation to result."""

        if not final_settings.transform:
            return eval_result, eval_score

        transformed_score = self.pre_apply_transformation(
            eval_result, eval_score, final_settings
        )

        transformed_score = evaluator.resolve_pipeline(transformed_score, eval_score)

        transformed_result = self.post_apply_transformation(
            eval_result, transformed_score, final_settings
        )

        return transformed_result, transformed_score

    # ------------------------------------------------------------------------------

    # ------------------------------------------------------------------------------
    # CHECKER
    def pre_run_checker(
        self,
        eval_result: EvalResult,
        eval_score: Any,
        final_settings: EvalSettings,
    ) -> bool:
        """Evaluate checker expression against score."""

        checker_expr = str(final_settings.checker)

        locals_dict = {
            "value": eval_score,
            "result": eval_result,
            "target": final_settings.target,
        }

        # evaluate checker
        checker_score = eval(checker_expr, evaluator.all_evaluators, locals_dict)

        return checker_score

    def post_run_checker(
        self,
        eval_result: EvalResult,
        eval_score: Any,
        final_settings: EvalSettings,
        checker_score: Any = None,
    ) -> bool:
        """Process checker result and optionally run assertions."""

        if final_settings.asserts:
            assert checker_score, (
                f"Assertion failed: score {eval_score} is not in range {final_settings.target}"
            )

        init_method = "checker: " + eval_result.init_method

        checker_result = EvalResult(
            checker_score, init_method=init_method, prev_result=eval_result
        )

        return checker_result

    def run_asserts(
        self,
        eval_score: Any,
        final_settings: EvalSettings,
    ) -> bool:

        if final_settings.target is None:
            failure_message = f"Assertion failed: score {eval_score}"
        else:
            failure_message = f"Assertion failed: score {eval_score} is not in range {final_settings.target}"

        assert eval_score, failure_message

    def run_checker(
        self,
        eval_result: EvalResult,
        eval_score: Any,
        final_settings: EvalSettings,
    ) -> bool:
        """Synchronously run checker logic on evaluation result."""

        if not final_settings.checker:
            if not final_settings.asserts:
                return eval_result, eval_score

            self.run_asserts(eval_score, final_settings)

            return eval_result, eval_score

        checker_score = self.pre_run_checker(eval_result, eval_score, final_settings)

        checker_score = evaluator.resolve_pipeline(
            checker_score, eval_score, final_settings.target
        )

        checker_result = self.post_run_checker(
            eval_result, eval_score, final_settings, checker_score
        )

        return checker_result, checker_score

    async def arun_checker(
        self,
        eval_result: EvalResult,
        eval_score: Any,
        final_settings: EvalSettings,
    ) -> bool:
        """Asynchronously run checker logic on evaluation result."""

        if not final_settings.checker:
            if not final_settings.asserts:
                return eval_result, eval_score

            self.run_asserts(eval_score, final_settings)

            return eval_result, eval_score

        checker_score = self.pre_run_checker(eval_result, eval_score, final_settings)

        checker_score = await evaluator.aresolve_pipeline(
            checker_score, eval_score, final_settings.target
        )

        checker_result = self.post_run_checker(
            eval_result, eval_score, final_settings, checker_score
        )

        return checker_result, checker_score

    # ------------------------------------------------------------------------------

    # ------------------------------------------------------------------------------
    # WRAPPING
    @staticmethod
    def parse_wraps(wraps: str | dict | None | Any):
        """Parse wraps parameter into evaluator name and settings."""
        if wraps is None:
            return None, {}

        if isinstance(wraps, str):
            return wraps, {}
        elif isinstance(wraps, dict):
            # assert that there is a single key of type str
            assert len(wraps) == 1 and isinstance(list(wraps.keys())[0], str), (
                "wraps must be a single key of type str"
            )

            wrapped_eval_name = list(wraps.keys())[0]
            wrapped_eval_settings_kwargs = wraps[wrapped_eval_name]
            return wrapped_eval_name, wrapped_eval_settings_kwargs
        else:
            raise ValueError(f"Invalid wraps type: {type(wraps)}")

    @staticmethod
    def create_wrapper(
        base_callable: "evaluator",
        wrapped_eval_settings: EvalSettings,
        wrapped_eval_kwargs: dict,
        wrapper_name: str,
    ) -> Callable:
        """
        Create a wrapper function for an evaluator, given the base evaluator,
        the wrapped evaluator settings, the wrapped evaluator kwargs, and the wrapper name.

        The wrapped_eval / base_callable's settings and kwargs update any previous settings and kwargs.
        The wrapper's settings do NOT update the wrapped evaluator's settings.
        The wrapper's kwargs DO update the wrapped evaluator's kwargs.

        The final settings and kwargs are passed into the wrapped evaluator during calltime.
        Due to the ordering of the dict unpacking, the wrapper's kwargs will update the
        wrapped evaluator's kwargs. The settings are also passed as kwargs into the base callable.

        Args:
            base_callable (evaluator): The base evaluator to be wrapped.
            wrapped_eval_settings (EvalSettings): Settings for the wrapped evaluator.
            wrapped_eval_kwargs (dict): Additional keyword arguments for the wrapped evaluator.
            wrapper_name (str): Name for the wrapper function.

        Returns:
            Callable: A wrapper function that calls the base evaluator with the provided settings and arguments.
        """

        base_callable_settings = wrapped_eval_settings.copy()
        base_callable_kwargs = wrapped_eval_kwargs.copy()

        if asyncio.iscoroutinefunction(base_callable.func):

            async def afunc(*args, **kwargs):
                return await base_callable(
                    *args,
                    **{
                        **base_callable_settings.dict(),
                        **base_callable_kwargs,
                        **kwargs,
                    },
                )

            afunc.__name__ = wrapper_name
            return afunc

        def func(*args, **kwargs):
            return base_callable(
                *args,
                **{**base_callable_settings.dict(), **base_callable_kwargs, **kwargs},
            )

        func.__name__ = wrapper_name
        return func

    @staticmethod
    def create_wrapped_evaluator(evaluator_settings: EvaluatorSettings) -> None:

        wrapper_name = evaluator_settings.name
        wrapper_settings = evaluator_settings.resolve_settings()
        wrapper_kwargs = evaluator_settings.resolve_kwargs()

        # parse the wrapped evaluator
        wrapped_eval_name, wrapped_eval_settings_kwargs = evaluator.parse_wraps(
            wrapper_settings.wraps
        )
        assert isinstance(wrapped_eval_name, str), (
            f"wrapped evaluator name must be a string but got: {type(wrapped_eval_name)}"
        )

        wrapped_eval_settings, wrapped_eval_kwargs = (
            EvalSettings.extract_eval_settings_and_kwargs(wrapped_eval_settings_kwargs)
        )

        # create the wrapped evaluator settings if not already created
        # this can happen if the wrapped evaluator is not defined in any config file
        if wrapped_eval_name not in evaluator.all_evaluator_settings:
            evaluator.all_evaluator_settings[wrapped_eval_name] = EvaluatorSettings(
                name=wrapped_eval_name
            )

        # get the wrapped evaluator. It must be a registered evaluator
        base_callable = eval(
            wrapped_eval_name,
            evaluator.all_evaluators,
        )

        # TODO: if the base callable is not a Callable but
        # not an evaluator, we need to wrap it as well
        assert isinstance(base_callable, evaluator)

        # update the wrapped evaluator settings with the wrapper settings
        final_wrapped_eval_settings = (
            evaluator.all_evaluator_settings[wrapped_eval_name]
            .resolve_settings()
            .copy()
        )
        final_wrapped_eval_settings.update(wrapped_eval_settings)

        # update the wrapped evaluator kwargs with the wrapper kwargs
        final_wrapped_eval_kwargs = (
            evaluator.all_evaluator_settings[wrapped_eval_name].resolve_kwargs().copy()
        )
        final_wrapped_eval_kwargs.update(wrapped_eval_kwargs)

        # the wrapper's kwargs merge with the wrapped evaluator's kwargs,
        # but the settings don't.
        final_wrapped_eval_kwargs.update(wrapper_kwargs)

        if asyncio.iscoroutinefunction(base_callable.func):
            afunc: Coroutine = evaluator.create_wrapper(
                base_callable,
                final_wrapped_eval_settings,
                final_wrapped_eval_kwargs,
                wrapper_name,
            )

            # initialize and register the wrapper eval
            aevaluator(func=afunc)

        else:
            # make the wrapper eval
            func: Callable = evaluator.create_wrapper(
                base_callable,
                final_wrapped_eval_settings,
                final_wrapped_eval_kwargs,
                wrapper_name,
            )

            # initialize and register the wrapper eval
            evaluator(func=func)

    # ------------------------------------------------------------------------------

    # ------------------------------------------------------------------------------
    # CALLING
    @staticmethod
    async def aresolve_pipeline(score, *args, **kwargs):
        """Asynchronously resolve pipeline of evaluators."""
        if asyncio.iscoroutinefunction(score) or isinstance(score, aevaluator):
            score = await score(*args, **kwargs)
        elif isinstance(score, Callable):
            score = score(*args, **kwargs)

        return score

    @staticmethod
    def resolve_pipeline(score, *args, **kwargs):
        """Synchronously resolve pipeline of evaluators."""
        # string evaluated to a function
        if isinstance(score, Callable) or isinstance(score, evaluator):
            score = score(*args, **kwargs)
        return score

    def get_final_settings_and_kwargs(self, call_kwargs):
        """Extract and merge final settings and kwargs for execution."""
        eval_settings, eval_kwargs = EvalSettings.extract_eval_settings_and_kwargs(
            call_kwargs
        )
        explicit_settings, explicit_kwargs = (
            EvalSettings.extract_eval_settings_and_kwargs(self.explicit_config)
        )

        # calltime copy
        final_settings = self.settings.copy()
        final_settings.update(eval_settings)
        final_settings.update(explicit_settings)

        final_kwargs = self.kwargs.copy()
        final_kwargs.update(eval_kwargs)
        final_kwargs.update(explicit_kwargs)

        return final_settings, final_kwargs

    @atrace(event_type="chain", event_name="Evaluation")
    async def async_call(self, *call_args, **call_kwargs):

        final_settings, final_kwargs = self.get_final_settings_and_kwargs(call_kwargs)

        async def asingle_evaluation() -> tuple[EvalResult, Any]:

            # run the evaluator
            score = await atrace(self.func)(*call_args, **final_kwargs)

            result = EvalResult(
                score=score,
                init_method=self.name,
                eval_settings=final_settings,
                eval_kwargs=final_kwargs,
                func_impl=f"{self.func.__module__}.{self.func.__name__}",
                func_args=call_args,
                func_kwargs=call_kwargs,
            )

            enrich_span(
                inputs={
                    "score": score,
                },
                outputs={
                    "result": result,
                },
                config={
                    "final_settings": str(final_settings),
                    "final_kwargs": final_kwargs,
                },
                metadata={
                    "func": self.name,
                    "func_impl": f"{self.func.__module__}.{self.func.__name__}",
                    "func_args": call_args,
                    "func_kwargs": call_kwargs,
                },
            )

            # transform
            (
                transformed_result,
                transformed_score,
            ) = await self.async_apply_transformation(result, score, final_settings)

            # check target on transform if aggregate not defined
            if not final_settings.aggregate:
                checker_result, checker_score = await self.arun_checker(
                    eval_result=transformed_result,
                    eval_score=transformed_score,
                    final_settings=final_settings,
                )

                return checker_result, checker_score

            return transformed_result, transformed_score

        # execute repetition
        if final_settings.repeat:
            # Parallel evaluation
            results_scores = await asyncio.gather(
                *(asingle_evaluation() for _ in range(final_settings.repeat))
            )
            results, scores = zip(*results_scores)
            results = tuple(results)
            scores = tuple(scores)
        else:
            result, score = await asingle_evaluation()
            results = (result,)
            scores = (score,)

        # apply aggregation
        aggregate_result, aggregate_score = await self.async_apply_aggregation(
            results, scores, final_settings
        )

        # check target on aggregate if aggregate defined
        if final_settings.aggregate:
            checker_result, checker_score = await self.arun_checker(
                eval_result=aggregate_result,
                eval_score=aggregate_score,
                final_settings=final_settings,
            )

            return checker_result, checker_score

        return aggregate_result, aggregate_score

    @trace(event_type="chain", event_name="Evaluation")
    def sync_call(self, *call_args, **call_kwargs):

        final_settings, final_kwargs = self.get_final_settings_and_kwargs(call_kwargs)

        def single_evaluation() -> tuple[EvalResult, Any]:

            # run the evaluator
            score = self.func(*call_args, **final_kwargs)

            result = EvalResult(
                score=score,
                init_method=self.name,
                eval_settings=final_settings,
                eval_kwargs=final_kwargs,
                func_impl=f"{self.func.__module__}.{self.func.__name__}",
                func_args=call_args,
                func_kwargs=call_kwargs,
            )

            enrich_span(
                inputs={
                    "score": score,
                },
                outputs={
                    "result": result,
                },
                config={
                    "final_settings": str(final_settings),
                    "final_kwargs": final_kwargs,
                },
                metadata={
                    "func": self.name,
                    "func_impl": f"{self.func.__module__}.{self.func.__name__}",
                    "func_args": call_args,
                    "func_kwargs": call_kwargs,
                },
            )

            # transform
            transformed_result, transformed_score = self.sync_apply_transformation(
                result, score, final_settings
            )

            # check target on transform if aggregate not defined
            if not final_settings.aggregate:
                checker_result, checker_score = self.run_checker(
                    eval_result=transformed_result,
                    eval_score=transformed_score,
                    final_settings=final_settings,
                )

                return checker_result, checker_score

            return transformed_result, transformed_score

        # execute repetition
        # TODO: add option for sequential evaluation since thread pools may not work for asyncio
        if final_settings.repeat:
            # Serial evaluation
            with concurrent.futures.ThreadPoolExecutor() as executor:
                futures = [
                    executor.submit(single_evaluation)
                    for _ in range(final_settings.repeat)
                ]
                results, scores = zip(*[future.result() for future in futures])
            results = tuple(results)
            scores = tuple(scores)
        else:
            result, score = single_evaluation()
            results = (result,)
            scores = (score,)

        # apply aggregation
        aggregate_result, aggregate_score = self.sync_apply_aggregation(
            results, scores, final_settings
        )

        # check target on aggregate if aggregate defined
        if final_settings.aggregate:
            checker_result, checker_score = self.run_checker(
                eval_result=aggregate_result,
                eval_score=aggregate_score,
                final_settings=final_settings,
            )

            return checker_result, checker_score

        return aggregate_result, aggregate_score

    def __call__(self, *args, **kwargs) -> Any:

        # RUN EVALUATOR
        results, scores = None, None
        assert not asyncio.iscoroutinefunction(self.func), (
            "please use @aevaluator instead of @evaluator for this function"
        )
        results, scores = self.sync_call(*args, **kwargs)

        return scores

    async def __acall__(self, *args, **kwargs) -> Any:

        # RUN EVALUATOR
        results, scores = None, None
        if asyncio.iscoroutinefunction(self.func):
            results, scores = await self.async_call(*args, **kwargs)
        else:
            results, scores = self.sync_call(*args, **kwargs)
        return scores

    def raw(self, *args, **kwargs):
        """Execute wrapped function without evaluator pipeline."""
        return self.func(*args, **kwargs)

    async def araw(self, *args, **kwargs):
        """Asynchronously execute wrapped function without pipeline."""
        return await self.func(*args, **kwargs)

    # ------------------------------------------------------------------------------
    # PROPERTIES
    # ------------------------------------------------------------------------------

    @property
    def settings(self) -> EvalSettings:
        if self.name not in evaluator.all_evaluator_settings:
            evaluator.all_evaluator_settings[self.name] = EvaluatorSettings(
                name=self.name
            )
        return evaluator.all_evaluator_settings[self.name].resolve_settings()

    @settings.setter
    def settings(self, value: EvalSettings):
        assert isinstance(value, EvalSettings)
        if self.name not in evaluator.all_evaluator_settings:
            evaluator.all_evaluator_settings[self.name] = EvaluatorSettings(
                name=self.name
            )
        evaluator.all_evaluator_settings[self.name].explicit_settings = value

    @property
    def kwargs(self) -> dict[str, Any]:
        if self.name not in evaluator.all_evaluator_settings:
            evaluator.all_evaluator_settings[self.name] = EvaluatorSettings(
                name=self.name
            )
        return evaluator.all_evaluator_settings[self.name].resolve_kwargs()

    @kwargs.setter
    def kwargs(self, value: dict):
        assert isinstance(value, dict)
        if self.name not in evaluator.all_evaluator_settings:
            evaluator.all_evaluator_settings[self.name] = EvaluatorSettings(
                name=self.name
            )
        evaluator.all_evaluator_settings[self.name].explicit_kwargs = value

    @property
    def config(self) -> dict[str, Any]:
        if self.explicit_config is None:
            self.explicit_config = {**self.settings.dict(), **self.kwargs}
        return self.explicit_config

    @config.setter
    def config(self, value: Any):
        if value is None:
            self.explicit_config = None
        else:
            raise NotImplementedError

    # ------------------------------------------------------------------------------
    # ACCESSORS
    # ------------------------------------------------------------------------------

    @classmethod
    def _validate_key(cls, key: str | Callable | None) -> str:
        """Validate and normalize evaluator registry key."""
        if isinstance(key, str):
            return key
        elif isinstance(key, Callable):
            if hasattr(key, "__name__"):
                return key.__name__
            else:
                return str(key)
        else:
            raise KeyError(f"Invalid key type: {type(key)}")

    @classmethod
    def __class_getitem__(cls, keys):
        """
        Get an evaluator by name or Callable.
        """

        if isinstance(keys, (str, Callable)):
            # Ensure that key is a string
            key: str = cls._validate_key(keys)
            if key in cls.all_evaluators:
                return cls.all_evaluators[key]
            elif key in cls.all_evaluator_settings:
                # if the evaluator is wrapped, initialize and register the wrapper
                if (evaluator_settings := cls.all_evaluator_settings[key]) is not None:
                    # initialize and register the wrapper
                    evaluator.create_wrapped_evaluator(evaluator_settings)
                    return cls.all_evaluators[key]
            else:
                raise KeyError(f"Key '{key}' not found in evaluators or config.")
        elif isinstance(keys, tuple):
            # Multiple key access
            return [cls.__class_getitem__(key) for key in keys]
        else:
            raise KeyError(f"Invalid key type: {type(keys)}")

    @classmethod
    def __class_setitem__(cls, key, value):
        key = cls._validate_key(key)
        cls.all_evaluators[key] = value

    @classmethod
    def __class_delitem__(cls, key):
        key = cls._validate_key(key)
        del cls.all_evaluators[key]

    @property
    def __code__(self):
        return self.func.__code__

all_evaluators class-attribute instance-attribute

all_evaluators: dict[
    str, evaluator | Callable | Coroutine | aevaluator
] = dict()

all_evaluator_settings class-attribute instance-attribute

all_evaluator_settings: dict[str, EvaluatorSettings] = (
    dict()
)

func instance-attribute

func: Callable = func

name instance-attribute

name: str = (
    __name__ if hasattr(func, "__name__") else str(func)
)

explicit_config instance-attribute

explicit_config = None

settings property writable

settings: EvalSettings

kwargs property writable

kwargs: dict[str, Any]

config property writable

config: dict[str, Any]

pre_apply_aggregation

pre_apply_aggregation(
    eval_results: tuple[EvalResult] | list[EvalResult],
    eval_scores: tuple | list,
    final_settings: EvalSettings,
) -> tuple[EvalResult, Any] | Coroutine
Source code in src/honeyhive/experiments/evaluators.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def pre_apply_aggregation(
    self,
    eval_results: tuple[EvalResult] | list[EvalResult],
    eval_scores: tuple | list,
    final_settings: EvalSettings,
) -> tuple[EvalResult, Any] | Coroutine:

    locals_dict = {"values": eval_scores, "results": eval_results}

    # TODO: enable aggregate to be a function
    aggregation_expr = str(final_settings.aggregate)

    # apply aggregation
    aggregate_score = eval(aggregation_expr, evaluator.all_evaluators, locals_dict)

    return aggregate_score

post_apply_aggregation

post_apply_aggregation(
    eval_results: (
        tuple[EvalResult] | list[EvalResult] | EvalResult
    ),
    aggregate_score: Any,
    final_settings: EvalSettings,
)

Wrap aggregated score in EvalResult.

Source code in src/honeyhive/experiments/evaluators.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def post_apply_aggregation(
    self,
    eval_results: tuple[EvalResult] | list[EvalResult] | EvalResult,
    aggregate_score: Any,
    final_settings: EvalSettings,
):
    """Wrap aggregated score in EvalResult."""
    init_methods = set()

    # if no repetitions, we will only have one eval result
    if isinstance(eval_results, EvalResult):
        init_methods.add(eval_results.init_method)
    else:
        for eval_result in eval_results:
            if isinstance(eval_result, EvalResult):
                init_methods.add(eval_result.init_method)

    init_method = "aggregate: "
    if len(init_methods) > 0:
        init_method += "-".join(init_methods)

    aggregate_result = EvalResult(
        aggregate_score, init_method=init_method, prev_results=eval_results
    )

    return aggregate_result

sync_apply_aggregation

sync_apply_aggregation(
    eval_results: (
        tuple[EvalResult] | list[EvalResult] | EvalResult
    ),
    eval_scores: tuple | list | Any,
    final_settings: EvalSettings,
) -> tuple[EvalResult, Any]

Synchronously apply aggregation to results.

Source code in src/honeyhive/experiments/evaluators.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def sync_apply_aggregation(
    self,
    eval_results: tuple[EvalResult] | list[EvalResult] | EvalResult,
    eval_scores: tuple | list | Any,
    final_settings: EvalSettings,
) -> tuple[EvalResult, Any]:
    """Synchronously apply aggregation to results."""

    if not final_settings.aggregate:
        return eval_results, eval_scores

    aggregate_score = self.pre_apply_aggregation(
        eval_results, eval_scores, final_settings
    )

    aggregate_score = evaluator.resolve_pipeline(aggregate_score, eval_scores)

    aggregate_result = self.post_apply_aggregation(
        eval_results, aggregate_score, final_settings
    )

    return aggregate_result, aggregate_score

async_apply_aggregation async

async_apply_aggregation(
    eval_results: tuple[EvalResult] | list[EvalResult],
    eval_scores: tuple | list,
    final_settings: EvalSettings,
) -> tuple[EvalResult, Any]

Asynchronously apply aggregation to results.

Source code in src/honeyhive/experiments/evaluators.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
async def async_apply_aggregation(
    self,
    eval_results: tuple[EvalResult] | list[EvalResult],
    eval_scores: tuple | list,
    final_settings: EvalSettings,
) -> tuple[EvalResult, Any]:
    """Asynchronously apply aggregation to results."""

    if not final_settings.aggregate:
        return eval_results, eval_scores

    aggregate_score = self.pre_apply_aggregation(
        eval_results, eval_scores, final_settings
    )

    aggregate_score = await evaluator.aresolve_pipeline(
        aggregate_score, eval_scores
    )

    aggregate_result = self.post_apply_aggregation(
        eval_results, aggregate_score, final_settings
    )

    return aggregate_result, aggregate_score

pre_apply_transformation

pre_apply_transformation(
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
)

Apply transformation expression to evaluation score.

Source code in src/honeyhive/experiments/evaluators.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def pre_apply_transformation(
    self,
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
):
    """Apply transformation expression to evaluation score."""

    transform_expr = str(final_settings.transform)

    locals_dict = {"value": eval_score, "result": eval_result}

    # apply transformation
    transformed_score = eval(transform_expr, evaluator.all_evaluators, locals_dict)

    return transformed_score

post_apply_transformation

post_apply_transformation(
    eval_result: EvalResult,
    transformed_score: Any,
    final_settings: EvalSettings,
)

Wrap transformed score in EvalResult.

Source code in src/honeyhive/experiments/evaluators.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
def post_apply_transformation(
    self,
    eval_result: EvalResult,
    transformed_score: Any,
    final_settings: EvalSettings,
):
    """Wrap transformed score in EvalResult."""
    init_method = "transform: " + eval_result.init_method

    transformed_result = EvalResult(
        transformed_score, init_method=init_method, prev_result=eval_result
    )

    transformed_result.weight = final_settings.weight

    return transformed_result

async_apply_transformation async

async_apply_transformation(
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
) -> tuple[EvalResult, Any]

Asynchronously apply transformation to result.

Source code in src/honeyhive/experiments/evaluators.py
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
async def async_apply_transformation(
    self,
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
) -> tuple[EvalResult, Any]:
    """Asynchronously apply transformation to result."""

    if not final_settings.transform:
        return eval_result, eval_score

    transformed_score = self.pre_apply_transformation(
        eval_result, eval_score, final_settings
    )

    transformed_score = await evaluator.aresolve_pipeline(
        transformed_score, eval_score
    )

    transformed_result = self.post_apply_transformation(
        eval_result, transformed_score, final_settings
    )

    return transformed_result, transformed_score

sync_apply_transformation

sync_apply_transformation(
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
) -> tuple[EvalResult, Any]

Synchronously apply transformation to result.

Source code in src/honeyhive/experiments/evaluators.py
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def sync_apply_transformation(
    self,
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
) -> tuple[EvalResult, Any]:
    """Synchronously apply transformation to result."""

    if not final_settings.transform:
        return eval_result, eval_score

    transformed_score = self.pre_apply_transformation(
        eval_result, eval_score, final_settings
    )

    transformed_score = evaluator.resolve_pipeline(transformed_score, eval_score)

    transformed_result = self.post_apply_transformation(
        eval_result, transformed_score, final_settings
    )

    return transformed_result, transformed_score

pre_run_checker

pre_run_checker(
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
) -> bool

Evaluate checker expression against score.

Source code in src/honeyhive/experiments/evaluators.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def pre_run_checker(
    self,
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
) -> bool:
    """Evaluate checker expression against score."""

    checker_expr = str(final_settings.checker)

    locals_dict = {
        "value": eval_score,
        "result": eval_result,
        "target": final_settings.target,
    }

    # evaluate checker
    checker_score = eval(checker_expr, evaluator.all_evaluators, locals_dict)

    return checker_score

post_run_checker

post_run_checker(
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
    checker_score: Any = None,
) -> bool

Process checker result and optionally run assertions.

Source code in src/honeyhive/experiments/evaluators.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
def post_run_checker(
    self,
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
    checker_score: Any = None,
) -> bool:
    """Process checker result and optionally run assertions."""

    if final_settings.asserts:
        assert checker_score, (
            f"Assertion failed: score {eval_score} is not in range {final_settings.target}"
        )

    init_method = "checker: " + eval_result.init_method

    checker_result = EvalResult(
        checker_score, init_method=init_method, prev_result=eval_result
    )

    return checker_result

run_asserts

run_asserts(
    eval_score: Any, final_settings: EvalSettings
) -> bool
Source code in src/honeyhive/experiments/evaluators.py
585
586
587
588
589
590
591
592
593
594
595
596
def run_asserts(
    self,
    eval_score: Any,
    final_settings: EvalSettings,
) -> bool:

    if final_settings.target is None:
        failure_message = f"Assertion failed: score {eval_score}"
    else:
        failure_message = f"Assertion failed: score {eval_score} is not in range {final_settings.target}"

    assert eval_score, failure_message

run_checker

run_checker(
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
) -> bool

Synchronously run checker logic on evaluation result.

Source code in src/honeyhive/experiments/evaluators.py
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
def run_checker(
    self,
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
) -> bool:
    """Synchronously run checker logic on evaluation result."""

    if not final_settings.checker:
        if not final_settings.asserts:
            return eval_result, eval_score

        self.run_asserts(eval_score, final_settings)

        return eval_result, eval_score

    checker_score = self.pre_run_checker(eval_result, eval_score, final_settings)

    checker_score = evaluator.resolve_pipeline(
        checker_score, eval_score, final_settings.target
    )

    checker_result = self.post_run_checker(
        eval_result, eval_score, final_settings, checker_score
    )

    return checker_result, checker_score

arun_checker async

arun_checker(
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
) -> bool

Asynchronously run checker logic on evaluation result.

Source code in src/honeyhive/experiments/evaluators.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
async def arun_checker(
    self,
    eval_result: EvalResult,
    eval_score: Any,
    final_settings: EvalSettings,
) -> bool:
    """Asynchronously run checker logic on evaluation result."""

    if not final_settings.checker:
        if not final_settings.asserts:
            return eval_result, eval_score

        self.run_asserts(eval_score, final_settings)

        return eval_result, eval_score

    checker_score = self.pre_run_checker(eval_result, eval_score, final_settings)

    checker_score = await evaluator.aresolve_pipeline(
        checker_score, eval_score, final_settings.target
    )

    checker_result = self.post_run_checker(
        eval_result, eval_score, final_settings, checker_score
    )

    return checker_result, checker_score

parse_wraps staticmethod

parse_wraps(wraps: str | dict | None | Any)

Parse wraps parameter into evaluator name and settings.

Source code in src/honeyhive/experiments/evaluators.py
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
@staticmethod
def parse_wraps(wraps: str | dict | None | Any):
    """Parse wraps parameter into evaluator name and settings."""
    if wraps is None:
        return None, {}

    if isinstance(wraps, str):
        return wraps, {}
    elif isinstance(wraps, dict):
        # assert that there is a single key of type str
        assert len(wraps) == 1 and isinstance(list(wraps.keys())[0], str), (
            "wraps must be a single key of type str"
        )

        wrapped_eval_name = list(wraps.keys())[0]
        wrapped_eval_settings_kwargs = wraps[wrapped_eval_name]
        return wrapped_eval_name, wrapped_eval_settings_kwargs
    else:
        raise ValueError(f"Invalid wraps type: {type(wraps)}")

create_wrapper staticmethod

create_wrapper(
    base_callable: evaluator,
    wrapped_eval_settings: EvalSettings,
    wrapped_eval_kwargs: dict,
    wrapper_name: str,
) -> Callable

Create a wrapper function for an evaluator, given the base evaluator, the wrapped evaluator settings, the wrapped evaluator kwargs, and the wrapper name.

The wrapped_eval / base_callable's settings and kwargs update any previous settings and kwargs. The wrapper's settings do NOT update the wrapped evaluator's settings. The wrapper's kwargs DO update the wrapped evaluator's kwargs.

The final settings and kwargs are passed into the wrapped evaluator during calltime. Due to the ordering of the dict unpacking, the wrapper's kwargs will update the wrapped evaluator's kwargs. The settings are also passed as kwargs into the base callable.

Parameters:

Name Type Description Default
base_callable evaluator

The base evaluator to be wrapped.

required
wrapped_eval_settings EvalSettings

Settings for the wrapped evaluator.

required
wrapped_eval_kwargs dict

Additional keyword arguments for the wrapped evaluator.

required
wrapper_name str

Name for the wrapper function.

required

Returns:

Name Type Description
Callable Callable

A wrapper function that calls the base evaluator with the provided settings and arguments.

Source code in src/honeyhive/experiments/evaluators.py
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
@staticmethod
def create_wrapper(
    base_callable: "evaluator",
    wrapped_eval_settings: EvalSettings,
    wrapped_eval_kwargs: dict,
    wrapper_name: str,
) -> Callable:
    """
    Create a wrapper function for an evaluator, given the base evaluator,
    the wrapped evaluator settings, the wrapped evaluator kwargs, and the wrapper name.

    The wrapped_eval / base_callable's settings and kwargs update any previous settings and kwargs.
    The wrapper's settings do NOT update the wrapped evaluator's settings.
    The wrapper's kwargs DO update the wrapped evaluator's kwargs.

    The final settings and kwargs are passed into the wrapped evaluator during calltime.
    Due to the ordering of the dict unpacking, the wrapper's kwargs will update the
    wrapped evaluator's kwargs. The settings are also passed as kwargs into the base callable.

    Args:
        base_callable (evaluator): The base evaluator to be wrapped.
        wrapped_eval_settings (EvalSettings): Settings for the wrapped evaluator.
        wrapped_eval_kwargs (dict): Additional keyword arguments for the wrapped evaluator.
        wrapper_name (str): Name for the wrapper function.

    Returns:
        Callable: A wrapper function that calls the base evaluator with the provided settings and arguments.
    """

    base_callable_settings = wrapped_eval_settings.copy()
    base_callable_kwargs = wrapped_eval_kwargs.copy()

    if asyncio.iscoroutinefunction(base_callable.func):

        async def afunc(*args, **kwargs):
            return await base_callable(
                *args,
                **{
                    **base_callable_settings.dict(),
                    **base_callable_kwargs,
                    **kwargs,
                },
            )

        afunc.__name__ = wrapper_name
        return afunc

    def func(*args, **kwargs):
        return base_callable(
            *args,
            **{**base_callable_settings.dict(), **base_callable_kwargs, **kwargs},
        )

    func.__name__ = wrapper_name
    return func

create_wrapped_evaluator staticmethod

create_wrapped_evaluator(
    evaluator_settings: EvaluatorSettings,
) -> None
Source code in src/honeyhive/experiments/evaluators.py
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
@staticmethod
def create_wrapped_evaluator(evaluator_settings: EvaluatorSettings) -> None:

    wrapper_name = evaluator_settings.name
    wrapper_settings = evaluator_settings.resolve_settings()
    wrapper_kwargs = evaluator_settings.resolve_kwargs()

    # parse the wrapped evaluator
    wrapped_eval_name, wrapped_eval_settings_kwargs = evaluator.parse_wraps(
        wrapper_settings.wraps
    )
    assert isinstance(wrapped_eval_name, str), (
        f"wrapped evaluator name must be a string but got: {type(wrapped_eval_name)}"
    )

    wrapped_eval_settings, wrapped_eval_kwargs = (
        EvalSettings.extract_eval_settings_and_kwargs(wrapped_eval_settings_kwargs)
    )

    # create the wrapped evaluator settings if not already created
    # this can happen if the wrapped evaluator is not defined in any config file
    if wrapped_eval_name not in evaluator.all_evaluator_settings:
        evaluator.all_evaluator_settings[wrapped_eval_name] = EvaluatorSettings(
            name=wrapped_eval_name
        )

    # get the wrapped evaluator. It must be a registered evaluator
    base_callable = eval(
        wrapped_eval_name,
        evaluator.all_evaluators,
    )

    # TODO: if the base callable is not a Callable but
    # not an evaluator, we need to wrap it as well
    assert isinstance(base_callable, evaluator)

    # update the wrapped evaluator settings with the wrapper settings
    final_wrapped_eval_settings = (
        evaluator.all_evaluator_settings[wrapped_eval_name]
        .resolve_settings()
        .copy()
    )
    final_wrapped_eval_settings.update(wrapped_eval_settings)

    # update the wrapped evaluator kwargs with the wrapper kwargs
    final_wrapped_eval_kwargs = (
        evaluator.all_evaluator_settings[wrapped_eval_name].resolve_kwargs().copy()
    )
    final_wrapped_eval_kwargs.update(wrapped_eval_kwargs)

    # the wrapper's kwargs merge with the wrapped evaluator's kwargs,
    # but the settings don't.
    final_wrapped_eval_kwargs.update(wrapper_kwargs)

    if asyncio.iscoroutinefunction(base_callable.func):
        afunc: Coroutine = evaluator.create_wrapper(
            base_callable,
            final_wrapped_eval_settings,
            final_wrapped_eval_kwargs,
            wrapper_name,
        )

        # initialize and register the wrapper eval
        aevaluator(func=afunc)

    else:
        # make the wrapper eval
        func: Callable = evaluator.create_wrapper(
            base_callable,
            final_wrapped_eval_settings,
            final_wrapped_eval_kwargs,
            wrapper_name,
        )

        # initialize and register the wrapper eval
        evaluator(func=func)

aresolve_pipeline async staticmethod

aresolve_pipeline(score, *args, **kwargs)

Asynchronously resolve pipeline of evaluators.

Source code in src/honeyhive/experiments/evaluators.py
815
816
817
818
819
820
821
822
823
@staticmethod
async def aresolve_pipeline(score, *args, **kwargs):
    """Asynchronously resolve pipeline of evaluators."""
    if asyncio.iscoroutinefunction(score) or isinstance(score, aevaluator):
        score = await score(*args, **kwargs)
    elif isinstance(score, Callable):
        score = score(*args, **kwargs)

    return score

resolve_pipeline staticmethod

resolve_pipeline(score, *args, **kwargs)

Synchronously resolve pipeline of evaluators.

Source code in src/honeyhive/experiments/evaluators.py
825
826
827
828
829
830
831
@staticmethod
def resolve_pipeline(score, *args, **kwargs):
    """Synchronously resolve pipeline of evaluators."""
    # string evaluated to a function
    if isinstance(score, Callable) or isinstance(score, evaluator):
        score = score(*args, **kwargs)
    return score

get_final_settings_and_kwargs

get_final_settings_and_kwargs(call_kwargs)

Extract and merge final settings and kwargs for execution.

Source code in src/honeyhive/experiments/evaluators.py
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
def get_final_settings_and_kwargs(self, call_kwargs):
    """Extract and merge final settings and kwargs for execution."""
    eval_settings, eval_kwargs = EvalSettings.extract_eval_settings_and_kwargs(
        call_kwargs
    )
    explicit_settings, explicit_kwargs = (
        EvalSettings.extract_eval_settings_and_kwargs(self.explicit_config)
    )

    # calltime copy
    final_settings = self.settings.copy()
    final_settings.update(eval_settings)
    final_settings.update(explicit_settings)

    final_kwargs = self.kwargs.copy()
    final_kwargs.update(eval_kwargs)
    final_kwargs.update(explicit_kwargs)

    return final_settings, final_kwargs

async_call async

async_call(*call_args, **call_kwargs)
Source code in src/honeyhive/experiments/evaluators.py
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
@atrace(event_type="chain", event_name="Evaluation")
async def async_call(self, *call_args, **call_kwargs):

    final_settings, final_kwargs = self.get_final_settings_and_kwargs(call_kwargs)

    async def asingle_evaluation() -> tuple[EvalResult, Any]:

        # run the evaluator
        score = await atrace(self.func)(*call_args, **final_kwargs)

        result = EvalResult(
            score=score,
            init_method=self.name,
            eval_settings=final_settings,
            eval_kwargs=final_kwargs,
            func_impl=f"{self.func.__module__}.{self.func.__name__}",
            func_args=call_args,
            func_kwargs=call_kwargs,
        )

        enrich_span(
            inputs={
                "score": score,
            },
            outputs={
                "result": result,
            },
            config={
                "final_settings": str(final_settings),
                "final_kwargs": final_kwargs,
            },
            metadata={
                "func": self.name,
                "func_impl": f"{self.func.__module__}.{self.func.__name__}",
                "func_args": call_args,
                "func_kwargs": call_kwargs,
            },
        )

        # transform
        (
            transformed_result,
            transformed_score,
        ) = await self.async_apply_transformation(result, score, final_settings)

        # check target on transform if aggregate not defined
        if not final_settings.aggregate:
            checker_result, checker_score = await self.arun_checker(
                eval_result=transformed_result,
                eval_score=transformed_score,
                final_settings=final_settings,
            )

            return checker_result, checker_score

        return transformed_result, transformed_score

    # execute repetition
    if final_settings.repeat:
        # Parallel evaluation
        results_scores = await asyncio.gather(
            *(asingle_evaluation() for _ in range(final_settings.repeat))
        )
        results, scores = zip(*results_scores)
        results = tuple(results)
        scores = tuple(scores)
    else:
        result, score = await asingle_evaluation()
        results = (result,)
        scores = (score,)

    # apply aggregation
    aggregate_result, aggregate_score = await self.async_apply_aggregation(
        results, scores, final_settings
    )

    # check target on aggregate if aggregate defined
    if final_settings.aggregate:
        checker_result, checker_score = await self.arun_checker(
            eval_result=aggregate_result,
            eval_score=aggregate_score,
            final_settings=final_settings,
        )

        return checker_result, checker_score

    return aggregate_result, aggregate_score

sync_call

sync_call(*call_args, **call_kwargs)
Source code in src/honeyhive/experiments/evaluators.py
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
@trace(event_type="chain", event_name="Evaluation")
def sync_call(self, *call_args, **call_kwargs):

    final_settings, final_kwargs = self.get_final_settings_and_kwargs(call_kwargs)

    def single_evaluation() -> tuple[EvalResult, Any]:

        # run the evaluator
        score = self.func(*call_args, **final_kwargs)

        result = EvalResult(
            score=score,
            init_method=self.name,
            eval_settings=final_settings,
            eval_kwargs=final_kwargs,
            func_impl=f"{self.func.__module__}.{self.func.__name__}",
            func_args=call_args,
            func_kwargs=call_kwargs,
        )

        enrich_span(
            inputs={
                "score": score,
            },
            outputs={
                "result": result,
            },
            config={
                "final_settings": str(final_settings),
                "final_kwargs": final_kwargs,
            },
            metadata={
                "func": self.name,
                "func_impl": f"{self.func.__module__}.{self.func.__name__}",
                "func_args": call_args,
                "func_kwargs": call_kwargs,
            },
        )

        # transform
        transformed_result, transformed_score = self.sync_apply_transformation(
            result, score, final_settings
        )

        # check target on transform if aggregate not defined
        if not final_settings.aggregate:
            checker_result, checker_score = self.run_checker(
                eval_result=transformed_result,
                eval_score=transformed_score,
                final_settings=final_settings,
            )

            return checker_result, checker_score

        return transformed_result, transformed_score

    # execute repetition
    # TODO: add option for sequential evaluation since thread pools may not work for asyncio
    if final_settings.repeat:
        # Serial evaluation
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [
                executor.submit(single_evaluation)
                for _ in range(final_settings.repeat)
            ]
            results, scores = zip(*[future.result() for future in futures])
        results = tuple(results)
        scores = tuple(scores)
    else:
        result, score = single_evaluation()
        results = (result,)
        scores = (score,)

    # apply aggregation
    aggregate_result, aggregate_score = self.sync_apply_aggregation(
        results, scores, final_settings
    )

    # check target on aggregate if aggregate defined
    if final_settings.aggregate:
        checker_result, checker_score = self.run_checker(
            eval_result=aggregate_result,
            eval_score=aggregate_score,
            final_settings=final_settings,
        )

        return checker_result, checker_score

    return aggregate_result, aggregate_score

raw

raw(*args, **kwargs)

Execute wrapped function without evaluator pipeline.

Source code in src/honeyhive/experiments/evaluators.py
1052
1053
1054
def raw(self, *args, **kwargs):
    """Execute wrapped function without evaluator pipeline."""
    return self.func(*args, **kwargs)

araw async

araw(*args, **kwargs)

Asynchronously execute wrapped function without pipeline.

Source code in src/honeyhive/experiments/evaluators.py
1056
1057
1058
async def araw(self, *args, **kwargs):
    """Asynchronously execute wrapped function without pipeline."""
    return await self.func(*args, **kwargs)

AggregatedMetrics

Bases: BaseModel

Aggregated metrics model for experiment results.

Supports the backend response format with a 'details' array containing MetricDetail objects.

Backend Response Format (per OpenAPI spec): { "aggregation_function": "average", "details": [ { "metric_name": "accuracy", "metric_type": "numeric", "event_name": "llm_call", "event_type": "model", "aggregate": 0.85, "values": [0.8, 0.9, 0.85], "datapoints": {"passed": [...], "failed": [...]} }, ... ] }

Example

metrics = AggregatedMetrics( ... aggregation_function="average", ... details=[{"metric_name": "accuracy", "aggregate": 0.85}] ... ) metrics.get_metric("accuracy") MetricDetail(metric_name='accuracy', aggregate=0.85, ...) metrics.list_metrics() ['accuracy']

Source code in src/honeyhive/experiments/models.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
class AggregatedMetrics(BaseModel):
    """
    Aggregated metrics model for experiment results.

    Supports the backend response format with a 'details' array containing
    MetricDetail objects.

    Backend Response Format (per OpenAPI spec):
    {
      "aggregation_function": "average",
      "details": [
        {
          "metric_name": "accuracy",
          "metric_type": "numeric",
          "event_name": "llm_call",
          "event_type": "model",
          "aggregate": 0.85,
          "values": [0.8, 0.9, 0.85],
          "datapoints": {"passed": [...], "failed": [...]}
        },
        ...
      ]
    }

    Example:
        >>> metrics = AggregatedMetrics(
        ...     aggregation_function="average",
        ...     details=[{"metric_name": "accuracy", "aggregate": 0.85}]
        ... )
        >>> metrics.get_metric("accuracy")
        MetricDetail(metric_name='accuracy', aggregate=0.85, ...)
        >>> metrics.list_metrics()
        ['accuracy']
    """

    aggregation_function: Optional[str] = Field(
        None, description="Aggregation function used (average, sum, min, max)"
    )

    details: List[MetricDetail] = Field(
        default_factory=list,
        description="List of metric details from backend",
    )

    # Required for Pydantic 2.12+ when using extra="allow"
    __pydantic_extra__: Dict[str, Any] = None

    # Allow extra fields for backward compatibility with dynamic metric keys
    model_config = ConfigDict(extra="allow")

    def get_metric(
        self, metric_name: str
    ) -> Optional[Union[MetricDetail, Dict[str, Any]]]:
        """
        Get a specific metric by name.

        Supports both the new 'details' array format (returns MetricDetail)
        and the legacy model_extra format (returns dict) for backward compatibility.

        Args:
            metric_name: Name of the metric to retrieve

        Returns:
            MetricDetail object (new format), dict (legacy format), or None if not found

        Example:
            >>> metrics.get_metric("accuracy")
            MetricDetail(metric_name='accuracy', aggregate=0.85, ...)
        """
        # First check the details array (new format)
        for metric in self.details:
            if metric.metric_name == metric_name:
                return metric
        # Fall back to model_extra (legacy format for backward compatibility)
        extra = self.model_extra or {}
        return extra.get(metric_name)

    def list_metrics(self) -> List[str]:
        """
        List all metric names in this result.

        Supports both the new 'details' array format and the legacy model_extra
        format for backward compatibility.

        Returns:
            List of metric names from details array or model_extra keys

        Example:
            >>> metrics.list_metrics()
            ['accuracy', 'latency', 'cost']
        """
        # First check the details array (new format)
        if self.details:
            # pylint: disable=not-an-iterable
            return [metric.metric_name for metric in self.details]
        # Fall back to model_extra (legacy format for backward compatibility)
        extra = self.model_extra or {}
        # Exclude known fields that aren't metrics
        return [k for k in extra.keys() if k not in ("aggregation_function",)]

    def get_all_metrics(self) -> Dict[str, Union[MetricDetail, Dict[str, Any]]]:
        """
        Get all metrics as a dictionary.

        Supports both the new 'details' array format (returns MetricDetail values)
        and the legacy model_extra format (returns dict values) for backward
        compatibility.

        Returns:
            Dictionary mapping metric names to MetricDetail objects or dicts

        Example:
            >>> metrics.get_all_metrics()
            {
                'accuracy': MetricDetail(metric_name='accuracy', aggregate=0.85, ...),
                'latency': MetricDetail(metric_name='latency', aggregate=120.5, ...)
            }
        """
        # First check the details array (new format)
        if self.details:
            # pylint: disable=not-an-iterable
            return {metric.metric_name: metric for metric in self.details}
        # Fall back to model_extra (legacy format for backward compatibility)
        extra = self.model_extra or {}
        # Exclude known fields that aren't metrics
        return {k: v for k, v in extra.items() if k not in ("aggregation_function",)}

aggregation_function class-attribute instance-attribute

aggregation_function: Optional[str] = Field(
    None,
    description="Aggregation function used (average, sum, min, max)",
)

details class-attribute instance-attribute

details: List[MetricDetail] = Field(
    default_factory=list,
    description="List of metric details from backend",
)

get_metric

get_metric(
    metric_name: str,
) -> Optional[Union[MetricDetail, Dict[str, Any]]]

Get a specific metric by name.

Supports both the new 'details' array format (returns MetricDetail) and the legacy model_extra format (returns dict) for backward compatibility.

Parameters:

Name Type Description Default
metric_name str

Name of the metric to retrieve

required

Returns:

Type Description
Optional[Union[MetricDetail, Dict[str, Any]]]

MetricDetail object (new format), dict (legacy format), or None if not found

Example

metrics.get_metric("accuracy") MetricDetail(metric_name='accuracy', aggregate=0.85, ...)

Source code in src/honeyhive/experiments/models.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def get_metric(
    self, metric_name: str
) -> Optional[Union[MetricDetail, Dict[str, Any]]]:
    """
    Get a specific metric by name.

    Supports both the new 'details' array format (returns MetricDetail)
    and the legacy model_extra format (returns dict) for backward compatibility.

    Args:
        metric_name: Name of the metric to retrieve

    Returns:
        MetricDetail object (new format), dict (legacy format), or None if not found

    Example:
        >>> metrics.get_metric("accuracy")
        MetricDetail(metric_name='accuracy', aggregate=0.85, ...)
    """
    # First check the details array (new format)
    for metric in self.details:
        if metric.metric_name == metric_name:
            return metric
    # Fall back to model_extra (legacy format for backward compatibility)
    extra = self.model_extra or {}
    return extra.get(metric_name)

list_metrics

list_metrics() -> List[str]

List all metric names in this result.

Supports both the new 'details' array format and the legacy model_extra format for backward compatibility.

Returns:

Type Description
List[str]

List of metric names from details array or model_extra keys

Example

metrics.list_metrics() ['accuracy', 'latency', 'cost']

Source code in src/honeyhive/experiments/models.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def list_metrics(self) -> List[str]:
    """
    List all metric names in this result.

    Supports both the new 'details' array format and the legacy model_extra
    format for backward compatibility.

    Returns:
        List of metric names from details array or model_extra keys

    Example:
        >>> metrics.list_metrics()
        ['accuracy', 'latency', 'cost']
    """
    # First check the details array (new format)
    if self.details:
        # pylint: disable=not-an-iterable
        return [metric.metric_name for metric in self.details]
    # Fall back to model_extra (legacy format for backward compatibility)
    extra = self.model_extra or {}
    # Exclude known fields that aren't metrics
    return [k for k in extra.keys() if k not in ("aggregation_function",)]

get_all_metrics

get_all_metrics() -> (
    Dict[str, Union[MetricDetail, Dict[str, Any]]]
)

Get all metrics as a dictionary.

Supports both the new 'details' array format (returns MetricDetail values) and the legacy model_extra format (returns dict values) for backward compatibility.

Returns:

Type Description
Dict[str, Union[MetricDetail, Dict[str, Any]]]

Dictionary mapping metric names to MetricDetail objects or dicts

Example

metrics.get_all_metrics() { 'accuracy': MetricDetail(metric_name='accuracy', aggregate=0.85, ...), 'latency': MetricDetail(metric_name='latency', aggregate=120.5, ...) }

Source code in src/honeyhive/experiments/models.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def get_all_metrics(self) -> Dict[str, Union[MetricDetail, Dict[str, Any]]]:
    """
    Get all metrics as a dictionary.

    Supports both the new 'details' array format (returns MetricDetail values)
    and the legacy model_extra format (returns dict values) for backward
    compatibility.

    Returns:
        Dictionary mapping metric names to MetricDetail objects or dicts

    Example:
        >>> metrics.get_all_metrics()
        {
            'accuracy': MetricDetail(metric_name='accuracy', aggregate=0.85, ...),
            'latency': MetricDetail(metric_name='latency', aggregate=120.5, ...)
        }
    """
    # First check the details array (new format)
    if self.details:
        # pylint: disable=not-an-iterable
        return {metric.metric_name: metric for metric in self.details}
    # Fall back to model_extra (legacy format for backward compatibility)
    extra = self.model_extra or {}
    # Exclude known fields that aren't metrics
    return {k: v for k, v in extra.items() if k not in ("aggregation_function",)}

ExperimentResultSummary

Bases: BaseModel

Aggregated experiment result from backend.

This model represents the complete result of an experiment run, including pass/fail status, aggregated metrics, and datapoint results.

Retrieved from: GET /runs/:run_id/result

Source code in src/honeyhive/experiments/models.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
class ExperimentResultSummary(BaseModel):
    """
    Aggregated experiment result from backend.

    This model represents the complete result of an experiment run,
    including pass/fail status, aggregated metrics, and datapoint results.

    Retrieved from: GET /runs/:run_id/result
    """

    run_id: str = Field(..., description="Experiment run identifier")

    status: str = Field(
        ..., description="Run status (pending, completed, running, failed, cancelled)"
    )

    success: bool = Field(..., description="Overall success status of the run")

    passed: List[str] = Field(
        default_factory=list, description="List of datapoint IDs that passed"
    )

    failed: List[str] = Field(
        default_factory=list, description="List of datapoint IDs that failed"
    )

    metrics: AggregatedMetrics = Field(
        ..., description="Aggregated metrics from backend"
    )

    datapoints: List[DatapointResult] = Field(
        default_factory=list,
        description="List of datapoint results from backend",
    )

    def print_table(self, run_name: Optional[str] = None) -> None:
        """
        Print evaluation results in a formatted table.

        Displays:
        - Run summary (ID, status, pass/fail counts)
        - Aggregated metrics
        - Per-datapoint details (if available)

        Args:
            run_name: Optional run name to display in table title

        Example:
            >>> result = evaluate(...)
            >>> result.print_table(run_name="My Experiment")
        """
        console = Console()

        # Print header
        title = f"Evaluation Results: {run_name or self.run_id}"
        console.print(f"\n{'=' * 80}")
        console.print(f"[bold yellow]{title}[/bold yellow]")
        console.print(f"{'=' * 80}\n")

        # Print summary
        status_emoji = "✅" if self.success else "❌"
        status_color = "green" if self.success else "red"

        console.print(f"[bold]Run ID:[/bold] {self.run_id}")
        status_text = (
            f"[bold]Status:[/bold] [{status_color}]"
            f"{status_emoji} {self.status}[/{status_color}]"
        )
        console.print(status_text)
        console.print(f"[bold]Passed:[/bold] {len(self.passed)}")
        console.print(f"[bold]Failed:[/bold] {len(self.failed)}")
        console.print()

        # Print aggregated metrics table
        metric_names = self.metrics.list_metrics()  # pylint: disable=no-member

        if metric_names:
            metrics_table = Table(
                title="Aggregated Metrics",
                show_lines=False,
                title_style=Style(color="cyan", bold=True),
            )
            metrics_table.add_column(
                "Metric", justify="left", style="magenta", no_wrap=True
            )
            metrics_table.add_column("Value", justify="right", style="green")
            metrics_table.add_column("Type", justify="center", style="blue")

            for metric_name in sorted(metric_names):
                # pylint: disable=no-member
                metric_data = self.metrics.get_metric(metric_name)
                if metric_data is not None:
                    # Handle both MetricDetail objects (new format) and dicts (legacy)
                    if isinstance(metric_data, MetricDetail):
                        aggregate_value = metric_data.aggregate
                        metric_type = metric_data.metric_type or "unknown"
                    elif isinstance(metric_data, dict):
                        aggregate_value = metric_data.get("aggregate")
                        metric_type = metric_data.get("metric_type", "unknown")
                    else:
                        aggregate_value = None
                        metric_type = "unknown"

                    # Format value based on type
                    if aggregate_value is None:
                        value_str = "N/A"
                    elif isinstance(aggregate_value, float):
                        value_str = f"{aggregate_value:.4f}"
                    else:
                        value_str = str(aggregate_value)

                    metrics_table.add_row(metric_name, value_str, metric_type)

            console.print(metrics_table)
            console.print()

        # Print per-datapoint summary if available
        if self.datapoints:
            datapoints_table = Table(
                title=f"Datapoint Results ({len(self.datapoints)} total)",
                show_lines=False,
                title_style=Style(color="cyan", bold=True),
            )
            datapoints_table.add_column(
                "Datapoint ID", justify="left", style="blue", no_wrap=False
            )
            datapoints_table.add_column(
                "Session ID", justify="left", style="blue", no_wrap=False
            )
            datapoints_table.add_column("Status", justify="center", style="green")

            for datapoint in self.datapoints[:20]:  # Limit to first 20 for display
                dp_id = datapoint.datapoint_id or "N/A"
                session_id = datapoint.session_id or "N/A"
                passed = datapoint.passed

                if passed is True:
                    status = "[green]✅ Passed[/green]"
                elif passed is False:
                    status = "[red]❌ Failed[/red]"
                else:
                    status = "❓ Unknown"

                datapoints_table.add_row(dp_id, session_id, status)

            console.print(datapoints_table)

            if len(self.datapoints) > 20:
                msg = (
                    f"\n[dim](Showing first 20 of "
                    f"{len(self.datapoints)} datapoints)[/dim]"
                )
                console.print(msg)

            console.print()

        console.print(f"{'=' * 80}\n")

run_id class-attribute instance-attribute

run_id: str = Field(
    ..., description="Experiment run identifier"
)

status class-attribute instance-attribute

status: str = Field(
    ...,
    description="Run status (pending, completed, running, failed, cancelled)",
)

success class-attribute instance-attribute

success: bool = Field(
    ..., description="Overall success status of the run"
)

passed class-attribute instance-attribute

passed: List[str] = Field(
    default_factory=list,
    description="List of datapoint IDs that passed",
)

failed class-attribute instance-attribute

failed: List[str] = Field(
    default_factory=list,
    description="List of datapoint IDs that failed",
)

metrics class-attribute instance-attribute

metrics: AggregatedMetrics = Field(
    ..., description="Aggregated metrics from backend"
)

datapoints class-attribute instance-attribute

datapoints: List[DatapointResult] = Field(
    default_factory=list,
    description="List of datapoint results from backend",
)

print_table

print_table(run_name: Optional[str] = None) -> None

Print evaluation results in a formatted table.

Displays: - Run summary (ID, status, pass/fail counts) - Aggregated metrics - Per-datapoint details (if available)

Parameters:

Name Type Description Default
run_name Optional[str]

Optional run name to display in table title

None
Example

result = evaluate(...) result.print_table(run_name="My Experiment")

Source code in src/honeyhive/experiments/models.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def print_table(self, run_name: Optional[str] = None) -> None:
    """
    Print evaluation results in a formatted table.

    Displays:
    - Run summary (ID, status, pass/fail counts)
    - Aggregated metrics
    - Per-datapoint details (if available)

    Args:
        run_name: Optional run name to display in table title

    Example:
        >>> result = evaluate(...)
        >>> result.print_table(run_name="My Experiment")
    """
    console = Console()

    # Print header
    title = f"Evaluation Results: {run_name or self.run_id}"
    console.print(f"\n{'=' * 80}")
    console.print(f"[bold yellow]{title}[/bold yellow]")
    console.print(f"{'=' * 80}\n")

    # Print summary
    status_emoji = "✅" if self.success else "❌"
    status_color = "green" if self.success else "red"

    console.print(f"[bold]Run ID:[/bold] {self.run_id}")
    status_text = (
        f"[bold]Status:[/bold] [{status_color}]"
        f"{status_emoji} {self.status}[/{status_color}]"
    )
    console.print(status_text)
    console.print(f"[bold]Passed:[/bold] {len(self.passed)}")
    console.print(f"[bold]Failed:[/bold] {len(self.failed)}")
    console.print()

    # Print aggregated metrics table
    metric_names = self.metrics.list_metrics()  # pylint: disable=no-member

    if metric_names:
        metrics_table = Table(
            title="Aggregated Metrics",
            show_lines=False,
            title_style=Style(color="cyan", bold=True),
        )
        metrics_table.add_column(
            "Metric", justify="left", style="magenta", no_wrap=True
        )
        metrics_table.add_column("Value", justify="right", style="green")
        metrics_table.add_column("Type", justify="center", style="blue")

        for metric_name in sorted(metric_names):
            # pylint: disable=no-member
            metric_data = self.metrics.get_metric(metric_name)
            if metric_data is not None:
                # Handle both MetricDetail objects (new format) and dicts (legacy)
                if isinstance(metric_data, MetricDetail):
                    aggregate_value = metric_data.aggregate
                    metric_type = metric_data.metric_type or "unknown"
                elif isinstance(metric_data, dict):
                    aggregate_value = metric_data.get("aggregate")
                    metric_type = metric_data.get("metric_type", "unknown")
                else:
                    aggregate_value = None
                    metric_type = "unknown"

                # Format value based on type
                if aggregate_value is None:
                    value_str = "N/A"
                elif isinstance(aggregate_value, float):
                    value_str = f"{aggregate_value:.4f}"
                else:
                    value_str = str(aggregate_value)

                metrics_table.add_row(metric_name, value_str, metric_type)

        console.print(metrics_table)
        console.print()

    # Print per-datapoint summary if available
    if self.datapoints:
        datapoints_table = Table(
            title=f"Datapoint Results ({len(self.datapoints)} total)",
            show_lines=False,
            title_style=Style(color="cyan", bold=True),
        )
        datapoints_table.add_column(
            "Datapoint ID", justify="left", style="blue", no_wrap=False
        )
        datapoints_table.add_column(
            "Session ID", justify="left", style="blue", no_wrap=False
        )
        datapoints_table.add_column("Status", justify="center", style="green")

        for datapoint in self.datapoints[:20]:  # Limit to first 20 for display
            dp_id = datapoint.datapoint_id or "N/A"
            session_id = datapoint.session_id or "N/A"
            passed = datapoint.passed

            if passed is True:
                status = "[green]✅ Passed[/green]"
            elif passed is False:
                status = "[red]❌ Failed[/red]"
            else:
                status = "❓ Unknown"

            datapoints_table.add_row(dp_id, session_id, status)

        console.print(datapoints_table)

        if len(self.datapoints) > 20:
            msg = (
                f"\n[dim](Showing first 20 of "
                f"{len(self.datapoints)} datapoints)[/dim]"
            )
            console.print(msg)

        console.print()

    console.print(f"{'=' * 80}\n")

ExperimentRunStatus

Bases: str, Enum

Extended status enum with all backend values.

The generated Status enum only includes 'pending' and 'completed', but the backend supports additional states.

Source code in src/honeyhive/experiments/models.py
25
26
27
28
29
30
31
32
33
34
35
36
37
class ExperimentRunStatus(str, Enum):
    """
    Extended status enum with all backend values.

    The generated Status enum only includes 'pending' and 'completed',
    but the backend supports additional states.
    """

    PENDING = "pending"
    COMPLETED = "completed"
    RUNNING = "running"
    FAILED = "failed"
    CANCELLED = "cancelled"

PENDING class-attribute instance-attribute

PENDING = 'pending'

COMPLETED class-attribute instance-attribute

COMPLETED = 'completed'

RUNNING class-attribute instance-attribute

RUNNING = 'running'

FAILED class-attribute instance-attribute

FAILED = 'failed'

CANCELLED class-attribute instance-attribute

CANCELLED = 'cancelled'

RunComparisonResult

Bases: BaseModel

Comparison between two experiment runs.

This model represents the delta analysis between a new run and an old run, including metric changes and datapoint differences.

Retrieved from: GET /runs/:new_run_id/compare-with/:old_run_id

Source code in src/honeyhive/experiments/models.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
class RunComparisonResult(BaseModel):
    """
    Comparison between two experiment runs.

    This model represents the delta analysis between a new run and an old run,
    including metric changes and datapoint differences.

    Retrieved from: GET /runs/:new_run_id/compare-with/:old_run_id
    """

    new_run_id: str = Field(..., description="New experiment run identifier")

    old_run_id: str = Field(..., description="Old experiment run identifier")

    common_datapoints: int = Field(
        ..., description="Number of datapoints common to both runs"
    )

    new_only_datapoints: int = Field(
        default=0, description="Number of datapoints only in new run"
    )

    old_only_datapoints: int = Field(
        default=0, description="Number of datapoints only in old run"
    )

    metric_deltas: Dict[str, Any] = Field(
        default_factory=dict, description="Metric name to delta information mapping"
    )

    def get_metric_delta(self, metric_name: str) -> Optional[Dict[str, Any]]:
        """
        Get delta information for a specific metric.

        Args:
            metric_name: Name of the metric

        Returns:
            Delta information including new_value, old_value, delta, percent_change

        Example:
            >>> comparison.get_metric_delta("accuracy")
            {
                'new_value': 0.85,
                'old_value': 0.80,
                'delta': 0.05,
                'percent_change': 6.25
            }
        """
        return self.metric_deltas.get(metric_name)  # pylint: disable=no-member

    def list_improved_metrics(self) -> List[str]:
        """
        List metrics that improved in the new run.

        Returns:
            List of metric names where improved_count > 0
        """
        improved = []
        for (
            metric_name,
            delta_info,
        ) in self.metric_deltas.items():  # pylint: disable=no-member
            if isinstance(delta_info, dict) and delta_info.get("improved_count", 0) > 0:
                improved.append(metric_name)
        return improved

    def list_degraded_metrics(self) -> List[str]:
        """
        List metrics that degraded in the new run.

        Returns:
            List of metric names where degraded_count > 0
        """
        degraded = []
        for (
            metric_name,
            delta_info,
        ) in self.metric_deltas.items():  # pylint: disable=no-member
            if isinstance(delta_info, dict) and delta_info.get("degraded_count", 0) > 0:
                degraded.append(metric_name)
        return degraded

new_run_id class-attribute instance-attribute

new_run_id: str = Field(
    ..., description="New experiment run identifier"
)

old_run_id class-attribute instance-attribute

old_run_id: str = Field(
    ..., description="Old experiment run identifier"
)

common_datapoints class-attribute instance-attribute

common_datapoints: int = Field(
    ...,
    description="Number of datapoints common to both runs",
)

new_only_datapoints class-attribute instance-attribute

new_only_datapoints: int = Field(
    default=0,
    description="Number of datapoints only in new run",
)

old_only_datapoints class-attribute instance-attribute

old_only_datapoints: int = Field(
    default=0,
    description="Number of datapoints only in old run",
)

metric_deltas class-attribute instance-attribute

metric_deltas: Dict[str, Any] = Field(
    default_factory=dict,
    description="Metric name to delta information mapping",
)

get_metric_delta

get_metric_delta(
    metric_name: str,
) -> Optional[Dict[str, Any]]

Get delta information for a specific metric.

Parameters:

Name Type Description Default
metric_name str

Name of the metric

required

Returns:

Type Description
Optional[Dict[str, Any]]

Delta information including new_value, old_value, delta, percent_change

Example

comparison.get_metric_delta("accuracy") { 'new_value': 0.85, 'old_value': 0.80, 'delta': 0.05, 'percent_change': 6.25 }

Source code in src/honeyhive/experiments/models.py
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
def get_metric_delta(self, metric_name: str) -> Optional[Dict[str, Any]]:
    """
    Get delta information for a specific metric.

    Args:
        metric_name: Name of the metric

    Returns:
        Delta information including new_value, old_value, delta, percent_change

    Example:
        >>> comparison.get_metric_delta("accuracy")
        {
            'new_value': 0.85,
            'old_value': 0.80,
            'delta': 0.05,
            'percent_change': 6.25
        }
    """
    return self.metric_deltas.get(metric_name)  # pylint: disable=no-member

list_improved_metrics

list_improved_metrics() -> List[str]

List metrics that improved in the new run.

Returns:

Type Description
List[str]

List of metric names where improved_count > 0

Source code in src/honeyhive/experiments/models.py
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def list_improved_metrics(self) -> List[str]:
    """
    List metrics that improved in the new run.

    Returns:
        List of metric names where improved_count > 0
    """
    improved = []
    for (
        metric_name,
        delta_info,
    ) in self.metric_deltas.items():  # pylint: disable=no-member
        if isinstance(delta_info, dict) and delta_info.get("improved_count", 0) > 0:
            improved.append(metric_name)
    return improved

list_degraded_metrics

list_degraded_metrics() -> List[str]

List metrics that degraded in the new run.

Returns:

Type Description
List[str]

List of metric names where degraded_count > 0

Source code in src/honeyhive/experiments/models.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def list_degraded_metrics(self) -> List[str]:
    """
    List metrics that degraded in the new run.

    Returns:
        List of metric names where degraded_count > 0
    """
    degraded = []
    for (
        metric_name,
        delta_info,
    ) in self.metric_deltas.items():  # pylint: disable=no-member
        if isinstance(delta_info, dict) and delta_info.get("degraded_count", 0) > 0:
            degraded.append(metric_name)
    return degraded

evaluate

evaluate(
    function: Callable,
    *,
    dataset: Optional[List[Dict[str, Any]]] = None,
    dataset_id: Optional[str] = None,
    evaluators: Optional[List[Callable]] = None,
    instrumentors: Optional[List[Callable[[], Any]]] = None,
    api_key: Optional[str] = None,
    server_url: Optional[str] = None,
    project: Optional[str] = None,
    name: Optional[str] = None,
    run_id: Optional[str] = None,
    max_workers: int = 10,
    aggregate_function: str = "average",
    verbose: bool = False,
    print_results: bool = True
) -> Any

Run experiment evaluation with backend aggregation.

This is the main user-facing API for running experiments. It: 1. Prepares dataset (external or HoneyHive) 2. Creates experiment run via API 3. Executes function against dataset with tracer multi-instance 4. Runs evaluators (if provided) 5. Retrieves aggregated results from backend

Parameters:

Name Type Description Default
function Callable

User function to execute against each datapoint. Can be either a synchronous function or an async function. Async functions are automatically detected and executed with asyncio.run().

required
dataset Optional[List[Dict[str, Any]]]

External dataset (list of dicts with 'inputs' and 'ground_truth')

None
dataset_id Optional[str]

HoneyHive dataset ID (alternative to external dataset)

None
evaluators Optional[List[Callable]]

List of evaluator functions (optional)

None
instrumentors Optional[List[Callable[[], Any]]]

List of instrumentor factory functions. Each factory should return a new instrumentor instance when called. This ensures each datapoint gets its own tracer and instrumentor instance for proper trace routing. Example: [lambda: OpenAIInstrumentor()]

None
api_key Optional[str]

HoneyHive API key (or set HONEYHIVE_API_KEY/HH_API_KEY env var)

None
server_url Optional[str]

HoneyHive server URL (or set HONEYHIVE_SERVER_URL/ HH_SERVER_URL/HH_API_URL env var)

None
project Optional[str]

Deprecated and ignored. Project scope is determined by the API key.

None
name Optional[str]

Experiment run name (auto-generated if not provided)

None
run_id Optional[str]

Experiment run ID to send to the backend (auto-generated UUID if not provided). The backend's returned run_id is always honored as the final ID.

None
max_workers int

ThreadPool size for concurrent execution (default: 10)

10
aggregate_function str

Backend aggregation function ("average", "sum", "min", "max")

'average'
verbose bool

Enable verbose logging

False
print_results bool

Print formatted results table after evaluation (default: True)

True

Returns:

Type Description
Any

ExperimentResultSummary with backend-computed aggregates

Raises:

Type Description
ValueError

If neither dataset nor dataset_id provided, or both provided

Examples:

>>> from honeyhive import HoneyHive
>>> from honeyhive.experiments import evaluate
>>>
>>> # Define function to test (sync)
>>> def my_function(inputs, ground_truth):
...     # Your LLM call or function logic
...     return {"output": "result"}
>>>
>>> # Async functions are also supported
>>> async def my_async_function(inputs, ground_truth):
...     result = await some_async_llm_call()
...     return {"output": result}
>>>
>>> # External dataset
>>> dataset = [
...     {"inputs": {"query": "test1"}, "ground_truth": {"answer": "a1"}},
...     {"inputs": {"query": "test2"}, "ground_truth": {"answer": "a2"}}
... ]
>>>
>>> result = evaluate(
...     function=my_function,  # or my_async_function
...     dataset=dataset,
...     api_key="hh_...",
...     name="My Experiment"
... )
>>>
>>> print(f"Success: {result.success}")
>>> print(f"Passed: {len(result.passed)}")
>>> print(f"Metrics: {result.metrics.list_metrics()}")
>>>
>>> # HoneyHive dataset
>>> result = evaluate(
...     function=my_function,
...     dataset_id="ds-123",
...     api_key="hh_..."
... )
>>>
>>> # With instrumentors for automatic LLM tracing
>>> from openinference.instrumentation.openai import OpenAIInstrumentor
>>> result = evaluate(
...     function=my_function,
...     dataset=dataset,
...     api_key="hh_...",
...     instrumentors=[lambda: OpenAIInstrumentor()]
... )
Source code in src/honeyhive/experiments/core.py
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
def evaluate(  # pylint: disable=too-many-locals,too-many-branches
    function: Callable,
    *,
    dataset: Optional[List[Dict[str, Any]]] = None,
    dataset_id: Optional[str] = None,
    evaluators: Optional[List[Callable]] = None,
    instrumentors: Optional[List[Callable[[], Any]]] = None,
    api_key: Optional[str] = None,
    server_url: Optional[str] = None,
    project: Optional[str] = None,
    name: Optional[str] = None,
    run_id: Optional[str] = None,
    max_workers: int = 10,
    aggregate_function: str = "average",
    verbose: bool = False,
    print_results: bool = True,
) -> Any:
    """
    Run experiment evaluation with backend aggregation.

    This is the main user-facing API for running experiments. It:
    1. Prepares dataset (external or HoneyHive)
    2. Creates experiment run via API
    3. Executes function against dataset with tracer multi-instance
    4. Runs evaluators (if provided)
    5. Retrieves aggregated results from backend

    Args:
        function: User function to execute against each datapoint. Can be either
            a synchronous function or an async function. Async functions are
            automatically detected and executed with asyncio.run().
        dataset: External dataset (list of dicts with 'inputs' and 'ground_truth')
        dataset_id: HoneyHive dataset ID (alternative to external dataset)
        evaluators: List of evaluator functions (optional)
        instrumentors: List of instrumentor factory functions. Each factory should
            return a new instrumentor instance when called. This ensures each
            datapoint gets its own tracer and instrumentor instance for proper
            trace routing. Example: [lambda: OpenAIInstrumentor()]
        api_key: HoneyHive API key (or set HONEYHIVE_API_KEY/HH_API_KEY env var)
        server_url: HoneyHive server URL (or set HONEYHIVE_SERVER_URL/
            HH_SERVER_URL/HH_API_URL env var)
        project: Deprecated and ignored. Project scope is determined by the API key.
        name: Experiment run name (auto-generated if not provided)
        run_id: Experiment run ID to send to the backend (auto-generated UUID if not
            provided). The backend's returned run_id is always honored as the final ID.
        max_workers: ThreadPool size for concurrent execution (default: 10)
        aggregate_function: Backend aggregation function
            ("average", "sum", "min", "max")
        verbose: Enable verbose logging
        print_results: Print formatted results table after evaluation
            (default: True)

    Returns:
        ExperimentResultSummary with backend-computed aggregates

    Raises:
        ValueError: If neither dataset nor dataset_id provided, or both provided

    Examples:
        >>> from honeyhive import HoneyHive
        >>> from honeyhive.experiments import evaluate
        >>>
        >>> # Define function to test (sync)
        >>> def my_function(inputs, ground_truth):
        ...     # Your LLM call or function logic
        ...     return {"output": "result"}
        >>>
        >>> # Async functions are also supported
        >>> async def my_async_function(inputs, ground_truth):
        ...     result = await some_async_llm_call()
        ...     return {"output": result}
        >>>
        >>> # External dataset
        >>> dataset = [
        ...     {"inputs": {"query": "test1"}, "ground_truth": {"answer": "a1"}},
        ...     {"inputs": {"query": "test2"}, "ground_truth": {"answer": "a2"}}
        ... ]
        >>>
        >>> result = evaluate(
        ...     function=my_function,  # or my_async_function
        ...     dataset=dataset,
        ...     api_key="hh_...",
        ...     name="My Experiment"
        ... )
        >>>
        >>> print(f"Success: {result.success}")
        >>> print(f"Passed: {len(result.passed)}")
        >>> print(f"Metrics: {result.metrics.list_metrics()}")
        >>>
        >>> # HoneyHive dataset
        >>> result = evaluate(
        ...     function=my_function,
        ...     dataset_id="ds-123",
        ...     api_key="hh_..."
        ... )
        >>>
        >>> # With instrumentors for automatic LLM tracing
        >>> from openinference.instrumentation.openai import OpenAIInstrumentor
        >>> result = evaluate(
        ...     function=my_function,
        ...     dataset=dataset,
        ...     api_key="hh_...",
        ...     instrumentors=[lambda: OpenAIInstrumentor()]
        ... )
    """
    # Validate inputs
    if dataset is None and dataset_id is None:
        raise ValueError("Must provide either 'dataset' or 'dataset_id'")
    if dataset is not None and dataset_id is not None:
        raise ValueError("Cannot provide both 'dataset' and 'dataset_id'")
    if project is not None:
        warnings.warn(
            "The 'project' argument to evaluate() is deprecated and ignored. "
            "Project scope is determined by the API key.",
            DeprecationWarning,
            stacklevel=2,
        )

    # Load from environment variables if not provided
    # Support both HONEYHIVE_* and HH_* prefixes for convenience
    # Note: HoneyHive client's config only reads HH_* prefix, so we check
    # HONEYHIVE_* first for better UX, then pass explicitly to client
    if api_key is None:
        api_key = os.getenv("HONEYHIVE_API_KEY") or os.getenv("HH_API_KEY")

    if server_url is None:
        # Check multiple variations for maximum compatibility
        server_url = (
            os.getenv("HONEYHIVE_SERVER_URL")  # Most intuitive
            or os.getenv("HH_SERVER_URL")  # Alternative shorthand
            or os.getenv("HH_API_URL")  # Client config uses this
        )

    # Initialize client - passing explicit values ensures both HONEYHIVE_* and HH_*
    # environment variables work (client's config only checks HH_* prefix)
    client_params = {"api_key": api_key}
    if server_url:
        client_params["base_url"] = server_url
    client = HoneyHive(**client_params)

    # Step 1: Prepare dataset
    if dataset is not None:
        # External dataset - generate EXT- IDs
        if verbose:
            logger.info("Preparing external dataset with %d datapoints", len(dataset))

        external_dataset_id, datapoint_ids = prepare_external_dataset(dataset)
        dataset_list = dataset

        if verbose:
            logger.info("Generated external dataset ID: %s", external_dataset_id)
    else:
        # HoneyHive dataset - fetch from API
        # At this point dataset_id is guaranteed to be str (not None)
        assert dataset_id is not None, "dataset_id must be provided"

        if verbose:
            logger.info("Fetching HoneyHive dataset: %s", dataset_id)
            logger.info("DEBUG - Input dataset_id type: %s", type(dataset_id))
            logger.info("DEBUG - Is EXT- dataset: %s", dataset_id.startswith("EXT-"))

        # Get dataset metadata - list() returns GetDatasetsResponse with datasets list
        ds_response = client.datasets.list(dataset_id=dataset_id)
        dataset_list = []
        datapoint_ids = []

        # Extract the dataset from the response
        if not ds_response.datasets:
            raise ValueError(f"Dataset not found: {dataset_id}")
        dataset_obj = ds_response.datasets[0]

        # Dataset.datapoints is List[str] (IDs only), fetch each datapoint.
        # get_datapoint returns a typed GetDatapointResponse Pydantic model
        # whose `.datapoint` field is List[Datapoint] (also Pydantic).
        #
        # Catch ONLY the exception types that represent a fetch failure
        # we can reasonably skip and keep going on (HTTP errors from the
        # generated SDK + httpx transport-level errors). Anything else
        # — AttributeError, TypeError, KeyError, etc. — indicates a real
        # bug we want to surface immediately rather than silently produce
        # an empty datapoint list.
        if dataset_obj.datapoints:
            for dp_id in dataset_obj.datapoints:
                try:
                    dp_response = client.datapoints.get_datapoint(dp_id)
                except (HTTPException, httpx.HTTPError) as e:
                    logger.warning("Failed to fetch datapoint %s: %s", dp_id, str(e))
                    continue
                dp_list = getattr(dp_response, "datapoint", []) or []
                if dp_list:
                    dp = dp_list[0]
                    dataset_list.append(
                        {
                            "inputs": getattr(dp, "inputs", None) or {},
                            "ground_truth": getattr(dp, "ground_truth", None),
                            "id": getattr(dp, "id", None) or dp_id,
                        }
                    )
                    datapoint_ids.append(getattr(dp, "id", None) or dp_id)

            # Guard against the silent-data-loss shape that the narrow
            # except above doesn't cover: every fetch logged + skipped
            # (transient HTTP failure on every datapoint), or every
            # response had an empty `.datapoint` list. In either case
            # the dataset claimed N datapoints but we collected zero —
            # better to fail loudly than to proceed with an empty
            # dataset and report passed=0.
            if not dataset_list:
                raise ValueError(
                    f"Dataset {dataset_id} listed "
                    f"{len(dataset_obj.datapoints)} datapoint(s) but every "
                    f"fetch returned no usable datapoint. Check warnings "
                    f"above for per-datapoint errors."
                )

        external_dataset_id = dataset_id

        if verbose:
            logger.info(
                "Loaded %d datapoints from HoneyHive dataset", len(dataset_list)
            )
            logger.info("DEBUG - external_dataset_id set to: %s", external_dataset_id)
            logger.info("DEBUG - datapoint_ids collected: %s", datapoint_ids)

    # Step 2: Create experiment run
    # Generate a client-side UUID if no run_id was provided. The backend also
    # generates a UUID when run_id is omitted, but we do it here so the
    # default run name ("experiment-{short_id}") is derived from the same ID
    # that will be sent in the request.
    run_id = run_id or str(uuid.uuid4())
    run_name = name or f"experiment-{run_id[:8]}"

    if verbose:
        logger.info("Creating experiment run: %s", run_name)
        logger.info("DEBUG - Before prepare_run_request_data:")
        logger.info("  external_dataset_id: %s", external_dataset_id)
        logger.info("  datapoint_ids: %s", datapoint_ids)

    git_context = get_git_context()

    run_metadata: Dict[str, Any] = {}
    if git_context:
        run_metadata["git"] = git_context

    run_data = prepare_run_request_data(
        run_id=run_id,
        name=run_name,
        dataset_id=external_dataset_id,
        event_ids=[],  # Empty initially
        datapoint_ids=datapoint_ids,  # Link datapoints to run
        configuration={
            "function": function.__name__,
            "evaluators": [e.__name__ for e in (evaluators or [])],
            "max_workers": max_workers,
            "aggregate_function": aggregate_function,
        },
        metadata=run_metadata,
        status="pending",
    )

    if verbose:
        logger.info("DEBUG - After prepare_run_request_data:")
        logger.info("  run_data['dataset_id']: %s", run_data.get("dataset_id"))
        logger.info("  run_data['datapoint_ids']: %s", run_data.get("datapoint_ids"))
        logger.info("  run_data['metadata']: %s", run_data.get("metadata"))

    # Create run via API (experiments API handles runs)
    run_request = PostExperimentRunRequest(**run_data)
    run_response = client.experiments.create_run(run_request)

    # Use backend-generated run_id if available
    if hasattr(run_response, "run_id") and run_response.run_id:
        run_id = str(run_response.run_id)

    if verbose:
        logger.info("Created experiment run: %s", run_id)

    # Step 3: Create experiment context
    # external_dataset_id is guaranteed to be str at this point
    context = ExperimentContext(
        run_id=run_id,
        dataset_id=external_dataset_id or "",  # Type safety
        run_name=run_name,
        source="evaluation",
    )

    # Step 4: Execute experiment with tracer multi-instance
    if verbose:
        logger.info(
            "Executing function against %d datapoints with %d workers",
            len(dataset_list),
            max_workers,
        )

    execution_results = run_experiment(
        function=function,
        dataset=dataset_list,
        datapoint_ids=datapoint_ids,
        server_url=server_url,
        experiment_context=context,
        api_key=api_key,
        max_workers=max_workers,
        verbose=verbose,
        instrumentors=instrumentors,
        evaluators=evaluators,
    )

    if verbose:
        logger.info("Enriching sessions with outputs and ground_truth")

    for result in execution_results:
        session_id = result.get("session_id")
        if session_id:
            _enrich_session_with_results(
                session_id=session_id,
                outputs=result.get("outputs"),
                ground_truth=result.get("ground_truth"),
                client=client,
                verbose=verbose,
            )

    _update_run_with_results(
        run_id=run_id,
        run_name=run_name,
        execution_results=execution_results,
        external_dataset_id=external_dataset_id,
        client=client,
        verbose=verbose,
    )

    # Step 7: Retrieve aggregated results from backend
    if verbose:
        logger.info(
            "Retrieving aggregated results with %s aggregation", aggregate_function
        )

    result_summary = get_run_result(
        client=client,
        run_id=run_id,
        aggregate_function=aggregate_function,
    )

    if verbose:
        logger.info(
            "Experiment complete: %s (passed: %d, failed: %d)",
            "SUCCESS" if result_summary.success else "FAILED",
            len(result_summary.passed),
            len(result_summary.failed),
        )

    # Print formatted results table if requested
    if print_results:
        result_summary.print_table(run_name=run_name)

    return result_summary

run_experiment

run_experiment(
    function: Callable,
    dataset: List[Dict[str, Any]],
    datapoint_ids: List[str],
    *,
    server_url: Optional[str] = None,
    experiment_context: ExperimentContext,
    api_key: Optional[str] = None,
    max_workers: int = 10,
    verbose: bool = False,
    instrumentors: Optional[List[Callable[[], Any]]] = None,
    evaluators: Optional[List[Callable]] = None
) -> List[Dict[str, Any]]

Run experiment with tracer multi-instance pattern.

CRITICAL: Each datapoint gets its OWN tracer instance for isolation. This prevents: - Metadata contamination between datapoints - Race conditions in concurrent execution - Session ID collisions

Threading Model: - Uses ThreadPoolExecutor (not multiprocessing) - I/O-bound operations (LLM calls, API requests) - Each tracer instance is completely isolated - Python 3.11+ GIL improvements for I/O

Parameters:

Name Type Description Default
function Callable

User function to execute against each datapoint. Can be either a synchronous function or an async function. Async functions are automatically detected and executed with asyncio.run().

required
dataset List[Dict[str, Any]]

List of datapoint dictionaries

required
datapoint_ids List[str]

List of datapoint IDs (parallel to dataset)

required
experiment_context ExperimentContext

ExperimentContext with run metadata

required
api_key Optional[str]

HoneyHive API key for tracer (or set HONEYHIVE_API_KEY env var)

None
max_workers int

ThreadPool size (default: 10)

10
verbose bool

Enable verbose logging

False
instrumentors Optional[List[Callable[[], Any]]]

List of instrumentor factory functions. Each factory should return a new instrumentor instance when called. This ensures each datapoint gets its own instrumentor instance for proper trace routing. Example: [lambda: OpenAIInstrumentor(), lambda: AnthropicInstrumentor()]

None
evaluators Optional[List[Callable]]

Optional list of evaluator callables. When set, each evaluator runs inline on the user function's outputs inside the per-datapoint chain span; their normalized scores attach to the chain span via enrich_span before the span closes.

None

Returns:

Type Description
List[Dict[str, Any]]

List of execution results (one per datapoint)

Examples:

>>> def my_function(inputs, ground_truth):
...     return {"output": "test"}
>>>
>>> # Async functions are also supported
>>> async def my_async_function(inputs, ground_truth):
...     result = await some_async_call()
...     return {"output": result}
>>>
>>> context = ExperimentContext(
...     run_id="run-123",
...     dataset_id="ds-456",
... )
>>>
>>> results = run_experiment(
...     function=my_function,  # or my_async_function
...     dataset=[{"inputs": {}, "ground_truth": {}}],
...     datapoint_ids=["dp-1"],
...     experiment_context=context,
...     api_key="hh_...",
...     max_workers=10,
...     instrumentors=[lambda: OpenAIInstrumentor()]
... )
Source code in src/honeyhive/experiments/core.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
def run_experiment(
    function: Callable,
    dataset: List[Dict[str, Any]],
    datapoint_ids: List[str],
    *,
    server_url: Optional[str] = None,
    experiment_context: ExperimentContext,
    api_key: Optional[str] = None,
    max_workers: int = 10,
    verbose: bool = False,
    instrumentors: Optional[List[Callable[[], Any]]] = None,
    evaluators: Optional[List[Callable]] = None,
) -> List[Dict[str, Any]]:
    """
    Run experiment with tracer multi-instance pattern.

    CRITICAL: Each datapoint gets its OWN tracer instance for isolation.
    This prevents:
    - Metadata contamination between datapoints
    - Race conditions in concurrent execution
    - Session ID collisions

    Threading Model:
    - Uses ThreadPoolExecutor (not multiprocessing)
    - I/O-bound operations (LLM calls, API requests)
    - Each tracer instance is completely isolated
    - Python 3.11+ GIL improvements for I/O

    Args:
        function: User function to execute against each datapoint. Can be either
            a synchronous function or an async function. Async functions are
            automatically detected and executed with asyncio.run().
        dataset: List of datapoint dictionaries
        datapoint_ids: List of datapoint IDs (parallel to dataset)
        experiment_context: ExperimentContext with run metadata
        api_key: HoneyHive API key for tracer (or set HONEYHIVE_API_KEY env var)
        max_workers: ThreadPool size (default: 10)
        verbose: Enable verbose logging
        instrumentors: List of instrumentor factory functions. Each factory should
            return a new instrumentor instance when called. This ensures each
            datapoint gets its own instrumentor instance for proper trace routing.
            Example: [lambda: OpenAIInstrumentor(), lambda: AnthropicInstrumentor()]
        evaluators: Optional list of evaluator callables. When set, each
            evaluator runs inline on the user function's outputs inside
            the per-datapoint chain span; their normalized scores attach
            to the chain span via ``enrich_span`` before the span closes.

    Returns:
        List of execution results (one per datapoint)

    Examples:
        >>> def my_function(inputs, ground_truth):
        ...     return {"output": "test"}
        >>>
        >>> # Async functions are also supported
        >>> async def my_async_function(inputs, ground_truth):
        ...     result = await some_async_call()
        ...     return {"output": result}
        >>>
        >>> context = ExperimentContext(
        ...     run_id="run-123",
        ...     dataset_id="ds-456",
        ... )
        >>>
        >>> results = run_experiment(
        ...     function=my_function,  # or my_async_function
        ...     dataset=[{"inputs": {}, "ground_truth": {}}],
        ...     datapoint_ids=["dp-1"],
        ...     experiment_context=context,
        ...     api_key="hh_...",
        ...     max_workers=10,
        ...     instrumentors=[lambda: OpenAIInstrumentor()]
        ... )
    """
    is_async = asyncio.iscoroutinefunction(function)
    user_fn_accepts_tracer = "tracer" in inspect.signature(function).parameters

    # Whole-experiment instrumentor lifecycle. The first datapoint into the
    # pool acquires _INSTRUMENTOR_LIFECYCLE_LOCK, binds each instrumentor to
    # its tracer.provider, and records that tracer in binding_tracer. Later
    # datapoints find active_instrumentors populated and skip. Cleanup runs
    # once after the pool drains.
    #
    # binding_tracer is the transport path for every wrapped call across the
    # experiment — all such spans flow through its provider's
    # BatchSpanProcessor, so it gets one more force_flush at teardown to
    # catch anything emitted after its own datapoint's flush ran.
    active_instrumentors: List[Any] = []
    binding_tracer: List[Any] = []  # singleton container so the closure can mutate it

    def process_datapoint(
        datapoint: Dict[str, Any], datapoint_id: str
    ) -> Dict[str, Any]:
        """
        Process single datapoint with isolated tracer and instrumentors.

        This function:
        1. Creates a NEW tracer instance for this datapoint
        2. Creates NEW instrumentor instances and sets tracer provider on them
        3. Executes the user function with tracer active
        4. Uninstruments all instrumentors
        5. Flushes the tracer to ensure all spans sent
        6. Returns result with status
        """
        # Extract inputs and ground truths from datapoint
        inputs = datapoint.get("inputs", {})
        ground_truth = datapoint.get("ground_truth")

        # Create tracer config for this datapoint with inputs
        tracer_config = experiment_context.to_tracer_config(datapoint_id)
        tracer_config["inputs"] = inputs  # Set session inputs

        if experiment_context.run_name:
            tracer_config["session_name"] = experiment_context.run_name

        # Create NEW tracer instance for this datapoint
        # Each tracer is completely isolated (own API client, logger, state)
        tracer = HoneyHiveTracer(
            api_key=api_key, server_url=server_url, verbose=verbose, **tracer_config
        )

        # Instrument once for the whole experiment under the module lock.
        # An instrumentor that raises here stays uninstrumented for the rest
        # of the experiment — install failures are deterministic (missing
        # dep, version mismatch), not transient.
        if instrumentors:
            with _INSTRUMENTOR_LIFECYCLE_LOCK:
                if not active_instrumentors:
                    binding_tracer.append(tracer)
                    for instrumentor_factory in instrumentors:
                        try:
                            instrumentor = instrumentor_factory()
                            instrumentor.instrument(tracer_provider=tracer.provider)
                            active_instrumentors.append(instrumentor)
                            if verbose:
                                safe_log(
                                    tracer,
                                    "info",
                                    "Initialized instrumentor %s for experiment",
                                    type(instrumentor).__name__,
                                )
                        except Exception as e:
                            safe_log(
                                tracer,
                                "warning",
                                "Failed to initialize instrumentor: %s",
                                str(e),
                            )

        try:
            # Execute function with tracer active
            # Tracer automatically adds all experiment metadata to spans!
            if verbose:
                # Use safe_log with tracer instance (multi-instance safety)
                safe_log(
                    tracer,
                    "info",
                    "Processing datapoint %s (run: %s)",
                    datapoint_id,
                    experiment_context.run_id,
                )

            # Wrap the user function so evaluators run before the chain span
            # closes — their scores attach to the still-recording span via
            # enrich_span(metrics=…) and ride out on the OTLP export.
            #
            # Sync and async user fns take separate paths so async evaluators
            # under an async user fn can be awaited directly; spinning up a
            # nested loop in the same thread that's already running one (via
            # asyncio.run below) would raise.

            def function_with_inline_evals(dp: Dict[str, Any]) -> Any:
                fn_outputs = (
                    function(dp, tracer=tracer)
                    if user_fn_accepts_tracer
                    else function(dp)
                )
                if evaluators:
                    _apply_inline_evaluators(
                        evaluators,
                        inputs=dp.get("inputs", {}),
                        outputs=fn_outputs,
                        ground_truth=dp.get("ground_truth"),
                        tracer=tracer,
                        max_workers=max_workers,
                        verbose=verbose,
                    )
                return fn_outputs

            async def afunction_with_inline_evals(dp: Dict[str, Any]) -> Any:
                fn_outputs = await (
                    function(dp, tracer=tracer)
                    if user_fn_accepts_tracer
                    else function(dp)
                )
                if evaluators:
                    await _aapply_inline_evaluators(
                        evaluators,
                        inputs=dp.get("inputs", {}),
                        outputs=fn_outputs,
                        ground_truth=dp.get("ground_truth"),
                        tracer=tracer,
                        verbose=verbose,
                    )
                return fn_outputs

            wrapped_for_trace = (
                afunction_with_inline_evals if is_async else function_with_inline_evals
            )
            functools.update_wrapper(wrapped_for_trace, function)
            # Drop __wrapped__ so inspect.signature(..., follow_wrapped=True) —
            # used by trace's input-capture path — stops at the closure's
            # (dp,) signature instead of walking back to the user fn's
            # (dp, tracer) and failing sig.bind(datapoint).
            try:
                del wrapped_for_trace.__wrapped__
            except AttributeError:
                pass

            traced_function = trace(
                event_type="chain",
                event_name=function.__name__,
                tracer=tracer,
            )(wrapped_for_trace)

            if verbose:
                safe_log(
                    tracer,
                    "info",
                    "Calling function (async=%s, accepts_tracer=%s, evaluators=%d)",
                    is_async,
                    user_fn_accepts_tracer,
                    len(evaluators or []),
                )
            if is_async:
                outputs = asyncio.run(traced_function(datapoint))
            else:
                outputs = traced_function(datapoint)

            # Capture session ID from tracer for linking to run
            # Outputs will be enriched later via UpdateEventRequest after tracer flush
            session_id = getattr(tracer, "session_id", None)

            return {
                "datapoint_id": datapoint_id,
                "inputs": inputs,
                "outputs": outputs,
                "ground_truth": ground_truth,
                "status": "success",
                "error": None,
                "session_id": session_id,  # Include session ID for run linkage
            }

        except Exception as e:
            # Use safe_log with tracer instance for error logging
            safe_log(
                tracer,
                "error",
                "Function execution failed for datapoint %s: %s",
                datapoint_id,
                str(e),
            )

            # Capture session ID even on failure
            session_id = getattr(tracer, "session_id", None)

            return {
                "datapoint_id": datapoint_id,
                "inputs": datapoint.get("inputs", {}),
                "outputs": None,
                "ground_truth": datapoint.get("ground_truth"),
                "status": "failed",
                "error": str(e),
                "session_id": session_id,  # Include session ID for run linkage
            }

        finally:
            # CRITICAL: Flush tracer to ensure all spans sent. Instrumentor
            # teardown happens once after the pool drains (in run_experiment)
            # so an early-finishing datapoint doesn't unwrap the client out
            # from under a sibling that's still mid-call.
            try:
                force_flush_tracer(tracer)
            except Exception as e:
                # Use safe_log for flush errors (tracer may be shutting down)
                safe_log(
                    tracer,
                    "warning",
                    "Failed to flush tracer for datapoint %s: %s",
                    datapoint_id,
                    str(e),
                )

    # Validate inputs
    if len(dataset) != len(datapoint_ids):
        raise ValueError(
            f"Dataset length ({len(dataset)}) does not match datapoint_ids length ({len(datapoint_ids)})"
        )

    if verbose:
        # Module-level orchestration logging (no tracer instance)
        logger.info(
            "Executing function against %d datapoints with %d workers",
            len(dataset),
            max_workers,
        )

    # Use ThreadPoolExecutor for I/O-bound concurrent execution
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all datapoint executions
        future_to_datapoint = {}
        for datapoint, datapoint_id in zip(dataset, datapoint_ids):
            future = executor.submit(process_datapoint, datapoint, datapoint_id)
            future_to_datapoint[future] = datapoint_id

        # Collect results as they complete
        for future in as_completed(future_to_datapoint):
            datapoint_id = future_to_datapoint[future]
            try:
                result = future.result()
                results.append(result)

                if verbose:
                    status = result.get("status", "unknown")
                    # Module-level logging (tracer already flushed)
                    logger.info("Completed datapoint %s: %s", datapoint_id, status)

            except Exception as e:
                # Module-level error logging (tracer context lost)
                logger.error(
                    "Unexpected error processing datapoint %s: %s",
                    datapoint_id,
                    str(e),
                    exc_info=True,
                )
                results.append(
                    {
                        "datapoint_id": datapoint_id,
                        "status": "failed",
                        "error": str(e),
                    }
                )

    # Flush the binding tracer. Every wrapped span across the experiment
    # was emitted through its provider, and short scripts / container
    # exits can race the BatchSpanProcessor's 5 s tick and atexit hook.
    if binding_tracer:
        try:
            force_flush_tracer(binding_tracer[0])
        except Exception as e:
            logger.warning("Failed to flush binding tracer for experiment: %s", str(e))

    # Uninstrument once every datapoint has finished — unwrapping the
    # wrapped client while a sibling is still mid-call would silently drop
    # its spans.
    for instrumentor in active_instrumentors:
        try:
            instrumentor.uninstrument()
            if verbose:
                logger.info(
                    "Uninstrumented %s for experiment",
                    type(instrumentor).__name__,
                )
        except Exception as e:
            logger.warning(
                "Failed to uninstrument %s: %s",
                type(instrumentor).__name__,
                str(e),
            )

    # Log summary
    success_count = sum(1 for r in results if r.get("status") == "success")
    failed_count = sum(1 for r in results if r.get("status") == "failed")

    if verbose:
        # Module-level summary logging
        logger.info(
            "Experiment execution complete: %d succeeded, %d failed",
            success_count,
            failed_count,
        )

    return results

compare_runs

compare_runs(
    client: Any,
    new_run_id: str,
    old_run_id: str,
    project_id: Optional[str] = None,
    aggregate_function: str = "average",
) -> RunComparisonResult

Compare two experiment runs using backend aggregated comparison.

Backend Endpoint: GET /runs/:new_run_id/compare-with/:old_run_id

The backend computes aggregated metrics for both runs and then compares them: - Common datapoints between runs (by datapoint_id) - Per-metric improved/degraded/same classification - Old and new aggregate values for each metric - Statistical aggregation (average, sum, min, max)

❌ DO NOT compute these client-side! ✅ Use backend endpoint for all comparisons

Parameters:

Name Type Description Default
client Any

HoneyHive API client

required
new_run_id str

New experiment run ID

required
old_run_id str

Old experiment run ID

required
project_id Optional[str]

Deprecated and ignored. Project scope is determined by the API key.

None
aggregate_function str

Aggregation function ("average", "sum", "min", "max")

'average'

Returns:

Type Description
RunComparisonResult

RunComparisonResult with delta calculations

Examples:

>>> comparison = compare_runs(client, "run-new", "run-old")
>>> comparison.common_datapoints
3
>>> delta = comparison.get_metric_delta("accuracy")
>>> delta
{
    'old_aggregate': 0.80,
    'new_aggregate': 0.85,
    'found_count': 3,
    'improved_count': 1,
    'degraded_count': 0,
    'improved': ['EXT-abc123'],
    'degraded': []
}
>>> comparison.list_improved_metrics()
['accuracy', 'error_rate']
>>> comparison.list_degraded_metrics()
[]
Source code in src/honeyhive/experiments/results.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def compare_runs(
    client: Any,  # HoneyHive client
    new_run_id: str,
    old_run_id: str,
    project_id: Optional[str] = None,
    aggregate_function: str = "average",
) -> RunComparisonResult:
    """
    Compare two experiment runs using backend aggregated comparison.

    Backend Endpoint: GET /runs/:new_run_id/compare-with/:old_run_id

    The backend computes aggregated metrics for both runs and then compares them:
    - Common datapoints between runs (by datapoint_id)
    - Per-metric improved/degraded/same classification
    - Old and new aggregate values for each metric
    - Statistical aggregation (average, sum, min, max)

    ❌ DO NOT compute these client-side!
    ✅ Use backend endpoint for all comparisons

    Args:
        client: HoneyHive API client
        new_run_id: New experiment run ID
        old_run_id: Old experiment run ID
        project_id: Deprecated and ignored. Project scope is determined by the API key.
        aggregate_function: Aggregation function ("average", "sum", "min", "max")

    Returns:
        RunComparisonResult with delta calculations

    Examples:
        >>> comparison = compare_runs(client, "run-new", "run-old")
        >>> comparison.common_datapoints
        3
        >>> delta = comparison.get_metric_delta("accuracy")
        >>> delta
        {
            'old_aggregate': 0.80,
            'new_aggregate': 0.85,
            'found_count': 3,
            'improved_count': 1,
            'degraded_count': 0,
            'improved': ['EXT-abc123'],
            'degraded': []
        }
        >>> comparison.list_improved_metrics()
        ['accuracy', 'error_rate']
        >>> comparison.list_degraded_metrics()
        []
    """
    if project_id is not None:
        warnings.warn(
            "The 'project_id' argument is deprecated and ignored. "
            "Project scope is determined by the API key.",
            DeprecationWarning,
            stacklevel=2,
        )

    # Use experiments API comparison endpoint
    # Note: project_id is no longer passed - backend uses auth scopes
    response = client.experiments.compare_runs(
        new_run_id=new_run_id,
        old_run_id=old_run_id,
        aggregate_function=aggregate_function,
    )

    # Parse commonDatapoints (list of IDs, not a count)
    common_datapoints_list = response.get("commonDatapoints", [])
    common_datapoints_count = len(common_datapoints_list)

    # Build metric_deltas from metrics array
    metric_deltas = {}
    for metric_data in response.get("metrics", []):
        metric_name = metric_data.get("metric_name")
        if metric_name:
            metric_deltas[metric_name] = {
                "old_aggregate": metric_data.get("old_aggregate"),
                "new_aggregate": metric_data.get("new_aggregate"),
                "found_count": metric_data.get("found_count", 0),
                "improved_count": metric_data.get("improved_count", 0),
                "degraded_count": metric_data.get("degraded_count", 0),
                "same_count": metric_data.get("same_count", 0),
                "improved": metric_data.get("improved", []),
                "degraded": metric_data.get("degraded", []),
                "same": metric_data.get("same", []),
                "old_values": metric_data.get("old_values", []),
                "new_values": metric_data.get("new_values", []),
            }

        # Extract new/old run data if needed (for future use)
        _old_run = response.get("old_run", {})
        _new_run = response.get("new_run", {})

    # Calculate new_only and old_only datapoints
    # (For now, we don't have this data from the backend response)
    new_only_count = 0
    old_only_count = 0

    return RunComparisonResult(
        new_run_id=new_run_id,
        old_run_id=old_run_id,
        common_datapoints=common_datapoints_count,
        new_only_datapoints=new_only_count,
        old_only_datapoints=old_only_count,
        metric_deltas=metric_deltas,
    )

get_run_metrics

get_run_metrics(
    client: Any,
    run_id: str,
    project_id: Optional[str] = None,
) -> Dict[str, Any]

Get raw metrics for a run (without aggregation).

Backend Endpoint: GET /runs/:run_id/result (returns metrics in response)

This returns raw metric data without aggregation, useful for: - Debugging individual datapoint metrics - Custom aggregation logic (if needed) - Detailed metric analysis

Parameters:

Name Type Description Default
client Any

HoneyHive API client

required
run_id str

Experiment run ID

required
project_id Optional[str]

Deprecated and ignored. Project scope is determined by the API key.

None

Returns:

Type Description
Dict[str, Any]

Raw metrics data from backend

Examples:

>>> metrics = get_run_metrics(client, "run-123")
>>> metrics["events"]
[{'event_id': '...', 'metrics': {...}}, ...]
Source code in src/honeyhive/experiments/results.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def get_run_metrics(
    client: Any,
    run_id: str,
    project_id: Optional[str] = None,
) -> Dict[str, Any]:  # HoneyHive client
    """
    Get raw metrics for a run (without aggregation).

    Backend Endpoint: GET /runs/:run_id/result (returns metrics in response)

    This returns raw metric data without aggregation, useful for:
    - Debugging individual datapoint metrics
    - Custom aggregation logic (if needed)
    - Detailed metric analysis

    Args:
        client: HoneyHive API client
        run_id: Experiment run ID
        project_id: Deprecated and ignored. Project scope is determined by the API key.

    Returns:
        Raw metrics data from backend

    Examples:
        >>> metrics = get_run_metrics(client, "run-123")
        >>> metrics["events"]
        [{'event_id': '...', 'metrics': {...}}, ...]
    """
    if project_id is not None:
        warnings.warn(
            "The 'project_id' argument is deprecated and ignored. "
            "Project scope is determined by the API key.",
            DeprecationWarning,
            stacklevel=2,
        )

    # Use experiments API for run results (includes metrics)
    # Note: project_id is no longer passed - backend uses auth scopes
    return cast(
        Dict[str, Any],
        client.experiments.get_result(run_id=run_id),
    )

get_run_result

get_run_result(
    client: Any,
    run_id: str,
    project_id: Optional[str] = None,
    aggregate_function: str = "average",
) -> ExperimentResultSummary

Get aggregated experiment result from backend.

Backend Endpoint: GET /runs/:run_id/result?aggregate_function=

The backend computes: - Pass/fail status for each datapoint - Metric aggregations (average, sum, min, max) - Composite metrics - Overall run status

❌ DO NOT compute these client-side! ✅ Use backend endpoint for all aggregations

Parameters:

Name Type Description Default
client Any

HoneyHive API client

required
run_id str

Experiment run ID

required
project_id Optional[str]

Deprecated and ignored. Project scope is determined by the API key.

None
aggregate_function str

Aggregation function ("average", "sum", "min", "max")

'average'

Returns:

Type Description
ExperimentResultSummary

ExperimentResultSummary with all aggregated metrics

Raises:

Type Description
HTTPError

If backend request fails

ValueError

If response format is invalid

Examples:

>>> from honeyhive import HoneyHive
>>> client = HoneyHive(api_key="...")
>>> result = get_run_result(client, "run-123", None, "average")
>>> result.success
True
>>> result.metrics.get_metric("accuracy")
{'aggregate': 0.85, 'values': [0.8, 0.9, 0.85]}
Source code in src/honeyhive/experiments/results.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def get_run_result(
    client: Any,  # HoneyHive client
    run_id: str,
    project_id: Optional[str] = None,
    aggregate_function: str = "average",
) -> ExperimentResultSummary:
    """
    Get aggregated experiment result from backend.

    Backend Endpoint: GET /runs/:run_id/result?aggregate_function=<function>

    The backend computes:
    - Pass/fail status for each datapoint
    - Metric aggregations (average, sum, min, max)
    - Composite metrics
    - Overall run status

    ❌ DO NOT compute these client-side!
    ✅ Use backend endpoint for all aggregations

    Args:
        client: HoneyHive API client
        run_id: Experiment run ID
        project_id: Deprecated and ignored. Project scope is determined by the API key.
        aggregate_function: Aggregation function ("average", "sum", "min", "max")

    Returns:
        ExperimentResultSummary with all aggregated metrics

    Raises:
        HTTPError: If backend request fails
        ValueError: If response format is invalid

    Examples:
        >>> from honeyhive import HoneyHive
        >>> client = HoneyHive(api_key="...")
        >>> result = get_run_result(client, "run-123", None, "average")
        >>> result.success
        True
        >>> result.metrics.get_metric("accuracy")
        {'aggregate': 0.85, 'values': [0.8, 0.9, 0.85]}
    """
    if project_id is not None:
        warnings.warn(
            "The 'project_id' argument is deprecated and ignored. "
            "Project scope is determined by the API key.",
            DeprecationWarning,
            stacklevel=2,
        )

    # Use experiments API for run results
    # Note: project_id is no longer passed - backend uses auth scopes
    response = client.experiments.get_result(
        run_id=run_id, aggregate_function=aggregate_function
    )

    # Parse datapoints into DatapointResult objects
    raw_datapoints: List[Dict[str, Any]] = response.get("datapoints", [])
    datapoints: List[DatapointResult] = [DatapointResult(**dp) for dp in raw_datapoints]

    # Parse response into ExperimentResultSummary
    return ExperimentResultSummary(
        run_id=run_id,
        status=response.get("status", "unknown"),
        success=response.get("success", False),
        passed=response.get("passed", []),
        failed=response.get("failed", []),
        metrics=AggregatedMetrics(**response.get("metrics", {})),
        datapoints=datapoints,
    )

generate_external_datapoint_id

generate_external_datapoint_id(
    datapoint: Dict[str, Any],
    index: int,
    custom_id: Optional[str] = None,
) -> str

Generate EXT- prefixed datapoint ID for external datapoints.

Parameters:

Name Type Description Default
datapoint Dict[str, Any]

Datapoint dictionary

required
index int

Index in dataset (for stable ordering)

required
custom_id Optional[str]

Optional custom ID (will be prefixed with EXT-)

None

Returns:

Type Description
str

Datapoint ID with EXT- prefix

Examples:

>>> datapoint = {"inputs": {"query": "test"}}
>>> generate_external_datapoint_id(datapoint, 0)
'EXT-f1e2d3c4b5a6'
>>> generate_external_datapoint_id(datapoint, 0, custom_id="dp-1")
'EXT-dp-1'
Source code in src/honeyhive/experiments/utils.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def generate_external_datapoint_id(
    datapoint: Dict[str, Any], index: int, custom_id: Optional[str] = None
) -> str:
    """
    Generate EXT- prefixed datapoint ID for external datapoints.

    Args:
        datapoint: Datapoint dictionary
        index: Index in dataset (for stable ordering)
        custom_id: Optional custom ID (will be prefixed with EXT-)

    Returns:
        Datapoint ID with EXT- prefix

    Examples:
        >>> datapoint = {"inputs": {"query": "test"}}
        >>> generate_external_datapoint_id(datapoint, 0)
        'EXT-f1e2d3c4b5a6'

        >>> generate_external_datapoint_id(datapoint, 0, custom_id="dp-1")
        'EXT-dp-1'
    """
    if custom_id:
        if not custom_id.startswith("EXT-"):
            return f"EXT-{custom_id}"
        return custom_id

    # Generate hash-based ID with index for uniqueness
    content = json.dumps(datapoint, sort_keys=True)
    hash_value = hashlib.sha256(f"{content}{index}".encode()).hexdigest()[:16]
    return f"EXT-{hash_value}"

generate_external_dataset_id

generate_external_dataset_id(
    datapoints: List[Dict[str, Any]],
    custom_id: Optional[str] = None,
) -> str

Generate EXT- prefixed dataset ID for external datasets.

External datasets are managed by the user (not stored in HoneyHive). They require an EXT- prefix to distinguish them from HoneyHive datasets.

Parameters:

Name Type Description Default
datapoints List[Dict[str, Any]]

List of datapoint dictionaries

required
custom_id Optional[str]

Optional custom ID (will be prefixed with EXT-)

None

Returns:

Type Description
str

Dataset ID with EXT- prefix

Examples:

>>> datapoints = [{"inputs": {"query": "test"}}]
>>> generate_external_dataset_id(datapoints)
'EXT-a1b2c3d4e5f6'
>>> generate_external_dataset_id(datapoints, custom_id="my-dataset")
'EXT-my-dataset'
Source code in src/honeyhive/experiments/utils.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def generate_external_dataset_id(
    datapoints: List[Dict[str, Any]], custom_id: Optional[str] = None
) -> str:
    """
    Generate EXT- prefixed dataset ID for external datasets.

    External datasets are managed by the user (not stored in HoneyHive).
    They require an EXT- prefix to distinguish them from HoneyHive datasets.

    Args:
        datapoints: List of datapoint dictionaries
        custom_id: Optional custom ID (will be prefixed with EXT-)

    Returns:
        Dataset ID with EXT- prefix

    Examples:
        >>> datapoints = [{"inputs": {"query": "test"}}]
        >>> generate_external_dataset_id(datapoints)
        'EXT-a1b2c3d4e5f6'

        >>> generate_external_dataset_id(datapoints, custom_id="my-dataset")
        'EXT-my-dataset'
    """
    if custom_id:
        # Ensure custom ID has EXT- prefix
        if not custom_id.startswith("EXT-"):
            return f"EXT-{custom_id}"
        return custom_id

    # Generate hash-based ID for deterministic identification
    content = json.dumps(datapoints, sort_keys=True)
    hash_value = hashlib.sha256(content.encode()).hexdigest()[:16]
    return f"EXT-{hash_value}"

prepare_external_dataset

prepare_external_dataset(
    datapoints: List[Dict[str, Any]],
    custom_dataset_id: Optional[str] = None,
) -> Tuple[str, List[str]]

Prepare external dataset with EXT- IDs.

This function generates a dataset ID and datapoint IDs for an external dataset, ensuring all IDs have the EXT- prefix.

Parameters:

Name Type Description Default
datapoints List[Dict[str, Any]]

List of datapoint dictionaries

required
custom_dataset_id Optional[str]

Optional custom dataset ID

None

Returns:

Type Description
Tuple[str, List[str]]

Tuple of (dataset_id, datapoint_ids)

Examples:

>>> datapoints = [
...     {"inputs": {"query": "test1"}},
...     {"inputs": {"query": "test2"}}
... ]
>>> dataset_id, datapoint_ids = prepare_external_dataset(datapoints)
>>> dataset_id.startswith("EXT-")
True
>>> all(dp_id.startswith("EXT-") for dp_id in datapoint_ids)
True
Source code in src/honeyhive/experiments/utils.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def prepare_external_dataset(
    datapoints: List[Dict[str, Any]], custom_dataset_id: Optional[str] = None
) -> Tuple[str, List[str]]:
    """
    Prepare external dataset with EXT- IDs.

    This function generates a dataset ID and datapoint IDs for an external
    dataset, ensuring all IDs have the EXT- prefix.

    Args:
        datapoints: List of datapoint dictionaries
        custom_dataset_id: Optional custom dataset ID

    Returns:
        Tuple of (dataset_id, datapoint_ids)

    Examples:
        >>> datapoints = [
        ...     {"inputs": {"query": "test1"}},
        ...     {"inputs": {"query": "test2"}}
        ... ]
        >>> dataset_id, datapoint_ids = prepare_external_dataset(datapoints)
        >>> dataset_id.startswith("EXT-")
        True
        >>> all(dp_id.startswith("EXT-") for dp_id in datapoint_ids)
        True
    """
    # Generate dataset ID
    dataset_id = generate_external_dataset_id(datapoints, custom_dataset_id)

    # Generate datapoint IDs
    datapoint_ids = []
    for idx, dp in enumerate(datapoints):
        # Check if datapoint already has an ID
        custom_dp_id = dp.get("id") or dp.get("datapoint_id")
        dp_id = generate_external_datapoint_id(dp, idx, custom_dp_id)
        datapoint_ids.append(dp_id)

    return dataset_id, datapoint_ids

prepare_run_request_data

prepare_run_request_data(
    run_id: str,
    name: str,
    *,
    dataset_id: Optional[str],
    event_ids: Optional[List[str]] = None,
    datapoint_ids: Optional[List[str]] = None,
    configuration: Optional[Dict[str, Any]] = None,
    metadata: Optional[Dict[str, Any]] = None,
    description: Optional[str] = None,
    results: Optional[Dict[str, Any]] = None,
    status: str = "pending"
) -> Dict[str, Any]

Prepare run request data with EXT- transformation.

CRITICAL: Backend requires special handling for external datasets: - If dataset_id starts with "EXT-": - Move to metadata.offline_dataset_id - Set dataset_id = None (prevents FK constraint error) - Otherwise, use dataset_id normally

Backend Logic (from backend_service/app/services/experiment_run.service.ts):

if (dataset_id && dataset_id.startsWith('EXT-')) {
    metadata = { ...metadata, offline_dataset_id: dataset_id };
    dataset_id = undefined; // Avoid FK constraint
}

Parameters:

Name Type Description Default
run_id str

Experiment run identifier

required
name str

Run name

required
dataset_id Optional[str]

Dataset identifier (may have EXT- prefix)

required
event_ids Optional[List[str]]

List of event/session IDs (optional)

None
configuration Optional[Dict[str, Any]]

Run configuration (optional)

None
metadata Optional[Dict[str, Any]]

Additional metadata (optional)

None
description Optional[str]

Run description (optional)

None
results Optional[Dict[str, Any]]

Run results (optional)

None
status str

Run status (default: "pending")

'pending'

Returns:

Type Description
Dict[str, Any]

Request data dictionary ready for backend API

Examples:

>>> # External dataset
>>> data = prepare_run_request_data(
...     run_id="run-123",
...     name="My Experiment",
...     dataset_id="EXT-abc123"
... )
>>> data["dataset_id"]  # None (moved to metadata)
>>> data["metadata"]["offline_dataset_id"]
'EXT-abc123'
>>> # HoneyHive dataset
>>> data = prepare_run_request_data(
...     run_id="run-123",
...     name="My Experiment",
...     dataset_id="ds-789"
... )
>>> data["dataset_id"]
'ds-789'
Source code in src/honeyhive/experiments/utils.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def prepare_run_request_data(
    run_id: str,
    name: str,
    *,
    dataset_id: Optional[str],
    event_ids: Optional[List[str]] = None,
    datapoint_ids: Optional[List[str]] = None,
    configuration: Optional[Dict[str, Any]] = None,
    metadata: Optional[Dict[str, Any]] = None,
    description: Optional[str] = None,
    results: Optional[Dict[str, Any]] = None,
    status: str = "pending",
) -> Dict[str, Any]:
    """
    Prepare run request data with EXT- transformation.

    CRITICAL: Backend requires special handling for external datasets:
    - If dataset_id starts with "EXT-":
      - Move to metadata.offline_dataset_id
      - Set dataset_id = None (prevents FK constraint error)
    - Otherwise, use dataset_id normally

    Backend Logic (from backend_service/app/services/experiment_run.service.ts):
    ```typescript
    if (dataset_id && dataset_id.startsWith('EXT-')) {
        metadata = { ...metadata, offline_dataset_id: dataset_id };
        dataset_id = undefined; // Avoid FK constraint
    }
    ```

    Args:
        run_id: Experiment run identifier
        name: Run name
        dataset_id: Dataset identifier (may have EXT- prefix)
        event_ids: List of event/session IDs (optional)
        configuration: Run configuration (optional)
        metadata: Additional metadata (optional)
        description: Run description (optional)
        results: Run results (optional)
        status: Run status (default: "pending")

    Returns:
        Request data dictionary ready for backend API

    Examples:
        >>> # External dataset
        >>> data = prepare_run_request_data(
        ...     run_id="run-123",
        ...     name="My Experiment",
        ...     dataset_id="EXT-abc123"
        ... )
        >>> data["dataset_id"]  # None (moved to metadata)
        >>> data["metadata"]["offline_dataset_id"]
        'EXT-abc123'

        >>> # HoneyHive dataset
        >>> data = prepare_run_request_data(
        ...     run_id="run-123",
        ...     name="My Experiment",
        ...     dataset_id="ds-789"
        ... )
        >>> data["dataset_id"]
        'ds-789'
    """
    # Initialize request data
    request_data: Dict[str, Any] = {
        "name": name,
        "run_id": run_id,
        "event_ids": event_ids or [],
        "datapoint_ids": datapoint_ids or [],
        "configuration": configuration or {},
        "metadata": metadata or {},
        "status": status,
    }

    # Add optional fields if provided
    if description:
        request_data["description"] = description
    if results:
        request_data["results"] = results

    # Handle EXT- prefix transformation
    if dataset_id and dataset_id.startswith("EXT-"):
        # Store external dataset ID in metadata
        request_data["metadata"]["offline_dataset_id"] = dataset_id
        # Clear dataset_id to avoid FK constraint
        request_data["dataset_id"] = None
    else:
        request_data["dataset_id"] = dataset_id

    return request_data