From 42f17c247ab48c755ec50b4b72fbcaa223c40d10 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Wed, 18 Mar 2026 06:14:55 +0530 Subject: [PATCH 1/2] Kotlin SDK Signed-off-by: Sreekanth --- .github/workflows/maven-publish.yml | 30 ++- .gitignore | 2 + numaflow-kotlin/pom.xml | 242 ++++++++++++++++++ .../numaflow/kt/sinker/Conversions.kt | 38 +++ .../numaproj/numaflow/kt/sinker/Extensions.kt | 16 ++ .../numaflow/kt/sinker/FlowAdapter.kt | 14 + .../numaproj/numaflow/kt/sinker/SinkDatum.kt | 51 ++++ .../numaflow/kt/sinker/SinkHandler.kt | 7 + .../numaflow/kt/sinker/SinkResponse.kt | 45 ++++ .../numaproj/numaflow/kt/sinker/SinkServer.kt | 51 ++++ .../numaflow/kt/sinker/SinkTestKit.kt | 68 +++++ .../numaflow/kt/sinker/SinkerBridge.kt | 19 ++ .../numaflow/kt/sinker/ExtensionsTest.kt | 64 +++++ .../numaflow/kt/sinker/FlowAdapterTest.kt | 75 ++++++ .../numaflow/kt/sinker/SinkDatumTest.kt | 81 ++++++ .../numaflow/kt/sinker/SinkResponseTest.kt | 98 +++++++ .../numaflow/kt/sinker/SinkServerTest.kt | 52 ++++ .../numaflow/kt/sinker/SinkTestKitTest.kt | 78 ++++++ .../numaflow/kt/sinker/SinkerBridgeTest.kt | 74 ++++++ releases.md | 4 +- 20 files changed, 1100 insertions(+), 9 deletions(-) create mode 100644 numaflow-kotlin/pom.xml create mode 100644 numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Conversions.kt create mode 100644 numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Extensions.kt create mode 100644 numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapter.kt create mode 100644 numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatum.kt create mode 100644 numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkHandler.kt create mode 100644 numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponse.kt create mode 100644 numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkServer.kt create mode 100644 numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKit.kt create mode 100644 numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridge.kt create mode 100644 numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/ExtensionsTest.kt create mode 100644 numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapterTest.kt create mode 100644 numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatumTest.kt create mode 100644 numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponseTest.kt create mode 100644 numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkServerTest.kt create mode 100644 numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKitTest.kt create mode 100644 numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridgeTest.kt diff --git a/.github/workflows/maven-publish.yml b/.github/workflows/maven-publish.yml index 6502cd8f..403176c9 100644 --- a/.github/workflows/maven-publish.yml +++ b/.github/workflows/maven-publish.yml @@ -20,7 +20,9 @@ jobs: gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }} passphrase: ${{ secrets.GPG_PASSPHRASE }} - - name: Set up Java for publishing to Maven Central Repository + # --- Java SDK (JDK 11) --- + + - name: Set up Java 11 for publishing to Maven Central Repository uses: actions/setup-java@v3 with: java-version: '11' @@ -28,22 +30,36 @@ jobs: server-id: central server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD - settings-path: ${{ github.workspace }} # location for the settings.xml file + settings-path: ${{ github.workspace }} - - name: Publish to the Maven Central Repository + - name: Publish Java SDK to Maven Central run: mvn -DcentralRelease=true -P central deploy -s $GITHUB_WORKSPACE/settings.xml env: MAVEN_USERNAME: ${{ secrets.MVN_CENTRAL_USERNAME }} MAVEN_PASSWORD: ${{ secrets.MVN_CENTRAL_PASSWORD }} - - name: Set up Java for publishing to GitHub Packages + - name: Set up Java 11 for publishing to GitHub Packages uses: actions/setup-java@v3 with: java-version: '11' distribution: 'temurin' - server-id: github # Value of the distributionManagement/repository/id field of the pom.xml - settings-path: ${{ github.workspace }} #location for the settings.xml file - - name: Publish to GitHub Packages + server-id: github + settings-path: ${{ github.workspace }} + + - name: Publish Java SDK to GitHub Packages run: mvn -DgithubRelease=true -P github deploy -s $GITHUB_WORKSPACE/settings.xml env: GITHUB_TOKEN: ${{ github.token }} + + # --- Kotlin SDK --- + + - name: Publish Kotlin SDK to Maven Central + run: cd numaflow-kotlin && mvn -DcentralRelease=true -P central deploy -s $GITHUB_WORKSPACE/settings.xml + env: + MAVEN_USERNAME: ${{ secrets.MVN_CENTRAL_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.MVN_CENTRAL_PASSWORD }} + + - name: Publish Kotlin SDK to GitHub Packages + run: cd numaflow-kotlin && mvn -DgithubRelease=true -P github deploy -s $GITHUB_WORKSPACE/settings.xml + env: + GITHUB_TOKEN: ${{ github.token }} diff --git a/.gitignore b/.gitignore index 195a9c4b..484a2548 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ # Compiled class file *.class +numaflow-kotlin/target/ + # Log file *.log diff --git a/numaflow-kotlin/pom.xml b/numaflow-kotlin/pom.xml new file mode 100644 index 00000000..967976e5 --- /dev/null +++ b/numaflow-kotlin/pom.xml @@ -0,0 +1,242 @@ + + + 4.0.0 + + io.numaproj.numaflow + numaflow-kotlin + 0.11.0 + jar + + numaflow-kotlin + Kotlin wrapper for Numaflow Java SDK — coroutines, Flow, sealed classes, DSL builders. + https://numaflow.numaproj.io/ + + + + The Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + + Numaproj Authors + numaproj@gmail.com + Numaproj + https://numaproj.io/ + + + + + scm:git:git://github.com/numaproj/numaflow-java.git + scm:git:ssh://github.com:numaproj/numaflow-java.git + http://github.com/numaproj/numaflow-java/tree/main + + + + UTF-8 + 2.1.0 + 1.9.0 + 5.10.2 + + + + + github + + + githubRelease + true + + + + + github + Numaflow Kotlin SDK + https://maven.pkg.github.com/numaproj/numaflow-java + + + + + central + + + centralRelease + true + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.5 + + + sign-artifacts + verify + + sign + + + + + + org.sonatype.central + central-publishing-maven-plugin + 0.8.0 + true + + central + true + published + + + + + + + + + + + io.numaproj.numaflow + numaflow-java + 0.11.0 + + + + + org.jetbrains.kotlin + kotlin-stdlib + ${kotlin.version} + + + org.jetbrains.kotlinx + kotlinx-coroutines-core + ${kotlinx.coroutines.version} + + + org.jetbrains.kotlinx + kotlinx-coroutines-jdk8 + ${kotlinx.coroutines.version} + + + + + org.jetbrains.kotlin + kotlin-test + ${kotlin.version} + test + + + org.jetbrains.kotlinx + kotlinx-coroutines-test + ${kotlinx.coroutines.version} + test + + + org.junit.jupiter + junit-jupiter + ${junit.jupiter.version} + test + + + + + src/main/kotlin + src/test/kotlin + + + + org.jetbrains.kotlin + kotlin-maven-plugin + ${kotlin.version} + + + compile + compile + + compile + + + + test-compile + test-compile + + test-compile + + + + + 11 + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.2.5 + + + + org.apache.maven.plugins + maven-source-plugin + 3.3.0 + + + attach-sources + + jar-no-fork + + + + + + + org.jetbrains.dokka + dokka-maven-plugin + 1.9.20 + + + prepare-package + + dokka + javadocJar + + + + + + + org.jacoco + jacoco-maven-plugin + 0.8.12 + + + + prepare-agent + + + + report + prepare-package + + report + + + + + + + maven-compiler-plugin + 3.12.1 + + 11 + + + + + diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Conversions.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Conversions.kt new file mode 100644 index 00000000..a3597a15 --- /dev/null +++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Conversions.kt @@ -0,0 +1,38 @@ +package io.numaproj.numaflow.kt.sinker + +import io.numaproj.numaflow.shared.UserMetadata +import io.numaproj.numaflow.sinker.Message as JavaMessage +import io.numaproj.numaflow.sinker.Response as JavaResponse + +internal fun SinkResponse.toJava(): JavaResponse = when (this) { + is SinkResponse.Ok -> JavaResponse.responseOK(id) + is SinkResponse.Failure -> JavaResponse.responseFailure(id, error) + is SinkResponse.Fallback -> JavaResponse.responseFallback(id) + is SinkResponse.Serve -> JavaResponse.responseServe(id, data) + is SinkResponse.OnSuccess -> JavaResponse.responseOnSuccess(id, message?.toJava()) +} + +internal fun SinkMessage.toJava(): JavaMessage = JavaMessage( + value, + keys.toTypedArray(), + userMetadata ?: UserMetadata(), +) + +internal fun JavaResponse.toKotlin(): SinkResponse { + return when { + success == true -> SinkResponse.Ok(id) + fallback == true -> SinkResponse.Fallback(id) + serve == true -> SinkResponse.Serve(id, serveResponse ?: byteArrayOf()) + onSuccess == true -> SinkResponse.OnSuccess( + id, + onSuccessMessage?.let { msg -> + SinkMessage( + value = msg.value ?: byteArrayOf(), + keys = msg.keys?.toList() ?: emptyList(), + userMetadata = msg.userMetadata, + ) + }, + ) + else -> SinkResponse.Failure(id, err ?: "unknown error") + } +} diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Extensions.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Extensions.kt new file mode 100644 index 00000000..25a32a5a --- /dev/null +++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Extensions.kt @@ -0,0 +1,16 @@ +package io.numaproj.numaflow.kt.sinker + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList + +suspend fun Flow.processEach(block: suspend (SinkDatum) -> SinkResponse): List = + map { block(it) }.toList() + +fun SinkDatum.ok(): SinkResponse.Ok = SinkResponse.Ok(id) + +fun SinkDatum.failure(error: String): SinkResponse.Failure = SinkResponse.Failure(id, error) + +fun SinkDatum.fallback(): SinkResponse.Fallback = SinkResponse.Fallback(id) + +fun SinkDatum.valueAsString(): String = String(value) diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapter.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapter.kt new file mode 100644 index 00000000..d6d9bde0 --- /dev/null +++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapter.kt @@ -0,0 +1,14 @@ +package io.numaproj.numaflow.kt.sinker + +import io.numaproj.numaflow.sinker.DatumIterator +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn + +internal fun DatumIterator.asFlow(): Flow = flow { + while (true) { + val datum = next() ?: break + emit(SinkDatum.from(datum)) + } +}.flowOn(Dispatchers.IO) diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatum.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatum.kt new file mode 100644 index 00000000..23913280 --- /dev/null +++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatum.kt @@ -0,0 +1,51 @@ +package io.numaproj.numaflow.kt.sinker + +import io.numaproj.numaflow.shared.SystemMetadata +import io.numaproj.numaflow.shared.UserMetadata +import io.numaproj.numaflow.sinker.Datum +import java.time.Instant + +data class SinkDatum( + val id: String, + val keys: List, + val value: ByteArray, + val eventTime: Instant, + val watermark: Instant, + val headers: Map, + val userMetadata: UserMetadata?, + val systemMetadata: SystemMetadata?, +) { + companion object { + fun from(datum: Datum): SinkDatum = SinkDatum( + id = datum.id ?: "", + keys = datum.keys?.toList() ?: emptyList(), + value = datum.value?.clone() ?: byteArrayOf(), + eventTime = datum.eventTime ?: Instant.EPOCH, + watermark = datum.watermark ?: Instant.EPOCH, + headers = datum.headers ?: emptyMap(), + userMetadata = datum.userMetadata, + systemMetadata = datum.systemMetadata, + ) + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is SinkDatum) return false + return id == other.id && + keys == other.keys && + value.contentEquals(other.value) && + eventTime == other.eventTime && + watermark == other.watermark && + headers == other.headers + } + + override fun hashCode(): Int { + var result = id.hashCode() + result = 31 * result + keys.hashCode() + result = 31 * result + value.contentHashCode() + result = 31 * result + eventTime.hashCode() + result = 31 * result + watermark.hashCode() + result = 31 * result + headers.hashCode() + return result + } +} diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkHandler.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkHandler.kt new file mode 100644 index 00000000..57e34c74 --- /dev/null +++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkHandler.kt @@ -0,0 +1,7 @@ +package io.numaproj.numaflow.kt.sinker + +import kotlinx.coroutines.flow.Flow + +fun interface SinkHandler { + suspend fun processMessages(datums: Flow): List +} diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponse.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponse.kt new file mode 100644 index 00000000..9c38794c --- /dev/null +++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponse.kt @@ -0,0 +1,45 @@ +package io.numaproj.numaflow.kt.sinker + +import io.numaproj.numaflow.shared.UserMetadata + +sealed interface SinkResponse { + val id: String + + data class Ok(override val id: String) : SinkResponse + data class Failure(override val id: String, val error: String) : SinkResponse + data class Fallback(override val id: String) : SinkResponse + + data class Serve(override val id: String, val data: ByteArray) : SinkResponse { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is Serve) return false + return id == other.id && data.contentEquals(other.data) + } + + override fun hashCode(): Int = 31 * id.hashCode() + data.contentHashCode() + } + + data class OnSuccess(override val id: String, val message: SinkMessage? = null) : SinkResponse +} + +data class SinkMessage( + val value: ByteArray, + val keys: List = emptyList(), + val userMetadata: UserMetadata? = null, +) { + companion object { + fun fromDatum(datum: SinkDatum): SinkMessage = SinkMessage( + value = datum.value.clone(), + keys = datum.keys, + userMetadata = datum.userMetadata?.let { UserMetadata(it) }, + ) + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is SinkMessage) return false + return value.contentEquals(other.value) && keys == other.keys + } + + override fun hashCode(): Int = 31 * value.contentHashCode() + keys.hashCode() +} diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkServer.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkServer.kt new file mode 100644 index 00000000..47748d9e --- /dev/null +++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkServer.kt @@ -0,0 +1,51 @@ +package io.numaproj.numaflow.kt.sinker + +import io.numaproj.numaflow.sinker.GRPCConfig +import io.numaproj.numaflow.sinker.Server +import kotlinx.coroutines.flow.Flow + +class SinkServerConfig { + var socketPath: String? = null + var maxMessageSize: Int? = null + var port: Int? = null + var isLocal: Boolean? = null + var infoFilePath: String? = null + + internal fun toGrpcConfig(): GRPCConfig { + val builder = GRPCConfig.newBuilder() + socketPath?.let { builder.socketPath(it) } + maxMessageSize?.let { builder.maxMessageSize(it) } + port?.let { builder.port(it) } + isLocal?.let { builder.isLocal(it) } + infoFilePath?.let { builder.infoFilePath(it) } + return builder.build() + } +} + +class SinkServer internal constructor(private val javaServer: Server) { + fun start() { javaServer.start() } + fun awaitTermination() { javaServer.awaitTermination() } + fun stop() { javaServer.stop() } + + fun run() { + start() + awaitTermination() + } +} + +fun sinkServer(config: SinkServerConfig.() -> Unit = {}, handler: SinkHandler): SinkServer { + val cfg = SinkServerConfig().apply(config) + val bridge = SinkerBridge(handler) + val javaServer = if (cfg.socketPath != null || cfg.maxMessageSize != null || + cfg.port != null || cfg.isLocal != null || cfg.infoFilePath != null + ) { + Server(bridge, cfg.toGrpcConfig()) + } else { + Server(bridge) + } + return SinkServer(javaServer) +} + +fun sinkServer(handler: suspend (Flow) -> List): SinkServer { + return sinkServer(handler = SinkHandler { datums -> handler(datums) }) +} diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKit.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKit.kt new file mode 100644 index 00000000..c6109705 --- /dev/null +++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKit.kt @@ -0,0 +1,68 @@ +package io.numaproj.numaflow.kt.sinker + +import io.numaproj.numaflow.sinker.GRPCConfig +import io.numaproj.numaflow.sinker.SinkerTestKit +import kotlinx.coroutines.flow.asFlow +import java.time.Instant + +object SinkTestKit { + + fun datum( + id: String, + value: ByteArray = byteArrayOf(), + keys: List = emptyList(), + eventTime: Instant = Instant.now(), + watermark: Instant = Instant.now(), + headers: Map = emptyMap(), + ): SinkDatum = SinkDatum( + id = id, + keys = keys, + value = value, + eventTime = eventTime, + watermark = watermark, + headers = headers, + userMetadata = null, + systemMetadata = null, + ) + + suspend fun test(handler: SinkHandler, datums: List): List = + handler.processMessages(datums.asFlow()) + + fun grpcTest(handler: SinkHandler): GrpcTestHarness = GrpcTestHarness(handler) + + class GrpcTestHarness(private val handler: SinkHandler) { + private val testKit = SinkerTestKit( + SinkerBridge(handler), + GRPCConfig.newBuilder().isLocal(true).build(), + ) + + fun start() { testKit.startServer() } + fun stop() { testKit.stopServer() } + + fun client(): SinkerTestKit.Client = SinkerTestKit.Client() + + fun sendRequest(datums: List): List { + val iterator = SinkerTestKit.TestListIterator() + for (datum in datums) { + iterator.addDatum(datum.toTestDatum()) + } + val client = client() + try { + val responseList = client.sendRequest(iterator) + return responseList.responses.map { it.toKotlin() } + } finally { + client.close() + } + } + + private fun SinkDatum.toTestDatum(): SinkerTestKit.TestDatum = + SinkerTestKit.TestDatum.builder() + .id(id) + .keys(keys.toTypedArray()) + .value(value) + .eventTime(eventTime) + .watermark(watermark) + .headers(headers) + .build() + } +} diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridge.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridge.kt new file mode 100644 index 00000000..dcf3c0c2 --- /dev/null +++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridge.kt @@ -0,0 +1,19 @@ +package io.numaproj.numaflow.kt.sinker + +import io.numaproj.numaflow.sinker.DatumIterator +import io.numaproj.numaflow.sinker.ResponseList +import io.numaproj.numaflow.sinker.Sinker +import kotlinx.coroutines.runBlocking + +internal class SinkerBridge(private val handler: SinkHandler) : Sinker() { + override fun processMessages(datumStream: DatumIterator): ResponseList { + val responses = runBlocking { + handler.processMessages(datumStream.asFlow()) + } + val builder = ResponseList.newBuilder() + for (response in responses) { + builder.addResponse(response.toJava()) + } + return builder.build() + } +} diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/ExtensionsTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/ExtensionsTest.kt new file mode 100644 index 00000000..840d418d --- /dev/null +++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/ExtensionsTest.kt @@ -0,0 +1,64 @@ +package io.numaproj.numaflow.kt.sinker + +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import java.time.Instant +import kotlin.test.assertEquals +import kotlin.test.assertIs + +class ExtensionsTest { + + private val now = Instant.now() + + private fun testDatum(id: String, value: String = "") = SinkDatum( + id = id, keys = listOf("k"), value = value.toByteArray(), + eventTime = now, watermark = now, headers = emptyMap(), + userMetadata = null, systemMetadata = null, + ) + + @Test + fun `processEach maps each datum to a response`() = runTest { + val flow = flowOf(testDatum("1"), testDatum("2"), testDatum("3")) + val results = flow.processEach { it.ok() } + + assertEquals(3, results.size) + results.forEachIndexed { i, r -> + assertIs(r) + assertEquals("${i + 1}", r.id) + } + } + + @Test + fun `ok extension creates Ok response`() { + val datum = testDatum("x") + val response = datum.ok() + assertEquals(SinkResponse.Ok("x"), response) + } + + @Test + fun `failure extension creates Failure response`() { + val datum = testDatum("y") + val response = datum.failure("oops") + assertEquals(SinkResponse.Failure("y", "oops"), response) + } + + @Test + fun `fallback extension creates Fallback response`() { + val datum = testDatum("z") + val response = datum.fallback() + assertEquals(SinkResponse.Fallback("z"), response) + } + + @Test + fun `valueAsString decodes ByteArray`() { + val datum = testDatum("1", "hello world") + assertEquals("hello world", datum.valueAsString()) + } + + @Test + fun `processEach with empty flow`() = runTest { + val results = flowOf().processEach { it.ok() } + assertEquals(0, results.size) + } +} diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapterTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapterTest.kt new file mode 100644 index 00000000..713556ec --- /dev/null +++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapterTest.kt @@ -0,0 +1,75 @@ +package io.numaproj.numaflow.kt.sinker + +import io.numaproj.numaflow.sinker.DatumIterator +import io.numaproj.numaflow.sinker.SinkerTestKit +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import java.time.Instant +import kotlin.test.assertEquals + +class FlowAdapterTest { + + @Test + fun `asFlow converts iterator elements to SinkDatum`() = runTest { + val now = Instant.now() + val iterator = SinkerTestKit.TestListIterator() + iterator.addDatum( + SinkerTestKit.TestDatum.builder() + .id("1") + .value("a".toByteArray()) + .keys(arrayOf("k")) + .eventTime(now) + .watermark(now) + .headers(mapOf("h" to "v")) + .build() + ) + iterator.addDatum( + SinkerTestKit.TestDatum.builder() + .id("2") + .value("b".toByteArray()) + .keys(emptyArray()) + .eventTime(now) + .watermark(now) + .headers(emptyMap()) + .build() + ) + + val results = iterator.asFlow().toList() + + assertEquals(2, results.size) + assertEquals("1", results[0].id) + assertEquals("a", String(results[0].value)) + assertEquals(listOf("k"), results[0].keys) + assertEquals("2", results[1].id) + } + + @Test + fun `asFlow handles empty iterator`() = runTest { + val iterator = SinkerTestKit.TestListIterator() + + val results = iterator.asFlow().toList() + + assertEquals(0, results.size) + } + + @Test + fun `asFlow stops at null (EOF)`() = runTest { + val iterator = object : DatumIterator { + private var count = 0 + override fun next() = if (count++ < 3) { + SinkerTestKit.TestDatum.builder() + .id("msg-$count") + .value(byteArrayOf()) + .keys(emptyArray()) + .eventTime(Instant.now()) + .watermark(Instant.now()) + .headers(emptyMap()) + .build() + } else null + } + + val results = iterator.asFlow().toList() + assertEquals(3, results.size) + } +} diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatumTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatumTest.kt new file mode 100644 index 00000000..92382438 --- /dev/null +++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatumTest.kt @@ -0,0 +1,81 @@ +package io.numaproj.numaflow.kt.sinker + +import io.numaproj.numaflow.shared.UserMetadata +import io.numaproj.numaflow.sinker.Datum +import io.numaproj.numaflow.shared.SystemMetadata +import org.junit.jupiter.api.Test +import java.time.Instant +import kotlin.test.assertEquals +import kotlin.test.assertNotEquals + +class SinkDatumTest { + + @Test + fun `from converts Java Datum correctly`() { + val now = Instant.now() + val javaDatum = object : Datum { + override fun getKeys() = arrayOf("key1", "key2") + override fun getValue() = "hello".toByteArray() + override fun getEventTime() = now + override fun getWatermark() = now + override fun getId() = "msg-1" + override fun getHeaders() = mapOf("h1" to "v1") + override fun getUserMetadata(): UserMetadata? = null + override fun getSystemMetadata(): SystemMetadata? = null + } + + val datum = SinkDatum.from(javaDatum) + + assertEquals("msg-1", datum.id) + assertEquals(listOf("key1", "key2"), datum.keys) + assertEquals("hello", String(datum.value)) + assertEquals(now, datum.eventTime) + assertEquals(now, datum.watermark) + assertEquals(mapOf("h1" to "v1"), datum.headers) + } + + @Test + fun `from handles null fields gracefully`() { + val javaDatum = object : Datum { + override fun getKeys(): Array? = null + override fun getValue(): ByteArray? = null + override fun getEventTime(): Instant? = null + override fun getWatermark(): Instant? = null + override fun getId(): String? = null + override fun getHeaders(): Map? = null + override fun getUserMetadata(): UserMetadata? = null + override fun getSystemMetadata(): SystemMetadata? = null + } + + val datum = SinkDatum.from(javaDatum) + + assertEquals("", datum.id) + assertEquals(emptyList(), datum.keys) + assertEquals(0, datum.value.size) + assertEquals(Instant.EPOCH, datum.eventTime) + } + + @Test + fun `equals and hashCode handle ByteArray correctly`() { + val now = Instant.now() + val datum1 = SinkDatum( + id = "1", keys = listOf("k"), value = "abc".toByteArray(), + eventTime = now, watermark = now, headers = emptyMap(), + userMetadata = null, systemMetadata = null, + ) + val datum2 = SinkDatum( + id = "1", keys = listOf("k"), value = "abc".toByteArray(), + eventTime = now, watermark = now, headers = emptyMap(), + userMetadata = null, systemMetadata = null, + ) + val datum3 = SinkDatum( + id = "1", keys = listOf("k"), value = "xyz".toByteArray(), + eventTime = now, watermark = now, headers = emptyMap(), + userMetadata = null, systemMetadata = null, + ) + + assertEquals(datum1, datum2) + assertEquals(datum1.hashCode(), datum2.hashCode()) + assertNotEquals(datum1, datum3) + } +} diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponseTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponseTest.kt new file mode 100644 index 00000000..241384f8 --- /dev/null +++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponseTest.kt @@ -0,0 +1,98 @@ +package io.numaproj.numaflow.kt.sinker + +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.test.assertNotEquals + +class SinkResponseTest { + + @Test + fun `Ok variant properties`() { + val ok = SinkResponse.Ok("id-1") + assertEquals("id-1", ok.id) + assertIs(ok) + } + + @Test + fun `Failure variant properties`() { + val fail = SinkResponse.Failure("id-2", "bad data") + assertEquals("id-2", fail.id) + assertEquals("bad data", fail.error) + } + + @Test + fun `Fallback variant properties`() { + val fb = SinkResponse.Fallback("id-3") + assertEquals("id-3", fb.id) + } + + @Test + fun `Serve variant equals handles ByteArray`() { + val s1 = SinkResponse.Serve("id-4", "data".toByteArray()) + val s2 = SinkResponse.Serve("id-4", "data".toByteArray()) + val s3 = SinkResponse.Serve("id-4", "other".toByteArray()) + assertEquals(s1, s2) + assertEquals(s1.hashCode(), s2.hashCode()) + assertNotEquals(s1, s3) + } + + @Test + fun `OnSuccess variant with and without message`() { + val os1 = SinkResponse.OnSuccess("id-5") + assertEquals(null, os1.message) + + val msg = SinkMessage(value = "v".toByteArray(), keys = listOf("k")) + val os2 = SinkResponse.OnSuccess("id-6", msg) + assertEquals(msg, os2.message) + } + + @Test + fun `toJava round-trip for Ok`() { + val ok = SinkResponse.Ok("id-1") + val java = ok.toJava() + assertEquals("id-1", java.id) + assertEquals(true, java.success) + } + + @Test + fun `toJava round-trip for Failure`() { + val fail = SinkResponse.Failure("id-2", "err") + val java = fail.toJava() + assertEquals("id-2", java.id) + assertEquals(false, java.success) + assertEquals("err", java.err) + } + + @Test + fun `toJava round-trip for Fallback`() { + val fb = SinkResponse.Fallback("id-3") + val java = fb.toJava() + assertEquals(true, java.fallback) + } + + @Test + fun `toJava round-trip for Serve`() { + val serve = SinkResponse.Serve("id-4", "data".toByteArray()) + val java = serve.toJava() + assertEquals(true, java.serve) + assertEquals("data", String(java.serveResponse)) + } + + @Test + fun `toJava round-trip for OnSuccess`() { + val os = SinkResponse.OnSuccess("id-5") + val java = os.toJava() + assertEquals(true, java.onSuccess) + } + + @Test + fun `toKotlin conversions`() { + assertEquals(SinkResponse.Ok("1"), io.numaproj.numaflow.sinker.Response.responseOK("1").toKotlin()) + assertEquals(SinkResponse.Fallback("2"), io.numaproj.numaflow.sinker.Response.responseFallback("2").toKotlin()) + + val fail = io.numaproj.numaflow.sinker.Response.responseFailure("3", "oops").toKotlin() + assertIs(fail) + assertEquals("oops", fail.error) + } +} diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkServerTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkServerTest.kt new file mode 100644 index 00000000..1172d05b --- /dev/null +++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkServerTest.kt @@ -0,0 +1,52 @@ +package io.numaproj.numaflow.kt.sinker + +import kotlinx.coroutines.flow.toList +import org.junit.jupiter.api.Test +import kotlin.test.assertNotNull + +class SinkServerTest { + + @Test + fun `sinkServer with handler creates server`() { + val server = sinkServer(handler = SinkHandler { datums -> + datums.toList().map { SinkResponse.Ok(it.id) } + }) + assertNotNull(server) + } + + @Test + fun `sinkServer with lambda creates server`() { + val server = sinkServer { datums -> + datums.toList().map { SinkResponse.Ok(it.id) } + } + assertNotNull(server) + } + + @Test + fun `sinkServer with config creates server`() { + val server = sinkServer( + config = { + isLocal = true + port = 50052 + maxMessageSize = 1024 * 1024 * 8 + }, + handler = SinkHandler { datums -> + datums.toList().map { SinkResponse.Ok(it.id) } + }, + ) + assertNotNull(server) + } + + @Test + fun `SinkServerConfig toGrpcConfig applies values`() { + val cfg = SinkServerConfig().apply { + socketPath = "/tmp/test.sock" + maxMessageSize = 999 + port = 12345 + isLocal = true + infoFilePath = "/tmp/info" + } + val grpc = cfg.toGrpcConfig() + assertNotNull(grpc) + } +} diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKitTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKitTest.kt new file mode 100644 index 00000000..63b44032 --- /dev/null +++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKitTest.kt @@ -0,0 +1,78 @@ +package io.numaproj.numaflow.kt.sinker + +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import java.time.Instant +import kotlin.test.assertEquals +import kotlin.test.assertIs + +class SinkTestKitTest { + + @Test + fun `datum factory creates SinkDatum with defaults`() { + val datum = SinkTestKit.datum(id = "1") + assertEquals("1", datum.id) + assertEquals(emptyList(), datum.keys) + assertEquals(0, datum.value.size) + assertEquals(emptyMap(), datum.headers) + } + + @Test + fun `datum factory creates SinkDatum with custom values`() { + val now = Instant.now() + val datum = SinkTestKit.datum( + id = "2", + value = "payload".toByteArray(), + keys = listOf("k1", "k2"), + eventTime = now, + watermark = now, + headers = mapOf("header" to "value"), + ) + assertEquals("2", datum.id) + assertEquals(listOf("k1", "k2"), datum.keys) + assertEquals("payload", String(datum.value)) + assertEquals(now, datum.eventTime) + assertEquals(mapOf("header" to "value"), datum.headers) + } + + @Test + fun `test invokes handler directly with Flow`() = runTest { + val handler = SinkHandler { datums -> + datums.processEach { it.ok() } + } + + val results = SinkTestKit.test( + handler, + listOf( + SinkTestKit.datum(id = "1", value = "hello".toByteArray()), + SinkTestKit.datum(id = "2", value = "world".toByteArray()), + ), + ) + + assertEquals(2, results.size) + assertEquals(SinkResponse.Ok("1"), results[0]) + assertEquals(SinkResponse.Ok("2"), results[1]) + } + + @Test + fun `test handles mixed response types`() = runTest { + val handler = SinkHandler { datums -> + datums.processEach { datum -> + if (datum.valueAsString() == "good") datum.ok() + else datum.failure("bad data") + } + } + + val results = SinkTestKit.test( + handler, + listOf( + SinkTestKit.datum(id = "1", value = "good".toByteArray()), + SinkTestKit.datum(id = "2", value = "bad".toByteArray()), + ), + ) + + assertIs(results[0]) + assertIs(results[1]) + assertEquals("bad data", (results[1] as SinkResponse.Failure).error) + } +} diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridgeTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridgeTest.kt new file mode 100644 index 00000000..6d82b05c --- /dev/null +++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridgeTest.kt @@ -0,0 +1,74 @@ +package io.numaproj.numaflow.kt.sinker + +import io.numaproj.numaflow.sinker.DatumIterator +import io.numaproj.numaflow.sinker.SinkerTestKit +import kotlinx.coroutines.flow.toList +import org.junit.jupiter.api.Test +import java.time.Instant +import kotlin.test.assertEquals + +class SinkerBridgeTest { + + @Test + fun `bridge delegates to handler and returns correct ResponseList`() { + val handler = SinkHandler { datums -> + datums.toList().map { SinkResponse.Ok(it.id) } + } + val bridge = SinkerBridge(handler) + + val iterator = SinkerTestKit.TestListIterator() + iterator.addDatum( + SinkerTestKit.TestDatum.builder() + .id("msg-1") + .value("hello".toByteArray()) + .keys(arrayOf("k1")) + .eventTime(Instant.now()) + .watermark(Instant.now()) + .headers(emptyMap()) + .build() + ) + iterator.addDatum( + SinkerTestKit.TestDatum.builder() + .id("msg-2") + .value("world".toByteArray()) + .keys(arrayOf("k2")) + .eventTime(Instant.now()) + .watermark(Instant.now()) + .headers(emptyMap()) + .build() + ) + + val result = bridge.processMessages(iterator) + + assertEquals(2, result.responses.size) + assertEquals("msg-1", result.responses[0].id) + assertEquals(true, result.responses[0].success) + assertEquals("msg-2", result.responses[1].id) + } + + @Test + fun `bridge handles failure responses`() { + val handler = SinkHandler { datums -> + datums.toList().map { SinkResponse.Failure(it.id, "failed") } + } + val bridge = SinkerBridge(handler) + + val iterator = SinkerTestKit.TestListIterator() + iterator.addDatum( + SinkerTestKit.TestDatum.builder() + .id("msg-1") + .value("data".toByteArray()) + .keys(emptyArray()) + .eventTime(Instant.now()) + .watermark(Instant.now()) + .headers(emptyMap()) + .build() + ) + + val result = bridge.processMessages(iterator) + + assertEquals(1, result.responses.size) + assertEquals(false, result.responses[0].success) + assertEquals("failed", result.responses[0].err) + } +} diff --git a/releases.md b/releases.md index 481546b1..69bdec66 100644 --- a/releases.md +++ b/releases.md @@ -5,8 +5,8 @@ This document explains the release process for the Java SDK. You can find the mo ### Before Release 1. Before releasing a new SDK version, make sure to update all references from the old version to the new one. -For example, the version in the `README.md`, as well as the `pom.xml` in the root and example directories should be updated (for [reference -](https://github.com/numaproj/numaflow-java/pull/89/files#diff-9c5fb3d1b7e3b0f54bc5c4182965c4fe1f9023d449017cece3005d3f90e8e4d8)). The specified version should follow the [semantic versioning](https://semver.org/) specification +For example, the version in the `README.md`, as well as the `pom.xml` in the root, example, and `numaflow-kotlin` directories should be updated (for [reference +](https://github.com/numaproj/numaflow-java/pull/89/files#diff-9c5fb3d1b7e3b0f54bc5c4182965c4fe1f9023d449017cece3005d3f90e8e4d8)). For `numaflow-kotlin/pom.xml`, update both its own `` and the `numaflow-java` dependency version. The specified version should follow the [semantic versioning](https://semver.org/) specification 2. If version to be released has backwards incompatible changes, i.e. it does not support older versions of the Numaflow platform, you must update the `MINIMUM_NUMAFLOW_VERSION` constant in the `src/main/java/io/numaproj/numaflow/info/ServerInfo.java` file to the minimum Numaflow version that is supported by your new SDK version From ef838ff89121a75622717dc6d4a16f5a759d60bb Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Wed, 18 Mar 2026 19:04:33 +0530 Subject: [PATCH 2/2] feat: Kotlin SDK Signed-off-by: Sreekanth --- .github/workflows/kotlin-publish.yml | 56 ++++++++++++++++++++++++++++ .github/workflows/maven-publish.yml | 27 ++++---------- numaflow-kotlin/pom.xml | 2 +- releases.md | 44 ++++++++++++++++++---- 4 files changed, 101 insertions(+), 28 deletions(-) create mode 100644 .github/workflows/kotlin-publish.yml diff --git a/.github/workflows/kotlin-publish.yml b/.github/workflows/kotlin-publish.yml new file mode 100644 index 00000000..d43ff9b5 --- /dev/null +++ b/.github/workflows/kotlin-publish.yml @@ -0,0 +1,56 @@ +# Publishes the Kotlin SDK to Maven Central and GitHub Packages. +# Triggered by GitHub releases with tags matching kotlin-v* (e.g. kotlin-v0.1.0). + +name: Publish Kotlin SDK to Maven Central and Github Packages +on: + release: + types: [ created ] +jobs: + publish: + if: startsWith(github.event.release.tag_name, 'kotlin-v') + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - uses: actions/checkout@v3 + - name: Import GPG key + id: import_gpg + uses: crazy-max/ghaction-import-gpg@v5 + with: + gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }} + passphrase: ${{ secrets.GPG_PASSPHRASE }} + + # Build the Java SDK first so it's available as a local dependency + - name: Set up Java + uses: actions/setup-java@v3 + with: + java-version: '11' + distribution: 'temurin' + server-id: central + server-username: MAVEN_USERNAME + server-password: MAVEN_PASSWORD + settings-path: ${{ github.workspace }} + + - name: Install Java SDK to local repository + run: mvn clean install -DskipTests -q + + - name: Publish Kotlin SDK to Maven Central + run: cd numaflow-kotlin && mvn -DcentralRelease=true -P central deploy -s $GITHUB_WORKSPACE/settings.xml + env: + MAVEN_USERNAME: ${{ secrets.MVN_CENTRAL_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.MVN_CENTRAL_PASSWORD }} + + - name: Set up Java for publishing to GitHub Packages + uses: actions/setup-java@v3 + with: + java-version: '11' + distribution: 'temurin' + server-id: github + settings-path: ${{ github.workspace }} + + - name: Publish Kotlin SDK to GitHub Packages + run: cd numaflow-kotlin && mvn -DgithubRelease=true -P github deploy -s $GITHUB_WORKSPACE/settings.xml + env: + GITHUB_TOKEN: ${{ github.token }} diff --git a/.github/workflows/maven-publish.yml b/.github/workflows/maven-publish.yml index 403176c9..f785927b 100644 --- a/.github/workflows/maven-publish.yml +++ b/.github/workflows/maven-publish.yml @@ -1,11 +1,13 @@ # This workflow will build a package using Maven and then publish it to Maven Central and GitHub packages when a release is created +# Triggered by tags matching v* (e.g. v0.11.0). Kotlin SDK uses a separate workflow. -name: Publish to Maven Central and Github Packages +name: Publish Java SDK to Maven Central and Github Packages on: release: types: [ created ] jobs: publish: + if: startsWith(github.event.release.tag_name, 'v') && !startsWith(github.event.release.tag_name, 'v-kotlin') runs-on: ubuntu-latest permissions: contents: read @@ -20,9 +22,7 @@ jobs: gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }} passphrase: ${{ secrets.GPG_PASSPHRASE }} - # --- Java SDK (JDK 11) --- - - - name: Set up Java 11 for publishing to Maven Central Repository + - name: Set up Java for publishing to Maven Central Repository uses: actions/setup-java@v3 with: java-version: '11' @@ -32,13 +32,13 @@ jobs: server-password: MAVEN_PASSWORD settings-path: ${{ github.workspace }} - - name: Publish Java SDK to Maven Central + - name: Publish to the Maven Central Repository run: mvn -DcentralRelease=true -P central deploy -s $GITHUB_WORKSPACE/settings.xml env: MAVEN_USERNAME: ${{ secrets.MVN_CENTRAL_USERNAME }} MAVEN_PASSWORD: ${{ secrets.MVN_CENTRAL_PASSWORD }} - - name: Set up Java 11 for publishing to GitHub Packages + - name: Set up Java for publishing to GitHub Packages uses: actions/setup-java@v3 with: java-version: '11' @@ -46,20 +46,7 @@ jobs: server-id: github settings-path: ${{ github.workspace }} - - name: Publish Java SDK to GitHub Packages + - name: Publish to GitHub Packages run: mvn -DgithubRelease=true -P github deploy -s $GITHUB_WORKSPACE/settings.xml env: GITHUB_TOKEN: ${{ github.token }} - - # --- Kotlin SDK --- - - - name: Publish Kotlin SDK to Maven Central - run: cd numaflow-kotlin && mvn -DcentralRelease=true -P central deploy -s $GITHUB_WORKSPACE/settings.xml - env: - MAVEN_USERNAME: ${{ secrets.MVN_CENTRAL_USERNAME }} - MAVEN_PASSWORD: ${{ secrets.MVN_CENTRAL_PASSWORD }} - - - name: Publish Kotlin SDK to GitHub Packages - run: cd numaflow-kotlin && mvn -DgithubRelease=true -P github deploy -s $GITHUB_WORKSPACE/settings.xml - env: - GITHUB_TOKEN: ${{ github.token }} diff --git a/numaflow-kotlin/pom.xml b/numaflow-kotlin/pom.xml index 967976e5..f284d8c8 100644 --- a/numaflow-kotlin/pom.xml +++ b/numaflow-kotlin/pom.xml @@ -6,7 +6,7 @@ io.numaproj.numaflow numaflow-kotlin - 0.11.0 + 0.1.0 jar numaflow-kotlin diff --git a/releases.md b/releases.md index 69bdec66..99e3abe0 100644 --- a/releases.md +++ b/releases.md @@ -1,14 +1,20 @@ # Release Guide -This document explains the release process for the Java SDK. You can find the most recent version under [Github Releases](https://github.com/numaproj/numaflow-java/releases). +This document explains the release process for the Java SDK and Kotlin SDK. You can find the most recent versions under [Github Releases](https://github.com/numaproj/numaflow-java/releases). + +The two SDKs are versioned and released independently. The Kotlin SDK depends on the Java SDK, so a Java SDK release may need to happen first if the Kotlin SDK needs new Java SDK features. + +--- + +## Java SDK ### Before Release 1. Before releasing a new SDK version, make sure to update all references from the old version to the new one. -For example, the version in the `README.md`, as well as the `pom.xml` in the root, example, and `numaflow-kotlin` directories should be updated (for [reference -](https://github.com/numaproj/numaflow-java/pull/89/files#diff-9c5fb3d1b7e3b0f54bc5c4182965c4fe1f9023d449017cece3005d3f90e8e4d8)). For `numaflow-kotlin/pom.xml`, update both its own `` and the `numaflow-java` dependency version. The specified version should follow the [semantic versioning](https://semver.org/) specification +For example, the version in the `README.md`, as well as the `pom.xml` in the root and example directories should be updated (for [reference +](https://github.com/numaproj/numaflow-java/pull/89/files#diff-9c5fb3d1b7e3b0f54bc5c4182965c4fe1f9023d449017cece3005d3f90e8e4d8)). The specified version should follow the [semantic versioning](https://semver.org/) specification 2. If version to be released has backwards incompatible changes, i.e. it does not support older versions of the Numaflow platform, -you must update the `MINIMUM_NUMAFLOW_VERSION` constant in the `src/main/java/io/numaproj/numaflow/info/ServerInfo.java` file to the minimum Numaflow version +you must update the `MINIMUM_NUMAFLOW_VERSION` constant in the `src/main/java/io/numaproj/numaflow/info/ServerInfo.java` file to the minimum Numaflow version that is supported by your new SDK version 3. After making these changes, create a PR. Once merged, it will trigger the `Docker Publish` workflow, and should be included in the release. As a result, the correct SDK version will always be printed in the server information logs, and the example images will @@ -16,10 +22,10 @@ always be using the latest changes (due to pointing to the local maven repositor ### How to Release -This can be done via the Github UI. +This can be done via the Github UI. 1. In the `Releases` section of the Java SDK repo, click `Draft a new release` 2. Create a tag that has the same name as the version that you specified in the root -`pom.xml`, prefix it with a `'v'`, and select it +`pom.xml`, prefix it with a `'v'`, and select it (e.g. `v0.11.0`) 3. Make the title the same as the tag 4. Click `Generate release notes` so that all the changes made since the last release are documented. If there are any major features or breaking changes that you would like to highlight as part of the release, add those to the description as well @@ -28,4 +34,28 @@ changes that you would like to highlight as part of the release, add those to th ### After Release -After your release, a Github Actions workflow, `Publish to Maven Central and Github Packages`, will be triggered. Monitor the workflow run and ensure that it succeeds. +After your release, the `Publish Java SDK to Maven Central and Github Packages` workflow will be triggered. Monitor the workflow run and ensure that it succeeds. + +--- + +## Kotlin SDK + +The Kotlin SDK lives in `numaflow-kotlin/` and has its own version in `numaflow-kotlin/pom.xml`. It depends on a published version of the Java SDK. + +### Before Release + +1. Update the version in `numaflow-kotlin/pom.xml` +2. If needed, update the `numaflow-java` dependency version in `numaflow-kotlin/pom.xml` to the Java SDK version it should depend on +3. Create a PR and merge + +### How to Release + +1. In the `Releases` section, click `Draft a new release` +2. Create a tag prefixed with `kotlin-v` (e.g. `kotlin-v0.1.0`) +3. Make the title the same as the tag +4. Click `Generate release notes` and add any highlights +5. Click `Publish release` + +### After Release + +The `Publish Kotlin SDK to Maven Central and Github Packages` workflow will be triggered. Monitor the workflow run and ensure that it succeeds.