Skip to content

Comments

Feature/snowflake s3 stage operations#17

Closed
abhishek-pattern wants to merge 46 commits intomainfrom
feature/snowflake-s3-stage-operations
Closed

Feature/snowflake s3 stage operations#17
abhishek-pattern wants to merge 46 commits intomainfrom
feature/snowflake-s3-stage-operations

Conversation

@abhishek-pattern
Copy link

No description provided.

- Add query_pandas_from_snowflake_via_s3_stage() for efficient large query results (>10M rows)
- Add publish_pandas_via_s3_stage() for efficient large DataFrame writes (>10M rows)
- Add make_batch_predictions_from_snowflake_via_s3_stage() for batch ML predictions
- Support dev/prod environment switching via current.is_production
- Add helper functions for S3 operations and SQL generation
- Add metaflow_s3/utils.py with S3 utility functions
- Add comprehensive functional tests
- Integrate with existing Metaflow card system and cost tracking
…function for improved readability and maintainability
Copilot AI review requested due to automatic review settings February 12, 2026 11:33
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements S3 stage operations for Snowflake data transfer, providing an alternative to direct Snowflake data loading that's more efficient for large datasets. The feature adds support for exporting data from Snowflake to S3 and importing from S3 to Snowflake using Snowflake's external stage functionality.

Changes:

  • Added S3 stage configuration constants for dev and prod environments
  • Implemented S3 operations module with functions for reading/writing DataFrames via S3
  • Extended publish_pandas and query_pandas_from_snowflake with use_s3_stage parameter
  • Added new batch_inference function for distributed model inference using S3 staging

Reviewed changes

Copilot reviewed 6 out of 7 changed files in this pull request and generated 19 comments.

Show a summary per file
File Description
src/ds_platform_utils/metaflow/_consts.py Adds S3 bucket and Snowflake stage configuration constants for dev/prod environments
src/ds_platform_utils/metaflow/s3.py New module implementing S3 read/write operations for DataFrames using Metaflow S3 client
src/ds_platform_utils/metaflow/pandas.py Adds S3 stage support to publish_pandas and query_pandas_from_snowflake functions with schema inference
src/ds_platform_utils/metaflow/batch_inference.py New function for parallel batch inference using Snowflake → S3 → model → S3 → Snowflake pipeline
tests/functional_tests/metaflow/test__pandas_s3.py Test flow for S3 stage operations with schema and without schema

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

parallelism: int = 1,
warehouse: Optional[str] = None,
ctx: Optional[dict] = None,
):
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docstring. The batch_inference function is a public function (no leading underscore) that lacks a docstring explaining its purpose, parameters, return value, and usage. This is inconsistent with other public functions in the codebase like publish_pandas and query_pandas_from_snowflake which have comprehensive docstrings. Add a docstring documenting all parameters and the function's behavior.

Suggested change
):
):
"""
Run a batch inference workflow from Snowflake via S3 and write predictions back to Snowflake.
The function executes a Snowflake query, exports the result to an S3-backed stage in
batches, applies the provided ``model_predictor_function`` to each batch, writes the
predictions back to S3, and then loads the results into a Snowflake table.
Parameters
----------
input_query:
Either a SQL query string or a ``pathlib.Path``/string pointing to a file
containing a SQL query. The query may contain ``{schema}`` and additional
placeholders that are populated using the current environment and the
``ctx`` mapping.
output_table_name:
Name of the target Snowflake table that will receive the prediction results.
The table is created or overwritten in the active schema, depending on the
behavior implemented by the underlying Snowflake utilities.
model_predictor_function:
A callable that takes a Pandas ``DataFrame`` containing a batch of input rows
from Snowflake and returns a ``DataFrame`` with the corresponding predictions.
The returned columns must match the schema expected for the output table.
output_table_schema:
Optional explicit schema for the output table as a list of ``(column_name,
snowflake_type)`` tuples. If omitted, the schema may be inferred from the
``DataFrame`` returned by ``model_predictor_function``.
use_utc:
Whether to use UTC when generating timestamps or time-based metadata used in
the batch inference process.
batch_size_in_mb:
Approximate size, in megabytes, of each batch of data exported from Snowflake
to S3 and processed by the predictor function.
parallelism:
Number of parallel worker threads used to process batches and compute
predictions.
warehouse:
Optional Snowflake warehouse name to use for executing the query and loading
results. If ``None``, the default warehouse for the environment is used.
ctx:
Optional dictionary of additional values to substitute into the SQL query
template in addition to the ``schema`` placeholder.
Returns
-------
None
This function performs side effects (Snowflake queries, S3 uploads/downloads,
and Metaflow card updates) and does not return a value.
"""

Copilot uses AI. Check for mistakes.
],
)

self.next(self.test_publish_pandas_with_warehouse)
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The step flow is broken. Line 48 calls self.test_publish_pandas_with_warehouse but this method doesn't exist. The next test method is named test_publish_pandas_without_schema. This should be self.next(self.test_publish_pandas_without_schema) to match the method defined at line 51.

Copilot uses AI. Check for mistakes.
Comment on lines +29 to +39
def batch_inference( # noqa: PLR0913, PLR0915
input_query: Union[str, Path],
output_table_name: str,
model_predictor_function: Callable[[pd.DataFrame], pd.DataFrame],
output_table_schema: Optional[List[Tuple[str, str]]] = None,
use_utc: bool = True,
batch_size_in_mb: int = 128,
parallelism: int = 1,
warehouse: Optional[str] = None,
ctx: Optional[dict] = None,
):
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New public function not exported in __init__.py. The batch_inference function is a new public API function (no leading underscore) but it's not added to the __all__ list in src/ds_platform_utils/metaflow/__init__.py. This means users cannot import it using from ds_platform_utils.metaflow import batch_inference. Either add it to the exports or mark it as private with a leading underscore if it's intended for internal use only.

Copilot uses AI. Check for mistakes.
def _generate_snowflake_to_s3_copy_query(
query: str,
snowflake_stage_path: str,
file_name: str = "data.parquet",
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file_name parameter is defined but never used in the function body. The COPY INTO command generates files with its own naming convention and doesn't use this parameter. Either remove this unused parameter or implement it in the COPY INTO command if file naming control is needed.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant