Kafka Connect: Support VARIANT when record convert#15283
Kafka Connect: Support VARIANT when record convert#15283seokyun-ha-toss wants to merge 12 commits intoapache:mainfrom
Conversation
…ersion methods for nested structures
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
Outdated
Show resolved
Hide resolved
| assertThat(updateMap.get("st.ff").type()).isInstanceOf(DoubleType.class); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
I think it might be more maintainable to write tests against convertVariantValue instead of the static helpers.
Also it seems like there should be tests for convertVariantValue specifically?
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
Outdated
Show resolved
Hide resolved
|
Hi @seokyun-ha-toss, |
Hi @alexkot1394, thanks for reaching out. I stepped away for a bit to validate my changes in our production environment. I took a look at your PR as well. From what I see, it treats Variant as a string primitive, whereas I believe it should be handled recursively to properly construct the Variant structure. I’d prefer to continue working on this PR and incorporate the review comments as soon as possible. Thanks again! |
…Set for uniqueness
… for null, primitive types, lists, maps, and mixed types
|
Hello, @danielcweeks, @emkornfield, and @alexkot1394, I'm ready to get a review on this. I've tested this for a month in production, and it works well! I can query the output Iceberg tables with Variant columns using Spark and Snowflake. Please take a look at the latest updates. Thanks! |
Summary
Add support for converting arbitrary Java objects (e.g.
Map<String, Object>, lists, primitives) into Iceberg Variant type in the Kafka Connect RecordConverter. Nested maps and lists are converted recursively so that structures like{"user": {"name": "alice", "address": {"city": "Seoul", "zip": "12345"}}}are correctly represented as a single Variant.Motivation
Kafka Connect payloads often come as schema-less or JSON-like maps. To write them into Iceberg tables with a Variant column, the connector must convert these Java objects into the Variant format (metadata + value) and support nested maps/arrays without losing structure or key names.
Behaviour
{"a": 1, "b": "x"}["a", "b"], one ShreddedObject with two fields.{"user": {"name": "alice", "address": {"city": "Seoul", "zip": "12345"}}}Variant.from(ByteBuffer)where appropriate.Relates
Thanks, Good Day!