Feature/snowflake s3 stage operations#17
Conversation
- Add query_pandas_from_snowflake_via_s3_stage() for efficient large query results (>10M rows) - Add publish_pandas_via_s3_stage() for efficient large DataFrame writes (>10M rows) - Add make_batch_predictions_from_snowflake_via_s3_stage() for batch ML predictions - Support dev/prod environment switching via current.is_production - Add helper functions for S3 operations and SQL generation - Add metaflow_s3/utils.py with S3 utility functions - Add comprehensive functional tests - Integrate with existing Metaflow card system and cost tracking
… functional tests
…r schema from DataFrame
…te table creation logic
…function for improved readability and maintainability
…for improved clarity and performance
…in batch inference
…using S3 file retrieval function
…ient assignment for improved clarity
…for improved performance
…tegrate with multiprocessing
…roved parallel processing
…r for improved parallel processing
…ved concurrency in batch inference
…e and temporary file handling
…remove unused functions
…tep and ensuring all futures complete
…t to streamline code
There was a problem hiding this comment.
Pull request overview
This PR implements S3 stage operations for Snowflake data transfer, providing an alternative to direct Snowflake data loading that's more efficient for large datasets. The feature adds support for exporting data from Snowflake to S3 and importing from S3 to Snowflake using Snowflake's external stage functionality.
Changes:
- Added S3 stage configuration constants for dev and prod environments
- Implemented S3 operations module with functions for reading/writing DataFrames via S3
- Extended
publish_pandasandquery_pandas_from_snowflakewithuse_s3_stageparameter - Added new
batch_inferencefunction for distributed model inference using S3 staging
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 19 comments.
Show a summary per file
| File | Description |
|---|---|
src/ds_platform_utils/metaflow/_consts.py |
Adds S3 bucket and Snowflake stage configuration constants for dev/prod environments |
src/ds_platform_utils/metaflow/s3.py |
New module implementing S3 read/write operations for DataFrames using Metaflow S3 client |
src/ds_platform_utils/metaflow/pandas.py |
Adds S3 stage support to publish_pandas and query_pandas_from_snowflake functions with schema inference |
src/ds_platform_utils/metaflow/batch_inference.py |
New function for parallel batch inference using Snowflake → S3 → model → S3 → Snowflake pipeline |
tests/functional_tests/metaflow/test__pandas_s3.py |
Test flow for S3 stage operations with schema and without schema |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| parallelism: int = 1, | ||
| warehouse: Optional[str] = None, | ||
| ctx: Optional[dict] = None, | ||
| ): |
There was a problem hiding this comment.
Missing docstring. The batch_inference function is a public function (no leading underscore) that lacks a docstring explaining its purpose, parameters, return value, and usage. This is inconsistent with other public functions in the codebase like publish_pandas and query_pandas_from_snowflake which have comprehensive docstrings. Add a docstring documenting all parameters and the function's behavior.
| ): | |
| ): | |
| """ | |
| Run a batch inference workflow from Snowflake via S3 and write predictions back to Snowflake. | |
| The function executes a Snowflake query, exports the result to an S3-backed stage in | |
| batches, applies the provided ``model_predictor_function`` to each batch, writes the | |
| predictions back to S3, and then loads the results into a Snowflake table. | |
| Parameters | |
| ---------- | |
| input_query: | |
| Either a SQL query string or a ``pathlib.Path``/string pointing to a file | |
| containing a SQL query. The query may contain ``{schema}`` and additional | |
| placeholders that are populated using the current environment and the | |
| ``ctx`` mapping. | |
| output_table_name: | |
| Name of the target Snowflake table that will receive the prediction results. | |
| The table is created or overwritten in the active schema, depending on the | |
| behavior implemented by the underlying Snowflake utilities. | |
| model_predictor_function: | |
| A callable that takes a Pandas ``DataFrame`` containing a batch of input rows | |
| from Snowflake and returns a ``DataFrame`` with the corresponding predictions. | |
| The returned columns must match the schema expected for the output table. | |
| output_table_schema: | |
| Optional explicit schema for the output table as a list of ``(column_name, | |
| snowflake_type)`` tuples. If omitted, the schema may be inferred from the | |
| ``DataFrame`` returned by ``model_predictor_function``. | |
| use_utc: | |
| Whether to use UTC when generating timestamps or time-based metadata used in | |
| the batch inference process. | |
| batch_size_in_mb: | |
| Approximate size, in megabytes, of each batch of data exported from Snowflake | |
| to S3 and processed by the predictor function. | |
| parallelism: | |
| Number of parallel worker threads used to process batches and compute | |
| predictions. | |
| warehouse: | |
| Optional Snowflake warehouse name to use for executing the query and loading | |
| results. If ``None``, the default warehouse for the environment is used. | |
| ctx: | |
| Optional dictionary of additional values to substitute into the SQL query | |
| template in addition to the ``schema`` placeholder. | |
| Returns | |
| ------- | |
| None | |
| This function performs side effects (Snowflake queries, S3 uploads/downloads, | |
| and Metaflow card updates) and does not return a value. | |
| """ |
| ], | ||
| ) | ||
|
|
||
| self.next(self.test_publish_pandas_with_warehouse) |
There was a problem hiding this comment.
The step flow is broken. Line 48 calls self.test_publish_pandas_with_warehouse but this method doesn't exist. The next test method is named test_publish_pandas_without_schema. This should be self.next(self.test_publish_pandas_without_schema) to match the method defined at line 51.
| def batch_inference( # noqa: PLR0913, PLR0915 | ||
| input_query: Union[str, Path], | ||
| output_table_name: str, | ||
| model_predictor_function: Callable[[pd.DataFrame], pd.DataFrame], | ||
| output_table_schema: Optional[List[Tuple[str, str]]] = None, | ||
| use_utc: bool = True, | ||
| batch_size_in_mb: int = 128, | ||
| parallelism: int = 1, | ||
| warehouse: Optional[str] = None, | ||
| ctx: Optional[dict] = None, | ||
| ): |
There was a problem hiding this comment.
New public function not exported in __init__.py. The batch_inference function is a new public API function (no leading underscore) but it's not added to the __all__ list in src/ds_platform_utils/metaflow/__init__.py. This means users cannot import it using from ds_platform_utils.metaflow import batch_inference. Either add it to the exports or mark it as private with a leading underscore if it's intended for internal use only.
| def _generate_snowflake_to_s3_copy_query( | ||
| query: str, | ||
| snowflake_stage_path: str, | ||
| file_name: str = "data.parquet", |
There was a problem hiding this comment.
The file_name parameter is defined but never used in the function body. The COPY INTO command generates files with its own naming convention and doesn't use this parameter. Either remove this unused parameter or implement it in the COPY INTO command if file naming control is needed.
No description provided.