Skip to content

[Feature]: Pydantic Support #307

@eruvanos

Description

@eruvanos

What would you like?

Step results require a custom SerDes in case they are not JSON serializable.

Pydantic is an established library within the Python ecosystem and is already mentioned in the documentation.

I suggest to add a PydanticSerDes that could be used for steps.

Possible Implementation

The following code provides a example implementation and tests.

import json

import pytest
from aws_durable_execution_sdk_python import (
    durable_execution,
    DurableContext,
    durable_step,
    ValidationError,
)
from aws_durable_execution_sdk_python.config import StepConfig, Duration
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.serdes import SerDes, SerDesContext
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
from pydantic import BaseModel, TypeAdapter


class PydanticSerDes[T: BaseModel](SerDes[T]):
    """SerDes for Pydantic models that supports both single models and lists of models."""

    def __init__(self, cls: type[T]):
        super().__init__()
        self.cls = cls

    def serialize(self, value: T, serdes_context: SerDesContext | None = None) -> str:
        adapter = TypeAdapter(self.cls | list[self.cls])
        return adapter.dump_json(value).decode()

    def deserialize(self, data: str, serdes_context: SerDesContext | None = None) -> T:
        try:
            obj = json.loads(data)
            adapter = TypeAdapter(self.cls | list[self.cls])
            return adapter.validate_python(obj)

        except ValidationError as e:
            print(f"Failed to deserialize data into {self.cls.__name__}: {e} \n {data}")
            raise


@pytest.fixture
def durable_runner(request):
    """Pytest fixture that provides a test runner."""
    marker = request.node.get_closest_marker("durable_execution")
    if not marker:
        pytest.fail("Test must be marked with @pytest.mark.durable_execution")

    handler = marker.kwargs.get("handler")
    runner = DurableFunctionTestRunner(handler=handler)

    yield runner

class NestedDummy(BaseModel):
    value: str


class Dummy(BaseModel):
    id: str
    nested: NestedDummy


@durable_step
def step_one(context: DurableContext):
    return [Dummy(id="1", nested=NestedDummy(value="nested_value"))]


@durable_step
def step_two(context: DurableContext, items: list[Dummy]):
    assert len(items) == 1
    assert items[0].id == "1"
    assert items[0].nested.value == "nested_value"
    return "Processed items"


@durable_execution
def handler(event, context: DurableContext):
    dummies = context.step(step_one(), config=StepConfig(serdes=PydanticSerDes(Dummy)))
    context.wait(Duration.from_seconds(1)) # required to test checkpointing behavior
    context.step(step_two(dummies), "step_one")


@pytest.mark.durable_execution(handler=handler, lambda_function_name="my_function")
def test_my_function(durable_runner):
    with durable_runner:
        result = durable_runner.run(timeout=10)
    assert result.status == InvocationStatus.SUCCEEDED

Is this a breaking change?

No

Does this require an RFC?

No

Additional Context

The implementation could also be adjusted to explicitly ask the developer to provide list[Model] in case a list should be processed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions