-
Notifications
You must be signed in to change notification settings - Fork 26
Description
Describe the bug
As the title says, MapStreamAsyncServer doesn't seem to be streaming data out as it's received. This seems to be a regression; a pipeline that was streaming around October last year (on pynumaflow v0.9.0 and numaflow v1.4.2) is now waiting for all processing to be completed before outputting- this is a problem for sufficiently large jobs as data is stored in memory until outputted
We've managed to trace this to the MapStreamAsyncServer call - running an equivalent function via the asyncio library instead of via pynumaflow functions results in messages being yielded as expected.
In particular this seems to be related to the underlying gRPC EoT call not being respected; during the course of recovering from this issue we cleared the isbsvc (in effect clearing the incoming buffer), and messages were yielded as expected
To Reproduce
Steps to reproduce the behavior:
- Create a basic source->async map stream->sink pipeline
- Start a job which will cause significant memory inflation - in our case, this is one which downloads a file and splits it into chunks. These chunks should be
yielded and therefore pushed onto the sink as they're created - Note that the stream UDF grows in memory usage during processing (indicating data is being "stored")
- Note that the stream UDF reduces in memory usage once processing has finished (indicating data is being outputted)
In our case specifically we run the pipeline in two threads, one which has a dequeue storing messages, and the other (main thread) which poplefts messages from that dequeue and yields them
Messages are not being sent to the sink at all (buffer shows 0 messages, sink shows a processing rate of 0), indicating they are being held by the server call. Nor is this a case of a yield being blocked (as searches online might suggest), as a logging call immediately after the yield is being respected
Expected behavior
Messages which are yielded before processing completes are passed on
Environment (please complete the following information):
- Numaflow: [v1.7.0]
- Numaflow-python: [v0.12.1]
Message from the maintainers:
Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.