diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 138e729f9c..db495f1e23 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index edd2ad57880..77a975ea48f 100644 +index edd2ad57880..837b95d1ada 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,8 @@ @@ -93,23 +93,22 @@ index 27ae10b3d59..78e69902dfd 100644 + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala -index db587dd9868..33802f29253 100644 +index db587dd9868..aac7295a53d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} ++import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec -@@ -67,6 +68,8 @@ private[execution] object SparkPlanInfo { +@@ -67,6 +68,7 @@ private[execution] object SparkPlanInfo { // dump the file scan metadata (e.g file path) to event log val metadata = plan match { case fileScan: FileSourceScanExec => fileScan.metadata + case cometScan: CometScanExec => cometScan.metadata -+ case nativeScan: CometNativeScanExec => nativeScan.metadata case _ => Map[String, String]() } new SparkPlanInfo( @@ -239,6 +238,20 @@ index e5494726695..00937f025c2 100644 } test("A cached table preserves the partitioning and ordering of its cached SparkPlan") { +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +index 9e8d77c53f3..855e3ada7d1 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +@@ -790,7 +790,8 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession { + } + } + +- test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") { ++ test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { + withTempPath { dir => + val data = sparkContext.parallelize(0 to 10).toDF("id") + data.write.parquet(dir.getCanonicalPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 6f3090d8908..c08a60fb0c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -397,14 +410,14 @@ index c4fb4fa943c..a04b23870a8 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..4acdf7e9cfb 100644 +index f33432ddb6f..42eb9fd1cb7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._ import org.apache.spark.sql.catalyst.plans.ExistenceJoin -+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} ++import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, InMemoryTableWithV2FilterCatalog} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ @@ -448,22 +461,40 @@ index f33432ddb6f..4acdf7e9cfb 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1729,6 +1736,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1698,7 +1705,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat + * Check the static scan metrics with and without DPP + */ + test("static scan metrics", +- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { ++ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"), ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { +@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) + case s: CometScanExec => -+ s.output.exists(_.exists(_.argString(maxFields = 100).contains("fid"))) -+ case s: CometNativeScanExec => + s.output.exists(_.exists(_.argString(maxFields = 100).contains("fid"))) case _ => false } assert(scanOption.isDefined) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala -index a206e97c353..fea1149b67d 100644 +index a206e97c353..79813d8e259 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala -@@ -467,7 +467,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite +@@ -280,7 +280,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite + } + } + +- test("explain formatted - check presence of subquery in case of DPP") { ++ test("explain formatted - check presence of subquery in case of DPP", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) { + withTable("df1", "df2") { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", +@@ -467,7 +468,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } @@ -473,7 +504,7 @@ index a206e97c353..fea1149b67d 100644 withTempDir { dir => Seq("parquet", "orc", "csv", "json").foreach { fmt => val basePath = dir.getCanonicalPath + "/" + fmt -@@ -545,7 +546,9 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite +@@ -545,7 +547,9 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } @@ -485,10 +516,18 @@ index a206e97c353..fea1149b67d 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 93275487f29..ca79ad8b6d9 100644 +index 93275487f29..510e3087e0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha +@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption} + + import scala.collection.mutable + ++import org.apache.comet.CometConf + import org.apache.hadoop.conf.Configuration + import org.apache.hadoop.fs.{LocalFileSystem, Path} + +@@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.catalyst.types.DataTypeUtils @@ -496,7 +535,16 @@ index 93275487f29..ca79ad8b6d9 100644 import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -639,7 +640,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest + case "" => "_LEGACY_ERROR_TEMP_2062" + case _ => "_LEGACY_ERROR_TEMP_2055" + } ++ // native_datafusion Parquet scan cannot throw a SparkFileNotFoundException ++ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + checkErrorMatchPVals( + exception = intercept[SparkException] { + testIgnoreMissingFiles(options) +@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest } Seq("parquet", "orc").foreach { format => @@ -506,7 +554,7 @@ index 93275487f29..ca79ad8b6d9 100644 withTempDir { dir => val tableName = s"spark_25132_${format}_native" val tableDir = dir.getCanonicalPath + s"/$tableName" -@@ -955,6 +957,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -514,7 +562,7 @@ index 93275487f29..ca79ad8b6d9 100644 } assert(smJoinExec.nonEmpty) } -@@ -1015,6 +1018,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -522,7 +570,7 @@ index 93275487f29..ca79ad8b6d9 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1056,6 +1060,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -530,7 +578,7 @@ index 93275487f29..ca79ad8b6d9 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1240,6 +1245,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -930,73 +978,6 @@ index 3cf2bfd17ab..49728c35c42 100644 withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", SQLConf.ANSI_ENABLED.key -> "true") { withTable("t") { -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -index fa1a64460fc..134f0db1fb8 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -@@ -17,6 +17,8 @@ - - package org.apache.spark.sql - -+import org.apache.comet.CometConf -+ - import org.apache.spark.{SPARK_DOC_ROOT, SparkIllegalArgumentException, SparkRuntimeException} - import org.apache.spark.sql.catalyst.expressions.Cast._ - import org.apache.spark.sql.execution.FormattedMode -@@ -178,29 +180,31 @@ class StringFunctionsSuite extends QueryTest with SharedSparkSession { - } - - test("string regex_replace / regex_extract") { -- val df = Seq( -- ("100-200", "(\\d+)-(\\d+)", "300"), -- ("100-200", "(\\d+)-(\\d+)", "400"), -- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c") -+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") { -+ val df = Seq( -+ ("100-200", "(\\d+)-(\\d+)", "300"), -+ ("100-200", "(\\d+)-(\\d+)", "400"), -+ ("100-200", "(\\d+)", "400")).toDF("a", "b", "c") - -- checkAnswer( -- df.select( -- regexp_replace($"a", "(\\d+)", "num"), -- regexp_replace($"a", $"b", $"c"), -- regexp_extract($"a", "(\\d+)-(\\d+)", 1)), -- Row("num-num", "300", "100") :: Row("num-num", "400", "100") :: -- Row("num-num", "400-400", "100") :: Nil) -- -- // for testing the mutable state of the expression in code gen. -- // This is a hack way to enable the codegen, thus the codegen is enable by default, -- // it will still use the interpretProjection if projection followed by a LocalRelation, -- // hence we add a filter operator. -- // See the optimizer rule `ConvertToLocalRelation` -- checkAnswer( -- df.filter("isnotnull(a)").selectExpr( -- "regexp_replace(a, b, c)", -- "regexp_extract(a, b, 1)"), -- Row("300", "100") :: Row("400", "100") :: Row("400-400", "100") :: Nil) -+ checkAnswer( -+ df.select( -+ regexp_replace($"a", "(\\d+)", "num"), -+ regexp_replace($"a", $"b", $"c"), -+ regexp_extract($"a", "(\\d+)-(\\d+)", 1)), -+ Row("num-num", "300", "100") :: Row("num-num", "400", "100") :: -+ Row("num-num", "400-400", "100") :: Nil) -+ -+ // for testing the mutable state of the expression in code gen. -+ // This is a hack way to enable the codegen, thus the codegen is enable by default, -+ // it will still use the interpretProjection if projection followed by a LocalRelation, -+ // hence we add a filter operator. -+ // See the optimizer rule `ConvertToLocalRelation` -+ checkAnswer( -+ df.filter("isnotnull(a)").selectExpr( -+ "regexp_replace(a, b, c)", -+ "regexp_extract(a, b, 1)"), -+ Row("300", "100") :: Row("400", "100") :: Row("400-400", "100") :: Nil) -+ } - } - - test("non-matching optional group") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 04702201f82..5ee11f83ecf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1036,6 +1017,20 @@ index 04702201f82..5ee11f83ecf 100644 } assert(exchanges.size === 1) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +index 9f8e979e3fb..3bc9dab8023 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +@@ -87,7 +87,8 @@ class UDFSuite extends QueryTest with SharedSparkSession { + spark.catalog.dropTempView("tmp_table") + } + +- test("SPARK-8005 input_file_name") { ++ test("SPARK-8005 input_file_name", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { + withTempPath { dir => + val data = sparkContext.parallelize(0 to 10, 2).toDF("id") + data.write.parquet(dir.getCanonicalPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index d269290e616..13726a31e07 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -1100,18 +1095,31 @@ index d269290e616..13726a31e07 100644 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -index cfc8b2cc845..c4be7eb3731 100644 +index cfc8b2cc845..b7c234e1437 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer +@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector + import scala.collection.mutable.ArrayBuffer + import org.apache.spark.SparkConf - import org.apache.spark.sql.{AnalysisException, QueryTest} +-import org.apache.spark.sql.{AnalysisException, QueryTest} ++import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { +@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { + } + } + +- test("Fallback Parquet V2 to V1") { ++ test("Fallback Parquet V2 to V1", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { + Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { format => + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) { + val commands = ArrayBuffer.empty[(String, LogicalPlan)] +@@ -184,7 +186,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { val df = spark.read.format(format).load(path.getCanonicalPath) checkAnswer(df, inputData.toDF()) assert( @@ -1375,6 +1383,28 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +index a1147c16cc8..c7a29496328 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution + + import org.apache.spark.{SparkArithmeticException, SparkException, SparkFileNotFoundException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.TableIdentifier + import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide} + import org.apache.spark.sql.catalyst.parser.ParseException +@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { + } + } + +- test("alter temporary view should follow current storeAnalyzedPlanForView config") { ++ test("alter temporary view should follow current storeAnalyzedPlanForView config", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) { + withTable("t") { + Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") + withView("v1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index eec396b2e39..bf3f1c769d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala @@ -1969,7 +1999,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 8e88049f51e..6150a556f9b 100644 +index 8e88049f51e..b713ccddfcb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared @@ -2058,7 +2088,7 @@ index 8e88049f51e..6150a556f9b 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1952,8 +1968,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1952,8 +1968,17 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared val e = intercept[SparkException] { sql(s"select a from $tableName where b > 0").collect() } @@ -2066,16 +2096,19 @@ index 8e88049f51e..6150a556f9b 100644 - """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) + assert(e.getCause.isInstanceOf[RuntimeException]) + val msg = e.getCause.getMessage -+ // native_datafusion produces a different error message for duplicate fields ++ // native_datafusion converts DataFusion's "Unable to get field named" error ++ // to _LEGACY_ERROR_TEMP_2093 but with a lowercase field name ("b" vs "B") ++ // because DataFusion resolves field names case-insensitively + assert( + msg.contains( + """Found duplicate field(s) "B": [B, b] in case-insensitive mode""") || -+ msg.contains("Unable to get field named"), ++ msg.contains( ++ """Found duplicate field(s) "b": [B, b] in case-insensitive mode"""), + s"Unexpected error message: $msg") } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { -@@ -1984,7 +2006,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1984,7 +2009,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2085,7 +2118,7 @@ index 8e88049f51e..6150a556f9b 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2044,7 +2067,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2044,7 +2070,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2095,7 +2128,7 @@ index 8e88049f51e..6150a556f9b 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2276,7 +2300,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2276,7 +2303,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2108,7 +2141,7 @@ index 8e88049f51e..6150a556f9b 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2336,7 +2364,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2336,7 +2367,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2122,7 +2155,7 @@ index 8e88049f51e..6150a556f9b 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8ed9ef1630e..f312174b182 100644 +index 8ed9ef1630e..a865928c1b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession @@ -2131,7 +2164,7 @@ index 8ed9ef1630e..f312174b182 100644 - test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { + test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { val data = (1 to 4).map(i => Tuple1(i.toString)) val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) @@ -2141,7 +2174,7 @@ index 8ed9ef1630e..f312174b182 100644 - test("SPARK-35640: int as long should throw schema incompatible error") { + test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { val data = (1 to 4).map(i => Tuple1(i)) val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) @@ -2156,7 +2189,7 @@ index 8ed9ef1630e..f312174b182 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index f6472ba3d9d..ce39ebb52e6 100644 +index f6472ba3d9d..7f00caf5063 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS @@ -2165,7 +2198,7 @@ index f6472ba3d9d..ce39ebb52e6 100644 - test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) @@ -2215,7 +2248,7 @@ index f6472ba3d9d..ce39ebb52e6 100644 - test("row group skipping doesn't overflow when reading into larger type") { + test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { withTempPath { path => Seq(0).toDF("a").write.parquet(path.toString) // The vectorized and non-vectorized readers will produce different exceptions, we don't need @@ -2414,32 +2447,42 @@ index 5cdbdc27b32..307fba16578 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..b18a5bea944 100644 +index 0ab8691801d..7b81f3a8f6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -@@ -18,6 +18,7 @@ +@@ -17,7 +17,9 @@ + package org.apache.spark.sql.execution.python ++import org.apache.spark.sql.IgnoreCometNativeDataFusion import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit} +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { + assert(arrowEvalNodes.size == 2) + } + +- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { ++ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { f => + spark.range(10).select($"id".as("a"), $"id".as("b")) +@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan + case scan: CometScanExec => scan -+ case scan: CometNativeScanExec => scan } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan + case scan: CometScanExec => scan -+ case scan: CometNativeScanExec => scan } assert(scanNodes.length == 1) // $"a" is not null and $"a" > 1 @@ -2448,14 +2491,13 @@ index 0ab8691801d..b18a5bea944 100644 + val dataFilters = scanNodes.head match { + case scan: FileSourceScanExec => scan.dataFilters + case scan: CometScanExec => scan.dataFilters -+ case scan: CometNativeScanExec => scan.dataFilters + } + assert(dataFilters.length == 2) + assert(dataFilters.flatMap(_.references.map(_.name)).distinct == Seq("a")) } } } -@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -2463,7 +2505,7 @@ index 0ab8691801d..b18a5bea944 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -2488,7 +2530,7 @@ index d083cac48ff..3c11bcde807 100644 import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala -index 746f289c393..7a6a88a9fce 100644 +index 746f289c393..5b9e31c1fa6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -19,16 +19,19 @@ package org.apache.spark.sql.sources @@ -2513,7 +2555,7 @@ index 746f289c393..7a6a88a9fce 100644 import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -@@ -102,12 +105,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -102,12 +105,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } } @@ -2523,7 +2565,6 @@ index 746f289c393..7a6a88a9fce 100644 + val fileScan = collect(plan) { + case f: FileSourceScanExec => f + case f: CometScanExec => f -+ case f: CometNativeScanExec => f + } assert(fileScan.nonEmpty, plan) fileScan.head @@ -2532,13 +2573,12 @@ index 746f289c393..7a6a88a9fce 100644 + private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) match { + case fs: FileSourceScanExec => fs.bucketedScan + case bs: CometScanExec => bs.bucketedScan -+ case ns: CometNativeScanExec => ns.bucketedScan + } + // To verify if the bucket pruning works, this function checks two conditions: // 1) Check if the pruned buckets (before filtering) are empty. // 2) Verify the final result is the same as the expected one -@@ -156,7 +169,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -156,7 +167,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition) .queryExecution.executedPlan val fileScan = getFileScan(planWithoutBucketedScan) @@ -2548,7 +2588,7 @@ index 746f289c393..7a6a88a9fce 100644 val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType val rowsWithInvalidBuckets = fileScan.execute().filter(row => { -@@ -452,28 +466,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -452,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) { val executedPlan = joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan @@ -2611,7 +2651,14 @@ index 746f289c393..7a6a88a9fce 100644 s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") // check the output partitioning -@@ -836,11 +876,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -831,16 +869,17 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti + } + } + +- test("disable bucketing when the output doesn't contain all bucketing columns") { ++ test("disable bucketing when the output doesn't contain all bucketing columns", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { + withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") val scanDF = spark.table("bucketed_table").select("j") @@ -2625,7 +2672,7 @@ index 746f289c393..7a6a88a9fce 100644 checkAnswer(aggDF, df1.groupBy("j").agg(max("k"))) } } -@@ -895,7 +935,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -895,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") { @@ -2636,7 +2683,7 @@ index 746f289c393..7a6a88a9fce 100644 SQLConf.SHUFFLE_PARTITIONS.key -> "5", SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil)) -@@ -914,7 +957,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -914,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") { @@ -2647,7 +2694,7 @@ index 746f289c393..7a6a88a9fce 100644 SQLConf.SHUFFLE_PARTITIONS.key -> "9", SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") { -@@ -944,7 +990,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -944,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("bucket coalescing eliminates shuffle") { @@ -2658,7 +2705,17 @@ index 746f289c393..7a6a88a9fce 100644 SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { // The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions. -@@ -1029,15 +1078,24 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -1013,7 +1061,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti + } + } + +- test("bucket coalescing is applied when join expressions match with partitioning expressions") { ++ test("bucket coalescing is applied when join expressions match with partitioning expressions", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { + withTable("t1", "t2", "t3") { + df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") + df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2") +@@ -1029,15 +1078,21 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti Seq(true, false).foreach { aqeEnabled => withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) { val plan = sql(query).queryExecution.executedPlan @@ -2669,7 +2726,6 @@ index 746f289c393..7a6a88a9fce 100644 val scans = collect(plan) { case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f + case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined => b -+ case b: CometNativeScanExec if b.optionalNumCoalescedBuckets.isDefined => b } if (expectedCoalescedNumBuckets.isDefined) { assert(scans.length == 1) @@ -2679,8 +2735,6 @@ index 746f289c393..7a6a88a9fce 100644 + assert(f.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + case b: CometScanExec => + assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) -+ case b: CometNativeScanExec => -+ assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + } } else { assert(scans.isEmpty) @@ -2710,18 +2764,20 @@ index 6f897a9c0b7..b0723634f68 100644 protected override lazy val sql = spark.sql _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala -index d675503a8ba..f220892396e 100644 +index d675503a8ba..c386a8cb686 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala -@@ -18,6 +18,7 @@ +@@ -17,7 +17,8 @@ + package org.apache.spark.sql.sources - import org.apache.spark.sql.QueryTest -+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} +-import org.apache.spark.sql.QueryTest ++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest} ++import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.internal.SQLConf -@@ -68,7 +69,11 @@ abstract class DisableUnnecessaryBucketedScanSuite +@@ -68,7 +69,10 @@ abstract class DisableUnnecessaryBucketedScanSuite def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): Unit = { val plan = sql(query).queryExecution.executedPlan @@ -2729,11 +2785,60 @@ index d675503a8ba..f220892396e 100644 + val bucketedScan = collect(plan) { + case s: FileSourceScanExec if s.bucketedScan => s + case s: CometScanExec if s.bucketedScan => s -+ case s: CometNativeScanExec if s.bucketedScan => s + } assert(bucketedScan.length == expectedNumBucketedScan) } +@@ -83,7 +87,8 @@ abstract class DisableUnnecessaryBucketedScanSuite + } + } + +- test("SPARK-32859: disable unnecessary bucketed table scan - basic test") { ++ test("SPARK-32859: disable unnecessary bucketed table scan - basic test", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { + withTable("t1", "t2", "t3") { + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") + df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2") +@@ -124,7 +129,8 @@ abstract class DisableUnnecessaryBucketedScanSuite + } + } + +- test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins test") { ++ test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins test", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { + withTable("t1", "t2", "t3") { + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") + df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2") +@@ -167,7 +173,8 @@ abstract class DisableUnnecessaryBucketedScanSuite + } + } + +- test("SPARK-32859: disable unnecessary bucketed table scan - multiple bucketed columns test") { ++ test("SPARK-32859: disable unnecessary bucketed table scan - multiple bucketed columns test", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { + withTable("t1", "t2", "t3") { + df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") + df2.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t2") +@@ -198,7 +205,8 @@ abstract class DisableUnnecessaryBucketedScanSuite + } + } + +- test("SPARK-32859: disable unnecessary bucketed table scan - other operators test") { ++ test("SPARK-32859: disable unnecessary bucketed table scan - other operators test", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { + withTable("t1", "t2", "t3") { + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") + df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2") +@@ -239,7 +247,8 @@ abstract class DisableUnnecessaryBucketedScanSuite + } + } + +- test("Aggregates with no groupby over tables having 1 BUCKET, return multiple rows") { ++ test("Aggregates with no groupby over tables having 1 BUCKET, return multiple rows", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { + withTable("t1") { + withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") { + sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 7f6fa2a123e..c778b4e2c48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -2868,6 +2973,39 @@ index aad91601758..201083bd621 100644 }) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +index b5cf13a9c12..ac17603fb7f 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar + + import org.apache.spark.{SparkException, TestUtils} + import org.apache.spark.internal.Logging +-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} ++import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, IgnoreCometNativeDataFusion, Row, SaveMode} + import org.apache.spark.sql.catalyst.InternalRow + import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid} + import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LocalRelation} +@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi + ) + } + +- test("SPARK-41198: input row calculation with CTE") { ++ test("SPARK-41198: input row calculation with CTE", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { + withTable("parquet_tbl", "parquet_streaming_tbl") { + spark.range(0, 10).selectExpr("id AS col1", "id AS col2") + .write.format("parquet").saveAsTable("parquet_tbl") +@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi + } + } + +- test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") { ++ test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { + withTable("parquet_streaming_tbl") { + val streamInput = MemoryStream[Int] + val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS value_stream") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala index 8f099c31e6b..ce4b7ad25b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala @@ -3111,6 +3249,29 @@ index de3b1ffccf0..2a76d127093 100644 override def beforeEach(): Unit = { super.beforeEach() +diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +index f3be79f9022..b4b1ea8dbc4 100644 +--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala ++++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn + import org.apache.hadoop.io.{LongWritable, Writable} + + import org.apache.spark.{SparkException, SparkFiles, TestUtils} +-import org.apache.spark.sql.{AnalysisException, QueryTest, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest, Row} + import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode + import org.apache.spark.sql.catalyst.plans.logical.Project + import org.apache.spark.sql.execution.WholeStageCodegenExec +@@ -448,7 +448,8 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { + } + } + +- test("SPARK-11522 select input_file_name from non-parquet table") { ++ test("SPARK-11522 select input_file_name from non-parquet table", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { + + withTempDir { tempDir => + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6160c3e5f6c..0956d7d9edc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index 2a10bb111d..833eda75e1 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -63,9 +63,10 @@ cause Comet to fall back to Spark. The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values. - No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true` - No support for duplicate field names in case-insensitive mode. When the required or data schema contains - field names that differ only by case (e.g., `B` and `b`), Comet falls back to Spark. Note that duplicates - in the physical Parquet file that are not reflected in the table schema cannot be detected at plan time, - so DataFusion may produce a different error message than Spark in that case. + field names that differ only by case (e.g., `B` and `b`), Comet falls back to Spark. Duplicates + in the physical Parquet file that are not reflected in the table schema cannot be detected at plan time; + in that case DataFusion will throw a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`, + matching Spark's behavior. The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results without falling back to Spark: diff --git a/native/Cargo.lock b/native/Cargo.lock index 05b673346e..230fc2a535 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1826,7 +1826,7 @@ dependencies = [ [[package]] name = "datafusion-comet" -version = "0.14.0" +version = "0.15.0" dependencies = [ "arrow", "assertables", @@ -1901,7 +1901,7 @@ dependencies = [ [[package]] name = "datafusion-comet-objectstore-hdfs" -version = "0.14.0" +version = "0.15.0" dependencies = [ "async-trait", "bytes", @@ -1915,7 +1915,7 @@ dependencies = [ [[package]] name = "datafusion-comet-proto" -version = "0.14.0" +version = "0.15.0" dependencies = [ "prost", "prost-build", @@ -1923,7 +1923,7 @@ dependencies = [ [[package]] name = "datafusion-comet-spark-expr" -version = "0.14.0" +version = "0.15.0" dependencies = [ "arrow", "base64", diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs index d4582da63f..2886df29d2 100644 --- a/native/core/src/errors.rs +++ b/native/core/src/errors.rs @@ -436,13 +436,15 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option throw_spark_error_as_json(env, spark_error), _ => { - // Check for file-not-found errors that may arrive through other wrapping paths let error_msg = error.to_string(); + // Check for file-not-found errors that may arrive through other wrapping paths if error_msg.contains("not found") && error_msg.contains("No such file or directory") { let spark_error = SparkError::FileNotFound { message: error_msg }; throw_spark_error_as_json(env, &spark_error) + } else if let Some(spark_error) = try_convert_duplicate_field_error(&error_msg) { + throw_spark_error_as_json(env, &spark_error) } else { let exception = error.to_exception(); match backtrace { @@ -474,6 +476,54 @@ fn throw_spark_error_as_json( ) } +/// Try to convert a DataFusion "Unable to get field named" error into a SparkError. +/// DataFusion produces this error when reading Parquet files with duplicate field names +/// in case-insensitive mode. For example, if a Parquet file has columns "B" and "b", +/// DataFusion may deduplicate them and report: Unable to get field named "b". Valid +/// fields: ["A", "B"]. When the requested field has a case-insensitive match among the +/// valid fields, we convert this to Spark's _LEGACY_ERROR_TEMP_2093 error. +fn try_convert_duplicate_field_error(error_msg: &str) -> Option { + // Match: Schema error: Unable to get field named "X". Valid fields: [...] + lazy_static! { + static ref FIELD_RE: Regex = + Regex::new(r#"Unable to get field named "([^"]+)"\. Valid fields: \[(.+)\]"#).unwrap(); + } + if let Some(caps) = FIELD_RE.captures(error_msg) { + let requested_field = caps.get(1)?.as_str(); + let requested_lower = requested_field.to_lowercase(); + // Parse field names from the Valid fields list: ["A", "B"] or [A, B, b] + let valid_fields_raw = caps.get(2)?.as_str(); + let all_fields: Vec = valid_fields_raw + .split(',') + .map(|s| s.trim().trim_matches('"').to_string()) + .collect(); + // Find fields that match case-insensitively + let mut matched: Vec = all_fields + .into_iter() + .filter(|f| f.to_lowercase() == requested_lower) + .collect(); + // Need at least one case-insensitive match to treat this as a duplicate field error. + // DataFusion may deduplicate columns case-insensitively, so the valid fields list + // might contain only one variant (e.g. "B" when file has both "B" and "b"). + // If requested field differs from the match, both existed in the original file. + if matched.is_empty() { + return None; + } + // Add the requested field name if it's not already in the list (different case) + if !matched.iter().any(|f| f == requested_field) { + matched.push(requested_field.to_string()); + } + let required_field_name = requested_field.to_string(); + let matched_fields = format!("[{}]", matched.join(", ")); + Some(SparkError::DuplicateFieldCaseInsensitive { + required_field_name, + matched_fields, + }) + } else { + None + } +} + #[derive(Debug, Error)] enum StacktraceError { #[error("Unable to initialize message: {0}")] diff --git a/native/spark-expr/src/error.rs b/native/spark-expr/src/error.rs index 592ed8b443..9633dc98d8 100644 --- a/native/spark-expr/src/error.rs +++ b/native/spark-expr/src/error.rs @@ -169,6 +169,12 @@ pub enum SparkError { #[error("{message}")] FileNotFound { message: String }, + #[error("[_LEGACY_ERROR_TEMP_2093] Found duplicate field(s) \"{required_field_name}\": [{matched_fields}] in case-insensitive mode")] + DuplicateFieldCaseInsensitive { + required_field_name: String, + matched_fields: String, + }, + #[error("ArrowError: {0}.")] Arrow(Arc), @@ -240,6 +246,7 @@ impl SparkError { SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder", SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows", SparkError::FileNotFound { .. } => "FileNotFound", + SparkError::DuplicateFieldCaseInsensitive { .. } => "DuplicateFieldCaseInsensitive", SparkError::Arrow(_) => "Arrow", SparkError::Internal(_) => "Internal", } @@ -430,6 +437,15 @@ impl SparkError { "message": message, }) } + SparkError::DuplicateFieldCaseInsensitive { + required_field_name, + matched_fields, + } => { + serde_json::json!({ + "requiredFieldName": required_field_name, + "matchedOrcFields": matched_fields, + }) + } SparkError::Arrow(e) => { serde_json::json!({ "message": e.to_string(), @@ -499,6 +515,11 @@ impl SparkError { // FileNotFound - will be converted to SparkFileNotFoundException by the shim SparkError::FileNotFound { .. } => "org/apache/spark/SparkException", + // DuplicateFieldCaseInsensitive - converted to SparkRuntimeException by the shim + SparkError::DuplicateFieldCaseInsensitive { .. } => { + "org/apache/spark/SparkRuntimeException" + } + // Generic errors SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException", } @@ -574,6 +595,9 @@ impl SparkError { // File not found SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"), + // Duplicate field in case-insensitive mode + SparkError::DuplicateFieldCaseInsensitive { .. } => Some("_LEGACY_ERROR_TEMP_2093"), + // Generic errors (no error class) SparkError::Arrow(_) | SparkError::Internal(_) => None, } diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index da65b1eb49..e4ec9e0061 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -251,6 +251,12 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) + case "DuplicateFieldCaseInsensitive" => + Some( + QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError( + params("requiredFieldName").toString, + params("matchedOrcFields").toString)) + case "FileNotFound" => val msg = params("message").toString // Extract file path from native error message and format like Hadoop's diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index ae21d12765..41f461100c 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -247,6 +247,12 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) + case "DuplicateFieldCaseInsensitive" => + Some( + QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError( + params("requiredFieldName").toString, + params("matchedOrcFields").toString)) + case "FileNotFound" => val msg = params("message").toString // Extract file path from native error message and format like Hadoop's diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 01d4eac4b6..f906db1405 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -258,6 +258,12 @@ trait ShimSparkErrorConverter { QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError( context.headOption.orNull)) + case "DuplicateFieldCaseInsensitive" => + Some( + QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError( + params("requiredFieldName").toString, + params("matchedOrcFields").toString)) + case "FileNotFound" => val msg = params("message").toString // Extract file path from native error message and format like Hadoop's