[FEAT] Expose total_pages_processed in execution API response#1801
[FEAT] Expose total_pages_processed in execution API response#1801pk-zipstack wants to merge 3 commits intomainfrom
Conversation
…adata Surface page usage data from PageUsage model in API responses to support tracking total pages processed per file execution and per workflow execution. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
for more information, see https://pre-commit.ci
Summary by CodeRabbit
WalkthroughAdds aggregated pages-processed tracking: a helper that sums PageUsage by run_id(s), a WorkflowExecution property exposing the aggregate, a serializer field, and enrichment of endpoint/task metadata with the aggregated total_pages_processed. Returns None when no input or no matching records. Changes
Sequence DiagramsequenceDiagram
participant Client
participant Endpoint as Endpoint/Destination
participant Task as FileExecutionTasks
participant Serializer
participant Model as WorkflowExecution
participant Helper as UsageHelper
participant DB as PageUsage
Client->>Endpoint: request combined metadata
Endpoint->>Helper: get_aggregated_pages_processed(run_id)
Helper->>DB: query PageUsage by run_id/run_ids
DB-->>Helper: return records
Helper-->>Endpoint: return summed pages_processed
Endpoint-->>Client: return metadata (includes total_pages_processed)
Task->>Helper: get_aggregated_pages_processed(run_id)
Helper->>DB: query PageUsage
DB-->>Helper: return records
Helper-->>Task: return sum
Task->>Serializer: include total_pages_processed in execution metadata
Serializer->>Model: access aggregated_total_pages_processed
Model->>Helper: get_aggregated_pages_processed(run_ids)
Helper-->>Model: return sum
Serializer-->>Client: serialized execution with aggregated_total_pages_processed
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (4)
backend/workflow_manager/workflow_v2/models/execution.py (2)
262-280:PageUsage.run_idhas no database index, but is now on multiple hot query paths.The
PageUsagemodel only indexesorganization_id. Bothfilter(run_id=run_id)(inUsageHelper.get_aggregated_pages_processed) andfilter(run_id__in=str_ids)(here) will full-scan thepage_usagetable as it grows. These queries are now triggered per-file in API deployments and per-row in execution list views.A migration adding a
db_index=Trueonrun_id(or aMeta.indexesentry) is recommended:# In account_usage/models.py – PageUsage.Meta indexes = [ models.Index(fields=["organization_id"]), models.Index(fields=["run_id"]), # + ]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/workflow_manager/workflow_v2/models/execution.py` around lines 262 - 280, Add a database index on PageUsage.run_id to avoid full table scans for queries used by aggregated_total_pages_processed and UsageHelper.get_aggregated_pages_processed: update the PageUsage model Meta to include an index for "run_id" (e.g., add models.Index(fields=["run_id"]) alongside the existing organization_id index) and create/apply a Django migration so the new index is created in the database.
274-280: Same redundant.exists()+.aggregate()double-query as inUsageHelper.
Sum("pages_processed")on an empty queryset returnsNonefor the key, so the.exists()check buys nothing. Collapsing to a single.aggregate()call saves one round-trip per property access (and this property is called per row in list-view serialization).♻️ Proposed fix
- str_ids = [str(fid) for fid in file_execution_ids] - queryset = PageUsage.objects.filter(run_id__in=str_ids) - if not queryset.exists(): - return None - - result = queryset.aggregate(total_pages=Sum("pages_processed")) - return result.get("total_pages") + str_ids = [str(fid) for fid in file_execution_ids] + result = PageUsage.objects.filter(run_id__in=str_ids).aggregate( + total_pages=Sum("pages_processed") + ) + return result.get("total_pages")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/workflow_manager/workflow_v2/models/execution.py` around lines 274 - 280, The current code does a redundant .exists() followed by .aggregate() which causes an extra DB round-trip; replace the two-step pattern in the block that builds str_ids and assigns queryset = PageUsage.objects.filter(run_id__in=str_ids) with a single aggregate call: call PageUsage.objects.filter(run_id__in=str_ids).aggregate(total_pages=Sum("pages_processed")) and return result.get("total_pages") (this preserves None for no rows) — update the logic around the variables file_execution_ids, queryset and the use of Sum("pages_processed")/pages_processed to remove the .exists() check and avoid the double query.backend/workflow_manager/execution/serializer/execution.py (1)
36-38:aggregated_total_pages_processedadds up to 3 extra DB queries per execution in list views.The model property fires: (1) a
values_listonfile_executions, (2) aPageUsage.exists(), (3) aPageUsage.aggregate(). Combined with the existing per-item queries forget_successful_filesandget_failed_files, execution list endpoints are now executing ~7 queries per row. The class-level TODO already calls this out; this field makes addressing it more urgent.Consider annotating the aggregate in the queryset that feeds the list view (e.g., via a subquery annotation or a bulk
prefetch_relatedapproach) rather than resolving it lazily per object.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/workflow_manager/execution/serializer/execution.py` around lines 36 - 38, The current get_aggregated_total_pages_processed serializer method calls the WorkflowExecution model property aggregated_total_pages_processed which triggers multiple DB queries per row; instead annotate the queryset that feeds the list view with the aggregated total (using a Subquery/OuterRef or a bulk aggregation with Prefetch over file_executions/PageUsage) and change get_aggregated_total_pages_processed to return that annotated value (e.g., read obj.annotated_aggregated_total_pages_processed or similar) so no per-object queries are run; update the view/queryset that constructs the list to include the annotation name you choose and ensure the serializer reads that attribute rather than accessing the model property.backend/usage_v2/helper.py (1)
37-45: Redundant.exists()check creates an unnecessary extra DB query.
Sumon an empty queryset already returnsNonefor the aggregated key, soresult.get("total_pages")will returnNonewhen there are no records — the.exists()guard is superfluous. The sibling methodget_aggregated_token_countuses a single.aggregate()call for this reason.Additionally, two static-analysis hints are valid here:
- BLE001: Replace bare
except Exceptionwith a narrower exception type, or at minimum annotate the intent.- TRY400:
logger.errorsuppresses the traceback;logger.exception(orlogger.error(..., exc_info=True)) is preferred.♻️ Proposed fix
- try: - queryset = PageUsage.objects.filter(run_id=run_id) - if not queryset.exists(): - return None - result = queryset.aggregate(total_pages=Sum("pages_processed")) - return result.get("total_pages") - except Exception as e: - logger.error(f"Error aggregating pages processed for run_id {run_id}: {e}") - return None + try: + result = PageUsage.objects.filter(run_id=run_id).aggregate( + total_pages=Sum("pages_processed") + ) + return result.get("total_pages") + except Exception as e: # noqa: BLE001 + logger.exception(f"Error aggregating pages processed for run_id {run_id}: {e}") + return None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/usage_v2/helper.py` around lines 37 - 45, Remove the redundant .exists() check and perform a single aggregate call on PageUsage.objects.filter(run_id=run_id) returning result.get("total_pages") (same pattern as get_aggregated_token_count); replace the bare except Exception with a narrower exception (e.g., catch django.db.DatabaseError) and log the failure with full traceback using logger.exception(...) (or logger.error(..., exc_info=True)) to preserve stack information while keeping the behavior of returning None on error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@backend/usage_v2/helper.py`:
- Around line 37-45: Remove the redundant .exists() check and perform a single
aggregate call on PageUsage.objects.filter(run_id=run_id) returning
result.get("total_pages") (same pattern as get_aggregated_token_count); replace
the bare except Exception with a narrower exception (e.g., catch
django.db.DatabaseError) and log the failure with full traceback using
logger.exception(...) (or logger.error(..., exc_info=True)) to preserve stack
information while keeping the behavior of returning None on error.
In `@backend/workflow_manager/execution/serializer/execution.py`:
- Around line 36-38: The current get_aggregated_total_pages_processed serializer
method calls the WorkflowExecution model property
aggregated_total_pages_processed which triggers multiple DB queries per row;
instead annotate the queryset that feeds the list view with the aggregated total
(using a Subquery/OuterRef or a bulk aggregation with Prefetch over
file_executions/PageUsage) and change get_aggregated_total_pages_processed to
return that annotated value (e.g., read
obj.annotated_aggregated_total_pages_processed or similar) so no per-object
queries are run; update the view/queryset that constructs the list to include
the annotation name you choose and ensure the serializer reads that attribute
rather than accessing the model property.
In `@backend/workflow_manager/workflow_v2/models/execution.py`:
- Around line 262-280: Add a database index on PageUsage.run_id to avoid full
table scans for queries used by aggregated_total_pages_processed and
UsageHelper.get_aggregated_pages_processed: update the PageUsage model Meta to
include an index for "run_id" (e.g., add models.Index(fields=["run_id"])
alongside the existing organization_id index) and create/apply a Django
migration so the new index is created in the database.
- Around line 274-280: The current code does a redundant .exists() followed by
.aggregate() which causes an extra DB round-trip; replace the two-step pattern
in the block that builds str_ids and assigns queryset =
PageUsage.objects.filter(run_id__in=str_ids) with a single aggregate call: call
PageUsage.objects.filter(run_id__in=str_ids).aggregate(total_pages=Sum("pages_processed"))
and return result.get("total_pages") (this preserves None for no rows) — update
the logic around the variables file_execution_ids, queryset and the use of
Sum("pages_processed")/pages_processed to remove the .exists() check and avoid
the double query.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (5)
backend/usage_v2/helper.pybackend/workflow_manager/endpoint_v2/destination.pybackend/workflow_manager/execution/serializer/execution.pybackend/workflow_manager/workflow_v2/file_execution_tasks.pybackend/workflow_manager/workflow_v2/models/execution.py
backend/usage_v2/helper.py
Outdated
| except Exception as e: | ||
| logger.error(f"Error aggregating pages processed for run_id {run_id}: {e}") | ||
| return None |
There was a problem hiding this comment.
NIT: Check this exception handling behaviour. In almost all cases its better to let the exception bubble up and let it get handled by the middleware. Test it after removing this - we need to ensure that user sees an appropriate error message (need not be too specific) but at the same time, we should log it with a traceback if its a 5xx error
backend/usage_v2/helper.py
Outdated
|
|
||
| class UsageHelper: | ||
| @staticmethod | ||
| def get_aggregated_pages_processed(run_id: str) -> int | None: |
There was a problem hiding this comment.
There's some code duplication between this method and aggregated_total_pages_processed() of WorkflowExecution model. Unify these implementations
| queryset = PageUsage.objects.filter(run_id__in=str_ids) | ||
| if not queryset.exists(): | ||
| return None | ||
|
|
||
| result = queryset.aggregate(total_pages=Sum("pages_processed")) |
There was a problem hiding this comment.
NIT: Code duplication
…ption handling - Unify UsageHelper.get_aggregated_pages_processed() to accept either run_id or run_ids, eliminating duplicate PageUsage query logic - WorkflowExecution.aggregated_total_pages_processed now delegates to UsageHelper instead of duplicating the aggregation - Remove broad try/except so exceptions bubble up to middleware Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
backend/usage_v2/helper.py (1)
49-52: Remove the redundantexists()check — it causes an extra DB round-trip.Django's
Sumon an empty queryset returns{"total_pages": None}, soresult.get("total_pages")already returnsNonewith no records present. Theexists()guard adds a second database hit for every call without changing the outcome.♻️ Proposed simplification
- if not queryset.exists(): - return None - result = queryset.aggregate(total_pages=Sum("pages_processed")) - return result.get("total_pages") + result = queryset.aggregate(total_pages=Sum("pages_processed")) + return result.get("total_pages")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/usage_v2/helper.py` around lines 49 - 52, The current code performs an extra DB round-trip by calling queryset.exists() before aggregating; remove the exists() check and directly call queryset.aggregate(total_pages=Sum("pages_processed")) and return result.get("total_pages") — keep the aggregate on the same queryset (using Sum and "pages_processed") so empty querysets yield None without the extra exists() query.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/workflow_manager/workflow_v2/models/execution.py`:
- Around line 261-277: The aggregated_total_pages_processed property issues
per-object DB queries (file_executions.values_list(...) and
UsageHelper.get_aggregated_pages_processed(...)) causing an N+1 when used in
ExecutionSerializer list endpoints; fix by either adding a DB index on
PageUsage.run_id to speed each aggregate query and adding a migration for that
index, or (preferred) batch the totals in the list view/serializer by computing
aggregated page totals for all execution IDs in one query and attaching them to
the queryset (override the list view or ExecutionSerializer to accept a
precomputed map keyed by execution id and avoid calling
aggregated_total_pages_processed per instance); reference the property
aggregated_total_pages_processed, the call to file_executions.values_list, and
UsageHelper.get_aggregated_pages_processed when implementing the change.
---
Nitpick comments:
In `@backend/usage_v2/helper.py`:
- Around line 49-52: The current code performs an extra DB round-trip by calling
queryset.exists() before aggregating; remove the exists() check and directly
call queryset.aggregate(total_pages=Sum("pages_processed")) and return
result.get("total_pages") — keep the aggregate on the same queryset (using Sum
and "pages_processed") so empty querysets yield None without the extra exists()
query.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/usage_v2/helper.pybackend/workflow_manager/workflow_v2/models/execution.py
| @property | ||
| def aggregated_total_pages_processed(self) -> int | None: | ||
| """Retrieve aggregated total pages processed for this execution. | ||
|
|
||
| Returns: | ||
| int | None: Total pages processed across all file executions, | ||
| or None if no page usage data exists. | ||
| """ | ||
| from usage_v2.helper import UsageHelper | ||
|
|
||
| file_execution_ids = list(self.file_executions.values_list("id", flat=True)) | ||
| if not file_execution_ids: | ||
| return None | ||
|
|
||
| return UsageHelper.get_aggregated_pages_processed( | ||
| run_ids=[str(fid) for fid in file_execution_ids] | ||
| ) |
There was a problem hiding this comment.
N+1 query risk when aggregated_total_pages_processed is serialized in a list endpoint.
Each call to this property issues at minimum 2 queries (1 for file_executions.values_list, 1 for PageUsage.aggregate). The AI summary confirms this property is exposed in ExecutionSerializer for the execution list API, making this O(2N) extra queries for N executions — on top of any existing per-object properties already doing the same.
Unlike aggregated_usage_cost (which filters Usage directly by execution_id), this property must first resolve file_execution_ids and then fan out to PageUsage, because there is no direct execution_id column on PageUsage (only run_id, which maps to a file execution ID). The fan-out is structurally inherent to the data model.
Mitigation options to consider:
- Add a DB index on
PageUsage.run_idto at least make each aggregate query fast. - Batch-load in the serializer: override the list view's queryset to prefetch/annotate page totals per execution, bypassing the per-object property for list responses.
- Accept the cost if the list endpoint is paginated tightly (e.g., page size ≤ 10) and usage is low, but make this explicit.
Run the following to confirm there's no existing index on PageUsage.run_id:
#!/bin/bash
# Confirm PageUsage model's Meta.indexes and any migrations that add an index on run_id
rg -n "run_id" --type py -C3 backend/account_usage/models.py
# Also check migrations for any index on page_usage.run_id
rg -rn "page_usage" --type py -g "**/migrations/**" -C2🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/workflow_manager/workflow_v2/models/execution.py` around lines 261 -
277, The aggregated_total_pages_processed property issues per-object DB queries
(file_executions.values_list(...) and
UsageHelper.get_aggregated_pages_processed(...)) causing an N+1 when used in
ExecutionSerializer list endpoints; fix by either adding a DB index on
PageUsage.run_id to speed each aggregate query and adding a migration for that
index, or (preferred) batch the totals in the list view/serializer by computing
aggregated page totals for all execution IDs in one query and attaching them to
the queryset (override the list view or ExecutionSerializer to accept a
precomputed map keyed by execution id and avoid calling
aggregated_total_pages_processed per instance); reference the property
aggregated_total_pages_processed, the call to file_executions.values_list, and
UsageHelper.get_aggregated_pages_processed when implementing the change.



What
UsageHelper.get_aggregated_pages_processed()to aggregate page count fromPageUsagemodel perrun_idaggregated_total_pages_processedproperty onWorkflowExecutionmodel to aggregate pages across all file executionstotal_pages_processedfor API destinations in_process_final_output()total_pages_processedfor DB destinations inget_combined_metadata()aggregated_total_pages_processedinExecutionSerializerfor the execution list APIWhy
PageUsagemodel already trackspages_processedperrun_id(file execution ID), but this data was not surfaced in API responsesHow
usage_v2/helper.py: Addedget_aggregated_pages_processed(run_id)static method that queriesPageUsage.objects.filter(run_id=run_id)and aggregatesSum('pages_processed'), returningint | Noneworkflow_v2/models/execution.py: Addedaggregated_total_pages_processedproperty that collects file execution IDs viaself.file_executions, converts to strings, and queriesPageUsagewithrun_id__inworkflow_v2/file_execution_tasks.py: In_process_final_output(), afterdestination.get_metadata()for API destinations, injectstotal_pages_processedintoexecution_metadataendpoint_v2/destination.py: Inget_combined_metadata(), addstotal_pages_processedalongside existingusagetoken dataexecution/serializer/execution.py: Addedaggregated_total_pages_processedas aSerializerMethodFieldCan this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
total_pages_processedfield gracefully returnsNonewhen noPageUsagedata exists.Database Migrations
Env Config
Relevant Docs
Related Issues or PRs
Dependencies Versions
Notes on Testing
total_pages_processedappears in per-file response metadatatotal_pages_processedappears in combined metadataaggregated_total_pages_processedappears in the responseNoneis returned gracefully when noPageUsagerecords exist for an executionScreenshots
Checklist
I have read and understood the Contribution Guidelines.