Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion packages/pynumaflow-lite/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,17 @@ test-rust:
clean:
cargo clean

py-fmt:
uv run black pynumaflow_lite/ tests/ manifests/

py-lint: py-fmt
uv run ruff check --fix .

fmt:
cargo fmt --all

.PHONY: lint
lint: test-fmt clippy
lint: test-fmt clippy py-lint
Comment on lines +46 to +56
Copy link
Copy Markdown
Contributor Author

@vaibhavtiwari33 vaibhavtiwari33 Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review requested
manual change: makefile lint command updated to include python linting


.PHONY: test-fmt
test-fmt:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@
This accumulator buffers incoming data and sorts it by event time,
flushing sorted data when the watermark advances.
"""

import signal
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review requested
manual change: import moved to the top

import asyncio
from datetime import datetime
from typing import AsyncIterator

from pynumaflow_lite.accumulator import Datum, Message, AccumulatorAsyncServer, Accumulator
from pynumaflow_lite.accumulator import (
Datum,
Message,
AccumulatorAsyncServer,
Accumulator,
)


class StreamSorter(Accumulator):
Expand All @@ -19,6 +26,7 @@ class StreamSorter(Accumulator):

def __init__(self):
from datetime import timezone

# Initialize with a very old timestamp (timezone-aware)
self.latest_wm = datetime.fromtimestamp(-1, tz=timezone.utc)
self.sorted_buffer: list[Datum] = []
Expand All @@ -33,8 +41,10 @@ async def handler(self, datums: AsyncIterator[Datum]) -> AsyncIterator[Message]:

async for datum in datums:
datum_count += 1
print(f"Received datum #{datum_count}: event_time={datum.event_time}, "
f"watermark={datum.watermark}, value={datum.value}")
print(
f"Received datum #{datum_count}: event_time={datum.event_time}, "
f"watermark={datum.watermark}, value={datum.value}"
)

# If watermark has moved forward
if datum.watermark and datum.watermark > self.latest_wm:
Expand Down Expand Up @@ -122,7 +132,7 @@ async def main():


# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
import signal

signal.signal(signal.SIGINT, signal.default_int_handler)
try:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
Expand Down
10 changes: 8 additions & 2 deletions packages/pynumaflow-lite/manifests/batchmap/batchmap_cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@


class SimpleBatchCat(batchmapper.BatchMapper):
async def handler(self, batch: AsyncIterable[batchmapper.Datum]) -> batchmapper.BatchResponses:
async def handler(
self, batch: AsyncIterable[batchmapper.Datum]
) -> batchmapper.BatchResponses:
responses = batchmapper.BatchResponses()
async for d in batch:
resp = batchmapper.BatchResponse(d.id)
Expand All @@ -29,7 +31,11 @@ async def handler(self, batch: AsyncIterable[batchmapper.Datum]) -> batchmapper.
pass


async def start(f: Callable[[AsyncIterable[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]]):
async def start(
f: Callable[
[AsyncIterable[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]
],
):
server = batchmapper.BatchMapAsyncServer()

# Register loop-level signal handlers so we control shutdown and avoid asyncio.run
Expand Down
4 changes: 1 addition & 3 deletions packages/pynumaflow-lite/manifests/map/map_cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@


class SimpleCat(mapper.Mapper):
async def handler(
self, keys: list[str], payload: mapper.Datum
) -> mapper.Messages:
async def handler(self, keys: list[str], payload: mapper.Datum) -> mapper.Messages:

messages = mapper.Messages()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@


class SimpleStreamCat(mapstreamer.MapStreamer):
async def handler(self, keys: list[str], datum: mapstreamer.Datum) -> AsyncIterator[Message]:
async def handler(
self, keys: list[str], datum: mapstreamer.Datum
) -> AsyncIterator[Message]:
parts = datum.value.decode("utf-8").split(",")
if not parts:
yield Message.to_drop()
Expand Down Expand Up @@ -51,4 +53,3 @@ async def start(f: Callable[[list[str], mapstreamer.Datum], AsyncIterator[Messag
if __name__ == "__main__":
async_handler = SimpleStreamCat()
asyncio.run(start(async_handler))

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ def __init__(self, initial: int = 0) -> None:
self.counter = initial

async def handler(
self, keys: list[str], datums: AsyncIterable[reducer.Datum], md: reducer.Metadata
self,
keys: list[str],
datums: AsyncIterable[reducer.Datum],
md: reducer.Metadata,
) -> reducer.Messages:
iw = md.interval_window
self.counter = 0
Expand Down Expand Up @@ -57,4 +60,3 @@ async def start(creator: type[reducer.Reducer], init_args: tuple):

if __name__ == "__main__":
asyncio.run(start(ReduceCounter, (0,)))

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
The counter increments for each datum and emits a message every 10 items,
plus a final message at the end.
"""

