generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 12
Open
Labels
enhancementNew feature or requestNew feature or request
Description
What would you like?
Step results require a custom SerDes in case they are not JSON serializable.
Pydantic is an established library within the Python ecosystem and is already mentioned in the documentation.
I suggest to add a PydanticSerDes that could be used for steps.
Possible Implementation
The following code provides a example implementation and tests.
import json
import pytest
from aws_durable_execution_sdk_python import (
durable_execution,
DurableContext,
durable_step,
ValidationError,
)
from aws_durable_execution_sdk_python.config import StepConfig, Duration
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.serdes import SerDes, SerDesContext
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
from pydantic import BaseModel, TypeAdapter
class PydanticSerDes[T: BaseModel](SerDes[T]):
"""SerDes for Pydantic models that supports both single models and lists of models."""
def __init__(self, cls: type[T]):
super().__init__()
self.cls = cls
def serialize(self, value: T, serdes_context: SerDesContext | None = None) -> str:
adapter = TypeAdapter(self.cls | list[self.cls])
return adapter.dump_json(value).decode()
def deserialize(self, data: str, serdes_context: SerDesContext | None = None) -> T:
try:
obj = json.loads(data)
adapter = TypeAdapter(self.cls | list[self.cls])
return adapter.validate_python(obj)
except ValidationError as e:
print(f"Failed to deserialize data into {self.cls.__name__}: {e} \n {data}")
raise
@pytest.fixture
def durable_runner(request):
"""Pytest fixture that provides a test runner."""
marker = request.node.get_closest_marker("durable_execution")
if not marker:
pytest.fail("Test must be marked with @pytest.mark.durable_execution")
handler = marker.kwargs.get("handler")
runner = DurableFunctionTestRunner(handler=handler)
yield runner
class NestedDummy(BaseModel):
value: str
class Dummy(BaseModel):
id: str
nested: NestedDummy
@durable_step
def step_one(context: DurableContext):
return [Dummy(id="1", nested=NestedDummy(value="nested_value"))]
@durable_step
def step_two(context: DurableContext, items: list[Dummy]):
assert len(items) == 1
assert items[0].id == "1"
assert items[0].nested.value == "nested_value"
return "Processed items"
@durable_execution
def handler(event, context: DurableContext):
dummies = context.step(step_one(), config=StepConfig(serdes=PydanticSerDes(Dummy)))
context.wait(Duration.from_seconds(1)) # required to test checkpointing behavior
context.step(step_two(dummies), "step_one")
@pytest.mark.durable_execution(handler=handler, lambda_function_name="my_function")
def test_my_function(durable_runner):
with durable_runner:
result = durable_runner.run(timeout=10)
assert result.status == InvocationStatus.SUCCEEDEDIs this a breaking change?
No
Does this require an RFC?
No
Additional Context
The implementation could also be adjusted to explicitly ask the developer to provide list[Model] in case a list should be processed.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request