diff --git a/packages/uipath-llamaindex/samples/chat-agent/README.md b/packages/uipath-llamaindex/samples/chat-agent/README.md new file mode 100644 index 00000000..5427b2f1 --- /dev/null +++ b/packages/uipath-llamaindex/samples/chat-agent/README.md @@ -0,0 +1,30 @@ +# Literature Chat Agent + +An AI assistant using Llamaindex and Tavily search for literature research and recommendations. + +## Requirements + +- Python 3.11+ +- OpenAI API key +- Tavily API key + +## Installation + +```bash +uv venv -p 3.11 .venv +source .venv/bin/activate # On Windows: .venv\Scripts\activate +uv sync +``` + +Set your API keys as environment variables in .env + +```bash +OPENAI_API_KEY=your_anthropic_api_key +TAVILY_API_KEY=your_tavily_api_key +``` + +## Usage + +```bash +uipath run agent 'uipath run agent '{"user_msg": "Tell me about 1984 by George Orwell"}'' +``` diff --git a/packages/uipath-llamaindex/samples/chat-agent/agent.mermaid b/packages/uipath-llamaindex/samples/chat-agent/agent.mermaid new file mode 100644 index 00000000..b1c558ce --- /dev/null +++ b/packages/uipath-llamaindex/samples/chat-agent/agent.mermaid @@ -0,0 +1,6 @@ +flowchart TB + __start__(__start__) + chat(chat) + __end__(__end__) + __start__ --> |ChatInput|chat + chat --> |StopEvent|__end__ diff --git a/packages/uipath-llamaindex/samples/chat-agent/llama_index.json b/packages/uipath-llamaindex/samples/chat-agent/llama_index.json new file mode 100644 index 00000000..97899057 --- /dev/null +++ b/packages/uipath-llamaindex/samples/chat-agent/llama_index.json @@ -0,0 +1,5 @@ +{ + "workflows": { + "agent": "main.py:agent" + } +} diff --git a/packages/uipath-llamaindex/samples/chat-agent/main.py b/packages/uipath-llamaindex/samples/chat-agent/main.py new file mode 100644 index 00000000..ddbc0480 --- /dev/null +++ b/packages/uipath-llamaindex/samples/chat-agent/main.py @@ -0,0 +1,35 @@ +import os + +from llama_index.core.agent.workflow import FunctionAgent +from llama_index.llms.openai import OpenAI +from llama_index.tools.tavily_research import TavilyToolSpec + +llm = OpenAI(model="gpt-4o-mini") +tavily_tool = TavilyToolSpec(api_key=os.environ["TAVILY_API_KEY"]) + +SYSTEM_PROMPT = ( + "You are an advanced AI assistant specializing in book research and literature analysis. " + "Your primary functions are:\n\n" + "1. Book Information Research: Gather comprehensive information about books, including plot summaries, " + "themes, publishing details, sales performance, critical reception, and awards.\n" + "2. Author Research: Provide detailed information about authors, translators, editors, and other " + "publishing industry professionals.\n" + "3. Book Recommendations: Suggest books based on user preferences, genres, themes, or similar books " + "they have enjoyed.\n" + "4. Publishing Industry Analysis: Analyze trends, bestseller data, genre popularity, and insights " + "from the literary world.\n" + "5. Book Trivia and Facts: Share interesting facts, behind-the-scenes stories, and trivia about " + "books, authors, and the publishing industry.\n\n" + "Use the search tool for recent or factual information. " + "Remember previous messages and maintain context across the discussion." +) + +agent = FunctionAgent( + tools=tavily_tool.to_tool_list(), + llm=llm, + system_prompt=SYSTEM_PROMPT +) + +async def chat(user_input: str) -> str: + response = await agent.run(user_msg=user_input) + return str(response) diff --git a/packages/uipath-llamaindex/samples/chat-agent/pyproject.toml b/packages/uipath-llamaindex/samples/chat-agent/pyproject.toml new file mode 100644 index 00000000..a285e8ee --- /dev/null +++ b/packages/uipath-llamaindex/samples/chat-agent/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "chat-agent" +version = "0.0.1" +description = "chat-agent" +authors = [{ name = "John Doe", email = "john.doe@myemail.com" }] +dependencies = [ + "uipath-llamaindex>=0.5.0, <0.6.0", + "llama-index-llms-openai>=0.6.10", + "llama-index-tools-tavily-research>=0.4.2", + "llama-index-llms-openai>=0.6.18" +] +requires-python = ">=3.11" + + +[dependency-groups] +dev = [ + "uipath-dev>=0.0.19", +] diff --git a/packages/uipath-llamaindex/samples/chat-agent/uipath.json b/packages/uipath-llamaindex/samples/chat-agent/uipath.json new file mode 100644 index 00000000..43c01b61 --- /dev/null +++ b/packages/uipath-llamaindex/samples/chat-agent/uipath.json @@ -0,0 +1,14 @@ +{ + "$schema": "https://cloud.uipath.com/draft/2024-12/uipath", + "runtimeOptions": { + "isConversational": false + }, + "packOptions": { + "fileExtensionsIncluded": [], + "filesIncluded": [], + "filesExcluded": [], + "directoriesExcluded": [], + "includeUvLock": true + }, + "functions": {} +} \ No newline at end of file diff --git a/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/chat/__init__.py b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/chat/__init__.py new file mode 100644 index 00000000..c528e952 --- /dev/null +++ b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/chat/__init__.py @@ -0,0 +1,3 @@ +from uipath_llamaindex.runtime.chat.messages import UiPathChatMessagesMapper + +__all__ = ["UiPathChatMessagesMapper"] diff --git a/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/chat/messages.py b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/chat/messages.py new file mode 100644 index 00000000..cc5755ca --- /dev/null +++ b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/chat/messages.py @@ -0,0 +1,261 @@ +import logging +from datetime import datetime, timezone +from typing import Any +from uuid import uuid4 + +from llama_index.core.agent.workflow.workflow_events import ( + AgentOutput, + AgentStream, + ToolCall, + ToolCallResult, +) +from uipath.core.chat import ( + UiPathConversationContentPartChunkEvent, + UiPathConversationContentPartEndEvent, + UiPathConversationContentPartEvent, + UiPathConversationContentPartStartEvent, + UiPathConversationMessageEndEvent, + UiPathConversationMessageEvent, + UiPathConversationMessageStartEvent, + UiPathConversationToolCallEndEvent, + UiPathConversationToolCallEvent, + UiPathConversationToolCallStartEvent, +) + +logger = logging.getLogger(__name__) + + +class UiPathChatMessagesMapper: + """Stateful mapper that converts LlamaIndex agent events to UiPath message events. + + Maintains state across events to properly track: + - The current AI message ID (generated per agent turn, since LlamaIndex doesn't provide one) + - Pending tool calls per message ID for correct message_end timing + """ + + def __init__(self, runtime_id: str) -> None: + self.runtime_id = runtime_id + self._current_message_id: str | None = None + # message_id -> set of tool_ids still pending completion + self._pending_tool_calls: dict[str, set[str]] = {} + # tool_id -> message_id for correlating ToolCallResult with its parent AI message + self._tool_id_to_message_id: dict[str, str] = {} + + @staticmethod + def map_input(input: dict[str, Any]) -> dict[str, Any]: + """Map UiPath chat message format to LlamaIndex expected format. + + If the input already has 'user_msg', return it unchanged. + If the input has a 'messages' array (UiPath chat format), extract the + text content from the first message's contentParts and map it to + 'user_msg'. + """ + if "user_msg" in input: + return input + + messages = input.get("messages") + if not messages or not isinstance(messages, list): + return input + + first_message = messages[0] + content_parts = first_message.get("contentParts", []) + + text_parts = [ + part["data"]["inline"] + for part in content_parts + if part.get("mimeType") == "text/plain" + and isinstance(part.get("data"), dict) + and part["data"].get("inline") + ] + + if text_parts: + return {"user_msg": " ".join(text_parts)} + + return input + + def get_timestamp(self) -> str: + """Format current time as ISO 8601 UTC with milliseconds: 2025-01-04T10:30:00.123Z""" + return ( + datetime.now(timezone.utc) + .isoformat(timespec="milliseconds") + .replace("+00:00", "Z") + ) + + def get_content_part_id(self, message_id: str) -> str: + return f"chunk-{message_id}-0" + + async def map_event( + self, + event: AgentStream | AgentOutput | ToolCall | ToolCallResult, + ) -> list[UiPathConversationMessageEvent] | None: + """Convert a LlamaIndex agent event into UiPath conversation message events. + + Returns a list of events to emit, or None if the event should be skipped. + """ + if isinstance(event, AgentStream): + return self._map_agent_stream(event) + + if isinstance(event, AgentOutput): + return self._map_agent_output(event) + + # ToolCall start is handled via AgentOutput to have the message_id available + if isinstance(event, ToolCall): + return None + + if isinstance(event, ToolCallResult): + return self._map_tool_call_result(event) + + return None + + def _map_agent_stream( + self, event: AgentStream + ) -> list[UiPathConversationMessageEvent] | None: + events: list[UiPathConversationMessageEvent] = [] + + # First stream chunk of a new agent turn: generate a fresh message ID + if self._current_message_id is None: + self._current_message_id = str(uuid4()) + events.append(self._create_message_start_event(self._current_message_id)) + + if event.delta: + events.append( + self._create_content_chunk_event(self._current_message_id, event.delta) + ) + + return events if events else None + + def _map_agent_output( + self, event: AgentOutput + ) -> list[UiPathConversationMessageEvent] | None: + message_id = self._current_message_id + # Reset for the next turn regardless of outcome + self._current_message_id = None + + if message_id is None: + return None + + events: list[UiPathConversationMessageEvent] = [] + + if event.tool_calls: + # Emit a tool_call_start event for each tool call and track them as pending + pending: set[str] = set() + for tool_call in event.tool_calls: + self._tool_id_to_message_id[tool_call.tool_id] = message_id + pending.add(tool_call.tool_id) + events.append( + self._create_tool_call_start_event( + message_id=message_id, + tool_call_id=tool_call.tool_id, + tool_name=tool_call.tool_name, + input=tool_call.tool_kwargs, + ) + ) + self._pending_tool_calls[message_id] = pending + # message_end will be emitted once the last ToolCallResult comes in + else: + # No tool calls: this is the final text response, close the message now + events.append(self._create_message_end_event(message_id)) + + return events if events else None + + def _map_tool_call_result( + self, event: ToolCallResult + ) -> list[UiPathConversationMessageEvent] | None: + message_id = self._tool_id_to_message_id.pop(event.tool_id, None) + if message_id is None: + logger.warning( + "ToolCallResult received for unknown tool_id '%s' — skipping.", + event.tool_id, + ) + return None + + output = event.tool_output.content if event.tool_output else None + + events: list[UiPathConversationMessageEvent] = [ + self._create_tool_call_end_event( + message_id=message_id, + tool_call_id=event.tool_id, + output=output, + ) + ] + + # Close the message once all tool calls for it have completed + pending = self._pending_tool_calls.get(message_id) + if pending is not None: + pending.discard(event.tool_id) + if not pending: + del self._pending_tool_calls[message_id] + events.append(self._create_message_end_event(message_id)) + + return events + + # ── Factory helpers ──────────────────────────────────────────────────────── + + def _create_message_start_event( + self, message_id: str + ) -> UiPathConversationMessageEvent: + return UiPathConversationMessageEvent( + message_id=message_id, + start=UiPathConversationMessageStartEvent( + role="assistant", timestamp=self.get_timestamp() + ), + content_part=UiPathConversationContentPartEvent( + content_part_id=self.get_content_part_id(message_id), + start=UiPathConversationContentPartStartEvent(mime_type="text/plain"), + ), + ) + + def _create_content_chunk_event( + self, message_id: str, text: str + ) -> UiPathConversationMessageEvent: + return UiPathConversationMessageEvent( + message_id=message_id, + content_part=UiPathConversationContentPartEvent( + content_part_id=self.get_content_part_id(message_id), + chunk=UiPathConversationContentPartChunkEvent(data=text), + ), + ) + + def _create_message_end_event( + self, message_id: str + ) -> UiPathConversationMessageEvent: + return UiPathConversationMessageEvent( + message_id=message_id, + end=UiPathConversationMessageEndEvent(), + content_part=UiPathConversationContentPartEvent( + content_part_id=self.get_content_part_id(message_id), + end=UiPathConversationContentPartEndEvent(), + ), + ) + + def _create_tool_call_start_event( + self, message_id: str, tool_call_id: str, tool_name: str, input: dict + ) -> UiPathConversationMessageEvent: + return UiPathConversationMessageEvent( + message_id=message_id, + tool_call=UiPathConversationToolCallEvent( + tool_call_id=tool_call_id, + start=UiPathConversationToolCallStartEvent( + tool_name=tool_name, + timestamp=self.get_timestamp(), + input=input, + ), + ), + ) + + def _create_tool_call_end_event( + self, message_id: str, tool_call_id: str, output: str | None + ) -> UiPathConversationMessageEvent: + return UiPathConversationMessageEvent( + message_id=message_id, + tool_call=UiPathConversationToolCallEvent( + tool_call_id=tool_call_id, + end=UiPathConversationToolCallEndEvent( + timestamp=self.get_timestamp(), + output=output, + ), + ), + ) + + +__all__ = ["UiPathChatMessagesMapper"] diff --git a/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py index 9350281f..73583428 100644 --- a/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py +++ b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py @@ -2,11 +2,12 @@ import asyncio import json +import logging +import os from typing import Any, AsyncGenerator, cast from uuid import uuid4 from llama_index.core.agent.workflow.workflow_events import ( - AgentInput, AgentOutput, AgentStream, ToolCall, @@ -47,9 +48,12 @@ UiPathLlamaIndexErrorCode, UiPathLlamaIndexRuntimeError, ) +from uipath_llamaindex.runtime.chat import UiPathChatMessagesMapper from uipath_llamaindex.runtime.schema import get_entrypoints_schema, get_workflow_schema from uipath_llamaindex.runtime.storage import SqliteResumableStorage +logger = logging.getLogger(__name__) + class UiPathLlamaIndexRuntime: """ @@ -136,13 +140,10 @@ async def _run_workflow( """ Core workflow execution logic used by both execute() and stream(). """ - workflow_input = input or {} + workflow_input = UiPathChatMessagesMapper.map_input(input or {}) is_resuming = bool(options and options.resume) - if is_resuming: - self._context = await self._load_context() - else: - self._context = Context(self.workflow) + self._context = await self._load_context() # Make the Context discoverable from inside steps if self.debug_mode and self._context is not None: @@ -168,28 +169,35 @@ async def _run_workflow( event_stream = handler.stream_events(expose_internal=True) suspended_event: InputRequiredEvent | None = None + chat = UiPathChatMessagesMapper(runtime_id=self.runtime_id) + raw_chunks: list[dict] = [] + mapped_chunks: list[dict] = [] is_resumed: bool = False async for event in event_stream: + if not isinstance(event, BreakpointEvent): + raw_chunks.append( + {"type": type(event).__name__, "data": json.loads(serialize_json(event))} + ) node_name = self._get_node_name(event) - if stream_events: - if isinstance( - event, - (AgentOutput, AgentInput, AgentStream, ToolCall, ToolCallResult), - ): - message_event = UiPathRuntimeMessageEvent( - payload=json.loads(serialize_json(event)), - node_name=node_name, - execution_id=self.runtime_id, - ) - yield message_event - elif not isinstance(event, BreakpointEvent): - state_event = UiPathRuntimeStateEvent( - payload=json.loads(serialize_json(event)), - node_name=node_name, - execution_id=self.runtime_id, - ) - yield state_event + if isinstance(event, (AgentOutput, AgentStream, ToolCall, ToolCallResult)): + try: + mapped_events = await chat.map_event(event) + except Exception as e: + logger.warning("Error mapping agent event: %s", e) + mapped_events = None + if mapped_events: + for mapped_event in mapped_events: + mapped_chunks.append(mapped_event.model_dump(by_alias=True, exclude_none=True)) + if stream_events: + yield UiPathRuntimeMessageEvent(payload=mapped_event) + elif stream_events and not isinstance(event, BreakpointEvent): + state_event = UiPathRuntimeStateEvent( + payload=json.loads(serialize_json(event)), + node_name=node_name, + execution_id=self.runtime_id, + ) + yield state_event if isinstance(event, BreakpointEvent): # Check if we should actually pause at this breakpoint @@ -215,6 +223,12 @@ async def _run_workflow( suspended_event = event break + for filename, data in [("raw_chunks.json", raw_chunks), ("mapped_chunks.json", mapped_chunks)]: + output_path = os.path.abspath(filename) + with open(output_path, "w") as f: + json.dump(data, f, indent=2) + print(f"{filename} written to: {output_path}") + if suspended_event is not None: await asyncio.sleep(0) # Yield control to event loop await self._save_context() @@ -224,7 +238,9 @@ async def _run_workflow( else: yield self._create_suspended_result(suspended_event) else: - yield self._create_success_result(await handler) + final_result = await handler + await self._save_context() + yield self._create_success_result(final_result) async def get_schema(self) -> UiPathRuntimeSchema: """Get schema for this LlamaIndex runtime."""