diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 4d777cda87..3a763d3219 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -126,6 +126,7 @@ jobs: # - native_iceberg_compat: Spark 3.5 only config: - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 'auto'} + - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 'native_datafusion'} - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'auto'} - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'native_datafusion'} - {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto'} diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 8738b3813a..6ef58a750a 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index d3544881af1..9c16099090c 100644 +index d3544881af1..377683b10c5 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,8 @@ @@ -417,18 +417,19 @@ index daef11ae4d6..9f3cc9181f2 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..1925aac8d97 100644 +index f33432ddb6f..881eab7417c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen +@@ -22,6 +22,8 @@ import org.scalatest.GivenWhenThen import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._ import org.apache.spark.sql.catalyst.plans.ExistenceJoin ++import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, InMemoryTableWithV2FilterCatalog} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ -@@ -262,6 +263,9 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -262,6 +264,9 @@ abstract class DynamicPartitionPruningSuiteBase case s: BatchScanExec => s.runtimeFilters.collect { case d: DynamicPruningExpression => d.child } @@ -438,7 +439,7 @@ index f33432ddb6f..1925aac8d97 100644 case _ => Nil } } -@@ -755,7 +759,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -755,7 +760,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -448,7 +449,7 @@ index f33432ddb6f..1925aac8d97 100644 Given("disable broadcast pruning and disable subquery duplication") withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1027,7 +1033,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -458,7 +459,7 @@ index f33432ddb6f..1925aac8d97 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTable("large", "dimTwo", "dimThree") { -@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1215,7 +1222,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + @@ -468,7 +469,7 @@ index f33432ddb6f..1925aac8d97 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1423,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -478,7 +479,17 @@ index f33432ddb6f..1925aac8d97 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1698,7 +1707,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat + * Check the static scan metrics with and without DPP + */ + test("static scan metrics", +- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { ++ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"), ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { +@@ -1729,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -513,18 +524,29 @@ index a6b295578d6..91acca4306f 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 2796b1cf154..52438178a0e 100644 +index 2796b1cf154..f5e2fdfe450 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} +@@ -33,6 +33,8 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan, Literal} import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter ++import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec} import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -815,6 +816,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -499,7 +501,8 @@ class FileBasedDataSourceSuite extends QueryTest + } + + Seq("parquet", "orc").foreach { format => +- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { ++ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempDir { dir => + val tableName = s"spark_25132_${format}_native" + val tableDir = dir.getCanonicalPath + s"/$tableName" +@@ -815,6 +818,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -532,7 +554,7 @@ index 2796b1cf154..52438178a0e 100644 } assert(smJoinExec.nonEmpty) } -@@ -875,6 +877,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -875,6 +879,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f @@ -540,7 +562,7 @@ index 2796b1cf154..52438178a0e 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -916,6 +919,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -916,6 +921,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f @@ -548,7 +570,7 @@ index 2796b1cf154..52438178a0e 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1100,6 +1104,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1100,6 +1106,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -981,7 +1003,7 @@ index 48ad10992c5..51d1ee65422 100644 extensions.injectColumnar(session => MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -index 18123a4d6ec..fbe4c766eee 100644 +index 18123a4d6ec..0fe185baa33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -17,6 +17,8 @@ @@ -1983,10 +2005,18 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 104b4e416cd..37ea65081e4 100644 +index 104b4e416cd..8e618094d91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -@@ -1096,7 +1096,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType + + import org.apache.spark.{SparkConf, SparkException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, IgnoreCometNativeScan} + import org.apache.spark.sql.catalyst.dsl.expressions._ + import org.apache.spark.sql.catalyst.expressions._ + import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints +@@ -1096,7 +1097,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // When a filter is pushed to Parquet, Parquet can apply it to every row. // So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. @@ -1999,7 +2029,7 @@ index 104b4e416cd..37ea65081e4 100644 } } } -@@ -1499,7 +1503,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1499,7 +1504,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2009,7 +2039,7 @@ index 104b4e416cd..37ea65081e4 100644 import testImplicits._ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", -@@ -1581,7 +1586,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1581,7 +1587,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2022,7 +2052,7 @@ index 104b4e416cd..37ea65081e4 100644 } } } -@@ -1608,7 +1617,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1608,7 +1618,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2035,7 +2065,16 @@ index 104b4e416cd..37ea65081e4 100644 } } } -@@ -1744,7 +1757,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1700,7 +1714,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + (attr, value) => sources.StringContains(attr, value)) + } + +- test("filter pushdown - StringPredicate") { ++ test("filter pushdown - StringPredicate", IgnoreCometNativeScan("cannot be pushed down")) { + import testImplicits._ + // keep() should take effect on StartsWith/EndsWith/Contains + Seq( +@@ -1744,7 +1758,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2045,7 +2084,17 @@ index 104b4e416cd..37ea65081e4 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1985,7 +1999,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1934,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + } + } + +- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { ++ test("SPARK-25207: exception when duplicate fields in case-insensitive mode", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { dir => + val count = 10 + val tableName = "spark_25207" +@@ -1985,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2055,7 +2104,7 @@ index 104b4e416cd..37ea65081e4 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2045,7 +2060,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2045,7 +2062,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2065,7 +2114,7 @@ index 104b4e416cd..37ea65081e4 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2277,7 +2293,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2277,7 +2295,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2078,7 +2127,7 @@ index 104b4e416cd..37ea65081e4 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2337,7 +2357,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2337,7 +2359,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2092,10 +2141,38 @@ index 104b4e416cd..37ea65081e4 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8670d95c65e..b624c3811dd 100644 +index 8670d95c65e..45669c0b664 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -41,6 +41,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} + + import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} + import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} + import org.apache.spark.sql.catalyst.util.DateTimeUtils +@@ -1064,7 +1065,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession + } + } + +- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { ++ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val data = (1 to 4).map(i => Tuple1(i.toString)) + val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) + +@@ -1075,7 +1077,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession + } + } + +- test("SPARK-35640: int as long should throw schema incompatible error") { ++ test("SPARK-35640: int as long should throw schema incompatible error", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val data = (1 to 4).map(i => Tuple1(i)) + val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) + +@@ -1335,7 +1338,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2106,10 +2183,28 @@ index 8670d95c65e..b624c3811dd 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index 29cb224c878..44837aa953b 100644 +index 29cb224c878..d16c22b73a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -978,7 +978,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat + + import org.apache.spark.{DebugFilesystem, SparkConf, SparkException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} + import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow + import org.apache.spark.sql.catalyst.util.ArrayData +@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { ++ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val data = (1 to 1000).map { i => + val ts = new java.sql.Timestamp(i) + Row(ts) +@@ -978,7 +980,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } @@ -2119,7 +2214,17 @@ index 29cb224c878..44837aa953b 100644 withAllParquetReaders { withTempPath { path => // Repeated values for dictionary encoding. -@@ -1047,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1031,7 +1034,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") + } + +- test("SPARK-34212 Parquet should read decimals correctly") { ++ test("SPARK-34212 Parquet should read decimals correctly", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + def readParquet(schema: String, path: File): DataFrame = { + spark.read.schema(schema).parquet(path.toString) + } +@@ -1047,7 +1051,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema, path), df) } @@ -2129,7 +2234,7 @@ index 29cb224c878..44837aa953b 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1069,7 +1071,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1069,7 +1074,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2139,7 +2244,17 @@ index 29cb224c878..44837aa953b 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) -@@ -1128,7 +1131,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1113,7 +1119,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("row group skipping doesn't overflow when reading into larger type") { ++ test("row group skipping doesn't overflow when reading into larger type", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { path => + Seq(0).toDF("a").write.parquet(path.toString) + // The vectorized and non-vectorized readers will produce different exceptions, we don't need +@@ -1128,7 +1135,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .where(s"a < ${Long.MaxValue}") .collect() } @@ -2244,14 +2359,14 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index bf5c51b89bb..ca22370ca3b 100644 +index bf5c51b89bb..6ff4d1ba18b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException -+import org.apache.spark.sql.IgnoreComet ++import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion} import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -2265,6 +2380,26 @@ index bf5c51b89bb..ca22370ca3b 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" +@@ -1026,7 +1028,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + } + } + +- test("schema mismatch failure error message for parquet vectorized reader") { ++ test("schema mismatch failure error message for parquet vectorized reader", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { dir => + val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) + assert(e.getCause.isInstanceOf[SparkException]) +@@ -1067,7 +1070,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + } + } + +- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { ++ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + import testImplicits._ + + withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 3a0bd35cb70..b28f06a757f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -2314,18 +2449,29 @@ index 26e61c6b58d..cb09d7e116a 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..d9125f658ad 100644 +index 0ab8691801d..f1c4b3d92b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -@@ -18,6 +18,7 @@ +@@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.python import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit} ++import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { + assert(arrowEvalNodes.size == 2) + } + +- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { ++ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { f => + spark.range(10).select($"id".as("a"), $"id".as("b")) +@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -2333,7 +2479,7 @@ index 0ab8691801d..d9125f658ad 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -2352,7 +2498,7 @@ index 0ab8691801d..d9125f658ad 100644 } } } -@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -2360,7 +2506,7 @@ index 0ab8691801d..d9125f658ad 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan