Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions synapseclient/core/upload/multipart_upload_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,8 @@ async def multipart_upload_dataframe_async(
force_restart: True to restart a previously initiated upload from scratch, False
to try to resume.
storage_str: Optional string to append to the upload message.
to_csv_kwargs: Additional arguments to pass to the `pd.DataFrame.to_csv`
function when writing the data to a CSV file.
"""
trace.get_current_span().set_attributes(
{
Expand Down
74 changes: 65 additions & 9 deletions synapseclient/models/mixins/table_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from io import BytesIO
from typing import Any, Dict, List, Optional, Protocol, Tuple, Union

import pandas as pd
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from typing_extensions import Self
Expand Down Expand Up @@ -138,9 +139,10 @@ def row_labels_from_rows(rows: List[Row]) -> List[Row]:
)


def convert_dtypes_to_json_serializable(df):
def convert_dtypes_to_json_serializable(df) -> pd.DataFrame:
"""
Convert the dtypes of the int64 and float64 columns to object columns which are JSON serializable types.
Replace both Ellipsis and pandas NA within nested structures which are not JSON serializable types.
Also, convert the ROW_ID, ROW_VERSION, and ROW_ID.1 columns to int columns which are JSON serializable types.
Arguments:
df: The dataframe to convert the dtypes of.
Expand All @@ -163,16 +165,63 @@ def convert_dtypes_to_json_serializable(df):
"datetime_list_col": [[datetime(2021, 1, 1), datetime(2021, 1, 2), datetime(2021, 1, 3)], [datetime(2021, 1, 4), datetime(2021, 1, 5), datetime(2021, 1, 6)], None, [datetime(2021, 1, 7), datetime(2021, 1, 8), datetime(2021, 1, 9)]],
"entityid_list_col": [["syn123", "syn456", None], ["syn101", "syn102", "syn103"], None, ["syn104", "syn105", "syn106"]],
"userid_list_col": [["user1", "user2", "user3"], ["user4", "user5", None], None, ["user7", "user8", "user9"]],
"json_col_with_quotes": [
{
"id": 1,
"description": 'Text with "quotes" in the description field',
"references": []
},
{
"id": 2,
"description": 'Another description with "quoted text" here',
"references": ["ref1", "ref2"]
},
{
"id": 3,
"description": 'Description containing "multiple" quoted "words"',
"references": [...]
},
{
"id": 4,
"description": 'Description containing apostrophes sage\'s',
"references": [...]
}

],
}).convert_dtypes()
df = convert_dtypes_to_json_serializable(df)
print(df)
"""
import pandas as pd

def _serialize_json_value(x):
if isinstance(x, (list, dict)):

def _reformat_special_values(obj):
if obj is ...:
return "..."
if obj is pd.NA:
return None
if isinstance(obj, dict):
return {k: _reformat_special_values(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_reformat_special_values(item) for item in obj]
return obj

cleaned_x = _reformat_special_values(x)
return cleaned_x
# Handle standalone ellipsis
if x is ...:
return "..."
return x

for col in df.columns:
df[col] = (
df[col].replace({pd.NA: None}).astype(object)
) # this will convert the int64 and float64 columns to object columns
sample_values = df[col].dropna()
if len(sample_values):
df[col] = df[col].apply(_serialize_json_value)
# restore the original values of the column especially for the int64 and float64 columns since apply function changes the dtype
df[col] = df[col].convert_dtypes()
df[col] = df[col].replace({pd.NA: None}).astype(object)

# Convert ROW_ prefixed columns back to int (like ROW_ID, ROW_VERSION)
if col in [
"ROW_ID",
Expand Down Expand Up @@ -2809,7 +2858,6 @@ async def main():
timeout=timeout,
synapse_client=synapse_client,
)

if download_location:
return csv_path

Expand Down Expand Up @@ -3168,7 +3216,9 @@ async def store_rows_async(
insert_size_bytes: int = 900 * MB,
csv_table_descriptor: Optional[CsvTableDescriptor] = None,
read_csv_kwargs: Optional[Dict[str, Any]] = None,
to_csv_kwargs: Optional[Dict[str, Any]] = None,
to_csv_kwargs: Optional[Dict[str, Any]] = {
"escapechar": "\\",
},
job_timeout: int = 600,
synapse_client: Optional[Synapse] = None,
) -> None:
Expand Down Expand Up @@ -3387,7 +3437,8 @@ async def store_rows_async(
function when writing the data to a CSV file. This is only used when
the `values` argument is a Pandas DataFrame. See
<https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html>
for complete list of supported arguments.
for complete list of supported arguments. The default is {"escapechar": "\\"}.
Ensure escapechar="\\" is set, along with other relevant kwargs, if your data contains double quotes.

job_timeout: The maximum amount of time to wait for a job to complete.
This is used when inserting, and updating rows of data. Each individual
Expand Down Expand Up @@ -3786,6 +3837,7 @@ async def _stream_and_update_from_df(
"AppendableRowSetRequest",
]
] = None,
to_csv_kwargs: Optional[Dict[str, Any]] = None,
) -> None:
"""
Organize the process of reading in and uploading parts of the DataFrame we are
Expand Down Expand Up @@ -3816,6 +3868,8 @@ async def _stream_and_update_from_df(
being uploaded.
changes: Additional changes to the table that should
execute within this transaction.
to_csv_kwargs: Additional arguments to pass to the `pd.DataFrame.to_csv`
function when writing the data to a CSV file.
"""
file_handle_id = await multipart_upload_dataframe_async(
syn=client,
Expand All @@ -3828,6 +3882,7 @@ async def _stream_and_update_from_df(
line_start=line_start,
line_end=line_end,
bytes_to_prepend=header,
to_csv_kwargs=to_csv_kwargs,
)
# We are using a semaphore here because large tables can take a very long time
# for the update to complete. This will allow us to wait for the update to
Expand Down Expand Up @@ -4031,8 +4086,8 @@ async def _chunk_and_upload_df(
to_csv_kwargs: Additional arguments to pass to the `pd.DataFrame.to_csv`
function when writing the data to a CSV file.
"""
df = convert_dtypes_to_json_serializable(df)
# Loop over the rows of the DF to determine the size/boundries we'll be uploading

chunks_to_upload = []
size_of_chunk = 0
buffer = BytesIO()
Expand Down Expand Up @@ -4142,6 +4197,7 @@ async def _chunk_and_upload_df(
header=header_line,
changes=changes,
file_suffix=f"{part}",
to_csv_kwargs=to_csv_kwargs,
)
)
)
Expand Down
185 changes: 185 additions & 0 deletions tests/integration/synapseclient/models/async/test_table_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import pandas as pd
import pytest
from pandas.api.types import is_object_dtype
from pytest_mock import MockerFixture

