diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 35a08b7c..fcbce94a 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -26,6 +26,23 @@ from apify.storage_clients._apify._models import ApifyRequestQueueMetadata +async def fetch_next_request_with_exp_backoff(rq: RequestQueue, max_retries: int = 5) -> Request | None: + """Fetch the next request with exponential backoff retries. + + In shared request queue mode, there is a propagation delay before newly added or reclaimed requests become visible + (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with exponential backoff to handle + that delay in integration tests. + """ + for attempt in range(max_retries): + result = await rq.fetch_next_request() + if result is not None: + return result + delay = 2**attempt + Actor.log.info(f'fetch_next_request returned None, retrying in {delay}s (attempt {attempt + 1}/{max_retries})') + await asyncio.sleep(delay) + return None + + async def test_add_and_fetch_requests(request_queue_apify: RequestQueue) -> None: """Test basic functionality of adding and fetching requests.""" @@ -208,8 +225,12 @@ async def test_request_unique_key_behavior(request_queue_apify: RequestQueue) -> ) -async def test_request_reclaim_functionality(request_queue_apify: RequestQueue) -> None: +async def test_request_reclaim_functionality( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test request reclaiming for failed processing.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -219,36 +240,44 @@ async def test_request_reclaim_functionality(request_queue_apify: RequestQueue) Actor.log.info('Added test request') # Fetch and reclaim the request - request = await rq.fetch_next_request() - assert request is not None, f'request={request}' - Actor.log.info(f'Fetched request: {request.url}') + fetched_request = await rq.fetch_next_request() + assert fetched_request is not None + Actor.log.info(f'Fetched request: {fetched_request.url}') # Reclaim the request (simulate failed processing) - reclaim_result = await rq.reclaim_request(request) - assert reclaim_result is not None, f'reclaim_result={reclaim_result}' - assert reclaim_result.was_already_handled is False, ( - f'reclaim_result.was_already_handled={reclaim_result.was_already_handled}' - ) + reclaim_result = await rq.reclaim_request(fetched_request) + assert reclaim_result is not None + assert reclaim_result.was_already_handled is False + Actor.log.info('Request reclaimed successfully') - # Should be able to fetch the same request again - request2 = await rq.fetch_next_request() - assert request2 is not None, f'request2={request2}' - assert request2.url == request.url, ( - f'request2.url={request2.url}', - f'request.url={request.url}', - ) + # Should be able to fetch the same request again. + # In shared mode, there is a propagation delay before the reclaimed request becomes visible + # (see https://github.com/apify/apify-sdk-python/issues/808). + if rq_access_mode == 'shared': + request2 = await fetch_next_request_with_exp_backoff(rq) + else: + request2 = await rq.fetch_next_request() + + assert request2 is not None + assert request2.url == fetched_request.url + Actor.log.info(f'Successfully fetched reclaimed request: {request2.url}') # Mark as handled this time await rq.mark_request_as_handled(request2) is_finished = await rq.is_finished() - assert is_finished is True, f'is_finished={is_finished}' + assert is_finished is True -async def test_request_reclaim_with_forefront(request_queue_apify: RequestQueue) -> None: +async def test_request_reclaim_with_forefront( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test reclaiming requests to the front of the queue.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') + rq = request_queue_apify Actor.log.info('Request queue opened') @@ -260,20 +289,24 @@ async def test_request_reclaim_with_forefront(request_queue_apify: RequestQueue) # Fetch first request first_request = await rq.fetch_next_request() - assert first_request is not None, f'first_request={first_request}' + assert first_request is not None Actor.log.info(f'Fetched first request: {first_request.url}') # Reclaim to forefront await rq.reclaim_request(first_request, forefront=True) Actor.log.info('Request reclaimed to forefront') - # The reclaimed request should be fetched first again - next_request = await rq.fetch_next_request() - assert next_request is not None, f'next_request={next_request}' - assert next_request.url == first_request.url, ( - f'next_request.url={next_request.url}', - f'first_request.url={first_request.url}', - ) + # The reclaimed request should be fetched first again. + # In shared mode, there is a propagation delay before the reclaimed request becomes visible + # (see https://github.com/apify/apify-sdk-python/issues/808). + if rq_access_mode == 'shared': + next_request = await fetch_next_request_with_exp_backoff(rq) + else: + next_request = await rq.fetch_next_request() + + assert next_request is not None + assert next_request.url == first_request.url + Actor.log.info(f'Confirmed reclaimed request came first: {next_request.url}') # Clean up