From 429836e91fd714deed38ea586374cd8f8152b528 Mon Sep 17 00:00:00 2001 From: radu-mocanu Date: Fri, 20 Feb 2026 17:01:09 +0200 Subject: [PATCH] feat: event bus, triggers --- pyproject.toml | 2 +- src/uipath/core/events/__init__.py | 5 + src/uipath/core/events/_event_bus.py | 157 ++++++++++++++++++++++ src/uipath/core/serialization/__init__.py | 4 +- src/uipath/core/serialization/json.py | 40 +++++- src/uipath/core/triggers/__init__.py | 15 +++ src/uipath/core/triggers/trigger.py | 69 ++++++++++ uv.lock | 2 +- 8 files changed, 289 insertions(+), 5 deletions(-) create mode 100644 src/uipath/core/events/__init__.py create mode 100644 src/uipath/core/events/_event_bus.py create mode 100644 src/uipath/core/triggers/__init__.py create mode 100644 src/uipath/core/triggers/trigger.py diff --git a/pyproject.toml b/pyproject.toml index aea629d..0521db1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-core" -version = "0.5.1" +version = "0.5.2" description = "UiPath Core abstractions" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath/core/events/__init__.py b/src/uipath/core/events/__init__.py new file mode 100644 index 0000000..5c387ff --- /dev/null +++ b/src/uipath/core/events/__init__.py @@ -0,0 +1,5 @@ +"""This module contains the event bus implementation.""" + +from uipath.core.events._event_bus import EventBus + +__all__ = ["EventBus"] diff --git a/src/uipath/core/events/_event_bus.py b/src/uipath/core/events/_event_bus.py new file mode 100644 index 0000000..e99d6ff --- /dev/null +++ b/src/uipath/core/events/_event_bus.py @@ -0,0 +1,157 @@ +"""Event bus implementation for runtime events.""" + +import asyncio +import logging +from typing import Any, Callable, TypeVar + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + + +class EventBus: + """Event bus for publishing and subscribing to events.""" + + def __init__(self) -> None: + """Initialize a new EventBus instance.""" + self._subscribers: dict[str, list[Callable[[Any], Any]]] = {} + self._running_tasks: set[asyncio.Task[Any]] = set() + + def subscribe(self, topic: str, handler: Callable[[Any], Any]) -> None: + """Subscribe a handler method/function to a topic. + + Args: + topic: The topic name to subscribe to. + handler: The async handler method/function that will handle events for this topic. + """ + if topic not in self._subscribers: + self._subscribers[topic] = [] + self._subscribers[topic].append(handler) + logger.debug(f"Handler registered for topic: {topic}") + + def unsubscribe(self, topic: str, handler: Callable[[Any], Any]) -> None: + """Unsubscribe a handler from a topic. + + Args: + topic: The topic name to unsubscribe from. + handler: The handler to remove. + """ + if topic in self._subscribers: + try: + self._subscribers[topic].remove(handler) + if not self._subscribers[topic]: + del self._subscribers[topic] + logger.debug(f"Handler unregistered from topic: {topic}") + except ValueError: + logger.warning(f"Handler not found for topic: {topic}") + + def _cleanup_completed_tasks(self) -> None: + completed_tasks = {task for task in self._running_tasks if task.done()} + self._running_tasks -= completed_tasks + + async def publish( + self, topic: str, payload: T, wait_for_completion: bool = True + ) -> None: + """Publish an event to all handlers of a topic. + + Args: + topic: The topic name to publish to. + payload: The event payload to publish. + wait_for_completion: Whether to wait for the event to be processed. + """ + if topic not in self._subscribers: + logger.debug(f"No handlers for topic: {topic}") + return + + self._cleanup_completed_tasks() + + tasks = [] + for subscriber in self._subscribers[topic]: + try: + task = asyncio.create_task(subscriber(payload)) + tasks.append(task) + self._running_tasks.add(task) + except Exception as e: + logger.error(f"Error creating task for subscriber {subscriber}: {e}") + + if tasks and wait_for_completion: + try: + await asyncio.gather(*tasks, return_exceptions=True) + except Exception as e: + logger.error(f"Error during event processing for topic {topic}: {e}") + finally: + # Clean up the tasks we just waited for + for task in tasks: + self._running_tasks.discard(task) + + def get_running_tasks_count(self) -> int: + """Get the number of currently running subscriber tasks. + + Returns: + Number of running tasks. + """ + self._cleanup_completed_tasks() + return len(self._running_tasks) + + async def wait_for_all(self, timeout: float | None = None) -> None: + """Wait for all currently running subscriber tasks to complete. + + Args: + timeout: Maximum time to wait in seconds. If None, waits indefinitely. + """ + self._cleanup_completed_tasks() + + if not self._running_tasks: + logger.debug("No running tasks to wait for") + return + + logger.debug( + f"Waiting for {len(self._running_tasks)} EventBus tasks to complete..." + ) + + try: + tasks_to_wait = list(self._running_tasks) + + if timeout: + await asyncio.wait_for( + asyncio.gather(*tasks_to_wait, return_exceptions=True), + timeout=timeout, + ) + else: + await asyncio.gather(*tasks_to_wait, return_exceptions=True) + + logger.debug("All EventBus tasks completed") + + except asyncio.TimeoutError: + logger.warning(f"Timeout waiting for EventBus tasks after {timeout}s") + for task in tasks_to_wait: + if not task.done(): + task.cancel() + except Exception as e: + logger.error(f"Error waiting for EventBus tasks: {e}") + finally: + self._cleanup_completed_tasks() + + def get_subscribers_count(self, topic: str) -> int: + """Get the number of subscribers for a topic. + + Args: + topic: The topic name. + + Returns: + Number of handlers for the topic. + """ + return len(self._subscribers.get(topic, [])) + + def clear_subscribers(self, topic: str | None = None) -> None: + """Clear subscribers for a topic or all topics. + + Args: + topic: The topic to clear. If None, clears all topics. + """ + if topic is None: + self._subscribers.clear() + logger.debug("All handlers cleared") + elif topic in self._subscribers: + del self._subscribers[topic] + logger.debug(f"Handlers cleared for topic: {topic}") diff --git a/src/uipath/core/serialization/__init__.py b/src/uipath/core/serialization/__init__.py index aa26e75..9296b10 100644 --- a/src/uipath/core/serialization/__init__.py +++ b/src/uipath/core/serialization/__init__.py @@ -1,5 +1,5 @@ """Serialization utilities for converting Python objects to various formats.""" -from .json import serialize_defaults, serialize_json +from .json import serialize_defaults, serialize_json, serialize_object -__all__ = ["serialize_defaults", "serialize_json"] +__all__ = ["serialize_defaults", "serialize_json", "serialize_object"] diff --git a/src/uipath/core/serialization/json.py b/src/uipath/core/serialization/json.py index a29c3b1..a3eebc4 100644 --- a/src/uipath/core/serialization/json.py +++ b/src/uipath/core/serialization/json.py @@ -1,8 +1,9 @@ """JSON serialization utilities for converting Python objects to JSON formats.""" import json +import uuid from dataclasses import asdict, is_dataclass -from datetime import datetime, timezone +from datetime import date, datetime, time, timezone from enum import Enum from typing import Any, cast from zoneinfo import ZoneInfo @@ -156,3 +157,40 @@ def serialize_json(obj: Any) -> str: '{"name": "Review PR", "created": "2024-01-15T10:30:00"}' """ return json.dumps(obj, default=serialize_defaults) + + +def serialize_object(obj): + """Recursively serializes an object and all its nested components.""" + # Handle Pydantic models + if hasattr(obj, "model_dump"): + return serialize_object(obj.model_dump(by_alias=True)) + elif hasattr(obj, "dict"): + return serialize_object(obj.dict()) + elif hasattr(obj, "to_dict"): + return serialize_object(obj.to_dict()) + # Special handling for UiPathBaseRuntimeErrors + elif hasattr(obj, "as_dict"): + return serialize_object(obj.as_dict) + elif isinstance(obj, (datetime, date, time)): + return obj.isoformat() + # Handle dictionaries + elif isinstance(obj, dict): + return {k: serialize_object(v) for k, v in obj.items()} + # Handle lists + elif isinstance(obj, list): + return [serialize_object(item) for item in obj] + # Handle exceptions + elif isinstance(obj, Exception): + return str(obj) + # Handle other iterable objects (convert to dict first) + elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes)): + try: + return serialize_object(dict(obj)) + except (TypeError, ValueError): + return obj + # UUIDs must be serialized explicitly + elif isinstance(obj, uuid.UUID): + return str(obj) + # Return primitive types as is + else: + return obj diff --git a/src/uipath/core/triggers/__init__.py b/src/uipath/core/triggers/__init__.py new file mode 100644 index 0000000..4004622 --- /dev/null +++ b/src/uipath/core/triggers/__init__.py @@ -0,0 +1,15 @@ +"""Module containing UiPath trigger definitions.""" + +__all__ = [ + "UiPathResumeTrigger", + "UiPathResumeTriggerType", + "UiPathApiTrigger", + "UiPathResumeTriggerName", +] + +from uipath.core.triggers.trigger import ( + UiPathApiTrigger, + UiPathResumeTrigger, + UiPathResumeTriggerName, + UiPathResumeTriggerType, +) diff --git a/src/uipath/core/triggers/trigger.py b/src/uipath/core/triggers/trigger.py new file mode 100644 index 0000000..05ed88a --- /dev/null +++ b/src/uipath/core/triggers/trigger.py @@ -0,0 +1,69 @@ +"""Module defining resume trigger types and data models.""" + +from enum import Enum +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + + +class UiPathResumeTriggerType(str, Enum): + """Constants representing different types of resume job triggers in the system.""" + + NONE = "None" + QUEUE_ITEM = "QueueItem" + JOB = "Job" + TASK = "Task" + TIMER = "Timer" + INBOX = "Inbox" + API = "Api" + DEEP_RAG = "DeepRag" + BATCH_RAG = "BatchRag" + INDEX_INGESTION = "IndexIngestion" + IXP_EXTRACTION = "IxpExtraction" + IXP_VS_ESCALATION = "IxpVsEscalation" + + +class UiPathResumeTriggerName(str, Enum): + """Constants representing specific names for resume job triggers in the system.""" + + UNKNOWN = "Unknown" + QUEUE_ITEM = "QueueItem" + JOB = "Job" + TASK = "Task" + ESCALATION = "Escalation" + TIMER = "Timer" + INBOX = "Inbox" + API = "Api" + DEEP_RAG = "DeepRag" + BATCH_RAG = "BatchRag" + INDEX_INGESTION = "IndexIngestion" + EXTRACTION = "Extraction" + IXP_VS_ESCALATION = "IxpVsEscalation" + + +class UiPathApiTrigger(BaseModel): + """API resume trigger request.""" + + inbox_id: str | None = Field(default=None, alias="inboxId") + request: Any = None + + model_config = ConfigDict(validate_by_name=True) + + +class UiPathResumeTrigger(BaseModel): + """Information needed to resume execution.""" + + interrupt_id: str | None = Field(default=None, alias="interruptId") + trigger_type: UiPathResumeTriggerType = Field( + default=UiPathResumeTriggerType.API, alias="triggerType" + ) + trigger_name: UiPathResumeTriggerName = Field( + default=UiPathResumeTriggerName.UNKNOWN, alias="triggerName", exclude=True + ) + item_key: str | None = Field(default=None, alias="itemKey") + api_resume: UiPathApiTrigger | None = Field(default=None, alias="apiResume") + folder_path: str | None = Field(default=None, alias="folderPath") + folder_key: str | None = Field(default=None, alias="folderKey") + payload: Any | None = Field(default=None, alias="interruptObject", exclude=True) + + model_config = ConfigDict(validate_by_name=True) diff --git a/uv.lock b/uv.lock index c7b74e9..dba07b4 100644 --- a/uv.lock +++ b/uv.lock @@ -1007,7 +1007,7 @@ wheels = [ [[package]] name = "uipath-core" -version = "0.5.1" +version = "0.5.2" source = { editable = "." } dependencies = [ { name = "opentelemetry-instrumentation" },