Source code for honeyhive.experiments.evaluators

"""Evaluators framework for HoneyHive experiments.

This module provides the evaluator decorator and supporting infrastructure for
defining and running evaluators in experiments. Port from main branch with
updated imports for complete-refactor tracer architecture.

Key Features:
    - @evaluator and @aevaluator decorators for sync/async evaluators
    - Transformation, aggregation, and checker pipelines
    - Evaluator wrapping and composition
    - Integration with HoneyHive tracer
    - Configurable settings hierarchy

Example:
    >>> from honeyhive.experiments.evaluators import evaluator
    >>>
    >>> @evaluator
    >>> def accuracy(output, ground_truth):
    ...     return 1.0 if output == ground_truth else 0.0
    >>>
    >>> score = accuracy("hello", "hello")  # Returns 1.0

Pylint Suppressions:
    - eval-used: Evaluators use eval() for dynamic expression evaluation
    - too-many-lines: Large module due to comprehensive evaluator framework
    - too-few-public-methods: Helper classes intentionally minimal
"""

# pylint: disable=eval-used,too-many-lines,too-few-public-methods,line-too-long,fixme,used-before-assignment,consider-using-f-string,use-dict-literal,missing-function-docstring,unused-argument,f-string-without-interpolation,no-else-return,consider-merging-isinstance,unused-variable,inconsistent-return-statements,abstract-method,invalid-overridden-method

import asyncio
import concurrent.futures
import functools
import inspect
import json
from dataclasses import dataclass, field, fields
from typing import Any, Callable, Coroutine, Optional

from honeyhive.tracer import atrace, enrich_span, trace


