diff --git a/packages/pynumaflow/tests/source/utils.py b/packages/pynumaflow/tests/source/utils.py index 04daaa61..0c1c3de7 100644 --- a/packages/pynumaflow/tests/source/utils.py +++ b/packages/pynumaflow/tests/source/utils.py @@ -1,5 +1,3 @@ -from collections.abc import Iterable - from pynumaflow.shared.asynciter import NonBlockingIterator from pynumaflow.sourcer import ReadRequest, Message, UserMetadata from pynumaflow.sourcer import ( @@ -56,28 +54,6 @@ async def partitions_handler(self) -> PartitionsResponse: return PartitionsResponse(partitions=mock_partitions()) -class SyncSource(Sourcer): - def read_handler(self, datum: ReadRequest) -> Iterable[Message]: - payload = b"payload:test_mock_message" - keys = ["test_key"] - offset = mock_offset() - event_time = mock_event_time() - for i in range(10): - yield Message(payload=payload, keys=keys, offset=offset, event_time=event_time) - - def ack_handler(self, ack_request: AckRequest): - return - - def nack_handler(self, nack_request: NackRequest): - return - - def pending_handler(self) -> PendingResponse: - return PendingResponse(count=10) - - def partitions_handler(self) -> PartitionsResponse: - return PartitionsResponse(partitions=mock_partitions()) - - def read_req_source_fn() -> ReadRequest: request = source_pb2.ReadRequest.Request( num_records=10, @@ -128,20 +104,3 @@ async def pending_handler(self) -> PendingResponse: async def partitions_handler(self) -> PartitionsResponse: raise RuntimeError("Got a runtime error from partition handler.") - - -class SyncSourceError(Sourcer): - def read_handler(self, datum: ReadRequest) -> Iterable[Message]: - raise RuntimeError("Got a runtime error from read handler.") - - def ack_handler(self, ack_request: AckRequest): - raise RuntimeError("Got a runtime error from ack handler.") - - def nack_handler(self, nack_request: NackRequest): - raise RuntimeError("Got a runtime error from nack handler.") - - def pending_handler(self) -> PendingResponse: - raise RuntimeError("Got a runtime error from pending handler.") - - def partitions_handler(self) -> PartitionsResponse: - raise RuntimeError("Got a runtime error from partition handler.")