-
-
Notifications
You must be signed in to change notification settings - Fork 770
Expand file tree
/
Copy pathasync_example.py
More file actions
225 lines (204 loc) · 7.44 KB
/
async_example.py
File metadata and controls
225 lines (204 loc) · 7.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import asyncio
import time
from typing import List, Dict
from praisonaiagents import Agent, Task, AgentTeam, TaskOutput
from praisonaiagents.main import (
display_error,
display_interaction,
display_tool_call,
display_instruction,
error_logs,
Console
)
from duckduckgo_search import DDGS
from pydantic import BaseModel
console = Console()
# 1. Define output model for structured results
class SearchResult(BaseModel):
query: str
results: List[Dict[str, str]]
total_results: int
# 2. Define both sync and async tools
def sync_search_tool(query: str) -> List[Dict]:
"""
Synchronous search using DuckDuckGo.
Args:
query (str): The search query.
Returns:
list: Search results
"""
display_tool_call(f"Running sync search for: {query}", console)
time.sleep(1) # Simulate network delay
try:
results = []
ddgs = DDGS()
for result in ddgs.text(keywords=query, max_results=5):
results.append({
"title": result.get("title", ""),
"url": result.get("href", ""),
"snippet": result.get("body", "")
})
return results
except Exception as e:
error_msg = f"Error during sync search: {e}"
display_error(error_msg, console)
error_logs.append(error_msg)
return []
async def async_search_tool(query: str) -> List[Dict]:
"""
Asynchronous search using DuckDuckGo.
Args:
query (str): The search query.
Returns:
list: Search results
"""
display_tool_call(f"Running async search for: {query}", console)
await asyncio.sleep(1) # Simulate network delay
try:
results = []
ddgs = DDGS()
for result in ddgs.text(keywords=query, max_results=5):
results.append({
"title": result.get("title", ""),
"url": result.get("href", ""),
"snippet": result.get("body", "")
})
return results
except Exception as e:
error_msg = f"Error during async search: {e}"
display_error(error_msg, console)
error_logs.append(error_msg)
return []
# 3. Define both sync and async callbacks
def sync_callback(output: TaskOutput):
display_interaction("Sync Callback", f"Processing output: {output.raw[:100]}...", markdown=True, console=console)
time.sleep(1) # Simulate processing
if output.output_format == "JSON":
display_tool_call(f"Processed JSON result: {output.json_dict}", console)
elif output.output_format == "Pydantic":
display_tool_call(f"Processed Pydantic result: {output.pydantic}", console)
async def async_callback(output: TaskOutput):
display_interaction("Async Callback", f"Processing output: {output.raw[:100]}...", markdown=True, console=console)
await asyncio.sleep(1) # Simulate processing
if output.output_format == "JSON":
display_tool_call(f"Processed JSON result: {output.json_dict}", console)
elif output.output_format == "Pydantic":
display_tool_call(f"Processed Pydantic result: {output.pydantic}", console)
# 4. Create agents with different tools
sync_agent = Agent(
name="SyncAgent",
role="Synchronous Search Specialist",
goal="Perform synchronous searches and return structured results",
backstory="Expert in sync operations and data organization",
tools=[sync_search_tool],
reflection=False,
)
async_agent = Agent(
name="AsyncAgent",
role="Asynchronous Search Specialist",
goal="Perform asynchronous searches and return structured results",
backstory="Expert in async operations and data organization",
tools=[async_search_tool],
reflection=False,
)
# 5. Create tasks with different configurations
sync_task = Task(
name="sync_search",
description="Search for 'Python programming' using sync tool and return structured results",
expected_output="SearchResult model with query details and results",
agent=sync_agent,
async_execution=False,
callback=sync_callback,
output_pydantic=SearchResult
)
async_task = Task(
name="async_search",
description="Search for 'Async programming' using async tool and return structured results",
expected_output="SearchResult model with query details and results",
agent=async_agent,
async_execution=True,
callback=async_callback,
output_pydantic=SearchResult
)
# 6. Create workflow tasks
workflow_sync_task = Task(
name="workflow_sync",
description="Workflow sync search for 'AI trends' with structured output",
expected_output="SearchResult model with AI trends data",
agent=sync_agent,
async_execution=False,
is_start=True,
next_tasks=["workflow_async"],
output_pydantic=SearchResult
)
workflow_async_task = Task(
name="workflow_async",
description="Workflow async search for 'Future of AI' with structured output",
expected_output="SearchResult model with Future of AI data",
agent=async_agent,
async_execution=True,
output_pydantic=SearchResult
)
# 7. Example usage functions
def run_sync_example():
"""Run synchronous example"""
display_instruction("\nRunning Synchronous Example...", console)
agents = AgentTeam(
agents=[sync_agent],
tasks=[sync_task],
process="sequential"
)
result = agents.start()
display_interaction("Sync Example", f"Result: {result}", markdown=True, console=console)
async def run_async_example():
"""Run asynchronous example"""
display_instruction("\nRunning Asynchronous Example...", console)
agents = AgentTeam(
agents=[async_agent],
tasks=[async_task],
process="sequential"
)
result = await agents.astart()
display_interaction("Async Example", f"Result: {result}", markdown=True, console=console)
async def run_mixed_example():
"""Run mixed sync/async example"""
display_instruction("\nRunning Mixed Sync/Async Example...", console)
agents = AgentTeam(
agents=[sync_agent, async_agent],
tasks=[sync_task, async_task],
process="sequential"
)
result = await agents.astart()
display_interaction("Mixed Example", f"Result: {result}", markdown=True, console=console)
async def run_workflow_example():
"""Run workflow example with both sync and async tasks"""
display_instruction("\nRunning Workflow Example...", console)
agents = AgentTeam(
agents=[sync_agent, async_agent],
tasks=[workflow_sync_task, workflow_async_task],
process="workflow"
)
result = await agents.astart()
display_interaction("Workflow Example", f"Result: {result}", markdown=True, console=console)
# 8. Main execution
async def main():
"""Main execution function"""
display_instruction("Starting PraisonAI Agents Examples...", console)
try:
# Run sync example in a separate thread to not block the event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, run_sync_example)
# Run async examples
await run_async_example()
await run_mixed_example()
await run_workflow_example()
if error_logs:
display_error("\nErrors encountered during execution:", console)
for error in error_logs:
display_error(error, console)
except Exception as e:
display_error(f"Error in main execution: {e}", console)
error_logs.append(str(e))
if __name__ == "__main__":
# Run the main function
asyncio.run(main())