Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
429 changes: 295 additions & 134 deletions dev/diffs/3.5.8.diff

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions docs/source/contributor-guide/parquet_scans.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ cause Comet to fall back to Spark.
The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values.
- No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true`
- No support for duplicate field names in case-insensitive mode. When the required or data schema contains
field names that differ only by case (e.g., `B` and `b`), Comet falls back to Spark. Note that duplicates
in the physical Parquet file that are not reflected in the table schema cannot be detected at plan time,
so DataFusion may produce a different error message than Spark in that case.
field names that differ only by case (e.g., `B` and `b`), Comet falls back to Spark. Duplicates
in the physical Parquet file that are not reflected in the table schema cannot be detected at plan time;
in that case DataFusion will throw a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`,
matching Spark's behavior.

The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results
without falling back to Spark:
Expand Down
8 changes: 4 additions & 4 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 51 additions & 1 deletion native/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,15 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
// Handle direct SparkError - serialize to JSON
CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error),
_ => {
// Check for file-not-found errors that may arrive through other wrapping paths
let error_msg = error.to_string();
// Check for file-not-found errors that may arrive through other wrapping paths
if error_msg.contains("not found")
&& error_msg.contains("No such file or directory")
{
let spark_error = SparkError::FileNotFound { message: error_msg };
throw_spark_error_as_json(env, &spark_error)
} else if let Some(spark_error) = try_convert_duplicate_field_error(&error_msg) {
throw_spark_error_as_json(env, &spark_error)
} else {
let exception = error.to_exception();
match backtrace {
Expand Down Expand Up @@ -474,6 +476,54 @@ fn throw_spark_error_as_json(
)
}

/// Try to convert a DataFusion "Unable to get field named" error into a SparkError.
/// DataFusion produces this error when reading Parquet files with duplicate field names
/// in case-insensitive mode. For example, if a Parquet file has columns "B" and "b",
/// DataFusion may deduplicate them and report: Unable to get field named "b". Valid
/// fields: ["A", "B"]. When the requested field has a case-insensitive match among the
/// valid fields, we convert this to Spark's _LEGACY_ERROR_TEMP_2093 error.
fn try_convert_duplicate_field_error(error_msg: &str) -> Option<SparkError> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is overkill or not. We could just update the Spark SQL test to allow for different exceptions with Comet

// Match: Schema error: Unable to get field named "X". Valid fields: [...]
lazy_static! {
static ref FIELD_RE: Regex =
Regex::new(r#"Unable to get field named "([^"]+)"\. Valid fields: \[(.+)\]"#).unwrap();
}
if let Some(caps) = FIELD_RE.captures(error_msg) {
let requested_field = caps.get(1)?.as_str();
let requested_lower = requested_field.to_lowercase();
// Parse field names from the Valid fields list: ["A", "B"] or [A, B, b]
let valid_fields_raw = caps.get(2)?.as_str();
let all_fields: Vec<String> = valid_fields_raw
.split(',')
.map(|s| s.trim().trim_matches('"').to_string())
.collect();
// Find fields that match case-insensitively
let mut matched: Vec<String> = all_fields
.into_iter()
.filter(|f| f.to_lowercase() == requested_lower)
.collect();
// Need at least one case-insensitive match to treat this as a duplicate field error.
// DataFusion may deduplicate columns case-insensitively, so the valid fields list
// might contain only one variant (e.g. "B" when file has both "B" and "b").
// If requested field differs from the match, both existed in the original file.
if matched.is_empty() {
return None;
}
// Add the requested field name if it's not already in the list (different case)
if !matched.iter().any(|f| f == requested_field) {
matched.push(requested_field.to_string());
}
let required_field_name = requested_field.to_string();
let matched_fields = format!("[{}]", matched.join(", "));
Some(SparkError::DuplicateFieldCaseInsensitive {
required_field_name,
matched_fields,
})
} else {
None
}
}

#[derive(Debug, Error)]
enum StacktraceError {
#[error("Unable to initialize message: {0}")]
Expand Down
24 changes: 24 additions & 0 deletions native/spark-expr/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ pub enum SparkError {
#[error("{message}")]
FileNotFound { message: String },

#[error("[_LEGACY_ERROR_TEMP_2093] Found duplicate field(s) \"{required_field_name}\": [{matched_fields}] in case-insensitive mode")]
DuplicateFieldCaseInsensitive {
required_field_name: String,
matched_fields: String,
},

#[error("ArrowError: {0}.")]
Arrow(Arc<ArrowError>),

Expand Down Expand Up @@ -240,6 +246,7 @@ impl SparkError {
SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder",
SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows",
SparkError::FileNotFound { .. } => "FileNotFound",
SparkError::DuplicateFieldCaseInsensitive { .. } => "DuplicateFieldCaseInsensitive",
SparkError::Arrow(_) => "Arrow",
SparkError::Internal(_) => "Internal",
}
Expand Down Expand Up @@ -430,6 +437,15 @@ impl SparkError {
"message": message,
})
}
SparkError::DuplicateFieldCaseInsensitive {
required_field_name,
matched_fields,
} => {
serde_json::json!({
"requiredFieldName": required_field_name,
"matchedOrcFields": matched_fields,
})
}
SparkError::Arrow(e) => {
serde_json::json!({
"message": e.to_string(),
Expand Down Expand Up @@ -499,6 +515,11 @@ impl SparkError {
// FileNotFound - will be converted to SparkFileNotFoundException by the shim
SparkError::FileNotFound { .. } => "org/apache/spark/SparkException",

// DuplicateFieldCaseInsensitive - converted to SparkRuntimeException by the shim
SparkError::DuplicateFieldCaseInsensitive { .. } => {
"org/apache/spark/SparkRuntimeException"
}

// Generic errors
SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException",
}
Expand Down Expand Up @@ -574,6 +595,9 @@ impl SparkError {
// File not found
SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"),

// Duplicate field in case-insensitive mode
SparkError::DuplicateFieldCaseInsensitive { .. } => Some("_LEGACY_ERROR_TEMP_2093"),

// Generic errors (no error class)
SparkError::Arrow(_) | SparkError::Internal(_) => None,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors
.intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context)))

case "DuplicateFieldCaseInsensitive" =>
Some(
QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
params("requiredFieldName").toString,
params("matchedOrcFields").toString))

case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors
.intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context)))

case "DuplicateFieldCaseInsensitive" =>
Some(
QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
params("requiredFieldName").toString,
params("matchedOrcFields").toString))

case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError(
context.headOption.orNull))

case "DuplicateFieldCaseInsensitive" =>
Some(
QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
params("requiredFieldName").toString,
params("matchedOrcFields").toString))

case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
Expand Down
Loading