diff --git a/.github/workflows/kotlin-publish.yml b/.github/workflows/kotlin-publish.yml
new file mode 100644
index 00000000..d43ff9b5
--- /dev/null
+++ b/.github/workflows/kotlin-publish.yml
@@ -0,0 +1,56 @@
+# Publishes the Kotlin SDK to Maven Central and GitHub Packages.
+# Triggered by GitHub releases with tags matching kotlin-v* (e.g. kotlin-v0.1.0).
+
+name: Publish Kotlin SDK to Maven Central and Github Packages
+on:
+ release:
+ types: [ created ]
+jobs:
+ publish:
+ if: startsWith(github.event.release.tag_name, 'kotlin-v')
+ runs-on: ubuntu-latest
+ permissions:
+ contents: read
+ packages: write
+
+ steps:
+ - uses: actions/checkout@v3
+ - name: Import GPG key
+ id: import_gpg
+ uses: crazy-max/ghaction-import-gpg@v5
+ with:
+ gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }}
+ passphrase: ${{ secrets.GPG_PASSPHRASE }}
+
+ # Build the Java SDK first so it's available as a local dependency
+ - name: Set up Java
+ uses: actions/setup-java@v3
+ with:
+ java-version: '11'
+ distribution: 'temurin'
+ server-id: central
+ server-username: MAVEN_USERNAME
+ server-password: MAVEN_PASSWORD
+ settings-path: ${{ github.workspace }}
+
+ - name: Install Java SDK to local repository
+ run: mvn clean install -DskipTests -q
+
+ - name: Publish Kotlin SDK to Maven Central
+ run: cd numaflow-kotlin && mvn -DcentralRelease=true -P central deploy -s $GITHUB_WORKSPACE/settings.xml
+ env:
+ MAVEN_USERNAME: ${{ secrets.MVN_CENTRAL_USERNAME }}
+ MAVEN_PASSWORD: ${{ secrets.MVN_CENTRAL_PASSWORD }}
+
+ - name: Set up Java for publishing to GitHub Packages
+ uses: actions/setup-java@v3
+ with:
+ java-version: '11'
+ distribution: 'temurin'
+ server-id: github
+ settings-path: ${{ github.workspace }}
+
+ - name: Publish Kotlin SDK to GitHub Packages
+ run: cd numaflow-kotlin && mvn -DgithubRelease=true -P github deploy -s $GITHUB_WORKSPACE/settings.xml
+ env:
+ GITHUB_TOKEN: ${{ github.token }}
diff --git a/.github/workflows/maven-publish.yml b/.github/workflows/maven-publish.yml
index 6502cd8f..f785927b 100644
--- a/.github/workflows/maven-publish.yml
+++ b/.github/workflows/maven-publish.yml
@@ -1,11 +1,13 @@
# This workflow will build a package using Maven and then publish it to Maven Central and GitHub packages when a release is created
+# Triggered by tags matching v* (e.g. v0.11.0). Kotlin SDK uses a separate workflow.
-name: Publish to Maven Central and Github Packages
+name: Publish Java SDK to Maven Central and Github Packages
on:
release:
types: [ created ]
jobs:
publish:
+ if: startsWith(github.event.release.tag_name, 'v') && !startsWith(github.event.release.tag_name, 'v-kotlin')
runs-on: ubuntu-latest
permissions:
contents: read
@@ -28,7 +30,7 @@ jobs:
server-id: central
server-username: MAVEN_USERNAME
server-password: MAVEN_PASSWORD
- settings-path: ${{ github.workspace }} # location for the settings.xml file
+ settings-path: ${{ github.workspace }}
- name: Publish to the Maven Central Repository
run: mvn -DcentralRelease=true -P central deploy -s $GITHUB_WORKSPACE/settings.xml
@@ -41,8 +43,9 @@ jobs:
with:
java-version: '11'
distribution: 'temurin'
- server-id: github # Value of the distributionManagement/repository/id field of the pom.xml
- settings-path: ${{ github.workspace }} #location for the settings.xml file
+ server-id: github
+ settings-path: ${{ github.workspace }}
+
- name: Publish to GitHub Packages
run: mvn -DgithubRelease=true -P github deploy -s $GITHUB_WORKSPACE/settings.xml
env:
diff --git a/.gitignore b/.gitignore
index 195a9c4b..484a2548 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,8 @@
# Compiled class file
*.class
+numaflow-kotlin/target/
+
# Log file
*.log
diff --git a/numaflow-kotlin/pom.xml b/numaflow-kotlin/pom.xml
new file mode 100644
index 00000000..f284d8c8
--- /dev/null
+++ b/numaflow-kotlin/pom.xml
@@ -0,0 +1,242 @@
+
+
+ 4.0.0
+
+ io.numaproj.numaflow
+ numaflow-kotlin
+ 0.1.0
+ jar
+
+ numaflow-kotlin
+ Kotlin wrapper for Numaflow Java SDK — coroutines, Flow, sealed classes, DSL builders.
+ https://numaflow.numaproj.io/
+
+
+
+ The Apache License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+
+
+
+
+
+ Numaproj Authors
+ numaproj@gmail.com
+ Numaproj
+ https://numaproj.io/
+
+
+
+
+ scm:git:git://github.com/numaproj/numaflow-java.git
+ scm:git:ssh://github.com:numaproj/numaflow-java.git
+ http://github.com/numaproj/numaflow-java/tree/main
+
+
+
+ UTF-8
+ 2.1.0
+ 1.9.0
+ 5.10.2
+
+
+
+
+ github
+
+
+ githubRelease
+ true
+
+
+
+
+ github
+ Numaflow Kotlin SDK
+ https://maven.pkg.github.com/numaproj/numaflow-java
+
+
+
+
+ central
+
+
+ centralRelease
+ true
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+ 1.5
+
+
+ sign-artifacts
+ verify
+
+ sign
+
+
+
+
+
+ org.sonatype.central
+ central-publishing-maven-plugin
+ 0.8.0
+ true
+
+ central
+ true
+ published
+
+
+
+
+
+
+
+
+
+
+ io.numaproj.numaflow
+ numaflow-java
+ 0.11.0
+
+
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib
+ ${kotlin.version}
+
+
+ org.jetbrains.kotlinx
+ kotlinx-coroutines-core
+ ${kotlinx.coroutines.version}
+
+
+ org.jetbrains.kotlinx
+ kotlinx-coroutines-jdk8
+ ${kotlinx.coroutines.version}
+
+
+
+
+ org.jetbrains.kotlin
+ kotlin-test
+ ${kotlin.version}
+ test
+
+
+ org.jetbrains.kotlinx
+ kotlinx-coroutines-test
+ ${kotlinx.coroutines.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit.jupiter.version}
+ test
+
+
+
+
+ src/main/kotlin
+ src/test/kotlin
+
+
+
+ org.jetbrains.kotlin
+ kotlin-maven-plugin
+ ${kotlin.version}
+
+
+ compile
+ compile
+
+ compile
+
+
+
+ test-compile
+ test-compile
+
+ test-compile
+
+
+
+
+ 11
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.2.5
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.3.0
+
+
+ attach-sources
+
+ jar-no-fork
+
+
+
+
+
+
+ org.jetbrains.dokka
+ dokka-maven-plugin
+ 1.9.20
+
+
+ prepare-package
+
+ dokka
+ javadocJar
+
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.12
+
+
+
+ prepare-agent
+
+
+
+ report
+ prepare-package
+
+ report
+
+
+
+
+
+
+ maven-compiler-plugin
+ 3.12.1
+
+ 11
+
+
+
+
+
diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Conversions.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Conversions.kt
new file mode 100644
index 00000000..a3597a15
--- /dev/null
+++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Conversions.kt
@@ -0,0 +1,38 @@
+package io.numaproj.numaflow.kt.sinker
+
+import io.numaproj.numaflow.shared.UserMetadata
+import io.numaproj.numaflow.sinker.Message as JavaMessage
+import io.numaproj.numaflow.sinker.Response as JavaResponse
+
+internal fun SinkResponse.toJava(): JavaResponse = when (this) {
+ is SinkResponse.Ok -> JavaResponse.responseOK(id)
+ is SinkResponse.Failure -> JavaResponse.responseFailure(id, error)
+ is SinkResponse.Fallback -> JavaResponse.responseFallback(id)
+ is SinkResponse.Serve -> JavaResponse.responseServe(id, data)
+ is SinkResponse.OnSuccess -> JavaResponse.responseOnSuccess(id, message?.toJava())
+}
+
+internal fun SinkMessage.toJava(): JavaMessage = JavaMessage(
+ value,
+ keys.toTypedArray(),
+ userMetadata ?: UserMetadata(),
+)
+
+internal fun JavaResponse.toKotlin(): SinkResponse {
+ return when {
+ success == true -> SinkResponse.Ok(id)
+ fallback == true -> SinkResponse.Fallback(id)
+ serve == true -> SinkResponse.Serve(id, serveResponse ?: byteArrayOf())
+ onSuccess == true -> SinkResponse.OnSuccess(
+ id,
+ onSuccessMessage?.let { msg ->
+ SinkMessage(
+ value = msg.value ?: byteArrayOf(),
+ keys = msg.keys?.toList() ?: emptyList(),
+ userMetadata = msg.userMetadata,
+ )
+ },
+ )
+ else -> SinkResponse.Failure(id, err ?: "unknown error")
+ }
+}
diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Extensions.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Extensions.kt
new file mode 100644
index 00000000..25a32a5a
--- /dev/null
+++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/Extensions.kt
@@ -0,0 +1,16 @@
+package io.numaproj.numaflow.kt.sinker
+
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.map
+import kotlinx.coroutines.flow.toList
+
+suspend fun Flow.processEach(block: suspend (SinkDatum) -> SinkResponse): List =
+ map { block(it) }.toList()
+
+fun SinkDatum.ok(): SinkResponse.Ok = SinkResponse.Ok(id)
+
+fun SinkDatum.failure(error: String): SinkResponse.Failure = SinkResponse.Failure(id, error)
+
+fun SinkDatum.fallback(): SinkResponse.Fallback = SinkResponse.Fallback(id)
+
+fun SinkDatum.valueAsString(): String = String(value)
diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapter.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapter.kt
new file mode 100644
index 00000000..d6d9bde0
--- /dev/null
+++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapter.kt
@@ -0,0 +1,14 @@
+package io.numaproj.numaflow.kt.sinker
+
+import io.numaproj.numaflow.sinker.DatumIterator
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.flow.flowOn
+
+internal fun DatumIterator.asFlow(): Flow = flow {
+ while (true) {
+ val datum = next() ?: break
+ emit(SinkDatum.from(datum))
+ }
+}.flowOn(Dispatchers.IO)
diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatum.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatum.kt
new file mode 100644
index 00000000..23913280
--- /dev/null
+++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatum.kt
@@ -0,0 +1,51 @@
+package io.numaproj.numaflow.kt.sinker
+
+import io.numaproj.numaflow.shared.SystemMetadata
+import io.numaproj.numaflow.shared.UserMetadata
+import io.numaproj.numaflow.sinker.Datum
+import java.time.Instant
+
+data class SinkDatum(
+ val id: String,
+ val keys: List,
+ val value: ByteArray,
+ val eventTime: Instant,
+ val watermark: Instant,
+ val headers: Map,
+ val userMetadata: UserMetadata?,
+ val systemMetadata: SystemMetadata?,
+) {
+ companion object {
+ fun from(datum: Datum): SinkDatum = SinkDatum(
+ id = datum.id ?: "",
+ keys = datum.keys?.toList() ?: emptyList(),
+ value = datum.value?.clone() ?: byteArrayOf(),
+ eventTime = datum.eventTime ?: Instant.EPOCH,
+ watermark = datum.watermark ?: Instant.EPOCH,
+ headers = datum.headers ?: emptyMap(),
+ userMetadata = datum.userMetadata,
+ systemMetadata = datum.systemMetadata,
+ )
+ }
+
+ override fun equals(other: Any?): Boolean {
+ if (this === other) return true
+ if (other !is SinkDatum) return false
+ return id == other.id &&
+ keys == other.keys &&
+ value.contentEquals(other.value) &&
+ eventTime == other.eventTime &&
+ watermark == other.watermark &&
+ headers == other.headers
+ }
+
+ override fun hashCode(): Int {
+ var result = id.hashCode()
+ result = 31 * result + keys.hashCode()
+ result = 31 * result + value.contentHashCode()
+ result = 31 * result + eventTime.hashCode()
+ result = 31 * result + watermark.hashCode()
+ result = 31 * result + headers.hashCode()
+ return result
+ }
+}
diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkHandler.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkHandler.kt
new file mode 100644
index 00000000..57e34c74
--- /dev/null
+++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkHandler.kt
@@ -0,0 +1,7 @@
+package io.numaproj.numaflow.kt.sinker
+
+import kotlinx.coroutines.flow.Flow
+
+fun interface SinkHandler {
+ suspend fun processMessages(datums: Flow): List
+}
diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponse.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponse.kt
new file mode 100644
index 00000000..9c38794c
--- /dev/null
+++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponse.kt
@@ -0,0 +1,45 @@
+package io.numaproj.numaflow.kt.sinker
+
+import io.numaproj.numaflow.shared.UserMetadata
+
+sealed interface SinkResponse {
+ val id: String
+
+ data class Ok(override val id: String) : SinkResponse
+ data class Failure(override val id: String, val error: String) : SinkResponse
+ data class Fallback(override val id: String) : SinkResponse
+
+ data class Serve(override val id: String, val data: ByteArray) : SinkResponse {
+ override fun equals(other: Any?): Boolean {
+ if (this === other) return true
+ if (other !is Serve) return false
+ return id == other.id && data.contentEquals(other.data)
+ }
+
+ override fun hashCode(): Int = 31 * id.hashCode() + data.contentHashCode()
+ }
+
+ data class OnSuccess(override val id: String, val message: SinkMessage? = null) : SinkResponse
+}
+
+data class SinkMessage(
+ val value: ByteArray,
+ val keys: List = emptyList(),
+ val userMetadata: UserMetadata? = null,
+) {
+ companion object {
+ fun fromDatum(datum: SinkDatum): SinkMessage = SinkMessage(
+ value = datum.value.clone(),
+ keys = datum.keys,
+ userMetadata = datum.userMetadata?.let { UserMetadata(it) },
+ )
+ }
+
+ override fun equals(other: Any?): Boolean {
+ if (this === other) return true
+ if (other !is SinkMessage) return false
+ return value.contentEquals(other.value) && keys == other.keys
+ }
+
+ override fun hashCode(): Int = 31 * value.contentHashCode() + keys.hashCode()
+}
diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkServer.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkServer.kt
new file mode 100644
index 00000000..47748d9e
--- /dev/null
+++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkServer.kt
@@ -0,0 +1,51 @@
+package io.numaproj.numaflow.kt.sinker
+
+import io.numaproj.numaflow.sinker.GRPCConfig
+import io.numaproj.numaflow.sinker.Server
+import kotlinx.coroutines.flow.Flow
+
+class SinkServerConfig {
+ var socketPath: String? = null
+ var maxMessageSize: Int? = null
+ var port: Int? = null
+ var isLocal: Boolean? = null
+ var infoFilePath: String? = null
+
+ internal fun toGrpcConfig(): GRPCConfig {
+ val builder = GRPCConfig.newBuilder()
+ socketPath?.let { builder.socketPath(it) }
+ maxMessageSize?.let { builder.maxMessageSize(it) }
+ port?.let { builder.port(it) }
+ isLocal?.let { builder.isLocal(it) }
+ infoFilePath?.let { builder.infoFilePath(it) }
+ return builder.build()
+ }
+}
+
+class SinkServer internal constructor(private val javaServer: Server) {
+ fun start() { javaServer.start() }
+ fun awaitTermination() { javaServer.awaitTermination() }
+ fun stop() { javaServer.stop() }
+
+ fun run() {
+ start()
+ awaitTermination()
+ }
+}
+
+fun sinkServer(config: SinkServerConfig.() -> Unit = {}, handler: SinkHandler): SinkServer {
+ val cfg = SinkServerConfig().apply(config)
+ val bridge = SinkerBridge(handler)
+ val javaServer = if (cfg.socketPath != null || cfg.maxMessageSize != null ||
+ cfg.port != null || cfg.isLocal != null || cfg.infoFilePath != null
+ ) {
+ Server(bridge, cfg.toGrpcConfig())
+ } else {
+ Server(bridge)
+ }
+ return SinkServer(javaServer)
+}
+
+fun sinkServer(handler: suspend (Flow) -> List): SinkServer {
+ return sinkServer(handler = SinkHandler { datums -> handler(datums) })
+}
diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKit.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKit.kt
new file mode 100644
index 00000000..c6109705
--- /dev/null
+++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKit.kt
@@ -0,0 +1,68 @@
+package io.numaproj.numaflow.kt.sinker
+
+import io.numaproj.numaflow.sinker.GRPCConfig
+import io.numaproj.numaflow.sinker.SinkerTestKit
+import kotlinx.coroutines.flow.asFlow
+import java.time.Instant
+
+object SinkTestKit {
+
+ fun datum(
+ id: String,
+ value: ByteArray = byteArrayOf(),
+ keys: List = emptyList(),
+ eventTime: Instant = Instant.now(),
+ watermark: Instant = Instant.now(),
+ headers: Map = emptyMap(),
+ ): SinkDatum = SinkDatum(
+ id = id,
+ keys = keys,
+ value = value,
+ eventTime = eventTime,
+ watermark = watermark,
+ headers = headers,
+ userMetadata = null,
+ systemMetadata = null,
+ )
+
+ suspend fun test(handler: SinkHandler, datums: List): List =
+ handler.processMessages(datums.asFlow())
+
+ fun grpcTest(handler: SinkHandler): GrpcTestHarness = GrpcTestHarness(handler)
+
+ class GrpcTestHarness(private val handler: SinkHandler) {
+ private val testKit = SinkerTestKit(
+ SinkerBridge(handler),
+ GRPCConfig.newBuilder().isLocal(true).build(),
+ )
+
+ fun start() { testKit.startServer() }
+ fun stop() { testKit.stopServer() }
+
+ fun client(): SinkerTestKit.Client = SinkerTestKit.Client()
+
+ fun sendRequest(datums: List): List {
+ val iterator = SinkerTestKit.TestListIterator()
+ for (datum in datums) {
+ iterator.addDatum(datum.toTestDatum())
+ }
+ val client = client()
+ try {
+ val responseList = client.sendRequest(iterator)
+ return responseList.responses.map { it.toKotlin() }
+ } finally {
+ client.close()
+ }
+ }
+
+ private fun SinkDatum.toTestDatum(): SinkerTestKit.TestDatum =
+ SinkerTestKit.TestDatum.builder()
+ .id(id)
+ .keys(keys.toTypedArray())
+ .value(value)
+ .eventTime(eventTime)
+ .watermark(watermark)
+ .headers(headers)
+ .build()
+ }
+}
diff --git a/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridge.kt b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridge.kt
new file mode 100644
index 00000000..dcf3c0c2
--- /dev/null
+++ b/numaflow-kotlin/src/main/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridge.kt
@@ -0,0 +1,19 @@
+package io.numaproj.numaflow.kt.sinker
+
+import io.numaproj.numaflow.sinker.DatumIterator
+import io.numaproj.numaflow.sinker.ResponseList
+import io.numaproj.numaflow.sinker.Sinker
+import kotlinx.coroutines.runBlocking
+
+internal class SinkerBridge(private val handler: SinkHandler) : Sinker() {
+ override fun processMessages(datumStream: DatumIterator): ResponseList {
+ val responses = runBlocking {
+ handler.processMessages(datumStream.asFlow())
+ }
+ val builder = ResponseList.newBuilder()
+ for (response in responses) {
+ builder.addResponse(response.toJava())
+ }
+ return builder.build()
+ }
+}
diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/ExtensionsTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/ExtensionsTest.kt
new file mode 100644
index 00000000..840d418d
--- /dev/null
+++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/ExtensionsTest.kt
@@ -0,0 +1,64 @@
+package io.numaproj.numaflow.kt.sinker
+
+import kotlinx.coroutines.flow.flowOf
+import kotlinx.coroutines.test.runTest
+import org.junit.jupiter.api.Test
+import java.time.Instant
+import kotlin.test.assertEquals
+import kotlin.test.assertIs
+
+class ExtensionsTest {
+
+ private val now = Instant.now()
+
+ private fun testDatum(id: String, value: String = "") = SinkDatum(
+ id = id, keys = listOf("k"), value = value.toByteArray(),
+ eventTime = now, watermark = now, headers = emptyMap(),
+ userMetadata = null, systemMetadata = null,
+ )
+
+ @Test
+ fun `processEach maps each datum to a response`() = runTest {
+ val flow = flowOf(testDatum("1"), testDatum("2"), testDatum("3"))
+ val results = flow.processEach { it.ok() }
+
+ assertEquals(3, results.size)
+ results.forEachIndexed { i, r ->
+ assertIs(r)
+ assertEquals("${i + 1}", r.id)
+ }
+ }
+
+ @Test
+ fun `ok extension creates Ok response`() {
+ val datum = testDatum("x")
+ val response = datum.ok()
+ assertEquals(SinkResponse.Ok("x"), response)
+ }
+
+ @Test
+ fun `failure extension creates Failure response`() {
+ val datum = testDatum("y")
+ val response = datum.failure("oops")
+ assertEquals(SinkResponse.Failure("y", "oops"), response)
+ }
+
+ @Test
+ fun `fallback extension creates Fallback response`() {
+ val datum = testDatum("z")
+ val response = datum.fallback()
+ assertEquals(SinkResponse.Fallback("z"), response)
+ }
+
+ @Test
+ fun `valueAsString decodes ByteArray`() {
+ val datum = testDatum("1", "hello world")
+ assertEquals("hello world", datum.valueAsString())
+ }
+
+ @Test
+ fun `processEach with empty flow`() = runTest {
+ val results = flowOf().processEach { it.ok() }
+ assertEquals(0, results.size)
+ }
+}
diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapterTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapterTest.kt
new file mode 100644
index 00000000..713556ec
--- /dev/null
+++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/FlowAdapterTest.kt
@@ -0,0 +1,75 @@
+package io.numaproj.numaflow.kt.sinker
+
+import io.numaproj.numaflow.sinker.DatumIterator
+import io.numaproj.numaflow.sinker.SinkerTestKit
+import kotlinx.coroutines.flow.toList
+import kotlinx.coroutines.test.runTest
+import org.junit.jupiter.api.Test
+import java.time.Instant
+import kotlin.test.assertEquals
+
+class FlowAdapterTest {
+
+ @Test
+ fun `asFlow converts iterator elements to SinkDatum`() = runTest {
+ val now = Instant.now()
+ val iterator = SinkerTestKit.TestListIterator()
+ iterator.addDatum(
+ SinkerTestKit.TestDatum.builder()
+ .id("1")
+ .value("a".toByteArray())
+ .keys(arrayOf("k"))
+ .eventTime(now)
+ .watermark(now)
+ .headers(mapOf("h" to "v"))
+ .build()
+ )
+ iterator.addDatum(
+ SinkerTestKit.TestDatum.builder()
+ .id("2")
+ .value("b".toByteArray())
+ .keys(emptyArray())
+ .eventTime(now)
+ .watermark(now)
+ .headers(emptyMap())
+ .build()
+ )
+
+ val results = iterator.asFlow().toList()
+
+ assertEquals(2, results.size)
+ assertEquals("1", results[0].id)
+ assertEquals("a", String(results[0].value))
+ assertEquals(listOf("k"), results[0].keys)
+ assertEquals("2", results[1].id)
+ }
+
+ @Test
+ fun `asFlow handles empty iterator`() = runTest {
+ val iterator = SinkerTestKit.TestListIterator()
+
+ val results = iterator.asFlow().toList()
+
+ assertEquals(0, results.size)
+ }
+
+ @Test
+ fun `asFlow stops at null (EOF)`() = runTest {
+ val iterator = object : DatumIterator {
+ private var count = 0
+ override fun next() = if (count++ < 3) {
+ SinkerTestKit.TestDatum.builder()
+ .id("msg-$count")
+ .value(byteArrayOf())
+ .keys(emptyArray())
+ .eventTime(Instant.now())
+ .watermark(Instant.now())
+ .headers(emptyMap())
+ .build()
+ } else null
+ }
+
+ val results = iterator.asFlow().toList()
+ assertEquals(3, results.size)
+ }
+}
diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatumTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatumTest.kt
new file mode 100644
index 00000000..92382438
--- /dev/null
+++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkDatumTest.kt
@@ -0,0 +1,81 @@
+package io.numaproj.numaflow.kt.sinker
+
+import io.numaproj.numaflow.shared.UserMetadata
+import io.numaproj.numaflow.sinker.Datum
+import io.numaproj.numaflow.shared.SystemMetadata
+import org.junit.jupiter.api.Test
+import java.time.Instant
+import kotlin.test.assertEquals
+import kotlin.test.assertNotEquals
+
+class SinkDatumTest {
+
+ @Test
+ fun `from converts Java Datum correctly`() {
+ val now = Instant.now()
+ val javaDatum = object : Datum {
+ override fun getKeys() = arrayOf("key1", "key2")
+ override fun getValue() = "hello".toByteArray()
+ override fun getEventTime() = now
+ override fun getWatermark() = now
+ override fun getId() = "msg-1"
+ override fun getHeaders() = mapOf("h1" to "v1")
+ override fun getUserMetadata(): UserMetadata? = null
+ override fun getSystemMetadata(): SystemMetadata? = null
+ }
+
+ val datum = SinkDatum.from(javaDatum)
+
+ assertEquals("msg-1", datum.id)
+ assertEquals(listOf("key1", "key2"), datum.keys)
+ assertEquals("hello", String(datum.value))
+ assertEquals(now, datum.eventTime)
+ assertEquals(now, datum.watermark)
+ assertEquals(mapOf("h1" to "v1"), datum.headers)
+ }
+
+ @Test
+ fun `from handles null fields gracefully`() {
+ val javaDatum = object : Datum {
+ override fun getKeys(): Array? = null
+ override fun getValue(): ByteArray? = null
+ override fun getEventTime(): Instant? = null
+ override fun getWatermark(): Instant? = null
+ override fun getId(): String? = null
+ override fun getHeaders(): Map? = null
+ override fun getUserMetadata(): UserMetadata? = null
+ override fun getSystemMetadata(): SystemMetadata? = null
+ }
+
+ val datum = SinkDatum.from(javaDatum)
+
+ assertEquals("", datum.id)
+ assertEquals(emptyList(), datum.keys)
+ assertEquals(0, datum.value.size)
+ assertEquals(Instant.EPOCH, datum.eventTime)
+ }
+
+ @Test
+ fun `equals and hashCode handle ByteArray correctly`() {
+ val now = Instant.now()
+ val datum1 = SinkDatum(
+ id = "1", keys = listOf("k"), value = "abc".toByteArray(),
+ eventTime = now, watermark = now, headers = emptyMap(),
+ userMetadata = null, systemMetadata = null,
+ )
+ val datum2 = SinkDatum(
+ id = "1", keys = listOf("k"), value = "abc".toByteArray(),
+ eventTime = now, watermark = now, headers = emptyMap(),
+ userMetadata = null, systemMetadata = null,
+ )
+ val datum3 = SinkDatum(
+ id = "1", keys = listOf("k"), value = "xyz".toByteArray(),
+ eventTime = now, watermark = now, headers = emptyMap(),
+ userMetadata = null, systemMetadata = null,
+ )
+
+ assertEquals(datum1, datum2)
+ assertEquals(datum1.hashCode(), datum2.hashCode())
+ assertNotEquals(datum1, datum3)
+ }
+}
diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponseTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponseTest.kt
new file mode 100644
index 00000000..241384f8
--- /dev/null
+++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkResponseTest.kt
@@ -0,0 +1,98 @@
+package io.numaproj.numaflow.kt.sinker
+
+import org.junit.jupiter.api.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertIs
+import kotlin.test.assertNotEquals
+
+class SinkResponseTest {
+
+ @Test
+ fun `Ok variant properties`() {
+ val ok = SinkResponse.Ok("id-1")
+ assertEquals("id-1", ok.id)
+ assertIs(ok)
+ }
+
+ @Test
+ fun `Failure variant properties`() {
+ val fail = SinkResponse.Failure("id-2", "bad data")
+ assertEquals("id-2", fail.id)
+ assertEquals("bad data", fail.error)
+ }
+
+ @Test
+ fun `Fallback variant properties`() {
+ val fb = SinkResponse.Fallback("id-3")
+ assertEquals("id-3", fb.id)
+ }
+
+ @Test
+ fun `Serve variant equals handles ByteArray`() {
+ val s1 = SinkResponse.Serve("id-4", "data".toByteArray())
+ val s2 = SinkResponse.Serve("id-4", "data".toByteArray())
+ val s3 = SinkResponse.Serve("id-4", "other".toByteArray())
+ assertEquals(s1, s2)
+ assertEquals(s1.hashCode(), s2.hashCode())
+ assertNotEquals(s1, s3)
+ }
+
+ @Test
+ fun `OnSuccess variant with and without message`() {
+ val os1 = SinkResponse.OnSuccess("id-5")
+ assertEquals(null, os1.message)
+
+ val msg = SinkMessage(value = "v".toByteArray(), keys = listOf("k"))
+ val os2 = SinkResponse.OnSuccess("id-6", msg)
+ assertEquals(msg, os2.message)
+ }
+
+ @Test
+ fun `toJava round-trip for Ok`() {
+ val ok = SinkResponse.Ok("id-1")
+ val java = ok.toJava()
+ assertEquals("id-1", java.id)
+ assertEquals(true, java.success)
+ }
+
+ @Test
+ fun `toJava round-trip for Failure`() {
+ val fail = SinkResponse.Failure("id-2", "err")
+ val java = fail.toJava()
+ assertEquals("id-2", java.id)
+ assertEquals(false, java.success)
+ assertEquals("err", java.err)
+ }
+
+ @Test
+ fun `toJava round-trip for Fallback`() {
+ val fb = SinkResponse.Fallback("id-3")
+ val java = fb.toJava()
+ assertEquals(true, java.fallback)
+ }
+
+ @Test
+ fun `toJava round-trip for Serve`() {
+ val serve = SinkResponse.Serve("id-4", "data".toByteArray())
+ val java = serve.toJava()
+ assertEquals(true, java.serve)
+ assertEquals("data", String(java.serveResponse))
+ }
+
+ @Test
+ fun `toJava round-trip for OnSuccess`() {
+ val os = SinkResponse.OnSuccess("id-5")
+ val java = os.toJava()
+ assertEquals(true, java.onSuccess)
+ }
+
+ @Test
+ fun `toKotlin conversions`() {
+ assertEquals(SinkResponse.Ok("1"), io.numaproj.numaflow.sinker.Response.responseOK("1").toKotlin())
+ assertEquals(SinkResponse.Fallback("2"), io.numaproj.numaflow.sinker.Response.responseFallback("2").toKotlin())
+
+ val fail = io.numaproj.numaflow.sinker.Response.responseFailure("3", "oops").toKotlin()
+ assertIs(fail)
+ assertEquals("oops", fail.error)
+ }
+}
diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkServerTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkServerTest.kt
new file mode 100644
index 00000000..1172d05b
--- /dev/null
+++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkServerTest.kt
@@ -0,0 +1,52 @@
+package io.numaproj.numaflow.kt.sinker
+
+import kotlinx.coroutines.flow.toList
+import org.junit.jupiter.api.Test
+import kotlin.test.assertNotNull
+
+class SinkServerTest {
+
+ @Test
+ fun `sinkServer with handler creates server`() {
+ val server = sinkServer(handler = SinkHandler { datums ->
+ datums.toList().map { SinkResponse.Ok(it.id) }
+ })
+ assertNotNull(server)
+ }
+
+ @Test
+ fun `sinkServer with lambda creates server`() {
+ val server = sinkServer { datums ->
+ datums.toList().map { SinkResponse.Ok(it.id) }
+ }
+ assertNotNull(server)
+ }
+
+ @Test
+ fun `sinkServer with config creates server`() {
+ val server = sinkServer(
+ config = {
+ isLocal = true
+ port = 50052
+ maxMessageSize = 1024 * 1024 * 8
+ },
+ handler = SinkHandler { datums ->
+ datums.toList().map { SinkResponse.Ok(it.id) }
+ },
+ )
+ assertNotNull(server)
+ }
+
+ @Test
+ fun `SinkServerConfig toGrpcConfig applies values`() {
+ val cfg = SinkServerConfig().apply {
+ socketPath = "/tmp/test.sock"
+ maxMessageSize = 999
+ port = 12345
+ isLocal = true
+ infoFilePath = "/tmp/info"
+ }
+ val grpc = cfg.toGrpcConfig()
+ assertNotNull(grpc)
+ }
+}
diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKitTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKitTest.kt
new file mode 100644
index 00000000..63b44032
--- /dev/null
+++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkTestKitTest.kt
@@ -0,0 +1,78 @@
+package io.numaproj.numaflow.kt.sinker
+
+import kotlinx.coroutines.test.runTest
+import org.junit.jupiter.api.Test
+import java.time.Instant
+import kotlin.test.assertEquals
+import kotlin.test.assertIs
+
+class SinkTestKitTest {
+
+ @Test
+ fun `datum factory creates SinkDatum with defaults`() {
+ val datum = SinkTestKit.datum(id = "1")
+ assertEquals("1", datum.id)
+ assertEquals(emptyList(), datum.keys)
+ assertEquals(0, datum.value.size)
+ assertEquals(emptyMap(), datum.headers)
+ }
+
+ @Test
+ fun `datum factory creates SinkDatum with custom values`() {
+ val now = Instant.now()
+ val datum = SinkTestKit.datum(
+ id = "2",
+ value = "payload".toByteArray(),
+ keys = listOf("k1", "k2"),
+ eventTime = now,
+ watermark = now,
+ headers = mapOf("header" to "value"),
+ )
+ assertEquals("2", datum.id)
+ assertEquals(listOf("k1", "k2"), datum.keys)
+ assertEquals("payload", String(datum.value))
+ assertEquals(now, datum.eventTime)
+ assertEquals(mapOf("header" to "value"), datum.headers)
+ }
+
+ @Test
+ fun `test invokes handler directly with Flow`() = runTest {
+ val handler = SinkHandler { datums ->
+ datums.processEach { it.ok() }
+ }
+
+ val results = SinkTestKit.test(
+ handler,
+ listOf(
+ SinkTestKit.datum(id = "1", value = "hello".toByteArray()),
+ SinkTestKit.datum(id = "2", value = "world".toByteArray()),
+ ),
+ )
+
+ assertEquals(2, results.size)
+ assertEquals(SinkResponse.Ok("1"), results[0])
+ assertEquals(SinkResponse.Ok("2"), results[1])
+ }
+
+ @Test
+ fun `test handles mixed response types`() = runTest {
+ val handler = SinkHandler { datums ->
+ datums.processEach { datum ->
+ if (datum.valueAsString() == "good") datum.ok()
+ else datum.failure("bad data")
+ }
+ }
+
+ val results = SinkTestKit.test(
+ handler,
+ listOf(
+ SinkTestKit.datum(id = "1", value = "good".toByteArray()),
+ SinkTestKit.datum(id = "2", value = "bad".toByteArray()),
+ ),
+ )
+
+ assertIs(results[0])
+ assertIs(results[1])
+ assertEquals("bad data", (results[1] as SinkResponse.Failure).error)
+ }
+}
diff --git a/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridgeTest.kt b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridgeTest.kt
new file mode 100644
index 00000000..6d82b05c
--- /dev/null
+++ b/numaflow-kotlin/src/test/kotlin/io/numaproj/numaflow/kt/sinker/SinkerBridgeTest.kt
@@ -0,0 +1,74 @@
+package io.numaproj.numaflow.kt.sinker
+
+import io.numaproj.numaflow.sinker.DatumIterator
+import io.numaproj.numaflow.sinker.SinkerTestKit
+import kotlinx.coroutines.flow.toList
+import org.junit.jupiter.api.Test
+import java.time.Instant
+import kotlin.test.assertEquals
+
+class SinkerBridgeTest {
+
+ @Test
+ fun `bridge delegates to handler and returns correct ResponseList`() {
+ val handler = SinkHandler { datums ->
+ datums.toList().map { SinkResponse.Ok(it.id) }
+ }
+ val bridge = SinkerBridge(handler)
+
+ val iterator = SinkerTestKit.TestListIterator()
+ iterator.addDatum(
+ SinkerTestKit.TestDatum.builder()
+ .id("msg-1")
+ .value("hello".toByteArray())
+ .keys(arrayOf("k1"))
+ .eventTime(Instant.now())
+ .watermark(Instant.now())
+ .headers(emptyMap())
+ .build()
+ )
+ iterator.addDatum(
+ SinkerTestKit.TestDatum.builder()
+ .id("msg-2")
+ .value("world".toByteArray())
+ .keys(arrayOf("k2"))
+ .eventTime(Instant.now())
+ .watermark(Instant.now())
+ .headers(emptyMap())
+ .build()
+ )
+
+ val result = bridge.processMessages(iterator)
+
+ assertEquals(2, result.responses.size)
+ assertEquals("msg-1", result.responses[0].id)
+ assertEquals(true, result.responses[0].success)
+ assertEquals("msg-2", result.responses[1].id)
+ }
+
+ @Test
+ fun `bridge handles failure responses`() {
+ val handler = SinkHandler { datums ->
+ datums.toList().map { SinkResponse.Failure(it.id, "failed") }
+ }
+ val bridge = SinkerBridge(handler)
+
+ val iterator = SinkerTestKit.TestListIterator()
+ iterator.addDatum(
+ SinkerTestKit.TestDatum.builder()
+ .id("msg-1")
+ .value("data".toByteArray())
+ .keys(emptyArray())
+ .eventTime(Instant.now())
+ .watermark(Instant.now())
+ .headers(emptyMap())
+ .build()
+ )
+
+ val result = bridge.processMessages(iterator)
+
+ assertEquals(1, result.responses.size)
+ assertEquals(false, result.responses[0].success)
+ assertEquals("failed", result.responses[0].err)
+ }
+}
diff --git a/releases.md b/releases.md
index 481546b1..99e3abe0 100644
--- a/releases.md
+++ b/releases.md
@@ -1,6 +1,12 @@
# Release Guide
-This document explains the release process for the Java SDK. You can find the most recent version under [Github Releases](https://github.com/numaproj/numaflow-java/releases).
+This document explains the release process for the Java SDK and Kotlin SDK. You can find the most recent versions under [Github Releases](https://github.com/numaproj/numaflow-java/releases).
+
+The two SDKs are versioned and released independently. The Kotlin SDK depends on the Java SDK, so a Java SDK release may need to happen first if the Kotlin SDK needs new Java SDK features.
+
+---
+
+## Java SDK
### Before Release
@@ -8,7 +14,7 @@ This document explains the release process for the Java SDK. You can find the mo
For example, the version in the `README.md`, as well as the `pom.xml` in the root and example directories should be updated (for [reference
](https://github.com/numaproj/numaflow-java/pull/89/files#diff-9c5fb3d1b7e3b0f54bc5c4182965c4fe1f9023d449017cece3005d3f90e8e4d8)). The specified version should follow the [semantic versioning](https://semver.org/) specification
2. If version to be released has backwards incompatible changes, i.e. it does not support older versions of the Numaflow platform,
-you must update the `MINIMUM_NUMAFLOW_VERSION` constant in the `src/main/java/io/numaproj/numaflow/info/ServerInfo.java` file to the minimum Numaflow version
+you must update the `MINIMUM_NUMAFLOW_VERSION` constant in the `src/main/java/io/numaproj/numaflow/info/ServerInfo.java` file to the minimum Numaflow version
that is supported by your new SDK version
3. After making these changes, create a PR. Once merged, it will trigger the `Docker Publish` workflow, and should be included in the release.
As a result, the correct SDK version will always be printed in the server information logs, and the example images will
@@ -16,10 +22,10 @@ always be using the latest changes (due to pointing to the local maven repositor
### How to Release
-This can be done via the Github UI.
+This can be done via the Github UI.
1. In the `Releases` section of the Java SDK repo, click `Draft a new release`
2. Create a tag that has the same name as the version that you specified in the root
-`pom.xml`, prefix it with a `'v'`, and select it
+`pom.xml`, prefix it with a `'v'`, and select it (e.g. `v0.11.0`)
3. Make the title the same as the tag
4. Click `Generate release notes` so that all the changes made since the last release are documented. If there are any major features or breaking
changes that you would like to highlight as part of the release, add those to the description as well
@@ -28,4 +34,28 @@ changes that you would like to highlight as part of the release, add those to th
### After Release
-After your release, a Github Actions workflow, `Publish to Maven Central and Github Packages`, will be triggered. Monitor the workflow run and ensure that it succeeds.
+After your release, the `Publish Java SDK to Maven Central and Github Packages` workflow will be triggered. Monitor the workflow run and ensure that it succeeds.
+
+---
+
+## Kotlin SDK
+
+The Kotlin SDK lives in `numaflow-kotlin/` and has its own version in `numaflow-kotlin/pom.xml`. It depends on a published version of the Java SDK.
+
+### Before Release
+
+1. Update the version in `numaflow-kotlin/pom.xml`
+2. If needed, update the `numaflow-java` dependency version in `numaflow-kotlin/pom.xml` to the Java SDK version it should depend on
+3. Create a PR and merge
+
+### How to Release
+
+1. In the `Releases` section, click `Draft a new release`
+2. Create a tag prefixed with `kotlin-v` (e.g. `kotlin-v0.1.0`)
+3. Make the title the same as the tag
+4. Click `Generate release notes` and add any highlights
+5. Click `Publish release`
+
+### After Release
+
+The `Publish Kotlin SDK to Maven Central and Github Packages` workflow will be triggered. Monitor the workflow run and ensure that it succeeds.