From 425093bc7cb13a7306efce6a7b0a0414a012719b Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Mon, 9 Feb 2026 11:24:36 +0100 Subject: [PATCH 1/4] Fix rare scenario when RQ state is wrong due to API failuire during marking request as failed --- .../_apify/_request_queue_shared_client.py | 4 +- .../_apify/_request_queue_single_client.py | 25 ++++++++----- .../apify_api/test_request_queue.py | 37 +++++++++++++++++++ 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 4a00e8bc..278b79f2 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -235,8 +235,8 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | processed_request=processed_request, hydrated_request=request, ) - except Exception as exc: - logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}') + except Exception: + logger.exception(f'Error marking request {request.unique_key} as handled.') return None else: return processed_request diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 7cc202bb..e1a4fa41 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -201,27 +201,27 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | # Set the handled_at timestamp if not already set request_id = unique_key_to_request_id(request.unique_key) + if cached_request := self._requests_cache.get(request_id): + cached_request.handled_at = request.handled_at + if request.handled_at is None: request.handled_at = datetime.now(tz=timezone.utc) self.metadata.handled_request_count += 1 self.metadata.pending_request_count -= 1 - if cached_request := self._requests_cache.get(request_id): - cached_request.handled_at = request.handled_at - try: + # Remember that we handled this request, to optimize local deduplication. + self._requests_already_handled.add(request_id) + self._requests_in_progress.discard(request_id) + # Remove request from cache, it will most likely not be needed. + self._requests_cache.pop(request_id, None) # Update the request in the API # Works as upsert - adds the request if it does not exist yet. (Local request that was handled before # adding to the queue.) processed_request = await self._update_request(request) - # Remember that we handled this request, to optimize local deduplication. - self._requests_already_handled.add(request_id) - # Remove request from cache. It will most likely not be needed. - self._requests_cache.pop(request_id) - self._requests_in_progress.discard(request_id) - except Exception as exc: - logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}') + except Exception: + logger.exception(f'Error marking request {request.unique_key} as handled.') return None else: return processed_request @@ -303,6 +303,11 @@ async def _list_head(self) -> None: # Ignore requests that are already in progress, we will not process them again. continue + if request_id in self._requests_already_handled: + # Request is locally known to be handled, but platform is not aware of it. + # This can be either due to delay in API data propagation or failed API call to mark it as handled. + continue + if request.was_already_handled: # Do not cache fully handled requests, we do not need them. Just cache their id. self._requests_already_handled.add(request_id) diff --git a/tests/integration/apify_api/test_request_queue.py b/tests/integration/apify_api/test_request_queue.py index e90c1600..9c8c7f96 100644 --- a/tests/integration/apify_api/test_request_queue.py +++ b/tests/integration/apify_api/test_request_queue.py @@ -1192,3 +1192,40 @@ def return_unprocessed_requests(requests: list[dict], *_: Any, **__: Any) -> dic Actor.log.info(stats_after) assert (stats_after['writeCount'] - stats_before['writeCount']) == 1 + + +async def test_request_queue_api_fail_when_marking_as_handled( + apify_token: str, monkeypatch: pytest.MonkeyPatch +) -> None: + """Test that single-access based Apify RQ can deal with API failures when marking requests as handled. + + Single-access based Apify RQ is aware that local information is reliable, so even if marking as handled fails + during API call, the RQ correctly tracks the handling information locally. It can even fix the missing handled + information on the platform later, when fetching next request later. + """ + + monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token) + async with Actor: + rq = await RequestQueue.open(storage_client=ApifyStorageClient(request_queue_access='single')) + + try: + request = Request.from_url('http://example.com') + # Fetch request + await rq.add_request(request) + assert request == await rq.fetch_next_request() + + # Mark as handled, but simulate API failure. + with mock.patch.object( + rq._client._api_client, 'update_request', side_effect=Exception('Simulated API failure') + ): + await rq.mark_request_as_handled(request) + assert not (await rq.get_request(request.unique_key)).was_already_handled + + # RQ with `request_queue_access="single"` knows, that the local information is reliable, so it knows it + # handled this request already despite the platform not being aware of it. + assert not await rq.fetch_next_request() + assert await rq.is_finished() + assert await rq.is_empty() + + finally: + await rq.drop() From 103474d4f4191486f7bb1986a698dde386d6448b Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 13 Feb 2026 09:33:53 +0100 Subject: [PATCH 2/4] Reduce e2e `TESTS_CONCURRENCY` to avoid platform overload --- .github/workflows/_tests.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/_tests.yaml b/.github/workflows/_tests.yaml index 9151c61f..c1e484b2 100644 --- a/.github/workflows/_tests.yaml +++ b/.github/workflows/_tests.yaml @@ -87,7 +87,7 @@ jobs: strategy: # E2E tests build and run Actors on the platform. Limit parallel workflows to 1 to avoid exceeding - # the platform's memory limits. A single workflow with 16 pytest workers provides good test + # the platform's memory limits. A single workflow with 4 pytest workers provides good test # parallelization while staying within platform constraints. max-parallel: 1 matrix: @@ -97,7 +97,7 @@ jobs: runs-on: ${{ matrix.os }} env: - TESTS_CONCURRENCY: "16" + TESTS_CONCURRENCY: "4" steps: - name: Checkout repository From 5e96c03d831b2da378620ddd72a3f1fe85493619 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 13 Feb 2026 09:58:19 +0100 Subject: [PATCH 3/4] Revet concurency changes --- .github/workflows/_tests.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/_tests.yaml b/.github/workflows/_tests.yaml index c1e484b2..9151c61f 100644 --- a/.github/workflows/_tests.yaml +++ b/.github/workflows/_tests.yaml @@ -87,7 +87,7 @@ jobs: strategy: # E2E tests build and run Actors on the platform. Limit parallel workflows to 1 to avoid exceeding - # the platform's memory limits. A single workflow with 4 pytest workers provides good test + # the platform's memory limits. A single workflow with 16 pytest workers provides good test # parallelization while staying within platform constraints. max-parallel: 1 matrix: @@ -97,7 +97,7 @@ jobs: runs-on: ${{ matrix.os }} env: - TESTS_CONCURRENCY: "4" + TESTS_CONCURRENCY: "16" steps: - name: Checkout repository From b6f52843706acf4672593a7ddab83a7848743e91 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 13 Feb 2026 13:42:29 +0100 Subject: [PATCH 4/4] Update test docstring --- tests/integration/test_request_queue.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index d1cc6b1d..a1fa7305 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -1200,8 +1200,7 @@ async def test_request_queue_api_fail_when_marking_as_handled( """Test that single-access based Apify RQ can deal with API failures when marking requests as handled. Single-access based Apify RQ is aware that local information is reliable, so even if marking as handled fails - during API call, the RQ correctly tracks the handling information locally. It can even fix the missing handled - information on the platform later, when fetching next request later. + during API call, the RQ correctly tracks the handling information locally. """ monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token)