Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/strands/models/litellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,18 @@ async def _process_choice_content(
)
yield data_type, chunk

# Process reasoning signature from LiteLLM's thinking attribute
thinking = getattr(content_source, "thinking", None)
if thinking is not None:
sig = getattr(thinking, "signature", None)
if isinstance(sig, str) and sig:
chunks, data_type = self._stream_switch_content("reasoning_content", data_type)
for chunk in chunks:
yield data_type, chunk
yield data_type, {
"contentBlockDelta": {"delta": {"reasoningContent": {"signature": sig}}}
}

# Process text content
if hasattr(content_source, "content") and content_source.content:
chunks, data_type = self._stream_switch_content("text", data_type)
Expand Down
136 changes: 136 additions & 0 deletions tests/strands/models/test_litellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,3 +848,139 @@ def test_format_request_messages_with_tool_calls_no_content():
},
]
assert tru_result == exp_result


@pytest.mark.asyncio
async def test_stream_preserves_thinking_signature(litellm_acompletion, api_key, model_id, model, agenerator, alist):
"""Test that reasoning content signatures from LiteLLM thinking attribute are preserved.

Gemini thinking models send thought_signature that must be preserved in the conversation
history for subsequent requests to succeed. LiteLLM provides signatures via the `thinking`
attribute on streaming deltas.
"""

class MockStreamChunk:
def __init__(self, choices=None):
self.choices = choices or []

# First chunk: reasoning text (no signature yet)
mock_thinking_no_sig = unittest.mock.Mock()
mock_thinking_no_sig.signature = None
mock_delta_1 = unittest.mock.Mock(
content=None, tool_calls=None, reasoning_content="Let me think...", thinking=mock_thinking_no_sig
)

# Second chunk: signature arrives via thinking attribute
mock_thinking_with_sig = unittest.mock.Mock()
mock_thinking_with_sig.signature = "base64encodedSignature=="
mock_delta_2 = unittest.mock.Mock(
content=None, tool_calls=None, reasoning_content=None, thinking=mock_thinking_with_sig
)

# Third chunk: text response
mock_delta_3 = unittest.mock.Mock(content="The answer is 42.", tool_calls=None, reasoning_content=None)
mock_delta_3.thinking = None

mock_event_1 = MockStreamChunk(choices=[unittest.mock.Mock(finish_reason=None, delta=mock_delta_1)])
mock_event_2 = MockStreamChunk(choices=[unittest.mock.Mock(finish_reason=None, delta=mock_delta_2)])
mock_event_3 = MockStreamChunk(choices=[unittest.mock.Mock(finish_reason="stop", delta=mock_delta_3)])
mock_event_4 = MockStreamChunk(choices=[])

litellm_acompletion.side_effect = unittest.mock.AsyncMock(
return_value=agenerator([mock_event_1, mock_event_2, mock_event_3, mock_event_4])
)

messages = [{"role": "user", "content": [{"type": "text", "text": "Think about 42"}]}]
response = model.stream(messages)
events = await alist(response)

# Verify reasoning content signature is emitted
signature_deltas = [
e
for e in events
if "contentBlockDelta" in e
and "reasoningContent" in e["contentBlockDelta"]["delta"]
and "signature" in e["contentBlockDelta"]["delta"]["reasoningContent"]
]
assert len(signature_deltas) == 1
assert signature_deltas[0]["contentBlockDelta"]["delta"]["reasoningContent"]["signature"] == (
"base64encodedSignature=="
)


@pytest.mark.asyncio
async def test_stream_signature_arrives_with_unset_data_type(
litellm_acompletion, api_key, model_id, model, agenerator, alist
):
"""Test that a reasoning signature arriving before any reasoning text triggers content switching.

When the signature is the first chunk (data_type is None), _stream_switch_content should emit
content_start/content_stop events for the reasoning_content block.
"""

class MockStreamChunk:
def __init__(self, choices=None):
self.choices = choices or []

# First chunk: signature arrives immediately (no prior reasoning text)
mock_thinking_with_sig = unittest.mock.Mock()
mock_thinking_with_sig.signature = "earlySignature=="
mock_delta_1 = unittest.mock.Mock(
content=None, tool_calls=None, reasoning_content=None, thinking=mock_thinking_with_sig
)

