From ac59dc1dca0bc427ccedc76a06ce02fc48ed1dd1 Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Sat, 28 Mar 2026 16:12:08 -0400 Subject: [PATCH] feat: [pynumaflow-lite] Update Sourcer to propagate totalPartitions Signed-off-by: Vaibhav Tiwari --- packages/pynumaflow-lite/Cargo.toml | 2 +- .../manifests/source/simple_source.py | 2 +- .../pynumaflow_lite/_source_dtypes.py | 23 ++++++++-- packages/pynumaflow-lite/src/source/server.rs | 42 +++++++++++++++---- .../tests/examples/source_simple.py | 8 +++- 5 files changed, 64 insertions(+), 13 deletions(-) diff --git a/packages/pynumaflow-lite/Cargo.toml b/packages/pynumaflow-lite/Cargo.toml index c19a0fa7..3629f271 100644 --- a/packages/pynumaflow-lite/Cargo.toml +++ b/packages/pynumaflow-lite/Cargo.toml @@ -9,7 +9,7 @@ name = "pynumaflow_lite" crate-type = ["cdylib", "rlib"] [dependencies] -numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "44ee3068fcf7088ff265df7ae7ce1881a40694ff" } +numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "15c46e8289943a639a46a475b7e0d286e928a8b0" } pyo3 = { version = "0.27.1", features = ["chrono", "experimental-inspect"] } tokio = "1.47.1" tonic = "0.14.2" diff --git a/packages/pynumaflow-lite/manifests/source/simple_source.py b/packages/pynumaflow-lite/manifests/source/simple_source.py index e7d501ba..92b287f8 100644 --- a/packages/pynumaflow-lite/manifests/source/simple_source.py +++ b/packages/pynumaflow-lite/manifests/source/simple_source.py @@ -78,7 +78,7 @@ async def pending_handler(self) -> sourcer.PendingResponse: """ return sourcer.PendingResponse(count=0) - async def partitions_handler(self) -> sourcer.PartitionsResponse: + async def active_partitions_handler(self) -> sourcer.PartitionsResponse: """ The simple source always returns default partitions. """ diff --git a/packages/pynumaflow-lite/pynumaflow_lite/_source_dtypes.py b/packages/pynumaflow-lite/pynumaflow_lite/_source_dtypes.py index 1ab72137..7cd43027 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/_source_dtypes.py +++ b/packages/pynumaflow-lite/pynumaflow_lite/_source_dtypes.py @@ -8,6 +8,7 @@ NackRequest, PendingResponse, PartitionsResponse, + TotalPartitionsResponse, ) @@ -19,10 +20,11 @@ class Sourcer(metaclass=ABCMeta): - read_handler: Read messages from the source - ack_handler: Acknowledge processed messages - pending_handler: Return the number of pending messages - - partitions_handler: Return the partitions this source handles + - active_partitions_handler: Return the partitions this source handles Optionally, you can implement: - nack_handler: Negatively acknowledge messages (default: no-op) + - total_partitions_handler: Return the total number of partitions in the source """ def __call__(self, *args, **kwargs): @@ -88,9 +90,9 @@ async def pending_handler(self) -> PendingResponse: pass @abstractmethod - async def partitions_handler(self) -> PartitionsResponse: + async def active_partitions_handler(self) -> PartitionsResponse: """ - Return the partitions associated with this source. + Return the active partitions associated with this source. This is used by the platform to determine the partitions to which the watermark should be published. If your source doesn't have the @@ -105,6 +107,21 @@ async def partitions_handler(self) -> PartitionsResponse: """ pass + async def total_partitions_handler(self) -> int | None: + """ + Optional. + + Returns the total number of partitions in the source. + Used by the platform for watermark progression to know when all + processors have reported in. + + Returns None by default, indicating the source does not report total partitions. + + :return: + TotalPartitionsResponse: Response containing the total number of partitions + """ + return None + async def nack_handler(self, request: NackRequest) -> None: """ Negatively acknowledge messages (optional). diff --git a/packages/pynumaflow-lite/src/source/server.rs b/packages/pynumaflow-lite/src/source/server.rs index 103105c3..f18bc352 100644 --- a/packages/pynumaflow-lite/src/source/server.rs +++ b/packages/pynumaflow-lite/src/source/server.rs @@ -145,27 +145,27 @@ impl numaflow::source::Sourcer for PySourceRunner { } } - /// Returns the partitions associated with the source. This will be used by the platform to determine + /// Returns the active partitions associated with the source. This will be used by the platform to determine /// the partitions to which the watermark should be published. Some sources might not have the concept of partitions. /// Kafka is an example of source where a reader can read from multiple partitions. /// If None is returned, Numaflow replica-id will be returned as the partition. - async fn partitions(&self) -> Option> { - // Call the Python partitions_handler + async fn active_partitions(&self) -> Option> { + // Call the Python active_partitions_handler let fut = Python::attach(|py| { let py_handler = self.py_handler.clone(); let locals = pyo3_async_runtimes::TaskLocals::new(self.event_loop.bind(py).clone()); let coro = py_handler - .call_method0(py, "partitions_handler") - .expect("failed to call partitions_handler") + .call_method0(py, "active_partitions_handler") + .expect("failed to call active_partitions_handler") .into_bound(py); pyo3_async_runtimes::into_future_with_locals(&locals, coro) - .expect("failed to convert partitions_handler to future") + .expect("failed to convert active_partitions_handler to future") }); // Await the Python coroutine and extract the result - let result = fut.await.expect("partitions_handler failed"); + let result = fut.await.expect("active_partitions_handler failed"); let partitions_response = Python::attach(|py| { result @@ -175,6 +175,34 @@ impl numaflow::source::Sourcer for PySourceRunner { Some(partitions_response.partitions) } + + /// Returns the total number of partitions in the source. This is used by the platform for + /// watermark progression to know when all processors have reported in. + /// If None is returned, the platform will not use total partitions for watermark tracking. + async fn total_partitions(&self) -> Option { + // Call the Python total_partitions_handler + let fut = Python::attach(|py| { + let py_handler = self.py_handler.clone(); + let locals = pyo3_async_runtimes::TaskLocals::new(self.event_loop.bind(py).clone()); + + let coro = py_handler + .call_method0(py, "total_partitions_handler") + .expect("failed to call total_partitions_handler") + .into_bound(py); + + pyo3_async_runtimes::into_future_with_locals(&locals, coro) + .expect("failed to convert total_partitions_handler to future") + }); + + // Await the Python coroutine and extract the result + let result = fut.await.expect("total_partitions_handler failed"); + + Python::attach(|py| { + result + .extract::>(py) + .expect("failed to extract Option from total_partitions_handler") + }) + } } /// Start the source server by spinning up a dedicated Python asyncio loop and wiring shutdown. diff --git a/packages/pynumaflow-lite/tests/examples/source_simple.py b/packages/pynumaflow-lite/tests/examples/source_simple.py index ca90ec03..da88ded7 100644 --- a/packages/pynumaflow-lite/tests/examples/source_simple.py +++ b/packages/pynumaflow-lite/tests/examples/source_simple.py @@ -87,12 +87,18 @@ async def pending_handler(self) -> sourcer.PendingResponse: """ return sourcer.PendingResponse(count=0) - async def partitions_handler(self) -> sourcer.PartitionsResponse: + async def active_partitions_handler(self) -> sourcer.PartitionsResponse: """ The simple source always returns default partitions. """ return sourcer.PartitionsResponse(partitions=[self.partition_idx]) + async def total_partitions_handler(self) -> int: + """ + The simple source has only one partition. + """ + return 1 + async def start(): sock_file = "/tmp/var/run/numaflow/source.sock"