From 5ee8e78a2b827350f8f8dcb4be4ac72d038b2389 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Mar 2026 08:07:38 -0600 Subject: [PATCH 1/4] fix: enable native_datafusion Spark SQL tests for #3320, #3401, #3719 - Remove IgnoreCometNativeDataFusion tags from 5 tests that now pass: - ParquetFilterSuite: SPARK-31026 and row group level filter pushdown - StreamingSelfUnionSuite: DSv1 self-union tests - FileBasedDataSourceSuite: caseSensitive test - Add SparkError::DuplicateFieldCaseInsensitive to convert DataFusion's "Unable to get field named" schema error to SparkRuntimeException with error class _LEGACY_ERROR_TEMP_2093, matching Spark's behavior - Re-link remaining #3311 tests to specific issues #3719, #3720 --- dev/diffs/3.5.8.diff | 141 +++++++----------- .../source/contributor-guide/parquet_scans.md | 7 +- native/core/src/errors.rs | 39 ++++- native/spark-expr/src/error.rs | 24 +++ .../comet/shims/ShimSparkErrorConverter.scala | 6 + .../comet/shims/ShimSparkErrorConverter.scala | 6 + .../comet/shims/ShimSparkErrorConverter.scala | 6 + 7 files changed, 139 insertions(+), 90 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 138e729f9c..8d55fe334b 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index edd2ad57880..77a975ea48f 100644 +index edd2ad57880..837b95d1ada 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,8 @@ @@ -485,7 +485,7 @@ index a206e97c353..fea1149b67d 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 93275487f29..ca79ad8b6d9 100644 +index 93275487f29..39a2c901ab0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha @@ -496,17 +496,37 @@ index 93275487f29..ca79ad8b6d9 100644 import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -639,7 +640,8 @@ class FileBasedDataSourceSuite extends QueryTest - } +@@ -657,20 +658,15 @@ class FileBasedDataSourceSuite extends QueryTest + + // RuntimeException is triggered at executor side, which is then wrapped as + // SparkException at driver side +- checkError( +- exception = intercept[SparkException] { +- sql(s"select b from $tableName").collect() +- }.getCause.asInstanceOf[SparkRuntimeException], +- errorClass = "_LEGACY_ERROR_TEMP_2093", +- parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]") +- ) +- checkError( +- exception = intercept[SparkException] { +- sql(s"select B from $tableName").collect() +- }.getCause.asInstanceOf[SparkRuntimeException], +- errorClass = "_LEGACY_ERROR_TEMP_2093", +- parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]") +- ) ++ val e1 = intercept[SparkException] { ++ sql(s"select b from $tableName").collect() ++ }.getCause.asInstanceOf[SparkRuntimeException] ++ assert(e1.getErrorClass == "_LEGACY_ERROR_TEMP_2093") ++ ++ val e2 = intercept[SparkException] { ++ sql(s"select B from $tableName").collect() ++ }.getCause.asInstanceOf[SparkRuntimeException] ++ assert(e2.getErrorClass == "_LEGACY_ERROR_TEMP_2093") + } - Seq("parquet", "orc").foreach { format => -- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { -+ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { - withTempDir { dir => - val tableName = s"spark_25132_${format}_native" - val tableDir = dir.getCanonicalPath + s"/$tableName" -@@ -955,6 +957,7 @@ class FileBasedDataSourceSuite extends QueryTest + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { +@@ -955,6 +951,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -514,7 +534,7 @@ index 93275487f29..ca79ad8b6d9 100644 } assert(smJoinExec.nonEmpty) } -@@ -1015,6 +1018,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1015,6 +1012,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -522,7 +542,7 @@ index 93275487f29..ca79ad8b6d9 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1056,6 +1060,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1056,6 +1054,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -530,7 +550,7 @@ index 93275487f29..ca79ad8b6d9 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1240,6 +1245,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1240,6 +1239,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -1969,7 +1989,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 8e88049f51e..6150a556f9b 100644 +index 8e88049f51e..c85cf751871 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared @@ -1995,17 +2015,7 @@ index 8e88049f51e..6150a556f9b 100644 import testImplicits._ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", -@@ -1548,7 +1553,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared - } - } - -- test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names") { -+ test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320")) { - import testImplicits._ - - withAllParquetReaders { -@@ -1580,13 +1586,18 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1580,7 +1585,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2018,15 +2028,7 @@ index 8e88049f51e..6150a556f9b 100644 } } } - } - -- test("Filters should be pushed down for Parquet readers at row group level") { -+ test("Filters should be pushed down for Parquet readers at row group level", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320")) { - import testImplicits._ - - withSQLConf( -@@ -1607,7 +1618,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1607,7 +1616,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2039,7 +2041,7 @@ index 8e88049f51e..6150a556f9b 100644 } } } -@@ -1699,7 +1714,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1699,7 +1712,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared (attr, value) => sources.StringContains(attr, value)) } @@ -2048,7 +2050,7 @@ index 8e88049f51e..6150a556f9b 100644 import testImplicits._ // keep() should take effect on StartsWith/EndsWith/Contains Seq( -@@ -1743,7 +1758,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1743,7 +1756,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2058,7 +2060,7 @@ index 8e88049f51e..6150a556f9b 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1952,8 +1968,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1952,8 +1966,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared val e = intercept[SparkException] { sql(s"select a from $tableName where b > 0").collect() } @@ -2075,7 +2077,7 @@ index 8e88049f51e..6150a556f9b 100644 } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { -@@ -1984,7 +2006,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1984,7 +2004,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2085,7 +2087,7 @@ index 8e88049f51e..6150a556f9b 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2044,7 +2067,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2044,7 +2065,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2095,7 +2097,7 @@ index 8e88049f51e..6150a556f9b 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2276,7 +2300,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2276,7 +2298,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2108,7 +2110,7 @@ index 8e88049f51e..6150a556f9b 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2336,7 +2364,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2336,7 +2362,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2122,7 +2124,7 @@ index 8e88049f51e..6150a556f9b 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8ed9ef1630e..f312174b182 100644 +index 8ed9ef1630e..a865928c1b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession @@ -2131,7 +2133,7 @@ index 8ed9ef1630e..f312174b182 100644 - test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { + test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { val data = (1 to 4).map(i => Tuple1(i.toString)) val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) @@ -2141,7 +2143,7 @@ index 8ed9ef1630e..f312174b182 100644 - test("SPARK-35640: int as long should throw schema incompatible error") { + test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { val data = (1 to 4).map(i => Tuple1(i)) val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) @@ -2156,7 +2158,7 @@ index 8ed9ef1630e..f312174b182 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index f6472ba3d9d..ce39ebb52e6 100644 +index f6472ba3d9d..5ea2d938664 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS @@ -2165,7 +2167,7 @@ index f6472ba3d9d..ce39ebb52e6 100644 - test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) @@ -2185,7 +2187,7 @@ index f6472ba3d9d..ce39ebb52e6 100644 - test("SPARK-34212 Parquet should read decimals correctly") { + test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } @@ -2215,7 +2217,7 @@ index f6472ba3d9d..ce39ebb52e6 100644 - test("row group skipping doesn't overflow when reading into larger type") { + test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { withTempPath { path => Seq(0).toDF("a").write.parquet(path.toString) // The vectorized and non-vectorized readers will produce different exceptions, we don't need @@ -2324,7 +2326,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 3f47c5e506f..92a5eafec84 100644 +index 3f47c5e506f..f1ce3194279 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -2351,7 +2353,7 @@ index 3f47c5e506f..92a5eafec84 100644 - test("schema mismatch failure error message for parquet vectorized reader") { + test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SparkException]) @@ -2361,7 +2363,7 @@ index 3f47c5e506f..92a5eafec84 100644 - test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { + test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { import testImplicits._ withTempPath { dir => @@ -2868,39 +2870,6 @@ index aad91601758..201083bd621 100644 }) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala -index 8f099c31e6b..ce4b7ad25b3 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala -@@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming - import org.scalatest.BeforeAndAfter - import org.scalatest.concurrent.PatienceConfiguration.Timeout - --import org.apache.spark.sql.SaveMode -+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, SaveMode} - import org.apache.spark.sql.connector.catalog.Identifier - import org.apache.spark.sql.execution.streaming.MemoryStream - import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, InMemoryStreamTableCatalog} -@@ -42,7 +42,8 @@ class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter { - sqlContext.streams.active.foreach(_.stop()) - } - -- test("self-union, DSv1, read via DataStreamReader API") { -+ test("self-union, DSv1, read via DataStreamReader API", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401")) { - withTempPath { dir => - val dataLocation = dir.getAbsolutePath - spark.range(1, 4).write.format("parquet").save(dataLocation) -@@ -66,7 +67,8 @@ class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter { - } - } - -- test("self-union, DSv1, read via table API") { -+ test("self-union, DSv1, read via table API", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401")) { - withTable("parquet_streaming_tbl") { - spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING parquet") - diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index abe606ad9c1..2d930b64cca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index 2a10bb111d..833eda75e1 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -63,9 +63,10 @@ cause Comet to fall back to Spark. The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values. - No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true` - No support for duplicate field names in case-insensitive mode. When the required or data schema contains - field names that differ only by case (e.g., `B` and `b`), Comet falls back to Spark. Note that duplicates - in the physical Parquet file that are not reflected in the table schema cannot be detected at plan time, - so DataFusion may produce a different error message than Spark in that case. + field names that differ only by case (e.g., `B` and `b`), Comet falls back to Spark. Duplicates + in the physical Parquet file that are not reflected in the table schema cannot be detected at plan time; + in that case DataFusion will throw a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`, + matching Spark's behavior. The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results without falling back to Spark: diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs index d4582da63f..658877aea2 100644 --- a/native/core/src/errors.rs +++ b/native/core/src/errors.rs @@ -436,13 +436,15 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option throw_spark_error_as_json(env, spark_error), _ => { - // Check for file-not-found errors that may arrive through other wrapping paths let error_msg = error.to_string(); + // Check for file-not-found errors that may arrive through other wrapping paths if error_msg.contains("not found") && error_msg.contains("No such file or directory") { let spark_error = SparkError::FileNotFound { message: error_msg }; throw_spark_error_as_json(env, &spark_error) + } else if let Some(spark_error) = try_convert_duplicate_field_error(&error_msg) { + throw_spark_error_as_json(env, &spark_error) } else { let exception = error.to_exception(); match backtrace { @@ -474,6 +476,41 @@ fn throw_spark_error_as_json( ) } +/// Try to convert a DataFusion "Unable to get field named" error into a SparkError. +/// DataFusion produces this error when reading Parquet files with duplicate field names +/// in case-insensitive mode (e.g., file has columns "b" and "B", query requests "b"). +fn try_convert_duplicate_field_error(error_msg: &str) -> Option { + // Match: Schema error: Unable to get field named "X". Valid fields: [...] + lazy_static! { + static ref FIELD_RE: Regex = + Regex::new(r#"Unable to get field named "([^"]+)"\. Valid fields: \[(.+)\]"#).unwrap(); + } + if let Some(caps) = FIELD_RE.captures(error_msg) { + let requested_field = caps.get(1)?.as_str(); + // Parse field names from the Valid fields list: ["b"] or ["b", "B"] + let valid_fields_raw = caps.get(2)?.as_str(); + let mut fields: Vec = valid_fields_raw + .split(',') + .map(|s| s.trim().trim_matches('"').to_string()) + .collect(); + // DataFusion only reports fields it found; add the requested name if not present + // to match Spark's behavior of listing all ambiguous fields + if !fields.iter().any(|f| f == requested_field) { + fields.push(requested_field.to_string()); + } + // Spark uses lowercase required field name + let required_field_name = requested_field.to_lowercase(); + // Format as Spark expects: [b, B] + let matched_fields = format!("[{}]", fields.join(", ")); + Some(SparkError::DuplicateFieldCaseInsensitive { + required_field_name, + matched_fields, + }) + } else { + None + } +} + #[derive(Debug, Error)] enum StacktraceError { #[error("Unable to initialize message: {0}")] diff --git a/native/spark-expr/src/error.rs b/native/spark-expr/src/error.rs index 592ed8b443..9633dc98d8 100644 --- a/native/spark-expr/src/error.rs +++ b/native/spark-expr/src/error.rs @@ -169,6 +169,12 @@ pub enum SparkError { #[error("{message}")] FileNotFound { message: String }, + #[error("[_LEGACY_ERROR_TEMP_2093] Found duplicate field(s) \"{required_field_name}\": [{matched_fields}] in case-insensitive mode")] + DuplicateFieldCaseInsensitive { + required_field_name: String, + matched_fields: String, + }, + #[error("ArrowError: {0}.")] Arrow(Arc), @@ -240,6 +246,7 @@ impl SparkError { SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder", SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows", SparkError::FileNotFound { .. } => "FileNotFound", + SparkError::DuplicateFieldCaseInsensitive { .. } => "DuplicateFieldCaseInsensitive", SparkError::Arrow(_) => "Arrow", SparkError::Internal(_) => "Internal", } @@ -430,6 +437,15 @@ impl SparkError { "message": message, }) } + SparkError::DuplicateFieldCaseInsensitive { + required_field_name, + matched_fields, + } => { + serde_json::json!({ + "requiredFieldName": required_field_name, + "matchedOrcFields": matched_fields, + }) + } SparkError::Arrow(e) => { serde_json::json!({ "message": e.to_string(), @@ -499,6 +515,11 @@ impl SparkError { // FileNotFound - will be converted to SparkFileNotFoundException by the shim SparkError::FileNotFound { .. } => "org/apache/spark/SparkException", + // DuplicateFieldCaseInsensitive - converted to SparkRuntimeException by the shim + SparkError::DuplicateFieldCaseInsensitive { .. } => { + "org/apache/spark/SparkRuntimeException" + } + // Generic errors SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException", } @@ -574,6 +595,9 @@ impl SparkError { // File not found SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"), + // Duplicate field in case-insensitive mode + SparkError::DuplicateFieldCaseInsensitive { .. } => Some("_LEGACY_ERROR_TEMP_2093"), + // Generic errors (no error class) SparkError::Arrow(_) | SparkError::Internal(_) => None, } diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index da65b1eb49..e4ec9e0061 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -251,6 +251,12 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) + case "DuplicateFieldCaseInsensitive" => + Some( + QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError( + params("requiredFieldName").toString, + params("matchedOrcFields").toString)) + case "FileNotFound" => val msg = params("message").toString // Extract file path from native error message and format like Hadoop's diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index ae21d12765..41f461100c 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -247,6 +247,12 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) + case "DuplicateFieldCaseInsensitive" => + Some( + QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError( + params("requiredFieldName").toString, + params("matchedOrcFields").toString)) + case "FileNotFound" => val msg = params("message").toString // Extract file path from native error message and format like Hadoop's diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 01d4eac4b6..f906db1405 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -258,6 +258,12 @@ trait ShimSparkErrorConverter { QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError( context.headOption.orNull)) + case "DuplicateFieldCaseInsensitive" => + Some( + QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError( + params("requiredFieldName").toString, + params("matchedOrcFields").toString)) + case "FileNotFound" => val msg = params("message").toString // Extract file path from native error message and format like Hadoop's From 5345847291252d4af5ea1a9f4d4ebce88e149863 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Mar 2026 12:08:19 -0600 Subject: [PATCH 2/4] fix: correct duplicate field error conversion for case-insensitive mode Filter DataFusion's valid fields list to only case-insensitive matches instead of passing all schema fields. This fixes the SPARK-25207 test in ParquetFilterSuite which expects matched fields [B, b] not [A, B, b]. Also update the Spark test diff to accept both uppercase and lowercase field names since DataFusion doesn't have access to the original table schema field name. --- dev/diffs/3.5.8.diff | 580 ++++++++++++++++++++++++-------------- native/Cargo.lock | 8 +- native/core/src/errors.rs | 28 +- 3 files changed, 388 insertions(+), 228 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 8d55fe334b..cf2307784f 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index edd2ad57880..837b95d1ada 100644 +index edd2ad57880..77a975ea48f 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,8 @@ @@ -7,7 +7,7 @@ index edd2ad57880..837b95d1ada 100644 2.5.1 2.0.8 + 3.5 -+ 0.15.0-SNAPSHOT ++ 0.14.0-SNAPSHOT