Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 59 additions & 26 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@
from apify.storage_clients._apify._models import ApifyRequestQueueMetadata


async def fetch_next_request_with_exp_backoff(rq: RequestQueue, max_retries: int = 5) -> Request | None:
"""Fetch the next request with exponential backoff retries.

In shared request queue mode, there is a propagation delay before newly added or reclaimed requests become visible
(see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with exponential backoff to handle
that delay in integration tests.
"""
for attempt in range(max_retries):
result = await rq.fetch_next_request()
if result is not None:
return result
delay = 2**attempt
Actor.log.info(f'fetch_next_request returned None, retrying in {delay}s (attempt {attempt + 1}/{max_retries})')
await asyncio.sleep(delay)
return None


async def test_add_and_fetch_requests(request_queue_apify: RequestQueue) -> None:
"""Test basic functionality of adding and fetching requests."""

Expand Down Expand Up @@ -208,8 +225,12 @@ async def test_request_unique_key_behavior(request_queue_apify: RequestQueue) ->
)


async def test_request_reclaim_functionality(request_queue_apify: RequestQueue) -> None:
async def test_request_reclaim_functionality(
request_queue_apify: RequestQueue,
request: pytest.FixtureRequest,
) -> None:
"""Test request reclaiming for failed processing."""
rq_access_mode = request.node.callspec.params.get('request_queue_apify')

rq = request_queue_apify
Actor.log.info('Request queue opened')
Expand All @@ -219,36 +240,44 @@ async def test_request_reclaim_functionality(request_queue_apify: RequestQueue)
Actor.log.info('Added test request')

# Fetch and reclaim the request
request = await rq.fetch_next_request()
assert request is not None, f'request={request}'
Actor.log.info(f'Fetched request: {request.url}')
fetched_request = await rq.fetch_next_request()
assert fetched_request is not None
Actor.log.info(f'Fetched request: {fetched_request.url}')

# Reclaim the request (simulate failed processing)
reclaim_result = await rq.reclaim_request(request)
assert reclaim_result is not None, f'reclaim_result={reclaim_result}'
assert reclaim_result.was_already_handled is False, (
f'reclaim_result.was_already_handled={reclaim_result.was_already_handled}'
)
reclaim_result = await rq.reclaim_request(fetched_request)
assert reclaim_result is not None
assert reclaim_result.was_already_handled is False

Actor.log.info('Request reclaimed successfully')

# Should be able to fetch the same request again
request2 = await rq.fetch_next_request()
assert request2 is not None, f'request2={request2}'
assert request2.url == request.url, (
f'request2.url={request2.url}',
f'request.url={request.url}',
)
# Should be able to fetch the same request again.
# In shared mode, there is a propagation delay before the reclaimed request becomes visible
# (see https://github.com/apify/apify-sdk-python/issues/808).
if rq_access_mode == 'shared':
request2 = await fetch_next_request_with_exp_backoff(rq)
else:
request2 = await rq.fetch_next_request()

assert request2 is not None
assert request2.url == fetched_request.url

Actor.log.info(f'Successfully fetched reclaimed request: {request2.url}')

# Mark as handled this time
await rq.mark_request_as_handled(request2)
is_finished = await rq.is_finished()
assert is_finished is True, f'is_finished={is_finished}'
assert is_finished is True


async def test_request_reclaim_with_forefront(request_queue_apify: RequestQueue) -> None:
async def test_request_reclaim_with_forefront(
request_queue_apify: RequestQueue,
request: pytest.FixtureRequest,
) -> None:
"""Test reclaiming requests to the front of the queue."""

rq_access_mode = request.node.callspec.params.get('request_queue_apify')

rq = request_queue_apify
Actor.log.info('Request queue opened')

Expand All @@ -260,20 +289,24 @@ async def test_request_reclaim_with_forefront(request_queue_apify: RequestQueue)

# Fetch first request
first_request = await rq.fetch_next_request()
assert first_request is not None, f'first_request={first_request}'
assert first_request is not None
Actor.log.info(f'Fetched first request: {first_request.url}')

# Reclaim to forefront
await rq.reclaim_request(first_request, forefront=True)
Actor.log.info('Request reclaimed to forefront')

# The reclaimed request should be fetched first again
next_request = await rq.fetch_next_request()
assert next_request is not None, f'next_request={next_request}'
assert next_request.url == first_request.url, (
f'next_request.url={next_request.url}',
f'first_request.url={first_request.url}',
)
# The reclaimed request should be fetched first again.
# In shared mode, there is a propagation delay before the reclaimed request becomes visible
# (see https://github.com/apify/apify-sdk-python/issues/808).
if rq_access_mode == 'shared':
next_request = await fetch_next_request_with_exp_backoff(rq)
else:
next_request = await rq.fetch_next_request()

assert next_request is not None
assert next_request.url == first_request.url

Actor.log.info(f'Confirmed reclaimed request came first: {next_request.url}')

# Clean up
Expand Down