# Second chunk: text response
mock_delta_2 = unittest.mock.Mock(content="The answer is 42.", tool_calls=None, reasoning_content=None)
mock_delta_2.thinking = None

mock_event_1 = MockStreamChunk(choices=[unittest.mock.Mock(finish_reason=None, delta=mock_delta_1)])
mock_event_2 = MockStreamChunk(choices=[unittest.mock.Mock(finish_reason="stop", delta=mock_delta_2)])
mock_event_3 = MockStreamChunk(choices=[])

litellm_acompletion.side_effect = unittest.mock.AsyncMock(
return_value=agenerator([mock_event_1, mock_event_2, mock_event_3])
)

messages = [{"role": "user", "content": [{"type": "text", "text": "Think about 42"}]}]
response = model.stream(messages)
events = await alist(response)

# Verify content_start was emitted before the signature (content type switch from None)
# When signature arrives first (data_type=None), _stream_switch_content emits content_start
content_starts = [e for e in events if "contentBlockStart" in e]
content_stops = [e for e in events if "contentBlockStop" in e]
assert len(content_starts) == 2 # one for reasoning_content, one for text
assert len(content_stops) == 2 # one for reasoning_content, one for text

# Verify the signature content block appears before text:
# event order should be: messageStart, contentBlockStart, signature delta, contentBlockStop,
# contentBlockStart, text delta, contentBlockStop, messageStop
sig_index = next(
i for i, e in enumerate(events)
if "contentBlockDelta" in e and "reasoningContent" in e["contentBlockDelta"]["delta"]
)
text_index = next(
i for i, e in enumerate(events)
if "contentBlockDelta" in e and "text" in e.get("contentBlockDelta", {}).get("delta", {})
)
assert sig_index < text_index

# Verify signature delta is emitted
signature_deltas = [
e
for e in events
if "contentBlockDelta" in e
and "reasoningContent" in e["contentBlockDelta"]["delta"]
and "signature" in e["contentBlockDelta"]["delta"]["reasoningContent"]
]
assert len(signature_deltas) == 1
assert signature_deltas[0]["contentBlockDelta"]["delta"]["reasoningContent"]["signature"] == "earlySignature=="

# Verify text content follows after reasoning
text_deltas = [
e
for e in events
if "contentBlockDelta" in e and "text" in e.get("contentBlockDelta", {}).get("delta", {})
]
assert len(text_deltas) == 1
assert text_deltas[0]["contentBlockDelta"]["delta"]["text"] == "The answer is 42."
39 changes: 39 additions & 0 deletions tests_integ/models/test_model_litellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,45 @@ def test_agent_invoke_reasoning(agent, model):
assert result.message["content"][0]["reasoningContent"]["reasoningText"]["text"]


@pytest.mark.parametrize("model_fixture", ["streaming_model", "non_streaming_model"])
def test_agent_reasoning_with_signature(model_fixture, request):
"""Test that reasoning content and signatures are preserved in responses.

Verifies the full reasoning pipeline: reasoning text is returned with
a non-empty signature, ensuring that the model provider's thought
signatures are correctly propagated through the LiteLLM adapter.

Note: tools are intentionally excluded to avoid the Bedrock constraint
that multi-turn thinking+tools requires thinking blocks in assistant
messages on subsequent turns.
"""
model = request.getfixturevalue(model_fixture)
model.update_config(
params={
"thinking": {
"budget_tokens": 1024,
"type": "enabled",
},
},
)

agent = Agent(model=model)
result = agent("What is 2 + 2? Think step by step.")

# Find the reasoning content block in the response
reasoning_blocks = [block for block in result.message["content"] if "reasoningContent" in block]
assert reasoning_blocks, "Expected at least one reasoningContent block"

reasoning_block = reasoning_blocks[0]
reasoning_text = reasoning_block["reasoningContent"]["reasoningText"]

# Verify reasoning text is present and non-empty
assert reasoning_text.get("text"), "Reasoning text should be non-empty"

# Verify signature is present and non-empty
assert reasoning_text.get("signature"), "Reasoning signature should be present and non-empty"


@pytest.mark.parametrize("model_fixture", ["streaming_model", "non_streaming_model"])
def test_structured_output(model_fixture, weather, request):
model = request.getfixturevalue(model_fixture)
Expand Down