[docs] class TerminalColors: # pylint: disable=too-few-public-methods """ANSI terminal color codes for output formatting.""" HEADER = "\033[95m" OKBLUE = "\033[94m" OKCYAN = "\033[96m" OKGREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m"
# ------------------------------------------------------------------------------ # EVALUATOR SETTINGS # ------------------------------------------------------------------------------ EVALUATOR_SETTINGS_KEYS = [ "wraps", "weight", "asserts", "repeat", "transform", "aggregate", "checker", "target", "evaluate", ] @dataclass class EvalSettings: """Configuration settings for evaluators.""" name: str # config key wraps: Optional[str | dict] = None weight: float = None asserts: bool = None repeat: Optional[int] = None transform: Optional[str] = None aggregate: Optional[str] = None checker: Optional[str] = None target: Optional[str] = None evaluate: Optional[str] = None def copy(self) -> "EvalSettings": """Create a deep copy of the settings.""" return EvalSettings( name=self.name, wraps=self.wraps, weight=self.weight, repeat=self.repeat, asserts=self.asserts, transform=self.transform, aggregate=self.aggregate, checker=self.checker, target=self.target, evaluate=self.evaluate, ) def keys(self): """Return dictionary keys.""" return self.__dict__.keys() # TODO: settings update should replace instead of merge def update(self, eval_settings: Any | None) -> None: """Update settings from dict or EvalSettings instance.""" if eval_settings is None: return if isinstance(eval_settings, dict): update_dict = eval_settings elif isinstance(eval_settings, EvalSettings): update_dict = eval_settings.__dict__ else: raise TypeError( "eval_settings must be either a dictionary or an EvalSettings instance. Got {}".format( type(eval_settings) ) ) valid_fields = {f.name for f in fields(self)} for key, value in update_dict.items(): if key not in valid_fields: raise ValueError(f"Invalid field name: {key}") if value is not None: setattr(self, key, value) @staticmethod def extract_eval_settings_and_kwargs(settings: dict[str, Any] | None): """Extract evaluator settings and kwargs from a combined dict.""" eval_kwargs = dict() eval_settings = dict() if settings is not None: for key, value in settings.items(): if key in EVALUATOR_SETTINGS_KEYS: eval_settings[key] = value else: eval_kwargs[key] = value return eval_settings, eval_kwargs def __str__(self) -> str: """Return string representation of settings.""" dict_str = {k: str(v) for k, v in self.__dict__.items() if v is not None} return json.dumps(dict_str, indent=4).replace('"', "") def dict(self) -> dict: """Convert to dictionary, excluding name.""" ret_dict = self.__dict__ ret_dict.pop("name", None) return ret_dict @dataclass class EvaluatorSettings: """Hierarchical settings management for evaluators.""" name: str # base default settings default_settings: EvalSettings = None default_kwargs: dict = field(default_factory=dict) # settings from defaults.yaml defaults_yaml_settings: EvalSettings = None defaults_yaml_kwargs: dict = field(default_factory=dict) # settings from evaluator init init_settings: EvalSettings = None init_kwargs: dict = field(default_factory=dict) # settings from decorator kwargs deco_settings: EvalSettings = None deco_kwargs: dict = field(default_factory=dict) # settings from config file config_settings: EvalSettings = None config_kwargs: dict = field(default_factory=dict) # settings from runtime explicit_settings: EvalSettings = None explicit_kwargs: dict = field(default_factory=dict) def __post_init__(self): self.default_settings = EvalSettings(name=self.name) self.defaults_yaml_settings = EvalSettings(name=self.name) self.init_settings = EvalSettings(name=self.name) self.deco_settings = EvalSettings(name=self.name) self.config_settings = EvalSettings(name=self.name) self.explicit_settings = None # this must be set at runtime # set defaults self.default_settings.asserts = False self.default_settings.weight = 1.0 def resolve_settings(self, settings: EvalSettings | None = None) -> EvalSettings: """Resolve settings from all sources in priority order.""" if self.explicit_settings: return self.explicit_settings final_settings: EvalSettings = self.default_settings final_settings.update(self.defaults_yaml_settings) final_settings.update(self.init_settings) final_settings.update(self.config_settings) final_settings.update(self.deco_settings) if settings is not None: final_settings.update(settings) return final_settings def resolve_kwargs(self, kwargs: dict | None = None) -> dict: """Resolve kwargs from all sources in priority order.""" if self.explicit_kwargs: return self.explicit_kwargs final_kwargs: dict = self.default_kwargs.copy() final_kwargs.update(self.defaults_yaml_kwargs) final_kwargs.update(self.init_kwargs) final_kwargs.update(self.config_kwargs) final_kwargs.update(self.deco_kwargs) if kwargs is not None: final_kwargs.update(kwargs) return final_kwargs # ------------------------------------------------------------------------------ # EVALUATOR RESULT # ------------------------------------------------------------------------------ class EvalResult: """Result container for evaluator execution.""" def __init__(self, score: Any, init_method: Optional[str] = None, **metadata): self.score: Any | EvalResult = score self.metadata: dict = metadata # determine the eval_type self.init_method = init_method or inspect.stack()[1].function self.eval_settings: Optional[EvalSettings] = EvalSettings(name=self.init_method) self.eval_kwargs: Optional[dict] = dict() # for easy access self.weight = self.eval_settings.weight self.func_impl: Callable = None self.func_args: tuple = None self.func_kwargs: dict = None def to_dict(self) -> dict: """Convert result to dictionary.""" return {"score": self.score, "metadata": self.metadata} def copy(self) -> "EvalResult": copy_result = EvalResult( score=self.score, init_method=self.init_method, **self.metadata ) return copy_result
[docs] class EvaluatorMeta(type): """Metaclass for evaluator accessor pattern.""" def __getattribute__(cls, name): try: return super().__getattribute__(name) except AttributeError: return cls.__class_getitem__(name)
class evaluator(metaclass=EvaluatorMeta): # pylint: disable=invalid-name """Sync evaluator decorator class with pipeline support.""" # ------------------------------------------------------------------------------ # STATICS / INITIALIZE # ------------------------------------------------------------------------------ # global registry of evaluator names to evaluator instances all_evaluators: dict[str, "evaluator" | Callable | Coroutine | "aevaluator"] = ( dict() ) # global registry of evaluator names to evaluator settings all_evaluator_settings: dict[str, EvaluatorSettings] = dict() def __unnamed__(self, *args, **kwargs): """Placeholder for unnamed evaluator.""" raise NotImplementedError(f"Please decorate with an evaluator implementation.") def __new__( cls, func=None, eval_settings=None, eval_kwargs=None, **deco_settings_kwargs ) -> "evaluator": """Allows evaluator to be initialized in the decorator with kwargs""" if func is None: return lambda f: cls(f, eval_settings, eval_kwargs, **deco_settings_kwargs) return super().__new__(cls) def __init__( self, func: Callable = __unnamed__, eval_settings: EvalSettings | None = None, eval_kwargs: dict[str, Any] | None = None, **deco_settings_kwargs, ) -> None: # set the wrapped function implementation and its name self.func: Callable = func # set the evaluator name self.name: str = func.__name__ if hasattr(func, "__name__") else str(func) # register all_evaluators[func name] = this evaluator self.all_evaluators[self.name] = self # initialize the evaluator settings if self.name not in evaluator.all_evaluator_settings: evaluator.all_evaluator_settings[self.name] = EvaluatorSettings(self.name) # get the settings evaluator_settings = evaluator.all_evaluator_settings[self.name] # init settings if eval_settings: eval_settings.name = self.name else: eval_settings = EvalSettings(name=self.name) evaluator_settings.init_settings = eval_settings evaluator_settings.init_kwargs = eval_kwargs or {} # decorator kwargs kwarg_eval_settings, kwarg_eval_kwargs = ( EvalSettings.extract_eval_settings_and_kwargs(deco_settings_kwargs) ) kwarg_eval_settings["name"] = self.name evaluator_settings.deco_settings = EvalSettings(**kwarg_eval_settings) evaluator_settings.deco_kwargs = kwarg_eval_kwargs self.explicit_config = None # sets decorator metadata to that of wrapped function functools.update_wrapper(self, func) functools.update_wrapper(func, self) # ------------------------------------------------------------------------------ # AGGREGATION def pre_apply_aggregation( self, eval_results: tuple[EvalResult] | list[EvalResult], eval_scores: tuple | list, final_settings: EvalSettings, ) -> tuple[EvalResult, Any] | Coroutine: locals_dict = {"values": eval_scores, "results": eval_results} # TODO: enable aggregate to be a function aggregation_expr = str(final_settings.aggregate) # apply aggregation aggregate_score = eval(aggregation_expr, evaluator.all_evaluators, locals_dict) return aggregate_score def post_apply_aggregation( self, eval_results: tuple[EvalResult] | list[EvalResult] | EvalResult, aggregate_score: Any, final_settings: EvalSettings, ): """Wrap aggregated score in EvalResult.""" init_methods = set() # if no repetitions, we will only have one eval result if isinstance(eval_results, EvalResult): init_methods.add(eval_results.init_method) else: for eval_result in eval_results: if isinstance(eval_result, EvalResult): init_methods.add(eval_result.init_method) init_method = "aggregate: " if len(init_methods) > 0: init_method += "-".join(init_methods) aggregate_result = EvalResult( aggregate_score, init_method=init_method, prev_results=eval_results ) return aggregate_result def sync_apply_aggregation( self, eval_results: tuple[EvalResult] | list[EvalResult] | EvalResult, eval_scores: tuple | list | Any, final_settings: EvalSettings, ) -> tuple[EvalResult, Any]: """Synchronously apply aggregation to results.""" if not final_settings.aggregate: return eval_results, eval_scores aggregate_score = self.pre_apply_aggregation( eval_results, eval_scores, final_settings ) aggregate_score = evaluator.resolve_pipeline(aggregate_score, eval_scores) aggregate_result = self.post_apply_aggregation( eval_results, aggregate_score, final_settings ) return aggregate_result, aggregate_score async def async_apply_aggregation( self, eval_results: tuple[EvalResult] | list[EvalResult], eval_scores: tuple | list, final_settings: EvalSettings, ) -> tuple[EvalResult, Any]: """Asynchronously apply aggregation to results.""" if not final_settings.aggregate: return eval_results, eval_scores aggregate_score = self.pre_apply_aggregation( eval_results, eval_scores, final_settings ) aggregate_score = await evaluator.aresolve_pipeline( aggregate_score, eval_scores ) aggregate_result = self.post_apply_aggregation( eval_results, aggregate_score, final_settings ) return aggregate_result, aggregate_score # ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------ # TRANSFORMATION def pre_apply_transformation( self, eval_result: EvalResult, eval_score: Any, final_settings: EvalSettings, ): """Apply transformation expression to evaluation score.""" transform_expr = str(final_settings.transform) locals_dict = {"value": eval_score, "result": eval_result} # apply transformation transformed_score = eval(transform_expr, evaluator.all_evaluators, locals_dict) return transformed_score def post_apply_transformation( self, eval_result: EvalResult, transformed_score: Any, final_settings: EvalSettings, ): """Wrap transformed score in EvalResult.""" init_method = "transform: " + eval_result.init_method transformed_result = EvalResult( transformed_score, init_method=init_method, prev_result=eval_result ) transformed_result.weight = final_settings.weight return transformed_result async def async_apply_transformation( self, eval_result: EvalResult, eval_score: Any, final_settings: EvalSettings, ) -> tuple[EvalResult, Any]: """Asynchronously apply transformation to result.""" if not final_settings.transform: return eval_result, eval_score transformed_score = self.pre_apply_transformation( eval_result, eval_score, final_settings ) transformed_score = await evaluator.aresolve_pipeline( transformed_score, eval_score ) transformed_result = self.post_apply_transformation( eval_result, transformed_score, final_settings ) return transformed_result, transformed_score def sync_apply_transformation( self, eval_result: EvalResult, eval_score: Any, final_settings: EvalSettings, ) -> tuple[EvalResult, Any]: """Synchronously apply transformation to result.""" if not final_settings.transform: return eval_result, eval_score transformed_score = self.pre_apply_transformation( eval_result, eval_score, final_settings ) transformed_score = evaluator.resolve_pipeline(transformed_score, eval_score) transformed_result = self.post_apply_transformation( eval_result, transformed_score, final_settings ) return transformed_result, transformed_score # ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------ # CHECKER def pre_run_checker( self, eval_result: EvalResult, eval_score: Any, final_settings: EvalSettings, ) -> bool: """Evaluate checker expression against score.""" checker_expr = str(final_settings.checker) locals_dict = { "value": eval_score, "result": eval_result, "target": final_settings.target, } # evaluate checker checker_score = eval(checker_expr, evaluator.all_evaluators, locals_dict) return checker_score def post_run_checker( self, eval_result: EvalResult, eval_score: Any, final_settings: EvalSettings, checker_score: Any = None, ) -> bool: """Process checker result and optionally run assertions.""" if final_settings.asserts: assert ( checker_score ), f"Assertion failed: score {eval_score} is not in range {final_settings.target}" init_method = "checker: " + eval_result.init_method checker_result = EvalResult( checker_score, init_method=init_method, prev_result=eval_result ) return checker_result def run_asserts( self, eval_score: Any, final_settings: EvalSettings, ) -> bool: if final_settings.target is None: failure_message = f"Assertion failed: score {eval_score}" else: failure_message = f"Assertion failed: score {eval_score} is not in range {final_settings.target}" assert eval_score, failure_message def run_checker( self, eval_result: EvalResult, eval_score: Any, final_settings: EvalSettings, ) -> bool: """Synchronously run checker logic on evaluation result.""" if not final_settings.checker: if not final_settings.asserts: return eval_result, eval_score self.run_asserts(eval_score, final_settings) return eval_result, eval_score checker_score = self.pre_run_checker(eval_result, eval_score, final_settings) checker_score = evaluator.resolve_pipeline( checker_score, eval_score, final_settings.target ) checker_result = self.post_run_checker( eval_result, eval_score, final_settings, checker_score ) return checker_result, checker_score async def arun_checker( self, eval_result: EvalResult, eval_score: Any, final_settings: EvalSettings, ) -> bool: """Asynchronously run checker logic on evaluation result.""" if not final_settings.checker: if not final_settings.asserts: return eval_result, eval_score self.run_asserts(eval_score, final_settings) return eval_result, eval_score checker_score = self.pre_run_checker(eval_result, eval_score, final_settings) checker_score = await evaluator.aresolve_pipeline( checker_score, eval_score, final_settings.target ) checker_result = self.post_run_checker( eval_result, eval_score, final_settings, checker_score ) return checker_result, checker_score # ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------ # WRAPPING @staticmethod def parse_wraps(wraps: str | dict | None | Any): """Parse wraps parameter into evaluator name and settings.""" if wraps is None: return None, {} if isinstance(wraps, str): return wraps, {} elif isinstance(wraps, dict): # assert that there is a single key of type str assert len(wraps) == 1 and isinstance( list(wraps.keys())[0], str ), "wraps must be a single key of type str" wrapped_eval_name = list(wraps.keys())[0] wrapped_eval_settings_kwargs = wraps[wrapped_eval_name] return wrapped_eval_name, wrapped_eval_settings_kwargs else: raise ValueError(f"Invalid wraps type: {type(wraps)}") @staticmethod def create_wrapper( base_callable: "evaluator", wrapped_eval_settings: EvalSettings, wrapped_eval_kwargs: dict, wrapper_name: str, ) -> Callable: """ Create a wrapper function for an evaluator, given the base evaluator, the wrapped evaluator settings, the wrapped evaluator kwargs, and the wrapper name. The wrapped_eval / base_callable's settings and kwargs update any previous settings and kwargs. The wrapper's settings do NOT update the wrapped evaluator's settings. The wrapper's kwargs DO update the wrapped evaluator's kwargs. The final settings and kwargs are passed into the wrapped evaluator during calltime. Due to the ordering of the dict unpacking, the wrapper's kwargs will update the wrapped evaluator's kwargs. The settings are also passed as kwargs into the base callable. Args: base_callable (evaluator): The base evaluator to be wrapped. wrapped_eval_settings (EvalSettings): Settings for the wrapped evaluator. wrapped_eval_kwargs (dict): Additional keyword arguments for the wrapped evaluator. wrapper_name (str): Name for the wrapper function. Returns: Callable: A wrapper function that calls the base evaluator with the provided settings and arguments. """ base_callable_settings = wrapped_eval_settings.copy() base_callable_kwargs = wrapped_eval_kwargs.copy() if asyncio.iscoroutinefunction(base_callable.func): async def afunc(*args, **kwargs): return await base_callable( *args, **{ **base_callable_settings.dict(), **base_callable_kwargs, **kwargs, }, ) afunc.__name__ = wrapper_name return afunc def func(*args, **kwargs): return base_callable( *args, **{**base_callable_settings.dict(), **base_callable_kwargs, **kwargs}, ) func.__name__ = wrapper_name return func @staticmethod def create_wrapped_evaluator(evaluator_settings: EvaluatorSettings) -> None: wrapper_name = evaluator_settings.name wrapper_settings = evaluator_settings.resolve_settings() wrapper_kwargs = evaluator_settings.resolve_kwargs() # parse the wrapped evaluator wrapped_eval_name, wrapped_eval_settings_kwargs = evaluator.parse_wraps( wrapper_settings.wraps ) assert isinstance( wrapped_eval_name, str ), f"wrapped evaluator name must be a string but got: {type(wrapped_eval_name)}" wrapped_eval_settings, wrapped_eval_kwargs = ( EvalSettings.extract_eval_settings_and_kwargs(wrapped_eval_settings_kwargs) ) # create the wrapped evaluator settings if not already created # this can happen if the wrapped evaluator is not defined in any config file if wrapped_eval_name not in evaluator.all_evaluator_settings: evaluator.all_evaluator_settings[wrapped_eval_name] = EvaluatorSettings( name=wrapped_eval_name ) # get the wrapped evaluator. It must be a registered evaluator base_callable = eval( wrapped_eval_name, evaluator.all_evaluators, ) # TODO: if the base callable is not a Callable but # not an evaluator, we need to wrap it as well assert isinstance(base_callable, evaluator) # update the wrapped evaluator settings with the wrapper settings final_wrapped_eval_settings = ( evaluator.all_evaluator_settings[wrapped_eval_name] .resolve_settings() .copy() ) final_wrapped_eval_settings.update(wrapped_eval_settings) # update the wrapped evaluator kwargs with the wrapper kwargs final_wrapped_eval_kwargs = ( evaluator.all_evaluator_settings[wrapped_eval_name].resolve_kwargs().copy() ) final_wrapped_eval_kwargs.update(wrapped_eval_kwargs) # the wrapper's kwargs merge with the wrapped evaluator's kwargs, # but the settings don't. final_wrapped_eval_kwargs.update(wrapper_kwargs) if asyncio.iscoroutinefunction(base_callable.func): afunc: Coroutine = evaluator.create_wrapper( base_callable, final_wrapped_eval_settings, final_wrapped_eval_kwargs, wrapper_name, ) # initialize and register the wrapper eval aevaluator(func=afunc) else: # make the wrapper eval func: Callable = evaluator.create_wrapper( base_callable, final_wrapped_eval_settings, final_wrapped_eval_kwargs, wrapper_name, ) # initialize and register the wrapper eval evaluator(func=func) # ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------ # CALLING @staticmethod async def aresolve_pipeline(score, *args, **kwargs): """Asynchronously resolve pipeline of evaluators.""" if asyncio.iscoroutinefunction(score) or isinstance(score, aevaluator): score = await score(*args, **kwargs) elif isinstance(score, Callable): score = score(*args, **kwargs) return score @staticmethod def resolve_pipeline(score, *args, **kwargs): """Synchronously resolve pipeline of evaluators.""" # string evaluated to a function if isinstance(score, Callable) or isinstance(score, evaluator): score = score(*args, **kwargs) return score def get_final_settings_and_kwargs(self, call_kwargs): """Extract and merge final settings and kwargs for execution.""" eval_settings, eval_kwargs = EvalSettings.extract_eval_settings_and_kwargs( call_kwargs ) explicit_settings, explicit_kwargs = ( EvalSettings.extract_eval_settings_and_kwargs(self.explicit_config) ) # calltime copy final_settings = self.settings.copy() final_settings.update(eval_settings) final_settings.update(explicit_settings) final_kwargs = self.kwargs.copy() final_kwargs.update(eval_kwargs) final_kwargs.update(explicit_kwargs) return final_settings, final_kwargs @atrace(event_type="chain", event_name="Evaluation") async def async_call(self, *call_args, **call_kwargs): final_settings, final_kwargs = self.get_final_settings_and_kwargs(call_kwargs) async def asingle_evaluation() -> tuple[EvalResult, Any]: # run the evaluator score = await atrace(self.func)(*call_args, **final_kwargs) result = EvalResult( score=score, init_method=self.name, eval_settings=final_settings, eval_kwargs=final_kwargs, func_impl=f"{self.func.__module__}.{self.func.__name__}", func_args=call_args, func_kwargs=call_kwargs, ) enrich_span( inputs={ "score": score, }, outputs={ "result": result, }, config={ "final_settings": str(final_settings), "final_kwargs": final_kwargs, }, metadata={ "func": self.name, "func_impl": f"{self.func.__module__}.{self.func.__name__}", "func_args": call_args, "func_kwargs": call_kwargs, }, ) # transform transformed_result, transformed_score = ( await self.async_apply_transformation(result, score, final_settings) ) # check target on transform if aggregate not defined if not final_settings.aggregate: checker_result, checker_score = await self.arun_checker( eval_result=transformed_result, eval_score=transformed_score, final_settings=final_settings, ) return checker_result, checker_score return transformed_result, transformed_score # execute repetition if final_settings.repeat: # Parallel evaluation results_scores = await asyncio.gather( *(asingle_evaluation() for _ in range(final_settings.repeat)) ) results, scores = zip(*results_scores) results = tuple(results) scores = tuple(scores) else: _, scores = await asingle_evaluation() # apply aggregation aggregate_result, aggregate_score = await self.async_apply_aggregation( results, scores, final_settings ) # check target on aggregate if aggregate defined if final_settings.aggregate: checker_result, checker_score = await self.arun_checker( eval_result=aggregate_result, eval_score=aggregate_score, final_settings=final_settings, ) return checker_result, checker_score return aggregate_result, aggregate_score @trace(event_type="chain", event_name="Evaluation") def sync_call(self, *call_args, **call_kwargs): final_settings, final_kwargs = self.get_final_settings_and_kwargs(call_kwargs) def single_evaluation() -> tuple[EvalResult, Any]: # run the evaluator score = self.func(*call_args, **final_kwargs) result = EvalResult( score=score, init_method=self.name, eval_settings=final_settings, eval_kwargs=final_kwargs, func_impl=f"{self.func.__module__}.{self.func.__name__}", func_args=call_args, func_kwargs=call_kwargs, ) enrich_span( inputs={ "score": score, }, outputs={ "result": result, }, config={ "final_settings": str(final_settings), "final_kwargs": final_kwargs, }, metadata={ "func": self.name, "func_impl": f"{self.func.__module__}.{self.func.__name__}", "func_args": call_args, "func_kwargs": call_kwargs, }, ) # transform transformed_result, transformed_score = self.sync_apply_transformation( result, score, final_settings ) # check target on transform if aggregate not defined if not final_settings.aggregate: checker_result, checker_score = self.run_checker( eval_result=transformed_result, eval_score=transformed_score, final_settings=final_settings, ) return checker_result, checker_score return transformed_result, transformed_score # execute repetition # TODO: add option for sequential evaluation since thread pools may not work for asyncio if final_settings.repeat: # Serial evaluation with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ executor.submit(single_evaluation) for _ in range(final_settings.repeat) ] results, scores = zip(*[future.result() for future in futures]) results = tuple(results) scores = tuple(scores) else: _, scores = single_evaluation() # apply aggregation aggregate_result, aggregate_score = self.sync_apply_aggregation( results, scores, final_settings ) # check target on aggregate if aggregate defined if final_settings.aggregate: checker_result, checker_score = self.run_checker( eval_result=aggregate_result, eval_score=aggregate_score, final_settings=final_settings, ) return checker_result, checker_score return aggregate_result, aggregate_score def __call__(self, *args, **kwargs) -> Any: # RUN EVALUATOR results, scores = None, None assert not asyncio.iscoroutinefunction( self.func ), "please use @aevaluator instead of @evaluator for this function" results, scores = self.sync_call(*args, **kwargs) return scores async def __acall__(self, *args, **kwargs) -> Any: # RUN EVALUATOR results, scores = None, None if asyncio.iscoroutinefunction(self.func): results, scores = await self.async_call(*args, **kwargs) else: results, scores = self.sync_call(*args, **kwargs) return scores def raw(self, *args, **kwargs): """Execute wrapped function without evaluator pipeline.""" return self.func(*args, **kwargs) async def araw(self, *args, **kwargs): """Asynchronously execute wrapped function without pipeline.""" return await self.func(*args, **kwargs) # ------------------------------------------------------------------------------ # PROPERTIES # ------------------------------------------------------------------------------ @property def settings(self) -> EvalSettings: if self.name not in evaluator.all_evaluator_settings: evaluator.all_evaluator_settings[self.name] = EvaluatorSettings( name=self.name ) return evaluator.all_evaluator_settings[self.name].resolve_settings() @settings.setter def settings(self, value: EvalSettings): assert isinstance(value, EvalSettings) if self.name not in evaluator.all_evaluator_settings: evaluator.all_evaluator_settings[self.name] = EvaluatorSettings( name=self.name ) evaluator.all_evaluator_settings[self.name].explicit_settings = value @property def kwargs(self) -> dict[str, Any]: if self.name not in evaluator.all_evaluator_settings: evaluator.all_evaluator_settings[self.name] = EvaluatorSettings( name=self.name ) return evaluator.all_evaluator_settings[self.name].resolve_kwargs() @kwargs.setter def kwargs(self, value: dict): assert isinstance(value, dict) if self.name not in evaluator.all_evaluator_settings: evaluator.all_evaluator_settings[self.name] = EvaluatorSettings( name=self.name ) evaluator.all_evaluator_settings[self.name].explicit_kwargs = value @property def config(self) -> dict[str, Any]: if self.explicit_config is None: self.explicit_config = {**self.settings.dict(), **self.kwargs} return self.explicit_config @config.setter def config(self, value: Any): if value is None: self.explicit_config = None else: raise NotImplementedError # ------------------------------------------------------------------------------ # ACCESSORS # ------------------------------------------------------------------------------ @classmethod def _validate_key(cls, key: str | Callable | None) -> str: """Validate and normalize evaluator registry key.""" if isinstance(key, str): return key elif isinstance(key, Callable): if hasattr(key, "__name__"): return key.__name__ else: return str(key) else: raise KeyError(f"Invalid key type: {type(key)}") @classmethod def __class_getitem__(cls, keys): """ Get an evaluator by name or Callable. """ if isinstance(keys, (str, Callable)): # Ensure that key is a string key: str = cls._validate_key(keys) if key in cls.all_evaluators: return cls.all_evaluators[key] elif key in cls.all_evaluator_settings: # if the evaluator is wrapped, initialize and register the wrapper if (evaluator_settings := cls.all_evaluator_settings[key]) is not None: # initialize and register the wrapper evaluator.create_wrapped_evaluator(evaluator_settings) return cls.all_evaluators[key] else: raise KeyError(f"Key '{key}' not found in evaluators or config.") elif isinstance(keys, tuple): # Multiple key access return [cls.__class_getitem__(key) for key in keys] else: raise KeyError(f"Invalid key type: {type(keys)}") @classmethod def __class_setitem__(cls, key, value): key = cls._validate_key(key) cls.all_evaluators[key] = value @classmethod def __class_delitem__(cls, key): key = cls._validate_key(key) del cls.all_evaluators[key] @property def __code__(self): return self.func.__code__ class aevaluator(evaluator): # pylint: disable=invalid-name """Async evaluator decorator class.""" async def __call__(self, *args, **kwargs): return await self.__acall__(*args, **kwargs) async def raw(self, *args, **kwargs): return await self.araw(*args, **kwargs) # ------------------------------------------------------------------------------ # EVALUATORS # ------------------------------------------------------------------------------ @evaluator def mean(scores: list[float]) -> float: """Calculate mean of scores.""" return sum(scores) / len(scores) @evaluator def median(scores: list[float]) -> float: """Calculate median of scores.""" return sorted(scores)[len(scores) // 2] @evaluator def mode(scores: list[float]) -> float: """Calculate mode of scores.""" return max(set(scores), key=scores.count)