Skip to content

Kafka Connect: Support VARIANT when record convert#15283

Open
seokyun-ha-toss wants to merge 12 commits intoapache:mainfrom
seokyun-ha-toss:support-variant-for-sink-connector
Open

Kafka Connect: Support VARIANT when record convert#15283
seokyun-ha-toss wants to merge 12 commits intoapache:mainfrom
seokyun-ha-toss:support-variant-for-sink-connector

Conversation

@seokyun-ha-toss
Copy link

Summary

Add support for converting arbitrary Java objects (e.g. Map<String, Object>, lists, primitives) into Iceberg Variant type in the Kafka Connect RecordConverter. Nested maps and lists are converted recursively so that structures like {"user": {"name": "alice", "address": {"city": "Seoul", "zip": "12345"}}} are correctly represented as a single Variant.

Motivation

Kafka Connect payloads often come as schema-less or JSON-like maps. To write them into Iceberg tables with a Variant column, the connector must convert these Java objects into the Variant format (metadata + value) and support nested maps/arrays without losing structure or key names.

Behaviour

Input Result
Primitives (String, int, long, boolean, etc.) Single metadata (empty) + corresponding Variant primitive.
Flat map e.g. {"a": 1, "b": "x"} One metadata with keys ["a", "b"], one ShreddedObject with two fields.
Nested map e.g. {"user": {"name": "alice", "address": {"city": "Seoul", "zip": "12345"}}} One shared metadata for all keys; root and nested objects as ShreddedObjects with consistent field IDs.
Lists Converted to VariantArray with elements converted recursively.
Already Variant / ByteBuffer Pass-through or Variant.from(ByteBuffer) where appropriate.

Relates

Thanks, Good Day!

assertThat(updateMap.get("st.ff").type()).isInstanceOf(DoubleType.class);
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be more maintainable to write tests against convertVariantValue instead of the static helpers.

Also it seems like there should be tests for convertVariantValue specifically?

@alexkot1394
Copy link

Hi @seokyun-ha-toss,
I've opened a PR that's trying to solve the same issue: #15498
I'm happy to help with your PR to get this change into main. Let me know if you want to collaborate on your PR or where you're at with the review comments above.

@seokyun-ha-toss
Copy link
Author

Hi @seokyun-ha-toss, I've opened a PR that's trying to solve the same issue: #15498 I'm happy to help with your PR to get this change into main. Let me know if you want to collaborate on your PR or where you're at with the review comments above.

Hi @alexkot1394, thanks for reaching out.

I stepped away for a bit to validate my changes in our production environment.

I took a look at your PR as well. From what I see, it treats Variant as a string primitive, whereas I believe it should be handled recursively to properly construct the Variant structure.

I’d prefer to continue working on this PR and incorporate the review comments as soon as possible.

Thanks again!

@seokyun-ha-toss
Copy link
Author

Hello, @danielcweeks, @emkornfield, and @alexkot1394, I'm ready to get a review on this. I've tested this for a month in production, and it works well! I can query the output Iceberg tables with Variant columns using Spark and Snowflake.

Please take a look at the latest updates. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants