Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
520 changes: 211 additions & 309 deletions native/Cargo.lock

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ edition = "2021"
rust-version = "1.88"

[workspace.dependencies]
arrow = { version = "57.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow = { version = "58.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
async-trait = { version = "0.1" }
bytes = { version = "1.11.1" }
parquet = { version = "57.3.0", default-features = false, features = ["experimental"] }
datafusion = { version = "52.3.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { version = "52.3.0" }
datafusion-physical-expr-adapter = { version = "52.3.0" }
datafusion-spark = { version = "52.3.0" }
parquet = { version = "58.0.0", default-features = false, features = ["experimental"] }
datafusion = { git = "https://github.com/apache/datafusion", tag = "53.0.0-rc2", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { git = "https://github.com/apache/datafusion", tag = "53.0.0-rc2" }
datafusion-physical-expr-adapter = { git = "https://github.com/apache/datafusion", tag = "53.0.0-rc2" }
datafusion-spark = { git = "https://github.com/apache/datafusion", tag = "53.0.0-rc2", features = ["core"] }
datafusion-comet-spark-expr = { path = "spark-expr" }
datafusion-comet-proto = { path = "proto" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand All @@ -51,12 +51,12 @@ num = "0.4"
rand = "0.10"
regex = "1.12.3"
thiserror = "2"
object_store = { version = "0.12.3", features = ["gcp", "azure", "aws", "http"] }
object_store = { version = "0.13.1", features = ["gcp", "azure", "aws", "http"] }
url = "2.2"
aws-config = "1.8.14"
aws-credential-types = "1.2.13"
iceberg = { git = "https://github.com/apache/iceberg-rust", tag = "v0.9.0-rc.1" }
iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", tag = "v0.9.0-rc.1", features = ["opendal-all"] }
iceberg = { git = "https://github.com/mbutrovich/iceberg-rust", branch = "df53-upgrade" }
iceberg-storage-opendal = { git = "https://github.com/mbutrovich/iceberg-rust", branch = "df53-upgrade", features = ["opendal-all"] }

[profile.release]
debug = true
Expand Down
4 changes: 2 additions & 2 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ jni = { version = "0.21", features = ["invocation"] }
lazy_static = "1.4"
assertables = "9"
hex = "0.4.3"
datafusion-functions-nested = { version = "52.3.0" }
datafusion-functions-nested = { git = "https://github.com/apache/datafusion", tag = "53.0.0-rc2" }

[features]
backtrace = ["datafusion/backtrace"]
default = ["hdfs-opendal"]
default = []
hdfs = ["datafusion-comet-objectstore-hdfs"]
hdfs-opendal = ["opendal", "object_store_opendal", "hdfs-sys"]
jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"]
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use datafusion::{
prelude::{SessionConfig, SessionContext},
};
use datafusion_comet_proto::spark_operator::Operator;
use datafusion_spark::function::array::repeat::SparkArrayRepeat;
use datafusion_spark::function::bitwise::bit_count::SparkBitCount;
use datafusion_spark::function::bitwise::bit_get::SparkBitGet;
use datafusion_spark::function::bitwise::bitwise_not::SparkBitwiseNot;
Expand Down Expand Up @@ -389,6 +390,7 @@ fn prepare_datafusion_session_context(

// register UDFs from datafusion-spark crate
fn register_datafusion_spark_function(session_ctx: &SessionContext) {
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkArrayRepeat::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkExpm1::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha2::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(CharFunc::default()));
Expand Down
28 changes: 18 additions & 10 deletions native/core/src/execution/memory_pools/fair_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,21 @@ impl MemoryPool for CometFairMemoryPool {
.expect("unexpected amount of unregister happened");
}

fn grow(&self, reservation: &MemoryReservation, additional: usize) {
self.try_grow(reservation, additional).unwrap();
fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
self.try_grow(_reservation, additional).unwrap();
}

fn shrink(&self, reservation: &MemoryReservation, subtractive: usize) {
fn shrink(&self, _reservation: &MemoryReservation, subtractive: usize) {
if subtractive > 0 {
let mut state = self.state.lock();
let size = reservation.size();
if size < subtractive {
panic!("Failed to release {subtractive} bytes where only {size} bytes reserved")
// We don't use reservation.size() here because DataFusion 53+ decrements
// the reservation's atomic size before calling pool.shrink(), so it would
// reflect the post-shrink value rather than the pre-shrink value.
if state.used < subtractive {
panic!(
"Failed to release {subtractive} bytes where only {} bytes tracked by pool",
state.used
)
}
self.release(subtractive)
.unwrap_or_else(|_| panic!("Failed to release {subtractive} bytes"));
Expand All @@ -127,7 +132,7 @@ impl MemoryPool for CometFairMemoryPool {

fn try_grow(
&self,
reservation: &MemoryReservation,
_reservation: &MemoryReservation,
additional: usize,
) -> Result<(), DataFusionError> {
if additional > 0 {
Expand All @@ -137,10 +142,13 @@ impl MemoryPool for CometFairMemoryPool {
.pool_size
.checked_div(num)
.expect("overflow in checked_div");
let size = reservation.size();
if limit < size + additional {
// We use state.used instead of reservation.size() because DataFusion 53+
// calls pool.try_grow() before incrementing the reservation's atomic size,
// so reservation.size() would not include prior grows.
let used = state.used;
if limit < used + additional {
return resources_err!(
"Failed to acquire {additional} bytes where {size} bytes already reserved and the fair limit is {limit} bytes, {num} registered"
"Failed to acquire {additional} bytes where {used} bytes already reserved and the fair limit is {limit} bytes, {num} registered"
);
}

Expand Down
8 changes: 4 additions & 4 deletions native/core/src/execution/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct ExpandExec {
projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
child: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl ExpandExec {
Expand All @@ -52,12 +52,12 @@ impl ExpandExec {
child: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
) -> Self {
let cache = PlanProperties::new(
let cache = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
Partitioning::UnknownPartitioning(1),
EmissionType::Final,
Boundedness::Bounded,
);
));

Self {
projections,
Expand Down Expand Up @@ -129,7 +129,7 @@ impl ExecutionPlan for ExpandExec {
Ok(Box::pin(expand_stream))
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
12 changes: 6 additions & 6 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct IcebergScanExec {
/// Output schema after projection
output_schema: SchemaRef,
/// Cached execution plan properties
plan_properties: PlanProperties,
plan_properties: Arc<PlanProperties>,
/// Catalog-specific configuration for FileIO
catalog_properties: HashMap<String, String>,
/// Pre-planned file scan tasks
Expand Down Expand Up @@ -93,13 +93,13 @@ impl IcebergScanExec {
})
}

fn compute_properties(schema: SchemaRef, num_partitions: usize) -> PlanProperties {
PlanProperties::new(
fn compute_properties(schema: SchemaRef, num_partitions: usize) -> Arc<PlanProperties> {
Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(num_partitions),
EmissionType::Incremental,
Boundedness::Bounded,
)
))
}
}

Expand All @@ -116,7 +116,7 @@ impl ExecutionPlan for IcebergScanExec {
Arc::clone(&self.output_schema)
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.plan_properties
}

Expand Down Expand Up @@ -288,7 +288,7 @@ where
_ => {
let adapter = self
.adapter_factory
.create(Arc::clone(&self.schema), Arc::clone(&file_schema));
.create(Arc::clone(&self.schema), Arc::clone(&file_schema))?;
let exprs =
build_projection_expressions(&self.schema, &adapter).map_err(|e| {
DataFusionError::Execution(format!(
Expand Down
40 changes: 24 additions & 16 deletions native/core/src/execution/operators/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ use std::{
fmt,
fmt::{Debug, Formatter},
fs::File,
io::Cursor,
sync::Arc,
};

#[cfg(feature = "hdfs-opendal")]
use opendal::Operator;
#[cfg(feature = "hdfs-opendal")]
use std::io::Cursor;

use crate::execution::shuffle::CompressionCodec;
use crate::parquet::parquet_support::{
create_hdfs_operator, is_hdfs_scheme, prepare_object_store_with_configs,
};
use crate::parquet::parquet_support::is_hdfs_scheme;
#[cfg(feature = "hdfs-opendal")]
use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
Expand All @@ -45,7 +47,7 @@ use datafusion::{
metrics::{ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
SendableRecordBatchStream, Statistics,
SendableRecordBatchStream,
},
};
use futures::TryStreamExt;
Expand All @@ -64,6 +66,7 @@ enum ParquetWriter {
/// Contains the arrow writer, HDFS operator, and destination path
/// an Arrow writer writes to in-memory buffer the data converted to Parquet format
/// The opendal::Writer is created lazily on first write
#[cfg(feature = "hdfs-opendal")]
Remote(
ArrowWriter<Cursor<Vec<u8>>>,
Option<opendal::Writer>,
Expand All @@ -80,6 +83,7 @@ impl ParquetWriter {
) -> std::result::Result<(), parquet::errors::ParquetError> {
match self {
ParquetWriter::LocalFile(writer) => writer.write(batch),
#[cfg(feature = "hdfs-opendal")]
ParquetWriter::Remote(
arrow_parquet_buffer_writer,
hdfs_writer_opt,
Expand Down Expand Up @@ -134,6 +138,7 @@ impl ParquetWriter {
writer.close()?;
Ok(())
}
#[cfg(feature = "hdfs-opendal")]
ParquetWriter::Remote(
arrow_parquet_buffer_writer,
mut hdfs_writer_opt,
Expand Down Expand Up @@ -208,7 +213,7 @@ pub struct ParquetWriterExec {
/// Metrics
metrics: ExecutionPlanMetricsSet,
/// Cache for plan properties
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl ParquetWriterExec {
Expand All @@ -228,12 +233,12 @@ impl ParquetWriterExec {
// Preserve the input's partitioning so each partition writes its own file
let input_partitioning = input.output_partitioning().clone();

let cache = PlanProperties::new(
let cache = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&input.schema())),
input_partitioning,
EmissionType::Final,
Boundedness::Bounded,
);
));

Ok(ParquetWriterExec {
input,
Expand Down Expand Up @@ -275,7 +280,7 @@ impl ParquetWriterExec {
output_file_path: &str,
schema: SchemaRef,
props: WriterProperties,
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
_runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
object_store_options: &HashMap<String, String>,
) -> Result<ParquetWriter> {
// Parse URL and match on storage scheme directly
Expand All @@ -284,11 +289,11 @@ impl ParquetWriterExec {
})?;

if is_hdfs_scheme(&url, object_store_options) {
// HDFS storage
#[cfg(feature = "hdfs-opendal")]
{
// Use prepare_object_store_with_configs to create and register the object store
let (_object_store_url, object_store_path) = prepare_object_store_with_configs(
runtime_env,
_runtime_env,
output_file_path.to_string(),
object_store_options,
)
Expand Down Expand Up @@ -324,6 +329,12 @@ impl ParquetWriterExec {
object_store_path.to_string(),
))
}
#[cfg(not(feature = "hdfs-opendal"))]
{
Err(DataFusionError::Execution(
"HDFS support is not enabled. Rebuild with the 'hdfs-opendal' feature.".into(),
))
}
} else if output_file_path.starts_with("file://")
|| output_file_path.starts_with("file:")
|| !output_file_path.contains("://")
Expand Down Expand Up @@ -405,11 +416,7 @@ impl ExecutionPlan for ParquetWriterExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.input.partition_statistics(None)
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down Expand Up @@ -576,6 +583,7 @@ mod tests {

/// Helper function to create a test RecordBatch with 1000 rows of (int, string) data
/// Example batch_id 1 -> 0..1000, 2 -> 1001..2000
#[allow(dead_code)]
fn create_test_record_batch(batch_id: i32) -> Result<RecordBatch> {
assert!(batch_id > 0, "batch_id must be greater than 0");
let num_rows = batch_id * 1000;
Expand Down
8 changes: 4 additions & 4 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct ScanExec {
/// It is also used in unit test to mock the input data from JVM.
pub batch: Arc<Mutex<Option<InputBatch>>>,
/// Cache of expensive-to-compute plan properties
cache: PlanProperties,
cache: Arc<PlanProperties>,
/// Metrics collector
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
Expand All @@ -95,14 +95,14 @@ impl ScanExec {
// Build schema directly from data types since get_next now always unpacks dictionaries
let schema = schema_from_data_types(&data_types);

let cache = PlanProperties::new(
let cache = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
// The partitioning is not important because we are not using DataFusion's
// query planner or optimizer
Partitioning::UnknownPartitioning(1),
EmissionType::Final,
Boundedness::Bounded,
);
));

Ok(Self {
exec_context_id,
Expand Down Expand Up @@ -417,7 +417,7 @@ impl ExecutionPlan for ScanExec {
)))
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
Loading
Loading