import synapseclient.models.mixins.asynchronous_job as asynchronous_job_module
Expand Down Expand Up @@ -351,6 +352,8 @@ async def test_store_rows_from_csv_infer_columns(
"float_string": [1.1, 2.2, 3.3, None],
}
)
data_for_table = data_for_table.convert_dtypes()
data_for_table = data_for_table.replace({pd.NA: None})
filepath = f"{tempfile.mkdtemp()}/upload_{uuid.uuid4()}.csv"
self.schedule_for_cleanup(filepath)
data_for_table.to_csv(filepath, index=False, float_format="%.12g")
Expand Down Expand Up @@ -512,6 +515,8 @@ async def test_store_rows_from_manually_defined_columns(
"float_column": [1.1, 2.2, 3.3, None],
}
)
data_for_table = data_for_table.convert_dtypes()
data_for_table = data_for_table.replace({pd.NA: None})
filepath = f"{tempfile.mkdtemp()}/upload_{uuid.uuid4()}.csv"
self.schedule_for_cleanup(filepath)
data_for_table.to_csv(filepath, index=False, float_format="%.12g")
Expand Down Expand Up @@ -977,6 +982,179 @@ async def test_store_rows_as_large_df_being_split_and_uploaded(
# AND The spy should have been called in multiple batches
assert spy_send_job.call_count == 1

async def test_store_rows_with_quotes_and_apostrophes_ellipses(
self, project_model: Project
) -> None:
"""Test columns with quotes, apostrophes, and ellipses (in lists, dicts, and standalone) in values are properly stored and retrieved in the tables"""
# GIVEN a table with a JSON column
table_name = str(uuid.uuid4())
table = Table(
name=table_name,
parent_id=project_model.id,
columns=[
Column(name="id", column_type=ColumnType.INTEGER),
Column(name="json_data", column_type=ColumnType.JSON),
Column(
name="string_list_with_ellipses", column_type=ColumnType.STRING_LIST
),
Column(name="string_col_with_ellipses", column_type=ColumnType.STRING),
Column(name="int_list_with_pa_na", column_type=ColumnType.INTEGER_LIST),
Column(name="nullable_int", column_type=ColumnType.INTEGER),
Column(name="nullable_float", column_type=ColumnType.DOUBLE),
],
)
table = await table.store_async(synapse_client=self.syn)
self.schedule_for_cleanup(table.id)

# AND data with quotes in JSON values
data_for_table = pd.DataFrame(
{
"id": [1, 2, 3, 4, 5, 6, 7],
"json_data": [
{"description": 'Text with "quotes" here', "value": 100},
{
"description": 'Multiple "quoted" "words" here',
"value": 300,
},
{
"description": ...,
"value": 200,
}, # standalone ellipses in the json value
{
"description": [1, 2, ...],
"value": 400,
}, # list with ellipses in the json value
{
"description": {"inner": ...},
"value": 500,
}, # dict with ellipses in the json value
{
"description": "single apostrophe's",
"author": "D'Angelo",
}, # single apostrophe in the json value
{
"description": "Multiple's apostrophe's",
"author": "McDonald's",
}, # multiple apostrophe's in the json value
],
"string_list_with_ellipses": [
["a", "b", ...],
["d", ..., "f"],
["g", "h", "i"],
[...],
["m", "n", "..."],
["p", "q", "r"],
["s", "t", "u"],
],
"string_col_with_ellipses": [
"value1",
...,
"value3",
...,
"value6",
...,
"value8",
],
"int_list_with_pa_na": [
[1, 2, 3],
pd.NA,
[7, 8, 9],
pd.NA,
[11, 12, 13],
pd.NA,
[15, 16, 17],
],
"nullable_int": pd.array([10, pd.NA, 30, pd.NA, 31, pd.NA, 32]),
"nullable_float": pd.array([1.1, pd.NA, 3.3, pd.NA, 3.4, pd.NA, 3.5]),
}
)
# WHEN I store the rows
await table.store_rows_async(
values=data_for_table,
synapse_client=self.syn,
)
# THEN I can query the table and retrieve the data correctly
results = await query_async(
f"SELECT * FROM {table.id}",
synapse_client=self.syn,
timeout=QUERY_TIMEOUT_SEC,
)
# AND the JSON data should be properly preserved with quotes
assert len(results) == 7
expected_result = pd.DataFrame(
{
"id": [1, 2, 3, 4, 5, 6, 7],
"json_data": [
{"description": 'Text with "quotes" here', "value": 100},
{
"description": 'Multiple "quoted" "words" here',
"value": 300,
},
{
"description": "...",
"value": 200,
}, # standalone ellipses in the json value
{
"description": [1, 2, "..."],
"value": 400,
}, # list with ellipses in the json value
{
"description": {"inner": "..."},
"value": 500,
}, # dict with ellipses in the json value
{
"description": "single apostrophe's",
"author": "D'Angelo",
}, # single apostrophe in the json value
{
"description": "Multiple's apostrophe's",
"author": "McDonald's",
}, # multiple apostrophe's in the json value
],
"string_list_with_ellipses": [
["a", "b", "..."],
["d", "...", "f"],
["g", "h", "i"],
["..."],
["m", "n", "..."],
["p", "q", "r"],
["s", "t", "u"],
],
"string_col_with_ellipses": [
"value1",
"...",
"value3",
"...",
"value6",
"...",
"value8",
],
"int_list_with_pa_na": [
[1, 2, 3],
[],
[7, 8, 9],
[],
[11, 12, 13],
[],
[15, 16, 17],
],
"nullable_int": pd.array([10, None, 30, None, 31, None, 32]),
"nullable_float": pd.array([1.1, None, 3.3, None, 3.4, None, 3.5]),
}
)
assert is_object_dtype(results.json_data)
assert is_object_dtype(results.int_list_with_pa_na)
assert is_object_dtype(results.nullable_int)
assert is_object_dtype(results.nullable_float)

expected_result = expected_result.convert_dtypes()
expected_result = expected_result.replace({pd.NA: None})
pd.testing.assert_frame_equal(
results.drop(columns=["ROW_ID", "ROW_VERSION"]),
expected_result,
check_dtype=False,
)


class TestUpsertRows:
@pytest.fixture(autouse=True, scope="function")
Expand Down Expand Up @@ -1549,9 +1727,13 @@ async def test_upsert_all_data_types(self, project_model: Project) -> None:
],
}
)

expected_results = expected_results.convert_dtypes()
expected_results = expected_results.replace({pd.NA: None})
pd.testing.assert_frame_equal(
results_after_insert, expected_results, check_dtype=False
)

# Create a second test file to update references
path2 = utils.make_bogus_data_file()
self.schedule_for_cleanup(path2)
Expand Down Expand Up @@ -1733,7 +1915,10 @@ async def test_upsert_all_data_types(self, project_model: Project) -> None:
],
}
)
expected_results = expected_results.convert_dtypes()
expected_results = expected_results.replace({pd.NA: None})
pd.testing.assert_frame_equal(results, expected_results, check_dtype=False)

# WHEN I upsert with multiple primary keys and null values
multi_key_data = pd.DataFrame(
{
Expand Down
Loading
Loading