feat: add CancellationToken for graceful agent execution cancellation#1772
feat: add CancellationToken for graceful agent execution cancellation#1772jgoyani1 wants to merge 5 commits intostrands-agents:mainfrom
Conversation
src/strands/types/cancellation.py
Outdated
| import threading | ||
|
|
||
|
|
||
| class CancellationToken: |
There was a problem hiding this comment.
During design review, I would recommend bringing up the naming of this. Throughout the SDK we now use stop, interrupt, and cancel. In my mind, they have the following meanings:
stop: Stop the agent loop and exit out.interrupt: Interrupt the agent loop to get a user response. Once received, resume from where agent loop was interrupted.cancel: We use this specifically on components. For example, we can cancel a tool call that is about to happen. So it isn't so much stopping as it is about preventing something from getting started.
I am thinking cancel is the right word here because we are checking it at certain points in the loop to essentially say, don't go forward with this action. stop as a verb is probably something we should avoid using going forward. So we could still have a "stop event" or "stop reason", but the underlying reason is because of things like "interrupted", "cancelled", etc.
So with all that said, I would probably recommend calling this CancelSignal. It also fits in line with python's asyncio task.cancel() terminology.
There was a problem hiding this comment.
I agree with this naming discussion. The current implementation uses StopSignal as the class name but cancel() as the method name, which creates some terminology inconsistency.
Looking at the docstrings in event_loop.py, they still reference "cancellation token" (lines 62, 67) which adds to the confusion.
If cancel is the preferred verb, then CancelSignal would be more intuitive. This would also align with Python's asyncio.Task.cancel() pattern.
Additionally, the file is currently named stop_signal.py - if renamed to CancelSignal, consider renaming the file to cancel_signal.py for consistency.
There was a problem hiding this comment.
Make sense, will update the class naming to StopSignal instead.
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
src/strands/types/stop_signal.py
Outdated
| Multiple calls to cancel() are safe and idempotent. | ||
| """ | ||
| with self._lock: | ||
| self._stopped = True |
There was a problem hiding this comment.
Issue: No reset mechanism for reusing agents after cancellation.
Once agent.cancel() is called, _stopped remains True forever. The agent cannot be reused for subsequent invocations - all future calls will immediately return stop_reason="cancelled".
Suggestion: Add a reset() method to allow clearing the cancellation state:
def reset(self) -> None:
"""Reset the cancellation state.
This allows the agent to be reused after cancellation.
"""
with self._lock:
self._stopped = FalseThen call it at the start of each invocation in agent.stream_async(), or alternatively create a new StopSignal for each invocation.
Without this, developers will need to create a new Agent instance after each cancellation, which may not be the expected behavior.
src/strands/event_loop/event_loop.py
Outdated
| def _should_cancel(invocation_state: dict[str, Any]) -> bool: | ||
| """Check if cancellation has been requested. | ||
|
|
||
| This helper function checks the cancellation token in the invocation state |
There was a problem hiding this comment.
Issue: Inconsistent terminology - docstring mentions "cancellation token" but the class is named StopSignal.
Suggestion: Update the docstring to use consistent terminology:
"""Check if cancellation has been requested.
This helper function checks the stop signal in the invocation state
...
Args:
invocation_state: Invocation state containing optional stop signal.This aligns with pgrayy's comment about naming consistency throughout the SDK.
| agent.cancel() | ||
|
|
||
| result = await agent.invoke_async("Hello") | ||
|
|
There was a problem hiding this comment.
Suggestion: Add a test for reusing an agent after cancellation to clarify the expected behavior.
Currently there are no tests that verify whether an agent can be reused after cancel() is called. Adding such a test would help clarify the intended design:
@pytest.mark.asyncio
async def test_agent_reuse_after_cancellation():
"""Test that agent can be reused after cancellation.
Verifies that the agent can be used for new invocations
after a previous invocation was cancelled.
"""
agent = Agent(model=MockedModelProvider([DEFAULT_RESPONSE, DEFAULT_RESPONSE]))
# First invocation - cancelled
agent.cancel()
result1 = await agent.invoke_async("Hello")
assert result1.stop_reason == "cancelled"
# Second invocation - should work normally (after reset)
result2 = await agent.invoke_async("Hello again")
assert result2.stop_reason == "end_turn" # or "cancelled" if no reset?This would help document the expected behavior.
|
Issue: Code coverage gap noted by Codecov (74.4% patch coverage). The Codecov report indicates 11 missing lines in
While the integration tests cover many scenarios, targeted unit tests with mock signals would improve coverage and make it easier to verify each checkpoint works correctly in isolation. |
Review SummaryAssessment: Request Changes This is a well-designed feature that adds graceful cancellation support to agents. The implementation is thread-safe, has clear checkpoints, and includes comprehensive tests. However, there's a critical issue that needs to be addressed before merging. Review Categories
The core implementation is solid - the four checkpoint design is sensible and the thread-safety approach using |
src/strands/types/stop_signal.py
Outdated
| This method is thread-safe and can be called from any thread. | ||
| Multiple calls to cancel() are safe and idempotent. | ||
| """ | ||
| with self._lock: |
There was a problem hiding this comment.
Does there need to be a lock around this since this is just a boolean? Assuming multiple threads call cancel, the end result is the same correct? Same with reading.
There was a problem hiding this comment.
I think we can remove the signal class by using threading.Event. So this would be dead code.
src/strands/types/stop_signal.py
Outdated
| import threading | ||
|
|
||
|
|
||
| class StopSignal: |
There was a problem hiding this comment.
Do we need our own signal class? Could we instead use threading.Event.
There was a problem hiding this comment.
Make sense to use threading.Event since it has everything which our Signal class is doing.
src/strands/event_loop/streaming.py
Outdated
| metrics: Metrics = Metrics(latencyMs=0, timeToFirstByteMs=0) | ||
|
|
||
| async for chunk in chunks: | ||
| # CHECKPOINT 3: Check for cancellation before processing stream chunks |
There was a problem hiding this comment.
We need to consider the state of the agent when cancelling at each of these checkpoints. Here for example we would have an incomplete assistant message. Cancelling before tool calls means the last message is a tool use which would be rejected on next invocation. At least in that particular case we can add a tool result with a cancellation message (similar to single tool cancellation here).
ae46812 to
c9535ce
Compare
c9535ce to
05bc0bf
Compare
There was a problem hiding this comment.
This test file doesn't appear to target any logic in the SDK. It is only testing threading. With that said, I would recommend deleting the file.
There was a problem hiding this comment.
Looks to be linting from running hatch run prepare.
src/strands/event_loop/event_loop.py
Outdated
| while True: | ||
| # CHECKPOINT 2: Check for cancellation before model call | ||
| # This prevents unnecessary model invocations when cancellation is requested | ||
| if _should_cancel(invocation_state): |
There was a problem hiding this comment.
Looks like we are missing some telemetry clean up here. stream_trace.end() needs a call which happens in the try/except down below outside of the while loop.
One other thing is that we are emitting a ModelStopReason with a message payload but we aren't adding it to the messages array. This could cause confusion. With that said, I think it would be a good idea to place this after the BeforeModelCallEvent fire and use a break. It should end up looking something like:
if _should_cancel(invocation_state):
message = {"role": "assistant", "content": []}
usage = Usage(inputTokens=0, outputTokens=0, totalTokens=0)
metrics = Metrics(latencyMs=0)
tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, "cancelled")
breakAlternatively, I would be okay if we remove this for now as well. If someone cancels, at the start, it is a small code path forward anyway to the mid model execution checkpoint. If we discover latency concerns in the future, we can always add this checkpoint back. But if we do keep for now, I think we want to take the approach described above to ensure we get the proper clean up with the cancel message added to the messages array.
There was a problem hiding this comment.
Good points on the telemetry cleanup and the message not being added to the messages array. Removed this checkpoint for now — the path from here to the mid-stream checkpoint (CHECKPOINT 3) is short, so the risk of missing a cancellation signal in that window is low. We can add it back with the proper cleanup approach you described if we see latency concerns.
src/strands/event_loop/event_loop.py
Outdated
| agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace, attributes) | ||
| yield EventLoopStopEvent( | ||
| "cancelled", | ||
| {"role": "assistant", "content": [{"text": "Cancelled by user"}]}, |
There was a problem hiding this comment.
One concern I have here is that we are yielding a message that doesn't get added to the messages array. I think this could cause some confusion. Honestly, I'm thinking we remove this checkpoint so we don't even have to worry about it right now. It is a short path from the start of invocation and so I would think has a much lower chance of triggering compared to the other checkpoints. We can always add it back in the future. To me, the most critical checkpoints to start with are mid model execution and before tool execution.
There was a problem hiding this comment.
Good call — removed this checkpoint. Agreed that the path from invocation start to the model call is short enough that the other two checkpoints (mid-stream and before tool execution) cover the important cases. We can revisit if needed.
src/strands/event_loop/event_loop.py
Outdated
| True if cancellation has been requested, False otherwise. | ||
| """ | ||
| signal = invocation_state.get("cancel_signal") | ||
| return signal.is_set() if signal else False |
There was a problem hiding this comment.
If you access through agent instead it could then just be agent._cancel_signal.is_set(). You wouldn't need this _should_cancel helper then. Note, we already do have precedent in this module for accessing _ member variables. The event loop is considered a part of Agent.
There was a problem hiding this comment.
make sense, will update it to use agent._cancel_signal.is_set()
src/strands/event_loop/streaming.py
Outdated
| chunks: AsyncIterable[StreamEvent], start_time: float | None = None | ||
| chunks: AsyncIterable[StreamEvent], | ||
| start_time: float | None = None, | ||
| cancel_signal: Any | None = None, |
There was a problem hiding this comment.
Nit: Should be typing this as thread.Event instead of Any.
src/strands/event_loop/event_loop.py
Outdated
| invocation_state["request_state"], | ||
| ) | ||
| if cycle_span: | ||
| tracer.end_event_loop_cycle_span(span=cycle_span, message=message) |
There was a problem hiding this comment.
This also needs tool_result_message=cancelled_tool_result_message
There was a problem hiding this comment.
Fixed — passing tool_result_message=cancelled_tool_result_message to end_event_loop_cycle_span now. Also initialized the variable to None before the conditional so it's safe when there are no tool results.
src/strands/event_loop/streaming.py
Outdated
| chunks: AsyncIterable[StreamEvent], start_time: float | None = None | ||
| chunks: AsyncIterable[StreamEvent], | ||
| start_time: float | None = None, | ||
| cancel_signal: Any | None = None, |
There was a problem hiding this comment.
+1 on the type hint fix. Using threading.Event | None instead of Any | None provides better type safety and IDE support.
|
Issue: Missing Documentation PR This PR introduces a new public API ( Please either:
Documentation for this feature should cover:
|
Review Summary (Second Pass)Assessment: Request Changes The implementation has improved significantly since the last review - using Remaining ItemsFrom pgrayy's review (still open):
Documentation:
The core design is sound - once pgrayy's feedback is addressed and documentation is added, this should be good to merge. |
| class DelayedModelProvider(MockedModelProvider): | ||
| async def stream(self, *args, **kwargs): | ||
| # Add a small delay before streaming | ||
| await asyncio.sleep(0.1) |
There was a problem hiding this comment.
Typically we should avoid using sleeps in unit tests since unit tests are meant to be fast. In this particular case, you can simulate delay with asyncio.Event, which also makes it more deterministic as you are in control of the pause and resume.
There was a problem hiding this comment.
Good point — replaced all time.sleep and asyncio.sleep calls with asyncio.Event for deterministic synchronization. Tests now use explicit signaling between the model provider and the cancel trigger, no timing dependencies.
|
|
||
| Verifies that cancellation works correctly when tools are being executed. | ||
| """ | ||
| from strands import tool |
There was a problem hiding this comment.
Imports should go at the top of the module.
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_agent_without_cancellation(): |
There was a problem hiding this comment.
We shouldn't need this test. We are already covered with the tests in test_agent.py.
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_all_checkpoints_and_reinvocation(): |
There was a problem hiding this comment.
This test is doing a lot. I think splitting as you were doing above is sufficient.
tests_integ/test_cancellation.py
Outdated
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.skipif(not os.getenv("ANTHROPIC_API_KEY"), reason="Anthropic API key not available") | ||
| async def test_cancel_with_anthropic(): |
There was a problem hiding this comment.
We don't need to test with different model provides as this is strictly SDK logic. Testing with bedrock alone is sufficient.
Implement CancellationToken to enable graceful cancellation of agent execution from external contexts (e.g., web requests, background threads). Key features: - Thread-safe CancellationToken class with cancel() and is_cancelled() methods - Agent accepts optional cancellation_token parameter at creation time - Four cancellation checkpoints in event loop: 1. Start of event loop cycle 2. Before model execution 3. During model response streaming 4. Before tool execution - New 'cancelled' stop reason in StopReason type - Token automatically added to invocation_state for event loop access Implementation details: - CancellationToken uses threading.Lock for thread safety - Zero usage/metrics returned when cancelled (no model execution occurred) - Cancellation detected at checkpoints returns proper empty message structure - Token shared by reference across packages in same process Tests included: - Unit tests for CancellationToken thread safety and behavior - Unit tests for agent cancellation at different checkpoints - Integration tests with real model providers (require credentials)
… - Rename CancellationToken to StopSignal based on team feedback - Make StopSignal internal (not exported in public API) - Users access cancellation via agent.cancel() method instead - Change invocation_state key from 'cancellation_token' to 'stop_signal' - Rename internal variable from _cancelled to _stopped - Update all tests to reflect new naming - All 1997 tests passing
- Remove agent.is_cancelled() method to simplify API - Users can check result.stop_reason == 'cancelled' instead - Keep only agent.cancel() as the public cancellation API - Update StopSignal docstring to reflect internal usage - Remove test for is_cancelled() method - All 1996 tests passing
…ernal signal from _stop_signal to _cancel_signal throughout the codebase to better align with the public cancel() API and improve code clarity.
05bc0bf to
9693939
Compare
src/strands/agent/agent.py
Outdated
| - Start of event loop cycle | ||
| - Before model execution | ||
| - During model response streaming | ||
| - Before tool execution |
| tool_result = tool_result_content[0]["toolResult"] | ||
| assert tool_result["toolUseId"] == "tool_1" | ||
| assert tool_result["status"] == "error" | ||
| assert "cancelled" in tool_result["content"][0]["text"].lower() |
There was a problem hiding this comment.
We should have a test for calling invoke again after a cancel. Something like:
async def test_agent_cancel_continue_after():
agent = Agent(model=MockedModelProvider([DEFAULT_RESPONSE, DEFAULT_RESPONSE]))
agent.cancel()
result1 = await agent.invoke_async("Hello")
assert result1.stop_reason == "cancelled"
# Second invocation should work normally
result2 = await agent.invoke_async("Hello again")
assert result2.stop_reason == "end_turn"
tests_integ/test_cancellation.py
Outdated
| Verifies that cancellation works correctly when using the | ||
| streaming API with a real Bedrock model. | ||
| """ | ||
| from strands.models import BedrockModel |
There was a problem hiding this comment.
forgot about this test file. will update again,.
tests_integ/test_cancellation.py
Outdated
| """Integration tests for cancellation with real model providers. | ||
|
|
||
| These tests verify that cancellation works correctly with actual model providers | ||
| like Bedrock, Anthropic, OpenAI, etc. They require valid credentials and may |
tests_integ/test_cancellation.py
Outdated
| os.getenv("ANTHROPIC_API_KEY"), # Anthropic | ||
| os.getenv("OPENAI_API_KEY"), # OpenAI |
tests_integ/test_cancellation.py
Outdated
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.skipif(not os.getenv("AWS_REGION"), reason="AWS credentials not available") |
There was a problem hiding this comment.
Only need the module level check.
tests_integ/test_cancellation.py
Outdated
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.skipif(not os.getenv("AWS_REGION"), reason="AWS credentials not available") | ||
| async def test_agent_without_cancellation_bedrock(): |
tests_integ/test_cancellation.py
Outdated
|
|
||
| assert result.stop_reason == "cancelled" | ||
| # The message might be empty or partially complete | ||
| assert result.message is not None |
There was a problem hiding this comment.
Isn't it guaranteed to be something now? It should be a cancel message right?
| cancel_task = asyncio.create_task(cancel_after_delay()) | ||
|
|
||
| events = [] | ||
| async for event in agent.stream_async( |
There was a problem hiding this comment.
You could make this more deterministic and avoid the sleep by cancelling inside this loop after detecting a model delta event.
tests_integ/test_cancellation.py
Outdated
|
|
||
| # Cancel after 3 seconds (should be during tool execution) | ||
| async def cancel_after_delay(): | ||
| await asyncio.sleep(3.0) |
There was a problem hiding this comment.
Similar to the unit tests, we could make this a bit more deterministic with asyncio.Event (assuming the tools are made async, which is allowed). I think this might be worthwhile because 3 seconds may not even be enough time for the model to generate the tool uses.
- Remove _should_cancel helper, use agent._cancel_signal.is_set() directly - Remove checkpoint 1 (cycle start) to avoid yielding unadded messages - Remove checkpoint 2 (before model call) to avoid telemetry cleanup issues - Add ToolResultMessageEvent yield in cancellation path for consistency - Pass tool_result_message to end_event_loop_cycle_span in cancellation path - Type cancel_signal as threading.Event instead of Any in streaming.py - Replace sleep-based test synchronization with asyncio.Event - Add _cancel_signal to test fixtures for mock agents - Remove obsolete test_stop_signal.py and test_cancellation.py
9693939 to
905c9df
Compare
There was a problem hiding this comment.
Review Summary (Third Pass)
Assessment: Ready for approval pending documentation
The implementation is now complete and all technical feedback from previous reviews has been addressed.
Resolved Items
| Issue | Resolution |
|---|---|
| Reset mechanism | ✅ _cancel_signal.clear() in finally block |
| Custom StopSignal class | ✅ Removed - uses threading.Event directly |
| Checkpoints 1 & 2 | ✅ Removed as suggested |
| Type hint | ✅ `threading.Event |
_should_cancel helper |
✅ Removed - uses agent._cancel_signal.is_set() |
test_stop_signal.py |
✅ Deleted |
| Tool cancellation | ✅ Yields ToolResultMessageEvent, passes to tracer |
| Test determinism | ✅ Uses asyncio.Event instead of sleeps |
| Agent reuse test | ✅ test_agent_cancel_continue_after added |
| Integration tests | ✅ Uses hooks for deterministic cancellation |
Remaining blocker: The Documentation PR section is still empty. This introduces a new public API (agent.cancel()) that needs documentation covering usage examples, behavior explanation, and limitations.
Once documentation is addressed, this is ready to merge. The implementation is clean and well-tested.
| model by cancelling before the model call starts. | ||
| """ | ||
|
|
||
| agent = Agent(model=BedrockModel(model_id="anthropic.claude-3-haiku-20240307-v1:0")) |
There was a problem hiding this comment.
I believe this model is close to deprecation.
Description
Implement graceful agent cancellation via agent.cancel() API to enable stopping agent execution from external contexts (e.g., web requests, background threads).
Key features:
Implementation details:
Empty message and zero metrics when cancelled before model execution
Current accumulated state when cancelled during streaming or after model call
Tests included:
Note:
Cancellation is checked at strategic checkpoints, not continuously. Long-running tools will complete their current execution before cancellation takes effect.
Related Issues
#81
Documentation PR
Type of Change
New feature
Testing
How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli
Tests included:
Unit tests for StopSignal thread safety and behavior
Unit tests for agent cancellation at different checkpoints
Tests for idempotency, threading, and edge cases
Integration tests with real model providers
I ran
hatch run prepareChecklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.