-
Notifications
You must be signed in to change notification settings - Fork 697
feat(hooks): add resume flag to AfterInvocationEvent #1767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1a56e79
181708e
f8601d3
ae38264
f31c8b9
9584b7d
d149546
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -777,49 +777,66 @@ async def _run_loop( | |
| Yields: | ||
| Events from the event loop cycle. | ||
| """ | ||
| before_invocation_event, _interrupts = await self.hooks.invoke_callbacks_async( | ||
| BeforeInvocationEvent(agent=self, invocation_state=invocation_state, messages=messages) | ||
| ) | ||
| messages = before_invocation_event.messages if before_invocation_event.messages is not None else messages | ||
| current_messages: Messages | None = messages | ||
|
|
||
| agent_result: AgentResult | None = None | ||
| try: | ||
| yield InitEventLoopEvent() | ||
| while current_messages is not None: | ||
| before_invocation_event, _interrupts = await self.hooks.invoke_callbacks_async( | ||
| BeforeInvocationEvent(agent=self, invocation_state=invocation_state, messages=current_messages) | ||
| ) | ||
| current_messages = ( | ||
| before_invocation_event.messages if before_invocation_event.messages is not None else current_messages | ||
| ) | ||
|
|
||
| await self._append_messages(*messages) | ||
| agent_result: AgentResult | None = None | ||
| try: | ||
| yield InitEventLoopEvent() | ||
|
|
||
| structured_output_context = StructuredOutputContext( | ||
| structured_output_model or self._default_structured_output_model, | ||
| structured_output_prompt=structured_output_prompt or self._structured_output_prompt, | ||
| ) | ||
| await self._append_messages(*current_messages) | ||
|
|
||
| # Execute the event loop cycle with retry logic for context limits | ||
| events = self._execute_event_loop_cycle(invocation_state, structured_output_context) | ||
| async for event in events: | ||
| # Signal from the model provider that the message sent by the user should be redacted, | ||
| # likely due to a guardrail. | ||
| if ( | ||
| isinstance(event, ModelStreamChunkEvent) | ||
| and event.chunk | ||
| and event.chunk.get("redactContent") | ||
| and event.chunk["redactContent"].get("redactUserContentMessage") | ||
| ): | ||
| self.messages[-1]["content"] = self._redact_user_content( | ||
| self.messages[-1]["content"], str(event.chunk["redactContent"]["redactUserContentMessage"]) | ||
| ) | ||
| if self._session_manager: | ||
| self._session_manager.redact_latest_message(self.messages[-1], self) | ||
| yield event | ||
| structured_output_context = StructuredOutputContext( | ||
| structured_output_model or self._default_structured_output_model, | ||
| structured_output_prompt=structured_output_prompt or self._structured_output_prompt, | ||
| ) | ||
|
|
||
| # Capture the result from the final event if available | ||
| if isinstance(event, EventLoopStopEvent): | ||
| agent_result = AgentResult(*event["stop"]) | ||
| # Execute the event loop cycle with retry logic for context limits | ||
| events = self._execute_event_loop_cycle(invocation_state, structured_output_context) | ||
| async for event in events: | ||
| # Signal from the model provider that the message sent by the user should be redacted, | ||
| # likely due to a guardrail. | ||
| if ( | ||
| isinstance(event, ModelStreamChunkEvent) | ||
| and event.chunk | ||
| and event.chunk.get("redactContent") | ||
| and event.chunk["redactContent"].get("redactUserContentMessage") | ||
| ): | ||
| self.messages[-1]["content"] = self._redact_user_content( | ||
| self.messages[-1]["content"], | ||
| str(event.chunk["redactContent"]["redactUserContentMessage"]), | ||
| ) | ||
| if self._session_manager: | ||
| self._session_manager.redact_latest_message(self.messages[-1], self) | ||
| yield event | ||
|
|
||
| # Capture the result from the final event if available | ||
| if isinstance(event, EventLoopStopEvent): | ||
| agent_result = AgentResult(*event["stop"]) | ||
|
|
||
| finally: | ||
| self.conversation_manager.apply_management(self) | ||
| await self.hooks.invoke_callbacks_async( | ||
| AfterInvocationEvent(agent=self, invocation_state=invocation_state, result=agent_result) | ||
| ) | ||
| finally: | ||
| self.conversation_manager.apply_management(self) | ||
| after_invocation_event, _interrupts = await self.hooks.invoke_callbacks_async( | ||
| AfterInvocationEvent(agent=self, invocation_state=invocation_state, result=agent_result) | ||
| ) | ||
|
|
||
| # Convert resume input to messages for next iteration, or None to stop | ||
| if after_invocation_event.resume is not None: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: No safety mechanism to prevent infinite resume loops A hook that always sets Suggestion: Consider adding a configurable
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the pattern that we have with retries for tools and model providers. I wanted to keep the same. |
||
| logger.debug("resume=<True> | hook requested agent resume with new input") | ||
| # If in interrupt state, process interrupt responses before continuing. | ||
| # This mirrors the _interrupt_state.resume() call in stream_async and will | ||
| # raise TypeError if the resume input is not valid interrupt responses. | ||
| self._interrupt_state.resume(after_invocation_event.resume) | ||
| current_messages = await self._convert_prompt_to_messages(after_invocation_event.resume) | ||
| else: | ||
| current_messages = None | ||
|
|
||
| async def _execute_event_loop_cycle( | ||
| self, invocation_state: dict[str, Any], structured_output_context: StructuredOutputContext | None = None | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| if TYPE_CHECKING: | ||
| from ..agent.agent_result import AgentResult | ||
|
|
||
| from ..types.agent import AgentInput | ||
| from ..types.content import Message, Messages | ||
| from ..types.interrupt import _Interruptible | ||
| from ..types.streaming import StopReason | ||
|
|
@@ -78,17 +79,31 @@ class AfterInvocationEvent(HookEvent): | |
| - Agent.stream_async | ||
| - Agent.structured_output | ||
|
|
||
| Resume: | ||
| When ``resume`` is set to a non-None value by a hook callback, the agent will | ||
| automatically re-invoke itself with the provided input. This enables hooks to | ||
| implement autonomous looping patterns where the agent continues processing | ||
| based on its previous result. The resume triggers a full new invocation cycle | ||
| including ``BeforeInvocationEvent``. | ||
|
|
||
| Attributes: | ||
| invocation_state: State and configuration passed through the agent invocation. | ||
| This can include shared context for multi-agent coordination, request tracking, | ||
| and dynamic configuration. | ||
| result: The result of the agent invocation, if available. | ||
| This will be None when invoked from structured_output methods, as those return typed output directly rather | ||
| than AgentResult. | ||
| resume: When set to a non-None agent input by a hook callback, the agent will | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did we brainstorm any other names? My gut reaction is
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, we didn't. I had the original slack canvas where we discussed, I see you mentioned It's a simple find-and-replace, so I just wanted to have the discussion in PR. I'm okay with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From a reviewer perspective, both names are reasonable:
Given that this feature can be used both for "resuming after interrupts" AND for "continuing with new input for validation loops", I slightly lean toward
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed that "resume" makes sense with "resume", but it feels off if it's not paused originally but rather you're attempting to force the agent to keep going.
|
||
| re-invoke itself with this input. The value can be any valid AgentInput | ||
| (str, content blocks, messages, etc.). Defaults to None (no resume). | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: Missing safety guidance in documentation The docstring should warn users about the risk of infinite loops if Suggestion: Add a note like:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the pattern that we have with retries for tools and model providers. I wanted to keep the same. |
||
| """ | ||
|
|
||
| invocation_state: dict[str, Any] = field(default_factory=dict) | ||
| result: "AgentResult | None" = None | ||
| resume: AgentInput = None | ||
|
|
||
| def _can_write(self, name: str) -> bool: | ||
| return name == "resume" | ||
|
|
||
| @property | ||
| def should_reverse_callbacks(self) -> bool: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.