Source code for honeyhive.api.events

"""Events API module for HoneyHive."""

from typing import Any, Dict, List, Optional, Union

from ..models import CreateEventRequest, Event, EventFilter
from .base import BaseAPI


class CreateEventResponse:  # pylint: disable=too-few-public-methods
    """Response from creating an event.

    Contains the result of an event creation operation including
    the event ID and success status.
    """

    def __init__(self, event_id: str, success: bool):
        """Initialize the response.

        Args:
            event_id: Unique identifier for the created event
            success: Whether the event creation was successful
        """
        self.event_id = event_id
        self.success = success

    @property
    def id(self) -> str:
        """Alias for event_id for compatibility.

        Returns:
            The event ID
        """
        return self.event_id

    @property
    def _id(self) -> str:
        """Alias for event_id for compatibility.

        Returns:
            The event ID
        """
        return self.event_id


class UpdateEventRequest:  # pylint: disable=too-few-public-methods
    """Request for updating an event.

    Contains the fields that can be updated for an existing event.
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        event_id: str,
        *,
        metadata: Optional[Dict[str, Any]] = None,
        feedback: Optional[Dict[str, Any]] = None,
        metrics: Optional[Dict[str, Any]] = None,
        outputs: Optional[Dict[str, Any]] = None,
        config: Optional[Dict[str, Any]] = None,
        user_properties: Optional[Dict[str, Any]] = None,
        duration: Optional[float] = None,
    ):
        """Initialize the update request.

        Args:
            event_id: ID of the event to update
            metadata: Additional metadata for the event
            feedback: User feedback for the event
            metrics: Computed metrics for the event
            outputs: Output data for the event
            config: Configuration data for the event
            user_properties: User-defined properties
            duration: Updated duration in milliseconds
        """
        self.event_id = event_id
        self.metadata = metadata
        self.feedback = feedback
        self.metrics = metrics
        self.outputs = outputs
        self.config = config
        self.user_properties = user_properties
        self.duration = duration


class BatchCreateEventRequest:  # pylint: disable=too-few-public-methods
    """Request for creating multiple events.

    Allows bulk creation of multiple events in a single API call.
    """

    def __init__(self, events: List[CreateEventRequest]):
        """Initialize the batch request.

        Args:
            events: List of events to create
        """
        self.events = events


class BatchCreateEventResponse:  # pylint: disable=too-few-public-methods
    """Response from creating multiple events.

    Contains the results of a bulk event creation operation.
    """

    def __init__(self, event_ids: List[str], success: bool):
        """Initialize the batch response.

        Args:
            event_ids: List of created event IDs
            success: Whether the batch operation was successful
        """
        self.event_ids = event_ids
        self.success = success


[docs] class EventsAPI(BaseAPI): """API for event operations."""
[docs] def create_event(self, event: CreateEventRequest) -> CreateEventResponse: """Create a new event using CreateEventRequest model.""" response = self.client.request( "POST", "/events", json={"event": event.model_dump(mode="json", exclude_none=True)}, ) data = response.json() return CreateEventResponse(event_id=data["event_id"], success=data["success"])
[docs] def create_event_from_dict(self, event_data: dict) -> CreateEventResponse: """Create a new event from event data dictionary (legacy method).""" # Handle both direct event data and nested event data if "event" in event_data: request_data = event_data else: request_data = {"event": event_data} response = self.client.request("POST", "/events", json=request_data) data = response.json() return CreateEventResponse(event_id=data["event_id"], success=data["success"])
[docs] def create_event_from_request( self, event: CreateEventRequest ) -> CreateEventResponse: """Create a new event from CreateEventRequest object.""" response = self.client.request( "POST", "/events", json={"event": event.model_dump(mode="json", exclude_none=True)}, ) data = response.json() return CreateEventResponse(event_id=data["event_id"], success=data["success"])
[docs] async def create_event_async( self, event: CreateEventRequest ) -> CreateEventResponse: """Create a new event asynchronously using CreateEventRequest model.""" response = await self.client.request_async( "POST", "/events", json={"event": event.model_dump(mode="json", exclude_none=True)}, ) data = response.json() return CreateEventResponse(event_id=data["event_id"], success=data["success"])
[docs] async def create_event_from_dict_async( self, event_data: dict ) -> CreateEventResponse: """Create a new event asynchronously from event data dictionary \ (legacy method).""" # Handle both direct event data and nested event data if "event" in event_data: request_data = event_data else: request_data = {"event": event_data} response = await self.client.request_async("POST", "/events", json=request_data) data = response.json() return CreateEventResponse(event_id=data["event_id"], success=data["success"])
[docs] async def create_event_from_request_async( self, event: CreateEventRequest ) -> CreateEventResponse: """Create a new event asynchronously.""" response = await self.client.request_async( "POST", "/events", json={"event": event.model_dump(mode="json", exclude_none=True)}, ) data = response.json() return CreateEventResponse(event_id=data["event_id"], success=data["success"])
[docs] def delete_event(self, event_id: str) -> bool: """Delete an event by ID.""" context = self._create_error_context( operation="delete_event", method="DELETE", path=f"/events/{event_id}", additional_context={"event_id": event_id}, ) with self.error_handler.handle_operation(context): response = self.client.request("DELETE", f"/events/{event_id}") return response.status_code == 200
[docs] async def delete_event_async(self, event_id: str) -> bool: """Delete an event by ID asynchronously.""" context = self._create_error_context( operation="delete_event_async", method="DELETE", path=f"/events/{event_id}", additional_context={"event_id": event_id}, ) with self.error_handler.handle_operation(context): response = await self.client.request_async("DELETE", f"/events/{event_id}") return response.status_code == 200
[docs] def update_event(self, request: UpdateEventRequest) -> None: """Update an event.""" request_data = { "event_id": request.event_id, "metadata": request.metadata, "feedback": request.feedback, "metrics": request.metrics, "outputs": request.outputs, "config": request.config, "user_properties": request.user_properties, "duration": request.duration, } # Remove None values request_data = {k: v for k, v in request_data.items() if v is not None} self.client.request("PUT", "/events", json=request_data)
[docs] async def update_event_async(self, request: UpdateEventRequest) -> None: """Update an event asynchronously.""" request_data = { "event_id": request.event_id, "metadata": request.metadata, "feedback": request.feedback, "metrics": request.metrics, "outputs": request.outputs, "config": request.config, "user_properties": request.user_properties, "duration": request.duration, } # Remove None values request_data = {k: v for k, v in request_data.items() if v is not None} await self.client.request_async("PUT", "/events", json=request_data)
[docs] def create_event_batch( self, request: BatchCreateEventRequest ) -> BatchCreateEventResponse: """Create multiple events using BatchCreateEventRequest model.""" events_data = [ event.model_dump(mode="json", exclude_none=True) for event in request.events ] response = self.client.request( "POST", "/events/batch", json={"events": events_data} ) data = response.json() return BatchCreateEventResponse( event_ids=data["event_ids"], success=data["success"] )
[docs] def create_event_batch_from_list( self, events: List[CreateEventRequest] ) -> BatchCreateEventResponse: """Create multiple events from a list of CreateEventRequest objects.""" events_data = [ event.model_dump(mode="json", exclude_none=True) for event in events ] response = self.client.request( "POST", "/events/batch", json={"events": events_data} ) data = response.json() return BatchCreateEventResponse( event_ids=data["event_ids"], success=data["success"] )
[docs] async def create_event_batch_async( self, request: BatchCreateEventRequest ) -> BatchCreateEventResponse: """Create multiple events asynchronously using BatchCreateEventRequest model.""" events_data = [ event.model_dump(mode="json", exclude_none=True) for event in request.events ] response = await self.client.request_async( "POST", "/events/batch", json={"events": events_data} ) data = response.json() return BatchCreateEventResponse( event_ids=data["event_ids"], success=data["success"] )
[docs] async def create_event_batch_from_list_async( self, events: List[CreateEventRequest] ) -> BatchCreateEventResponse: """Create multiple events asynchronously from a list of \ CreateEventRequest objects.""" events_data = [ event.model_dump(mode="json", exclude_none=True) for event in events ] response = await self.client.request_async( "POST", "/events/batch", json={"events": events_data} ) data = response.json() return BatchCreateEventResponse( event_ids=data["event_ids"], success=data["success"] )
[docs] def list_events( self, event_filters: Union[EventFilter, List[EventFilter]], limit: int = 100, project: Optional[str] = None, page: int = 1, ) -> List[Event]: """List events using EventFilter model with dynamic processing optimization. Uses the proper /events/export POST endpoint as specified in OpenAPI spec. Args: event_filters: EventFilter or list of EventFilter objects with filtering criteria limit: Maximum number of events to return (default: 100) project: Project name to filter by (required by API) page: Page number for pagination (default: 1) Returns: List of Event objects matching the filters Examples: Filter events by type and status:: filters = [ EventFilter( field="event_type", operator="is", value="model", type="string", ), EventFilter( field="error", operator="is not", value=None, type="string", ), ] events = client.events.list_events( event_filters=filters, project="My Project", limit=50 ) """ if not project: raise ValueError("project parameter is required for listing events") # Auto-convert single EventFilter to list if isinstance(event_filters, EventFilter): event_filters = [event_filters] # Build filters array as expected by /events/export endpoint filters = [] for event_filter in event_filters: if ( event_filter.field and event_filter.value is not None and event_filter.operator and event_filter.type ): filter_dict = { "field": str(event_filter.field), "value": str(event_filter.value), "operator": event_filter.operator.value, "type": event_filter.type.value, } filters.append(filter_dict) # Build request body according to OpenAPI spec request_body = { "project": project, "filters": filters, "limit": limit, "page": page, } response = self.client.request("POST", "/events/export", json=request_body) data = response.json() # Dynamic processing: Use universal dynamic processor return self._process_data_dynamically(data.get("events", []), Event, "events")
[docs] def list_events_from_dict( self, event_filter: dict, limit: int = 100 ) -> List[Event]: """List events from filter dictionary (legacy method).""" params = {"limit": limit} params.update(event_filter) response = self.client.request("GET", "/events", params=params) data = response.json() # Dynamic processing: Use universal dynamic processor return self._process_data_dynamically(data.get("events", []), Event, "events")
[docs] def get_events( # pylint: disable=too-many-arguments self, project: str, filters: List[EventFilter], *, date_range: Optional[Dict[str, str]] = None, limit: int = 1000, page: int = 1, ) -> Dict[str, Any]: """Get events using filters via /events/export endpoint. This is the proper way to filter events by session_id and other criteria. Args: project: Name of the project associated with the event filters: List of EventFilter objects to apply date_range: Optional date range filter with $gte and $lte ISO strings limit: Limit number of results (default 1000, max 7500) page: Page number of results (default 1) Returns: Dict containing 'events' list and 'totalEvents' count """ # Convert filters to proper format for API filters_data = [] for filter_obj in filters: filter_dict = filter_obj.model_dump(mode="json", exclude_none=True) # Convert enum values to strings for JSON serialization if "operator" in filter_dict and hasattr(filter_dict["operator"], "value"): filter_dict["operator"] = filter_dict["operator"].value if "type" in filter_dict and hasattr(filter_dict["type"], "value"): filter_dict["type"] = filter_dict["type"].value filters_data.append(filter_dict) request_data = { "project": project, "filters": filters_data, "limit": limit, "page": page, } if date_range: request_data["dateRange"] = date_range response = self.client.request("POST", "/events/export", json=request_data) data = response.json() # Parse events into Event objects events = [Event(**event_data) for event_data in data.get("events", [])] return {"events": events, "totalEvents": data.get("totalEvents", 0)}
[docs] async def list_events_async( self, event_filters: Union[EventFilter, List[EventFilter]], limit: int = 100, project: Optional[str] = None, page: int = 1, ) -> List[Event]: """List events asynchronously using EventFilter model. Uses the proper /events/export POST endpoint as specified in OpenAPI spec. Args: event_filters: EventFilter or list of EventFilter objects with filtering criteria limit: Maximum number of events to return (default: 100) project: Project name to filter by (required by API) page: Page number for pagination (default: 1) Returns: List of Event objects matching the filters Examples: Filter events by type and status:: filters = [ EventFilter( field="event_type", operator="is", value="model", type="string", ), EventFilter( field="error", operator="is not", value=None, type="string", ), ] events = await client.events.list_events_async( event_filters=filters, project="My Project", limit=50 ) """ if not project: raise ValueError("project parameter is required for listing events") # Auto-convert single EventFilter to list if isinstance(event_filters, EventFilter): event_filters = [event_filters] # Build filters array as expected by /events/export endpoint filters = [] for event_filter in event_filters: if ( event_filter.field and event_filter.value is not None and event_filter.operator and event_filter.type ): filter_dict = { "field": str(event_filter.field), "value": str(event_filter.value), "operator": event_filter.operator.value, "type": event_filter.type.value, } filters.append(filter_dict) # Build request body according to OpenAPI spec request_body = { "project": project, "filters": filters, "limit": limit, "page": page, } response = await self.client.request_async( "POST", "/events/export", json=request_body ) data = response.json() return self._process_data_dynamically(data.get("events", []), Event, "events")
[docs] async def list_events_from_dict_async( self, event_filter: dict, limit: int = 100 ) -> List[Event]: """List events asynchronously from filter dictionary (legacy method).""" params = {"limit": limit} params.update(event_filter) response = await self.client.request_async("GET", "/events", params=params) data = response.json() return self._process_data_dynamically(data.get("events", []), Event, "events")