Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public long getPending() {
}

@Override
public List<Integer> getPartitions() {
public List<Integer> getActivePartitions() {
return Sourcer.defaultPartitions();
}

Expand Down
16 changes: 12 additions & 4 deletions src/main/java/io/numaproj/numaflow/sourcer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,19 @@ public void partitionsFn(
return;
}

List<Integer> partitions = this.sourcer.getPartitions();
List<Integer> partitions = this.sourcer.getActivePartitions();
Integer totalPartitions = this.sourcer.getTotalPartitions();

SourceOuterClass.PartitionsResponse.Result.Builder resultBuilder =
SourceOuterClass.PartitionsResponse.Result.newBuilder()
.addAllPartitions(partitions);

if (totalPartitions != null) {
resultBuilder.setTotalPartitions(totalPartitions);
}

responseObserver.onNext(SourceOuterClass.PartitionsResponse.newBuilder()
.setResult(
SourceOuterClass.PartitionsResponse.Result.newBuilder()
.addAllPartitions(partitions))
.setResult(resultBuilder)
.build());
responseObserver.onCompleted();
}
Expand Down
35 changes: 34 additions & 1 deletion src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,39 @@ public static List<Integer> defaultPartitions() {
* is in a case like Kafka, where a reader can read from multiple Kafka partitions.
*
* @return list of partitions
* @deprecated Use {@link #getActivePartitions()} instead. This method will be removed in a future release.
*/
public abstract List<Integer> getPartitions();
@Deprecated
public List<Integer> getPartitions() {
return null;
}

/**
* method returns the active partitions associated with the source, will be used by the platform to determine
* the partitions to which the watermark should be published. If the source doesn't have partitions,
* `defaultPartitions()` can be used to return the default partitions.
* In most cases, the defaultPartitions() should be enough; the cases where we need to implement custom getActivePartitions()
* is in a case like Kafka, where a reader can read from multiple Kafka partitions.
* <p>
* Note: For backward compatibility, if this method is not overridden, it will fall back to {@link #getPartitions()}.
* New implementations should override this method instead of getPartitions().
*
* @return list of active partitions
*/
public List<Integer> getActivePartitions() {
// Fall back to deprecated getPartitions() for backward compatibility
return getPartitions();
}

/**
* method returns the total number of partitions in the source.
* This is optional and can be used by the platform for informational purposes.
* By default, this returns null indicating that the total partitions information is not available.
* Override this method if your source knows the total number of partitions.
*
* @return total number of partitions, or null if not available
*/
public Integer getTotalPartitions() {
return null;
}
}
30 changes: 29 additions & 1 deletion src/main/java/io/numaproj/numaflow/sourcer/SourcerTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,18 @@ public void onCompleted() {
* @throws Exception if the request fails
*/
public List<Integer> sendGetPartitionsRequest() throws Exception {
return sendGetPartitionsRequestWithTotal().getPartitions();
}

/**
* sendGetPartitionsRequestWithTotal sends a getPartitions request to the server
* and returns both the partitions list and total partitions count.
*
* @return the partitions response containing partitions list and optional total partitions
*
* @throws Exception if the request fails
*/
public PartitionsResult sendGetPartitionsRequestWithTotal() throws Exception {
CompletableFuture<SourceOuterClass.PartitionsResponse> future = new CompletableFuture<>();
StreamObserver<SourceOuterClass.PartitionsResponse> observer = new StreamObserver<>() {

Expand All @@ -267,7 +279,23 @@ public void onCompleted() {
}
};
sourceStub.partitionsFn(Empty.newBuilder().build(), observer);
return future.get().getResult().getPartitionsList();
SourceOuterClass.PartitionsResponse.Result result = future.get().getResult();
Integer totalPartitions = result.hasTotalPartitions() ? result.getTotalPartitions() : null;
return new PartitionsResult(result.getPartitionsList(), totalPartitions);
}
}

/**
* PartitionsResult holds the result of a partitions request.
*/
@Getter
public static class PartitionsResult {
private final List<Integer> partitions;
private final Integer totalPartitions;

public PartitionsResult(List<Integer> partitions, Integer totalPartitions) {
this.partitions = partitions;
this.totalPartitions = totalPartitions;
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/main/proto/source/v1/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,10 @@ message PendingResponse {
*/
message PartitionsResponse {
message Result {
// Required field holding the list of partitions.
// Required field holding the list of active partitions.
repeated int32 partitions = 1;
// Total number of partitions in the source
optional int32 total_partitions = 2;
}
// Required field holding the result.
Result result = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void nack(NackRequest request) {
}

@Override
public List<Integer> getPartitions() {
public List<Integer> getActivePartitions() {
return Sourcer.defaultPartitions();
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public void read(ReadRequest request, OutputObserver observer) {
}

@Override
public List<Integer> getPartitions() {
public List<Integer> getActivePartitions() {
return Sourcer.defaultPartitions();
}

Expand Down
Loading