-
-
Notifications
You must be signed in to change notification settings - Fork 770
Expand file tree
/
Copy pathparallelisation.py
More file actions
95 lines (81 loc) · 2.26 KB
/
parallelisation.py
File metadata and controls
95 lines (81 loc) · 2.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from praisonaiagents import Agent, Task, AgentTeam
from datetime import datetime
import asyncio
def process_time():
"""Simulate processing"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Processing at: {current_time}")
return f"Processed at {current_time}"
# Create parallel processing agents
agent1 = Agent(
name="Processor 1",
role="Time collector",
goal="Get the time and return it",
tools=[process_time]
)
agent2 = Agent(
name="Processor 2",
role="Time collector",
goal="Get the time and return it",
tools=[process_time]
)
agent3 = Agent(
name="Processor 3",
role="Time collector",
goal="Get the time and return it",
tools=[process_time]
)
aggregator = Agent(
name="Aggregator",
role="Result aggregator",
goal="Collect all the processed time from all tasks"
)
# Create parallel tasks with memory disabled
task1 = Task(
name="process_1",
description="Use process_time tool to get the time",
expected_output="processed time",
agent=agent1,
is_start=True,
async_execution=True
)
task2 = Task(
name="process_2",
description="Use process_time tool to get the time",
expected_output="processed time",
agent=agent2,
is_start=True,
async_execution=True
)
task3 = Task(
name="process_3",
description="Use process_time tool to get the time",
expected_output="processed time",
agent=agent3,
is_start=True,
async_execution=True
)
aggregate_task = Task(
name="aggregate",
description="Collect all the processed time from all tasks",
expected_output="Output all the processed time from all tasks and just the time",
agent=aggregator,
context=[task1, task2, task3]
)
async def main():
# Create workflow manager
workflow = AgentTeam(
agents=[agent1, agent2, agent3, aggregator],
tasks=[task1, task2, task3, aggregate_task],
process="workflow"
)
# Run parallel workflow
results = await workflow.astart()
# Print results
print("\nParallel Processing Results:")
for task_id, result in results["task_results"].items():
if result:
print(f"Task {task_id}: {result.raw}")
# Run the async main function
if __name__ == "__main__":
asyncio.run(main())