diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java index 44a108bc..40800a35 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java @@ -101,7 +101,7 @@ public long getPending() { } @Override - public List getPartitions() { + public List getActivePartitions() { return Sourcer.defaultPartitions(); } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Service.java b/src/main/java/io/numaproj/numaflow/sourcer/Service.java index d327a7a0..6f76a5d7 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Service.java @@ -269,11 +269,19 @@ public void partitionsFn( return; } - List partitions = this.sourcer.getPartitions(); + List partitions = this.sourcer.getActivePartitions(); + Integer totalPartitions = this.sourcer.getTotalPartitions(); + + SourceOuterClass.PartitionsResponse.Result.Builder resultBuilder = + SourceOuterClass.PartitionsResponse.Result.newBuilder() + .addAllPartitions(partitions); + + if (totalPartitions != null) { + resultBuilder.setTotalPartitions(totalPartitions); + } + responseObserver.onNext(SourceOuterClass.PartitionsResponse.newBuilder() - .setResult( - SourceOuterClass.PartitionsResponse.Result.newBuilder() - .addAllPartitions(partitions)) + .setResult(resultBuilder) .build()); responseObserver.onCompleted(); } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java b/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java index 3cf58307..0504f833 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java @@ -62,6 +62,39 @@ public static List defaultPartitions() { * is in a case like Kafka, where a reader can read from multiple Kafka partitions. * * @return list of partitions + * @deprecated Use {@link #getActivePartitions()} instead. This method will be removed in a future release. */ - public abstract List getPartitions(); + @Deprecated + public List getPartitions() { + return null; + } + + /** + * method returns the active partitions associated with the source, will be used by the platform to determine + * the partitions to which the watermark should be published. If the source doesn't have partitions, + * `defaultPartitions()` can be used to return the default partitions. + * In most cases, the defaultPartitions() should be enough; the cases where we need to implement custom getActivePartitions() + * is in a case like Kafka, where a reader can read from multiple Kafka partitions. + *

+ * Note: For backward compatibility, if this method is not overridden, it will fall back to {@link #getPartitions()}. + * New implementations should override this method instead of getPartitions(). + * + * @return list of active partitions + */ + public List getActivePartitions() { + // Fall back to deprecated getPartitions() for backward compatibility + return getPartitions(); + } + + /** + * method returns the total number of partitions in the source. + * This is optional and can be used by the platform for informational purposes. + * By default, this returns null indicating that the total partitions information is not available. + * Override this method if your source knows the total number of partitions. + * + * @return total number of partitions, or null if not available + */ + public Integer getTotalPartitions() { + return null; + } } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/SourcerTestKit.java b/src/main/java/io/numaproj/numaflow/sourcer/SourcerTestKit.java index 21cc6e98..ab6d9bca 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/SourcerTestKit.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/SourcerTestKit.java @@ -245,6 +245,18 @@ public void onCompleted() { * @throws Exception if the request fails */ public List sendGetPartitionsRequest() throws Exception { + return sendGetPartitionsRequestWithTotal().getPartitions(); + } + + /** + * sendGetPartitionsRequestWithTotal sends a getPartitions request to the server + * and returns both the partitions list and total partitions count. + * + * @return the partitions response containing partitions list and optional total partitions + * + * @throws Exception if the request fails + */ + public PartitionsResult sendGetPartitionsRequestWithTotal() throws Exception { CompletableFuture future = new CompletableFuture<>(); StreamObserver observer = new StreamObserver<>() { @@ -267,7 +279,23 @@ public void onCompleted() { } }; sourceStub.partitionsFn(Empty.newBuilder().build(), observer); - return future.get().getResult().getPartitionsList(); + SourceOuterClass.PartitionsResponse.Result result = future.get().getResult(); + Integer totalPartitions = result.hasTotalPartitions() ? result.getTotalPartitions() : null; + return new PartitionsResult(result.getPartitionsList(), totalPartitions); + } + } + + /** + * PartitionsResult holds the result of a partitions request. + */ + @Getter + public static class PartitionsResult { + private final List partitions; + private final Integer totalPartitions; + + public PartitionsResult(List partitions, Integer totalPartitions) { + this.partitions = partitions; + this.totalPartitions = totalPartitions; } } diff --git a/src/main/proto/source/v1/source.proto b/src/main/proto/source/v1/source.proto index 2de4add8..aae5a7c4 100644 --- a/src/main/proto/source/v1/source.proto +++ b/src/main/proto/source/v1/source.proto @@ -192,8 +192,10 @@ message PendingResponse { */ message PartitionsResponse { message Result { - // Required field holding the list of partitions. + // Required field holding the list of active partitions. repeated int32 partitions = 1; + // Total number of partitions in the source + optional int32 total_partitions = 2; } // Required field holding the result. Result result = 1; diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java index f2cc9a5d..5c69a132 100644 --- a/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java @@ -190,7 +190,7 @@ public void nack(NackRequest request) { } @Override - public List getPartitions() { + public List getActivePartitions() { return Sourcer.defaultPartitions(); } diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java index 6c3f05c2..877feca0 100644 --- a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java @@ -333,7 +333,7 @@ public void read(ReadRequest request, OutputObserver observer) { } @Override - public List getPartitions() { + public List getActivePartitions() { return Sourcer.defaultPartitions(); }