import asyncio
import signal
from collections.abc import AsyncIterable, AsyncIterator
Expand All @@ -17,12 +18,12 @@
class ReduceCounter(reducestreamer.ReduceStreamer):
"""
A reduce streaming counter that emits intermediate results.

This demonstrates the key difference from regular Reducer:
- Regular Reducer: waits for all data, then returns Messages
- ReduceStreamer: yields Message objects incrementally as an async iterator
"""

def __init__(self, initial: int = 0) -> None:
self.counter = initial

Expand All @@ -34,21 +35,21 @@ async def handler(
) -> AsyncIterator[reducestreamer.Message]:
"""
Process datums and yield messages incrementally.

Args:
keys: List of keys for this window
datums: Async iterable of incoming data
md: Metadata containing window information

Yields:
Message objects to send to the next vertex
"""
iw = md.interval_window
print(f"Handler started for keys={keys}, window=[{iw.start}, {iw.end}]")

async for _ in datums:
self.counter += 1

# Emit intermediate result every 10 items
if self.counter % 10 == 0:
msg = (
Expand All @@ -59,7 +60,7 @@ async def handler(
print(f"Yielding intermediate result: counter={self.counter}")
# Early release of data - this is the key feature of reduce streaming!
yield reducestreamer.Message(msg, keys=keys)

# Emit final result
msg = (
f"counter:{self.counter} (FINAL) "
Expand Down Expand Up @@ -105,4 +106,3 @@ async def start(creator: type, init_args: tuple):

if __name__ == "__main__":
asyncio.run(start(ReduceCounter, (0,)))

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, initial: int = 0) -> None:
self.counter = initial

async def session_reduce(
self, keys: list[str], datums: AsyncIterable[session_reducer.Datum]
self, keys: list[str], datums: AsyncIterable[session_reducer.Datum]
) -> AsyncIterator[session_reducer.Message]:
"""
Count all incoming messages in this session and yield the count.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- If MAPPER is set to "true", runs as a Mapper that reads side input files
- Otherwise, runs as a SideInput retriever that broadcasts values
"""

import asyncio
import os
import signal
Expand Down Expand Up @@ -142,4 +143,3 @@ async def start_mapper():
else:
print("Starting as SideInput retriever...")
asyncio.run(start_sideinput())

6 changes: 3 additions & 3 deletions packages/pynumaflow-lite/manifests/sink/sink_log.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import collections
import logging
import signal
from collections.abc import AsyncIterable, AsyncIterator
Expand Down Expand Up @@ -36,7 +35,9 @@ async def handler(self, datums: AsyncIterable[sinker.Datum]) -> sinker.Responses
pass


async def start(f: Callable[[AsyncIterator[sinker.Datum]], Awaitable[sinker.Responses]]):
async def start(
f: Callable[[AsyncIterator[sinker.Datum]], Awaitable[sinker.Responses]],
):
server = sinker.SinkAsyncServer()

# Register loop-level signal handlers so we control shutdown and avoid asyncio.run
Expand All @@ -61,4 +62,3 @@ async def start(f: Callable[[AsyncIterator[sinker.Datum]], Awaitable[sinker.Resp
if __name__ == "__main__":
async_handler = SimpleLogSink()
asyncio.run(start(async_handler))

20 changes: 14 additions & 6 deletions packages/pynumaflow-lite/manifests/source/simple_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ def __init__(self):
self.counter = 0
self.partition_idx = 0

async def read_handler(self, datum: sourcer.ReadRequest) -> AsyncIterator[sourcer.Message]:
async def read_handler(
self, datum: sourcer.ReadRequest
) -> AsyncIterator[sourcer.Message]:
"""
The simple source generates messages with incrementing numbers.
"""
_LOGGER.info(f"Read request: num_records={datum.num_records}, timeout_ms={datum.timeout_ms}")
_LOGGER.info(
f"Read request: num_records={datum.num_records}, timeout_ms={datum.timeout_ms}"
)

# Generate the requested number of messages
for i in range(datum.num_records):
Expand All @@ -36,7 +40,7 @@ async def read_handler(self, datum: sourcer.ReadRequest) -> AsyncIterator[source
# Create offset
offset = sourcer.Offset(
offset=str(self.counter).encode("utf-8"),
partition_id=self.partition_idx
partition_id=self.partition_idx,
)

# Create message
Expand All @@ -45,7 +49,7 @@ async def read_handler(self, datum: sourcer.ReadRequest) -> AsyncIterator[source
offset=offset,
event_time=datetime.now(timezone.utc),
keys=["key1"],
headers={"source": "simple"}
headers={"source": "simple"},
)

_LOGGER.info(f"Generated message: {self.counter}")
Expand All @@ -62,15 +66,19 @@ async def ack_handler(self, request: sourcer.AckRequest) -> None:
"""
_LOGGER.info(f"Acknowledging {len(request.offsets)} offsets")
for offset in request.offsets:
_LOGGER.debug(f"Acked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}")
_LOGGER.debug(
f"Acked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}"
)

async def nack_handler(self, request: sourcer.NackRequest) -> None:
"""
The simple source negatively acknowledges the offsets.
"""
_LOGGER.info(f"Negatively acknowledging {len(request.offsets)} offsets")
for offset in request.offsets:
_LOGGER.warning(f"Nacked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}")
_LOGGER.warning(
f"Nacked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}"
)

async def pending_handler(self) -> sourcer.PendingResponse:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
class EventFilter(sourcetransformer.SourceTransformer):
"""
A source transformer that filters and routes messages based on event time.

- Messages before 2022 are dropped
- Messages within 2022 are tagged with "within_year_2022"
- Messages after 2022 are tagged with "after_year_2022"
"""

async def handler(
self, keys: list[str], datum: sourcetransformer.Datum
self, keys: list[str], datum: sourcetransformer.Datum
) -> sourcetransformer.Messages:
val = datum.value
event_time = datum.event_time
Expand All @@ -30,23 +30,27 @@ async def handler(
print(f"Got event time: {event_time}, it is before 2022, so dropping")
messages.append(sourcetransformer.Message.message_to_drop(event_time))
elif event_time < january_first_2023:
print(f"Got event time: {event_time}, it is within year 2022, so forwarding to within_year_2022")
print(
f"Got event time: {event_time}, it is within year 2022, so forwarding to within_year_2022"
)
messages.append(
sourcetransformer.Message(
value=val,
event_time=january_first_2022,
keys=keys,
tags=["within_year_2022"]
tags=["within_year_2022"],
)
)
else:
print(f"Got event time: {event_time}, it is after year 2022, so forwarding to after_year_2022")
print(
f"Got event time: {event_time}, it is after year 2022, so forwarding to after_year_2022"
)
messages.append(
sourcetransformer.Message(
value=val,
event_time=january_first_2023,
keys=keys,
tags=["after_year_2022"]
tags=["after_year_2022"],
)
)

Expand All @@ -61,7 +65,9 @@ async def handler(
pass


async def start(f: Callable[[list[str], sourcetransformer.Datum], sourcetransformer.Messages]):
async def start(
f: Callable[[list[str], sourcetransformer.Datum], sourcetransformer.Messages],
):
server = sourcetransformer.SourceTransformAsyncServer()

# Register loop-level signal handlers so we control shutdown and avoid asyncio.run
Expand Down Expand Up @@ -92,4 +98,3 @@ async def start(f: Callable[[list[str], sourcetransformer.Datum], sourcetransfor
if __name__ == "__main__":
async_handler = EventFilter()
asyncio.run(start(async_handler))

18 changes: 15 additions & 3 deletions packages/pynumaflow-lite/pynumaflow_lite/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .pynumaflow_lite import *
from . import pynumaflow_lite # type: ignore[attr-defined] # Rust extension, resolved at runtime
from .pynumaflow_lite import * # noqa: F403 # Rust extension; exports resolved at runtime
Comment on lines +1 to +2
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review requested
manual change:

  • warning suppression for imports
  • explicitly import from . import pynumaflow_lite


# Ensure the `mapper`, `batchmapper`, and `mapstreamer` submodules are importable as attributes of the package
# even though they're primarily registered by the extension module.
Expand Down Expand Up @@ -139,8 +140,19 @@
pass

# Public API
__all__ = ["mapper", "batchmapper", "mapstreamer", "reducer", "session_reducer", "reducestreamer", "accumulator",
"sinker", "sourcer", "sourcetransformer", "sideinputer"]
__all__ = [
"mapper",
"batchmapper",
"mapstreamer",
"reducer",
"session_reducer",
"reducestreamer",
"accumulator",
"sinker",
"sourcer",
"sourcetransformer",
"sideinputer",
]

__doc__ = pynumaflow_lite.__doc__
if hasattr(pynumaflow_lite, "__all__"):
Expand Down
Loading
Loading