Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/a2a/client/transports/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,42 +184,44 @@
if isinstance(response.root, JSONRPCErrorResponse):
raise A2AClientJSONRPCError(response.root)
yield response.root.result
except httpx.TimeoutException as e:
raise A2AClientTimeoutError('Client Request timed out') from e
except httpx.HTTPStatusError as e:
raise A2AClientHTTPError(e.response.status_code, str(e)) from e
except SSEError as e:
raise A2AClientHTTPError(
400, f'Invalid SSE response or protocol error: {e}'
) from e
except json.JSONDecodeError as e:
raise A2AClientJSONError(str(e)) from e
except httpx.RequestError as e:
raise A2AClientHTTPError(
503, f'Network communication error: {e}'
) from e

async def _send_request(
self,
rpc_request_payload: dict[str, Any],

Check notice on line 204 in src/a2a/client/transports/jsonrpc.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/rest.py (166-181)
http_kwargs: dict[str, Any] | None = None,
) -> dict[str, Any]:
try:
response = await self.httpx_client.post(
self.url, json=rpc_request_payload, **(http_kwargs or {})
)
response.raise_for_status()
return response.json()
except httpx.ReadTimeout as e:
except httpx.TimeoutException as e:
raise A2AClientTimeoutError('Client Request timed out') from e
except httpx.HTTPStatusError as e:
raise A2AClientHTTPError(e.response.status_code, str(e)) from e
except json.JSONDecodeError as e:
raise A2AClientJSONError(str(e)) from e
except httpx.RequestError as e:
raise A2AClientHTTPError(
503, f'Network communication error: {e}'
) from e

async def get_task(

Check notice on line 224 in src/a2a/client/transports/jsonrpc.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/rest.py (183-197)
self,
request: TaskQueryParams,
*,
Expand Down Expand Up @@ -365,46 +367,48 @@
if isinstance(response.root, JSONRPCErrorResponse):
raise A2AClientJSONRPCError(response.root)
yield response.root.result
except httpx.TimeoutException as e:
raise A2AClientTimeoutError('Client Request timed out') from e
except SSEError as e:
raise A2AClientHTTPError(
400, f'Invalid SSE response or protocol error: {e}'
) from e
except json.JSONDecodeError as e:
raise A2AClientJSONError(str(e)) from e
except httpx.RequestError as e:
raise A2AClientHTTPError(
503, f'Network communication error: {e}'
) from e

async def get_card(
self,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
signature_verifier: Callable[[AgentCard], None] | None = None,
) -> AgentCard:
"""Retrieves the agent's card."""
modified_kwargs = update_extension_header(
self._get_http_args(context),
extensions if extensions is not None else self.extensions,
)
card = self.agent_card

if not card:
resolver = A2ACardResolver(self.httpx_client, self.url)
card = await resolver.get_agent_card(
http_kwargs=modified_kwargs,
signature_verifier=signature_verifier,
)
self._needs_extended_card = (
card.supports_authenticated_extended_card
)
self.agent_card = card

if not self._needs_extended_card:
return card

request = GetAuthenticatedExtendedCardRequest(id=str(uuid4()))

Check notice on line 411 in src/a2a/client/transports/jsonrpc.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/rest.py (368-409)
payload, modified_kwargs = await self._apply_interceptors(
request.method,
request.model_dump(mode='json', exclude_none=True),
Expand Down
12 changes: 11 additions & 1 deletion src/a2a/client/transports/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
from httpx_sse import SSEError, aconnect_sse

from a2a.client.card_resolver import A2ACardResolver
from a2a.client.errors import A2AClientHTTPError, A2AClientJSONError
from a2a.client.errors import (
A2AClientHTTPError,
A2AClientJSONError,
A2AClientTimeoutError,
)
from a2a.client.middleware import ClientCallContext, ClientCallInterceptor
from a2a.client.transports.base import ClientTransport
from a2a.extensions.common import update_extension_header
Expand Down Expand Up @@ -159,34 +163,38 @@
event = a2a_pb2.StreamResponse()
Parse(sse.data, event)
yield proto_utils.FromProto.stream_response(event)
except httpx.TimeoutException as e:
raise A2AClientTimeoutError('Client Request timed out') from e
except httpx.HTTPStatusError as e:
raise A2AClientHTTPError(e.response.status_code, str(e)) from e
except SSEError as e:
raise A2AClientHTTPError(
400, f'Invalid SSE response or protocol error: {e}'
) from e
except json.JSONDecodeError as e:
raise A2AClientJSONError(str(e)) from e
except httpx.RequestError as e:
raise A2AClientHTTPError(
503, f'Network communication error: {e}'
) from e

async def _send_request(self, request: httpx.Request) -> dict[str, Any]:

Check notice on line 181 in src/a2a/client/transports/rest.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/jsonrpc.py (187-204)

Check notice on line 181 in src/a2a/client/transports/rest.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/rest.py (369-381)
try:
response = await self.httpx_client.send(request)
response.raise_for_status()
return response.json()
except httpx.TimeoutException as e:
raise A2AClientTimeoutError('Client Request timed out') from e
except httpx.HTTPStatusError as e:
raise A2AClientHTTPError(e.response.status_code, str(e)) from e
except json.JSONDecodeError as e:
raise A2AClientJSONError(str(e)) from e
except httpx.RequestError as e:
raise A2AClientHTTPError(
503, f'Network communication error: {e}'
) from e

async def _send_post_request(

Check notice on line 197 in src/a2a/client/transports/rest.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/jsonrpc.py (210-224)
self,
target: str,
rpc_request_payload: dict[str, Any],
Expand Down Expand Up @@ -357,18 +365,20 @@
event = a2a_pb2.StreamResponse()
Parse(sse.data, event)
yield proto_utils.FromProto.stream_response(event)
except httpx.TimeoutException as e:
raise A2AClientTimeoutError('Client Request timed out') from e
except SSEError as e:
raise A2AClientHTTPError(
400, f'Invalid SSE response or protocol error: {e}'
) from e
except json.JSONDecodeError as e:
raise A2AClientJSONError(str(e)) from e
except httpx.RequestError as e:
raise A2AClientHTTPError(
503, f'Network communication error: {e}'
) from e

async def get_card(

Check notice on line 381 in src/a2a/client/transports/rest.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/rest.py (169-181)
self,
*,
context: ClientCallContext | None = None,
Expand Down
32 changes: 32 additions & 0 deletions tests/client/transports/test_jsonrpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,38 @@ async def test_send_message_client_timeout(

assert 'Client Request timed out' in str(exc_info.value)

@pytest.mark.asyncio
@patch('a2a.client.transports.jsonrpc.aconnect_sse')
async def test_send_message_streaming_timeout(
self,
mock_aconnect_sse: AsyncMock,
mock_httpx_client: AsyncMock,
mock_agent_card: MagicMock,
):
client = JsonRpcTransport(
httpx_client=mock_httpx_client, agent_card=mock_agent_card
)
params = MessageSendParams(
message=create_text_message_object(content='Hello stream')
)
mock_event_source = AsyncMock(spec=EventSource)
mock_event_source.response = MagicMock(spec=httpx.Response)
mock_event_source.response.raise_for_status.return_value = None
mock_event_source.aiter_sse.side_effect = httpx.TimeoutException(
'Read timed out'
)
mock_aconnect_sse.return_value.__aenter__.return_value = (
mock_event_source
)

with pytest.raises(A2AClientTimeoutError) as exc_info:
_ = [
item
async for item in client.send_message_streaming(request=params)
]

assert 'Client Request timed out' in str(exc_info.value)

@pytest.mark.asyncio
async def test_get_task_success(
self, mock_httpx_client: AsyncMock, mock_agent_card: MagicMock
Expand Down
36 changes: 35 additions & 1 deletion tests/client/transports/test_rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from httpx_sse import EventSource, ServerSentEvent

from a2a.client import create_text_message_object
from a2a.client.errors import A2AClientHTTPError
from a2a.client.errors import A2AClientHTTPError, A2AClientTimeoutError
from a2a.client.transports.rest import RestTransport
from a2a.extensions.common import HTTP_EXTENSION_HEADER
from a2a.grpc import a2a_pb2
Expand Down Expand Up @@ -50,6 +50,40 @@ def _assert_extensions_header(mock_kwargs: dict, expected_extensions: set[str]):
assert actual_extensions == expected_extensions


class TestRestTransport:
@pytest.mark.asyncio
@patch('a2a.client.transports.rest.aconnect_sse')
async def test_send_message_streaming_timeout(
self,
mock_aconnect_sse: AsyncMock,
mock_httpx_client: AsyncMock,
mock_agent_card: MagicMock,
):
client = RestTransport(
httpx_client=mock_httpx_client, agent_card=mock_agent_card
)
params = MessageSendParams(
message=create_text_message_object(content='Hello stream')
)
mock_event_source = AsyncMock(spec=EventSource)
mock_event_source.response = MagicMock(spec=httpx.Response)
mock_event_source.response.raise_for_status.return_value = None
mock_event_source.aiter_sse.side_effect = httpx.TimeoutException(
'Read timed out'
)
mock_aconnect_sse.return_value.__aenter__.return_value = (
mock_event_source
)

with pytest.raises(A2AClientTimeoutError) as exc_info:
_ = [
item
async for item in client.send_message_streaming(request=params)
]

assert 'Client Request timed out' in str(exc_info.value)


class TestRestTransportExtensions:
@pytest.mark.asyncio
async def test_send_message_with_default_extensions(
Expand Down
Loading