diff --git a/Cargo.lock b/Cargo.lock index becafa4d..754ce071 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4967,6 +4967,7 @@ dependencies = [ "sqd-storage", "tempfile", "tikv-jemallocator", + "tracing", ] [[package]] diff --git a/crates/query/Cargo.toml b/crates/query/Cargo.toml index 37bb19c3..8e5bfbdf 100644 --- a/crates/query/Cargo.toml +++ b/crates/query/Cargo.toml @@ -24,6 +24,7 @@ sqd-bloom-filter = { path = "../bloom-filter" } sqd-polars = { path = "../polars" } sqd-primitives = { path = "../primitives", features = ["range"] } sqd-storage = { path = "../storage", optional = true } +tracing = "0.1.40" [dev-dependencies] glob = "0.3.1" @@ -37,6 +38,9 @@ sqd-dataset = { path = "../dataset" } tikv-jemallocator = "0.6" [features] +max_level_trace = ["tracing/max_level_trace"] +max_level_debug = ["tracing/max_level_debug"] +max_level_info = ["tracing/max_level_info"] parquet = [ "dep:parquet", "dep:memmap2" diff --git a/crates/query/fixtures/fuel/queries/missing_block_fields/README.md b/crates/query/fixtures/fuel/queries/missing_block_fields/README.md new file mode 100644 index 00000000..ac5eef8b --- /dev/null +++ b/crates/query/fixtures/fuel/queries/missing_block_fields/README.md @@ -0,0 +1,8 @@ +This test validates that when nullable columns are missing intirely from the parquet file, such as: + +- eventInboxRoot +- consensusParametersVersion +- stateTransitionBytecodeVersion +- messageOutboxRoot + +The output will include these columnds with null values diff --git a/crates/query/fixtures/fuel/queries/missing_block_fields/query.json b/crates/query/fixtures/fuel/queries/missing_block_fields/query.json new file mode 100644 index 00000000..b8e5d40e --- /dev/null +++ b/crates/query/fixtures/fuel/queries/missing_block_fields/query.json @@ -0,0 +1,16 @@ +{ + "type": "fuel", + "fromBlock": 9000000, + "toBlock": 9000003, + "fields": { + "block": { + "number": true, + "hash": true, + "eventInboxRoot": true, + "consensusParametersVersion": true, + "stateTransitionBytecodeVersion": true, + "messageOutboxRoot": true + } + }, + "includeAllBlocks": false +} diff --git a/crates/query/fixtures/fuel/queries/missing_block_fields/result.json b/crates/query/fixtures/fuel/queries/missing_block_fields/result.json new file mode 100644 index 00000000..13404a7f --- /dev/null +++ b/crates/query/fixtures/fuel/queries/missing_block_fields/result.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:33134c5ff181dbcc4ef7aecda446521e58c291b5a639eef18d2fa900c1bc16d9 +size 583 diff --git a/crates/query/fixtures/fuel/queries/missing_field_with_join/README.md b/crates/query/fixtures/fuel/queries/missing_field_with_join/README.md new file mode 100644 index 00000000..ad89c663 --- /dev/null +++ b/crates/query/fixtures/fuel/queries/missing_field_with_join/README.md @@ -0,0 +1,3 @@ +This test validates that when nullable columns are missing intirely from the parquet file, such as: +- upgrade_purpose +The output will include these columnds with null values diff --git a/crates/query/fixtures/fuel/queries/missing_field_with_join/query.json b/crates/query/fixtures/fuel/queries/missing_field_with_join/query.json new file mode 100644 index 00000000..44074a42 --- /dev/null +++ b/crates/query/fixtures/fuel/queries/missing_field_with_join/query.json @@ -0,0 +1,28 @@ +{ + "type": "fuel", + "fromBlock": 9025510, + "toBlock": 9025549, + "fields": { + "block": { + "number": true, + "hash": true + }, + "receipt": { + "receiptType": true, + "contract": true, + "index": true, + "transactionIndex": true + }, + "transaction": { + "index": true, + "hash": true, + "upgradePurpose": true + } + }, + "receipts": [ + { + "type": ["LOG_DATA"], + "transaction": true + } + ] +} diff --git a/crates/query/fixtures/fuel/queries/missing_field_with_join/result.json b/crates/query/fixtures/fuel/queries/missing_field_with_join/result.json new file mode 100644 index 00000000..ec36baac --- /dev/null +++ b/crates/query/fixtures/fuel/queries/missing_field_with_join/result.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4305c10b962392df8c44de367922fcbbcee89cad10de5718e388a64d3f45cb2c +size 1154 diff --git a/crates/query/src/json/exp.rs b/crates/query/src/json/exp.rs index bbb06adc..efa4f789 100644 --- a/crates/query/src/json/exp.rs +++ b/crates/query/src/json/exp.rs @@ -218,6 +218,16 @@ fn eval_object(array: &dyn Array, props: &Vec<(Name, Exp)>) -> Result) { + out.extend_from_slice(b"null") + } +} + + fn eval_prop(array: &dyn Array, name: Name, exp: &Exp) -> Result { let array: &StructArray = array.as_any().downcast_ref().ok_or_else(|| { schema_error!("expected a StructArray, but got {}", array.data_type()) @@ -227,6 +237,15 @@ fn eval_prop(array: &dyn Array, name: Name, exp: &Exp) -> Result, relations: Vec, - output: Option + output: Option, } - struct Output { table: Name, key: &'static [Name], @@ -31,27 +28,30 @@ struct Output { weight_per_row: RowWeight, weight_columns: Vec, exp: Exp, - item_name: Name + item_name: Name, } - pub struct Plan { + tables: &'static TableSet, scans: Vec, relations: Vec, outputs: Vec, include_all_blocks: bool, parent_block_hash: Option, first_block: Option, - last_block: Option + last_block: Option, } - impl Plan { pub fn execute(&self, data_chunk: &dyn Chunk) -> anyhow::Result> { PlanExecution { - data_chunk, + chunk: ChunkWithDefaults { + chunk: data_chunk, + tables: self.tables, + }, plan: self, - }.execute() + } + .execute() } pub fn set_parent_block_hash(&mut self, hash: impl Into>) { @@ -67,24 +67,45 @@ impl Plan { } } +/// A wrapper around `Chunk` that automatically attaches default-null columns +/// from the `TableSet` schema when creating scans. This ensures that columns +/// with a null default that are missing from the underlying data source are +/// treated as all-null rather than causing errors. +struct ChunkWithDefaults<'a> { + chunk: &'a dyn Chunk, + tables: &'static TableSet, +} + +impl Chunk for ChunkWithDefaults<'_> { + fn scan_table(&self, name: Name) -> anyhow::Result> { + let mut scan = self.chunk.scan_table(name)?; + let default_null = self.tables.get(name).default_null_columns(); + if !default_null.is_empty() { + scan = scan.with_default_null_columns(default_null); + } + Ok(scan) + } +} struct PlanExecution<'a> { - data_chunk: &'a dyn Chunk, + chunk: ChunkWithDefaults<'a>, plan: &'a Plan, } - -impl <'a> PlanExecution<'a> { +impl<'a> PlanExecution<'a> { + /// Executes the full query plan against a data chunk. + /// + /// The plan operates across multiple tables (one per output). + /// Execution proceeds in three phases: + /// 1. Scans - find matching row indexes in each table (parallel per scan) + /// 2. Relations - propagate row selections across related tables (e.g. join, children) + /// 3. Output - read actual data for selected rows from each table and build the result fn execute(&self) -> anyhow::Result> { self.check_parent_block()?; - let relation_inputs = self.plan.relations.iter() - .map(|_| RowList::new()) - .collect(); + let relation_inputs = self.plan.relations.iter().map(|_| RowList::new()).collect(); - let output_inputs = self.plan.outputs.iter() - .map(|_| RowList::new()) - .collect(); + let output_inputs = self.plan.outputs.iter().map(|_| RowList::new()).collect(); self.execute_scans(&relation_inputs, &output_inputs)?; self.execute_relations(relation_inputs, &output_inputs)?; @@ -94,145 +115,170 @@ impl <'a> PlanExecution<'a> { fn check_parent_block(&self) -> anyhow::Result<()> { let parent_hash = match self.plan.parent_block_hash.as_ref() { Some(s) => s.as_str(), - None => return Ok(()) + None => return Ok(()), }; let block_number = match self.plan.first_block { Some(bn) => bn, - None => bail!("invalid plan: parent block hash is specified, but block number is not available") + None => bail!( + "invalid plan: parent block hash is specified, but block number is not available" + ), }; - let block_scan = self.data_chunk.scan_table(self.plan.outputs[0].table)?; - - let mut refs: Vec<_> = if block_scan.schema().column_with_name("parent_number").is_some() { - // this is Solana with possible gaps in block numbers - let df = block_scan - .with_column("parent_number") - .with_column("parent_hash") - .with_predicate( - col_between("parent_number", block_number.saturating_sub(100), block_number.saturating_sub(1)) - ) - .to_lazy_df()? - .collect()?; + let block_scan = self.chunk.scan_table(self.plan.outputs[0].table)?; - let numbers = df.column("parent_number")?.cast(&sqd_polars::prelude::DataType::UInt64)?; - let numbers = numbers.u64()?; + let has_parent_number = block_scan + .schema() + .column_with_name("parent_number") + .is_some(); - let hashes = df.column("parent_hash")?; - let hashes = hashes.str()?; - - (0..df.shape().0).map(|i| { - let number = numbers.get(i).expect("block number can't be null according to the predicate applied"); - let hash = hashes.get(i).unwrap_or(""); - BlockRef { - number, - hash: hash.to_string() - } - }).collect() + let (number_col, predicate_upper) = if has_parent_number { + ("parent_number", block_number.saturating_sub(1)) } else { - let df = block_scan - .with_column("number") - .with_column("parent_hash") - .with_predicate( - col_between("number", block_number.saturating_sub(100), block_number) - ) - .to_lazy_df()? - .collect()?; - - let numbers = df.column("number")?.cast(&sqd_polars::prelude::DataType::UInt64)?; - let numbers = numbers.u64()?; + ("number", block_number) + }; - let hashes = df.column("parent_hash")?; - let hashes = hashes.str()?; + let df = block_scan + .with_column(number_col) + .with_column("parent_hash") + .with_predicate(col_between( + number_col, + block_number.saturating_sub(100), + predicate_upper, + )) + .to_lazy_df()? + .collect()?; - (0..df.shape().0).map(|i| { - let number = numbers.get(i).expect("block number can't be null according to the predicate applied"); + let numbers = df + .column(number_col)? + .cast(&sqd_polars::prelude::DataType::UInt64)?; + let numbers = numbers.u64()?; + + let hashes = df.column("parent_hash")?; + let hashes = hashes.str()?; + + let mut refs: Vec<_> = (0..df.shape().0) + .map(|i| { + let number = numbers + .get(i) + .expect("block number can't be null according to the predicate applied"); let hash = hashes.get(i).unwrap_or(""); BlockRef { - number: number.saturating_sub(1), - hash: hash.to_string() + number: if has_parent_number { number } else { number.saturating_sub(1) }, + hash: hash.to_string(), } - }).collect() - }; + }) + .collect(); refs.sort_by(|a, b| a.number.cmp(&b.number)); - let parent_block = refs.last().ok_or_else(|| { - anyhow!("block {} is not present in the chunk", block_number) - })?; - + let parent_block = refs + .last() + .ok_or_else(|| anyhow!("block {} is not present in the chunk", block_number))?; + if parent_block.hash == parent_hash { Ok(()) } else { Err(anyhow!(UnexpectedBaseBlock { prev_blocks: refs, expected_hash: parent_hash.to_string() - })) + })) } } + /// Execute predicate scans to collect matching row indexes. + /// + /// Each scan targets a single table and collects row indexes that match + /// its predicate. These row indexes are distributed to relation inputs + /// (for cross-table joins in `execute_relations`) and/or output inputs (for direct + /// data reading in `execute_output`). Scans run in parallel. fn execute_scans( &self, relation_inputs: &Vec, - output_inputs: &Vec - ) -> anyhow::Result<()> - { - self.plan.scans.par_iter().try_for_each(|scan| -> anyhow::Result<()> { - let rows = self.data_chunk - .scan_table(scan.table)? - .with_row_index(true) - .with_columns([]) - .with_predicate(scan.predicate.clone()) - .execute()?; - - for rel_idx in scan.relations.iter() { - relation_inputs[*rel_idx].extend_from_record_batch_vec(&rows); - } + output_inputs: &Vec, + ) -> anyhow::Result<()> { + self.plan + .scans + .par_iter() + .try_for_each(|scan| -> anyhow::Result<()> { + let rows = self + .chunk + .scan_table(scan.table)? + .with_row_index(true) + .with_columns([]) + .with_predicate(scan.predicate.clone()) + .execute()?; - if let Some(idx) = &scan.output { - output_inputs[*idx].extend_from_record_batch_vec(&rows) - } + for rel_idx in scan.relations.iter() { + relation_inputs[*rel_idx].extend_from_record_batch_vec(&rows); + } - Ok(()) - }) + if let Some(idx) = &scan.output { + output_inputs[*idx].extend_from_record_batch_vec(&rows) + } + + Ok(()) + }) } + /// Propagate row selections through relations. + /// + /// Relations link rows between tables (e.g. join, children, parents). + /// For each relation, the matched rows from `execute_scans` are used to find + /// corresponding rows in the relation's output table, adding them + /// to that table's output inputs. Relations run in parallel. fn execute_relations( &self, relation_inputs: Vec, - output_inputs: &Vec - ) -> anyhow::Result<()> - { - relation_inputs.into_par_iter().enumerate().try_for_each(|(idx, row_list)| -> anyhow::Result<()> { - let input = row_list.into_inner(); - if input.is_empty() { - return Ok(()) - } - let rel = &self.plan.relations[idx]; - let output = &output_inputs[self.get_output_index(rel.output_table())]; - rel.eval(self.data_chunk, &input, output) - }) + output_inputs: &Vec, + ) -> anyhow::Result<()> { + relation_inputs.into_par_iter().enumerate().try_for_each( + |(idx, row_list)| -> anyhow::Result<()> { + let input = row_list.into_inner(); + if input.is_empty() { + return Ok(()); + } + let rel = &self.plan.relations[idx]; + let output = &output_inputs[self.get_output_index(rel.output_table())]; + rel.eval(&self.chunk, &input, output) + }, + ) } + /// Read actual column data and build the output. + /// + /// Each output corresponds to a single table (outputs[0] is always the + /// block header table). For each table independently: + /// 1. First pass: read key + weight columns to compute per-block weights + /// and determine which blocks fit in the ~20MB output budget + /// 2. Second pass: read the full projection for selected rows and build + /// DataItem encoders for JSON serialization + /// + /// Columns missing from the parquet file are handled gracefully if they + /// have a default value of Null (via default_null_columns) — they are + /// replaced with NullArrays so the encoder outputs "null" for them. fn execute_output(&self, output_inputs: Vec) -> anyhow::Result> { use sqd_polars::prelude::*; - let rows = output_inputs.into_par_iter() + let rows = output_inputs + .into_par_iter() .enumerate() .map(|(idx, row_list)| -> anyhow::Result { let output = &self.plan.outputs[idx]; - let maybe_row_selection = if idx == 0 { // block header + let maybe_row_selection = if idx == 0 { + // block header None } else { let row_indexes = row_list.into_inner(); if row_indexes.is_empty() { - return Ok(DataFrame::empty()) + return Ok(DataFrame::empty()); } Some(RowRangeList::from_sorted_indexes(row_indexes)) }; - let record_batches = self.data_chunk + let record_batches = self + .chunk .scan_table(output.table)? .with_row_selection(maybe_row_selection) .with_predicate(self.get_block_number_predicate(idx)) @@ -242,7 +288,7 @@ impl <'a> PlanExecution<'a> { .execute()?; let df = if record_batches.len() == 0 { - return Ok(DataFrame::empty()) + return Ok(DataFrame::empty()); } else { record_batch_vec_to_lazy_polars_df(&record_batches)? }; @@ -253,48 +299,48 @@ impl <'a> PlanExecution<'a> { } weight_exp = weight_exp.alias("weight"); - let rows = df.select([ - col("row_index"), - col(output.key[0]).alias("block_number"), - weight_exp - ]).collect()?; + let rows = df + .select([ + col("row_index"), + col(output.key[0]).alias("block_number"), + weight_exp, + ]) + .collect()?; Ok(rows) - }).collect::>>()?; + }) + .collect::>>()?; let header_rows = &rows[0]; if header_rows.is_empty() { - return Ok(None) + return Ok(None); } let mut item_union = Vec::with_capacity(self.plan.outputs.len() + 1); for df in rows.iter().skip(1).filter(|df| !df.is_empty()) { - item_union.push( - df.clone().lazy().select([ - col("block_number"), - col("weight").strict_cast(RowWeightPolarsType::get_dtype()) - ]) - ) + item_union.push(df.clone().lazy().select([ + col("block_number"), + col("weight").strict_cast(RowWeightPolarsType::get_dtype()), + ])) } let block_weights = if self.plan.include_all_blocks { - item_union.push( - header_rows.clone().lazy().select([ - col("block_number"), - col("weight").strict_cast(RowWeightPolarsType::get_dtype()) - ]) - ); + item_union.push(header_rows.clone().lazy().select([ + col("block_number"), + col("weight").strict_cast(RowWeightPolarsType::get_dtype()), + ])); concat(item_union, UnionArgs::default())? .group_by([col("block_number")]) .agg([sum("weight").alias("weight")]) } else { - let agg = header_rows.clone() + let agg = header_rows + .clone() .lazy() .select([ min("block_number").alias("first_block"), - max("block_number").alias("last_block") + max("block_number").alias("last_block"), ]) .collect()?; @@ -305,33 +351,32 @@ impl <'a> PlanExecution<'a> { item_union.push( DataFrame::new(vec![ block_numbers, - Series::new("weight".into(), &[0 as RowWeight, 0 as RowWeight]) - ])?.lazy() + Series::new("weight".into(), &[0 as RowWeight, 0 as RowWeight]), + ])? + .lazy(), ); let item_stats = concat(item_union, UnionArgs::default())? .group_by([col("block_number")]) .agg([sum("weight").alias("weight")]); - header_rows.clone().lazy().left_join( - item_stats, - col("block_number"), - col("block_number") - ).select([ - col("block_number"), - (col("weight") + col("weight_right")).alias("weight") - ]) + header_rows + .clone() + .lazy() + .left_join(item_stats, col("block_number"), col("block_number")) + .select([ + col("block_number"), + (col("weight") + col("weight_right")).alias("weight"), + ]) }; - let package_weight = block_weights.sort( - ["block_number"], - SortMultipleOptions::default() - ).select([ - col("block_number"), - col("weight").cum_sum(false) - ]).collect()?; + let package_weight = block_weights + .sort(["block_number"], SortMultipleOptions::default()) + .select([col("block_number"), col("weight").cum_sum(false)]) + .collect()?; - let mut selected_blocks = package_weight.clone() + let mut selected_blocks = package_weight + .clone() .lazy() .filter(col("weight").lt_eq(lit(20 * 1024 * 1024))) .select([col("block_number")]) @@ -341,7 +386,8 @@ impl <'a> PlanExecution<'a> { selected_blocks = package_weight.head(Some(1)) } - let last_block: BlockNumber = selected_blocks.column("block_number") + let last_block: BlockNumber = selected_blocks + .column("block_number") .unwrap() .tail(Some(1)) .cast(&DataType::UInt64)? @@ -352,65 +398,66 @@ impl <'a> PlanExecution<'a> { let data_items_mutex = parking_lot::Mutex::new( std::iter::repeat_with(|| None) .take(self.plan.outputs.len()) - .collect::>() + .collect::>(), ); - rows.into_par_iter().enumerate().try_for_each(|(idx, rows)| -> anyhow::Result<()> { - if rows.is_empty() { - return Ok(()); - } + rows.into_par_iter() + .enumerate() + .try_for_each(|(idx, rows)| -> anyhow::Result<()> { + if rows.is_empty() { + return Ok(()); + } - let output = &self.plan.outputs[idx]; + let output = &self.plan.outputs[idx]; - let row_index = if idx == 0 && !self.plan.include_all_blocks { - rows.lazy() - .semi_join( + let row_index = if idx == 0 && !self.plan.include_all_blocks { + rows.lazy().semi_join( selected_blocks.clone().lazy(), col("block_number"), - col("block_number") - ) - } else { - rows.lazy().filter( - col("block_number").lt_eq( - lit(last_block) + col("block_number"), ) - ) - }.select([ - col("row_index") - ]).collect()?; + } else { + rows.lazy() + .filter(col("block_number").lt_eq(lit(last_block))) + } + .select([col("row_index")]) + .collect()?; - let row_selection = RowRangeList::from_sorted_indexes( - row_index.column("row_index").unwrap().u32()?.into_no_null_iter() - ); + let row_selection = RowRangeList::from_sorted_indexes( + row_index + .column("row_index") + .unwrap() + .u32()? + .into_no_null_iter(), + ); - let records = self.data_chunk - .scan_table(output.table)? - .with_row_selection(row_selection) - .with_projection(output.projection.clone()) - .execute()?; + let records = self + .chunk + .scan_table(output.table)? + .with_row_selection(row_selection) + .with_projection(output.projection.clone()) + .execute()?; - let data_item = DataItem::new( - output.item_name, - output.key, - records, - &output.exp - )?; + let data_item = DataItem::new(output.item_name, output.key, records, &output.exp)?; - data_items_mutex.lock()[idx] = Some(data_item); + data_items_mutex.lock()[idx] = Some(data_item); - Ok(()) - })?; + Ok(()) + })?; - Ok(Some(BlockWriter::new(data_items_mutex - .into_inner() - .into_iter() - .flatten() - .collect() + Ok(Some(BlockWriter::new( + data_items_mutex + .into_inner() + .into_iter() + .flatten() + .collect(), ))) } fn get_output_index(&self, table: Name) -> usize { - self.plan.outputs.iter() + self.plan + .outputs + .iter() .position(|o| o.table == table) .unwrap() } @@ -421,12 +468,11 @@ impl <'a> PlanExecution<'a> { (Some(fst), Some(lst)) => Some(col_between(column, fst, lst)), (None, Some(lst)) => Some(col_lt_eq(column, lst)), (Some(fst), None) => Some(col_gt_eq(column, fst)), - (None, None) => None + (None, None) => None, } } } - pub struct PlanBuilder { tables: &'static TableSet, scans: Vec, @@ -435,31 +481,31 @@ pub struct PlanBuilder { include_all_blocks: bool, parent_block_hash: Option, first_block: Option, - last_block: Option + last_block: Option, } - -impl PlanBuilder{ +impl PlanBuilder { pub fn new(tables: &'static TableSet) -> Self { PlanBuilder { tables, scans: Vec::new(), relations: Vec::new(), - outputs: tables.iter().map(|table| { - Output { + outputs: tables + .iter() + .map(|table| Output { table: table.name, key: &table.primary_key, projection: HashSet::new(), weight_per_row: 0, weight_columns: Vec::new(), exp: Exp::Object(vec![]), - item_name: table.result_item_name - } - }).collect(), + item_name: table.result_item_name, + }) + .collect(), include_all_blocks: false, parent_block_hash: None, first_block: None, - last_block: None + last_block: None, } } @@ -474,7 +520,7 @@ impl PlanBuilder{ self.scans.push(scan); ScanBuilder { plan: self, - scan_idx + scan_idx, } } @@ -508,13 +554,14 @@ impl PlanBuilder{ self.simplify(); self.set_output_weights(); Plan { + tables: self.tables, scans: self.scans, relations: self.relations, outputs: self.outputs, include_all_blocks: self.include_all_blocks, parent_block_hash: self.parent_block_hash, first_block: self.first_block, - last_block: self.last_block + last_block: self.last_block, } } @@ -532,7 +579,11 @@ impl PlanBuilder{ let mut weight_columns = Vec::new(); for col in output.projection.iter() { - match table.column_weights.get(col).unwrap_or(&ColumnWeight::Fixed(32)) { + match table + .column_weights + .get(col) + .unwrap_or(&ColumnWeight::Fixed(32)) + { ColumnWeight::Fixed(weight) => { per_row += weight; } @@ -547,10 +598,13 @@ impl PlanBuilder{ } fn add_rel(&mut self, rel: Rel) -> usize { - self.relations.iter().position(|r| r == &rel).unwrap_or_else(|| { - self.relations.push(rel); - self.relations.len() - 1 - }) + self.relations + .iter() + .position(|r| r == &rel) + .unwrap_or_else(|| { + self.relations.push(rel); + self.relations.len() - 1 + }) } /// Lame plan optimization procedure @@ -624,9 +678,11 @@ impl PlanBuilder{ // Remove relations, that point to fully populated outputs. // We'll make sure, that those outputs remain populated later. - let relations_remove_mask = self.relations.iter().map(|rel| { - is_full[self.tables.get_index(rel.output_table())] - }).collect::>(); + let relations_remove_mask = self + .relations + .iter() + .map(|rel| is_full[self.tables.get_index(rel.output_table())]) + .collect::>(); remove_elements(&mut is_full_rel, &relations_remove_mask); self.remove_relations(&relations_remove_mask); @@ -647,25 +703,35 @@ impl PlanBuilder{ // Introduce new scans to populate that. let mut new_scans: HashMap = HashMap::new(); - for out_idx in is_full.iter().enumerate().filter_map(|(idx, full)| full.then_some(idx)) { + for out_idx in is_full + .iter() + .enumerate() + .filter_map(|(idx, full)| full.then_some(idx)) + { let table = self.outputs[out_idx].table; - new_scans.insert(table, Scan { + new_scans.insert( table, - predicate: None, - relations: vec![], - output: Some(out_idx) - }); - } - - for (idx, rel) in self.relations.iter().enumerate().filter(|(idx, _)| is_full_rel[*idx]) { - let table = rel.output_table(); - let scan = new_scans.entry(table).or_insert_with(|| { Scan { table, predicate: None, relations: vec![], - output: None - } + output: Some(out_idx), + }, + ); + } + + for (idx, rel) in self + .relations + .iter() + .enumerate() + .filter(|(idx, _)| is_full_rel[*idx]) + { + let table = rel.output_table(); + let scan = new_scans.entry(table).or_insert_with(|| Scan { + table, + predicate: None, + relations: vec![], + output: None, }); scan.relations.push(idx); } @@ -677,15 +743,18 @@ impl PlanBuilder{ remove_elements(&mut self.relations, remove_mask); let mut idx = 0; - let index_map = remove_mask.iter().map(|&remove| { - if remove { - None - } else { - let i = idx; - idx += 1; - Some(i) - } - }).collect::>(); + let index_map = remove_mask + .iter() + .map(|&remove| { + if remove { + None + } else { + let i = idx; + idx += 1; + Some(i) + } + }) + .collect::>(); for scan in self.scans.iter_mut() { scan.relations.retain_mut(|rel_idx| { @@ -702,19 +771,23 @@ impl PlanBuilder{ /// checks whether full input implies full output fn is_full_rel(&self, rel: &Rel) -> bool { match rel { - Rel::Join { input_table, input_key, output_table, output_key } => { + Rel::Join { + input_table, + input_key, + output_table, + output_key, + } => { let input_desc = self.tables.get(input_table); input_key == &input_desc.primary_key && input_desc.children.get(output_table) == Some(output_key) - }, + } Rel::Children { .. } => true, Rel::Parents { .. } => true, - _ => false + _ => false, } } } - fn remove_elements(vec: &mut Vec, remove_mask: &[bool]) { let mut idx = 0; vec.retain(|_| { @@ -724,14 +797,12 @@ fn remove_elements(vec: &mut Vec, remove_mask: &[bool]) { }); } - pub struct ScanBuilder<'a> { plan: &'a mut PlanBuilder, - scan_idx: usize + scan_idx: usize, } - -impl <'a> ScanBuilder<'a> { +impl<'a> ScanBuilder<'a> { pub fn with_predicate(&mut self, predicate: RowPredicateRef) -> &mut Self { self.scan_mut().predicate = Some(predicate); self @@ -741,13 +812,13 @@ impl <'a> ScanBuilder<'a> { &mut self, output_table: Name, output_key: Vec, - scan_key: Vec + scan_key: Vec, ) -> &mut Self { let rel_idx = self.plan.add_rel(Rel::Join { input_table: self.plan.scans[self.scan_idx].table, input_key: scan_key, output_table, - output_key + output_key, }); self.scan_mut().relations.push(rel_idx); self @@ -756,10 +827,7 @@ impl <'a> ScanBuilder<'a> { pub fn include_children(&mut self) -> &mut Self { let table = &self.plan.scans[self.scan_idx].table; let key = self.plan.tables.get(table).primary_key.clone(); - let rel_idx = self.plan.add_rel(Rel::Children { - table, - key - }); + let rel_idx = self.plan.add_rel(Rel::Children { table, key }); self.scan_mut().relations.push(rel_idx); self } @@ -767,10 +835,7 @@ impl <'a> ScanBuilder<'a> { pub fn include_parents(&mut self) -> &mut Self { let table = &self.plan.scans[self.scan_idx].table; let key = self.plan.tables.get(table).primary_key.clone(); - let rel_idx = self.plan.add_rel(Rel::Parents { - table, - key - }); + let rel_idx = self.plan.add_rel(Rel::Parents { table, key }); self.scan_mut().relations.push(rel_idx); self } @@ -779,13 +844,13 @@ impl <'a> ScanBuilder<'a> { &mut self, output_table: Name, output_key: Vec, - scan_key: Vec + scan_key: Vec, ) -> &mut Self { let rel_idx = self.plan.add_rel(Rel::ForeignChildren { input_table: self.plan.scans[self.scan_idx].table, input_key: scan_key, output_table, - output_key + output_key, }); self.scan_mut().relations.push(rel_idx); self @@ -795,13 +860,13 @@ impl <'a> ScanBuilder<'a> { &mut self, output_table: Name, output_key: Vec, - scan_key: Vec + scan_key: Vec, ) -> &mut Self { let rel_idx = self.plan.add_rel(Rel::ForeignParents { input_table: self.plan.scans[self.scan_idx].table, input_key: scan_key, output_table, - output_key + output_key, }); self.scan_mut().relations.push(rel_idx); self @@ -815,4 +880,4 @@ impl <'a> ScanBuilder<'a> { fn scan_mut(&mut self) -> &mut Scan { &mut self.plan.scans[self.scan_idx] } -} \ No newline at end of file +} diff --git a/crates/query/src/plan/table.rs b/crates/query/src/plan/table.rs index 90920f8f..9dd6238d 100644 --- a/crates/query/src/plan/table.rs +++ b/crates/query/src/plan/table.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use crate::primitives::{Name, RowWeight}; @@ -8,10 +8,16 @@ pub enum ColumnWeight { } +pub enum ColumnDefault { + Null +} + + pub struct Table { pub name: Name, pub primary_key: Vec, pub column_weights: HashMap, + pub column_defaults: HashMap, pub result_item_name: Name, pub children: HashMap> } @@ -33,6 +39,26 @@ impl Table { self } + pub fn set_column_default(&mut self, column: Name, default: ColumnDefault) -> &mut Self { + self.column_defaults.insert(column, default); + self + } + + pub fn set_nullable(&mut self, columns: &[Name]) -> &mut Self { + for col in columns { + self.column_defaults.insert(col, ColumnDefault::Null); + } + self + } + + pub fn default_null_columns(&self) -> HashSet { + self.column_defaults.iter() + .filter_map(|(name, default)| match default { + ColumnDefault::Null => Some(*name) + }) + .collect() + } + pub fn add_child(&mut self, name: Name, key: Vec) -> &mut Self { assert_eq!(key.len(), self.primary_key.len()); self.children.insert(name, key); @@ -88,6 +114,7 @@ impl TableSet { name, primary_key: pk, column_weights: HashMap::new(), + column_defaults: HashMap::new(), result_item_name: name, children: HashMap::new() }); diff --git a/crates/query/src/query/eth.rs b/crates/query/src/query/eth.rs index 6b328f09..cb874d98 100644 --- a/crates/query/src/query/eth.rs +++ b/crates/query/src/query/eth.rs @@ -665,4 +665,4 @@ impl EthQuery { statediffs, ) } -} \ No newline at end of file +} diff --git a/crates/query/src/query/fuel.rs b/crates/query/src/query/fuel.rs index f220d59a..4c9cb950 100644 --- a/crates/query/src/query/fuel.rs +++ b/crates/query/src/query/fuel.rs @@ -641,4 +641,4 @@ impl FuelQuery { outputs, ) } -} \ No newline at end of file +} diff --git a/crates/query/src/query/solana.rs b/crates/query/src/query/solana.rs index 6cde99e7..64ec6df5 100644 --- a/crates/query/src/query/solana.rs +++ b/crates/query/src/query/solana.rs @@ -15,7 +15,7 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" - ]); + ]); tables.add_table("transactions", vec![ "block_number", @@ -806,4 +806,4 @@ impl SolanaQuery { rewards, ) } -} \ No newline at end of file +} diff --git a/crates/query/src/query/substrate.rs b/crates/query/src/query/substrate.rs index 7887d957..84e2a118 100644 --- a/crates/query/src/query/substrate.rs +++ b/crates/query/src/query/substrate.rs @@ -572,4 +572,4 @@ impl SubstrateQuery { , ) } -} \ No newline at end of file +} diff --git a/crates/query/src/query/util.rs b/crates/query/src/query/util.rs index 339b42af..9e3b970b 100644 --- a/crates/query/src/query/util.rs +++ b/crates/query/src/query/util.rs @@ -24,6 +24,10 @@ macro_rules! item_field_selection { let $fields = self; $projection.into() } + + pub fn columns() -> &'static [crate::primitives::Name] { + &[$(stringify!($field)),*] + } } }; } diff --git a/crates/query/src/scan/chunk.rs b/crates/query/src/scan/chunk.rs index de44d3c7..7cd4e482 100644 --- a/crates/query/src/scan/chunk.rs +++ b/crates/query/src/scan/chunk.rs @@ -5,28 +5,3 @@ use crate::scan::scan::Scan; pub trait Chunk: Send + Sync { fn scan_table(&self, name: Name) -> anyhow::Result>; } - - -#[derive(Debug)] -pub struct TableDoesNotExist { - pub table_name: Name -} - - -impl TableDoesNotExist { - pub fn new(table_name: Name) -> Self { - Self { - table_name - } - } -} - - -impl std::fmt::Display for TableDoesNotExist { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "table '{}' does not exist", self.table_name) - } -} - - -impl std::error::Error for TableDoesNotExist {} \ No newline at end of file diff --git a/crates/query/src/scan/errors.rs b/crates/query/src/scan/errors.rs new file mode 100644 index 00000000..ccc975d8 --- /dev/null +++ b/crates/query/src/scan/errors.rs @@ -0,0 +1,50 @@ +use crate::primitives::Name; + +#[derive(Debug)] +pub struct TableDoesNotExist { + pub table_name: Name +} + + +impl TableDoesNotExist { + pub fn new(table_name: Name) -> Self { + Self { + table_name + } + } +} + + +impl std::fmt::Display for TableDoesNotExist { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "table '{}' does not exist", self.table_name) + } +} + + +impl std::error::Error for TableDoesNotExist {} + +#[derive(Debug)] +pub struct ColumnDoesNotExist { + pub column_name: Name, + pub table_name: String, +} + + +impl ColumnDoesNotExist { + pub fn new(table_name: String, column_name: Name) -> Self { + Self { + table_name, + column_name + } + } +} + + +impl std::fmt::Display for ColumnDoesNotExist { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "column '{}' is not found in {}", self.column_name, self.table_name) + } +} + +impl std::error::Error for ColumnDoesNotExist {} diff --git a/crates/query/src/scan/mod.rs b/crates/query/src/scan/mod.rs index d0d1d506..0f33b477 100644 --- a/crates/query/src/scan/mod.rs +++ b/crates/query/src/scan/mod.rs @@ -6,13 +6,15 @@ pub mod parquet; mod reader; mod row_predicate; mod row_predicate_dsl; -mod scan; +pub(crate) mod scan; #[cfg(feature = "storage")] mod storage; +mod errors; mod util; pub use arrow::*; pub use chunk::*; +pub use errors::*; pub use row_predicate::RowPredicateRef; pub use row_predicate_dsl::*; diff --git a/crates/query/src/scan/parquet/file.rs b/crates/query/src/scan/parquet/file.rs index 2f1dfcf5..6c1615e0 100644 --- a/crates/query/src/scan/parquet/file.rs +++ b/crates/query/src/scan/parquet/file.rs @@ -4,8 +4,13 @@ use crate::scan::parquet::metadata::ParquetMetadata; use crate::scan::reader::TableReader; use crate::scan::row_predicate::{RowPredicate, RowPredicateRef}; use crate::scan::util::{add_row_index, build_row_index_array}; -use arrow::array::RecordBatch; -use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector}; +use crate::ColumnDoesNotExist; +use arrow::array::{new_null_array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, + RowSelector, +}; use parquet::arrow::ProjectionMask; use parquet::file::metadata::RowGroupMetaData; use rayon::prelude::*; @@ -13,46 +18,69 @@ use std::cmp::Ordering; use std::collections::HashSet; use std::ops::Not; use std::sync::Arc; -use arrow::datatypes::SchemaRef; - #[derive(Clone)] pub struct ParquetFile { io: MmapIO, metadata: Arc, - filename: Arc + filename: Arc, } - impl ParquetFile { pub fn open>(file: P) -> anyhow::Result { let filename = file.into(); let io = MmapIO::open(&filename)?; - let metadata = ArrowReaderMetadata::load( - &io, - ArrowReaderOptions::new().with_page_index(true) - )?; + let metadata = + ArrowReaderMetadata::load(&io, ArrowReaderOptions::new().with_page_index(true))?; Ok(Self { io, metadata: Arc::new(ParquetMetadata::new(metadata)), - filename: Arc::new(filename) + filename: Arc::new(filename), }) } } - impl TableReader for ParquetFile { + /// Reads record batches from the parquet file with optional filtering and projection. + /// + /// The read proceeds in several stages: + /// + /// 1. **Row group stats pruning**: If the predicate supports stats evaluation, + /// row group-level min/max statistics are checked to skip entire row groups + /// that can't contain matching rows. The result is intersected with any + /// existing row_selection. + /// + /// 2. **Page stats pruning**: Within selected row groups, page-level statistics + /// are checked to further narrow down which pages need to be read. + /// + /// 3. **Projection resolution**: The requested `projection` columns and any + /// additional columns needed by the `predicate` are resolved to parquet + /// schema field indices. Predicate-only columns (not in the projection) + /// are tracked separately so they can be stripped from the output after + /// predicate evaluation. + /// + /// Columns listed in `default_null_columns` that are missing from the + /// parquet schema are skipped during projection resolution. After reading, + /// NullArray columns are injected for them so that predicates and callers + /// see them as all-null. + /// + /// 4. **Parallel row group reading**: Selected row groups are read in parallel. + /// Each row group read applies the projection mask, row selection, and + /// optionally adds a row_index column. After reading, the predicate is + /// evaluated and predicate-only columns are removed from the output. fn read( &self, predicate: Option, projection: Option<&HashSet>, row_selection: Option<&RowRangeList>, - with_row_index: bool + with_row_index: bool, + default_null_columns: Option<&HashSet> ) -> anyhow::Result> { + // Stage 1: Row group stats pruning let mut maybe_new_row_selection = None; if let Some(predicate) = predicate.as_ref() { @@ -88,6 +116,7 @@ impl TableReader for ParquetFile { .collect() }; + // Stage 2: Page stats pruning if let Some(predicate) = predicate.as_ref() { if predicate.can_evaluate_stats() { for (row_group_idx, sel_ptr) in row_groups.iter_mut() { @@ -102,72 +131,117 @@ impl TableReader for ParquetFile { } } row_groups.retain(|rg| { - rg.1.as_ref().map(|ranges| !ranges.is_empty()).unwrap_or(true) + rg.1.as_ref() + .map(|ranges| !ranges.is_empty()) + .unwrap_or(true) }) } } + // Stage 3: Projection resolution — map column names to parquet field indices. + // Predicate columns not already in the projection are tracked separately + // so they can be stripped from the output after predicate evaluation. + // Columns in default_null_columns that are missing from the parquet schema + // are skipped here and injected as NullArrays after reading. + let mut missing_null_columns: Vec = Vec::new(); + let (projection_mask, predicate_columns) = if let Some(columns) = projection { - let predicate_columns = predicate.as_ref() + let predicate_columns = predicate + .as_ref() .map_or([].as_slice(), |p| p.projection()) .iter() - .filter_map(|col| { - columns.contains(col).not().then(|| *col) - }) + .filter_map(|col| columns.contains(col).not().then(|| *col)) .collect::>(); - let mut indices = Vec::with_capacity( - columns.len() + predicate_columns.len() - ); + let mut indices = Vec::with_capacity(columns.len() + predicate_columns.len()); - let fields = self.metadata.metadata().parquet_schema().root_schema().get_fields(); + let fields = self + .metadata + .metadata() + .parquet_schema() + .root_schema() + .get_fields(); for name in columns.iter().chain(predicate_columns.iter()).copied() { - let idx = fields.iter() - .position(|f| f.name() == name) - .ok_or_else(|| { - anyhow::format_err!("column '{}' is not found in {}", name, self.filename) - })?; - indices.push(idx); + match fields.iter().position(|f| f.name() == name) { + Some(idx) => { + indices.push(idx); + } + None => { + if default_null_columns.map_or(false, |dnc| dnc.contains(name)) { + missing_null_columns.push(name); + } else { + tracing::error!("column '{}' is not found in {}", name, self.filename); + anyhow::bail!(ColumnDoesNotExist::new(self.filename.to_string(), name)); + } + } + } } - let projection_mask = ProjectionMask::roots( - parquet_metadata.file_metadata().schema_descr(), - indices - ); + let projection_mask = + ProjectionMask::roots(parquet_metadata.file_metadata().schema_descr(), indices); (projection_mask, predicate_columns) } else { (ProjectionMask::all(), HashSet::new()) }; + tracing::trace!("missing columnds: {missing_null_columns:?}"); + let maybe_row_index_offsets = with_row_index.then(|| { - maybe_row_group_offsets.unwrap_or_else(|| { - build_row_group_offsets(parquet_metadata.row_groups()) - }) + maybe_row_group_offsets + .unwrap_or_else(|| build_row_group_offsets(parquet_metadata.row_groups())) }); - let results: Vec<_> = row_groups.into_par_iter().map(|(row_group_idx, maybe_row_selection)| { - read_row_group( - self.io.clone(), - self.metadata.metadata(), - row_group_idx, - projection_mask.clone(), - predicate.clone().map(|p| (p, &predicate_columns)), - maybe_row_selection, - maybe_row_index_offsets.as_ref().map(|offsets| offsets[row_group_idx]), - 1_000_000_000 - ) - }).collect(); + // Stage 4: Parallel row group reading + let results: Vec<_> = row_groups + .into_par_iter() + .map(|(row_group_idx, maybe_row_selection)| { + read_row_group( + self.io.clone(), + self.metadata.metadata(), + row_group_idx, + projection_mask.clone(), + predicate.clone().map(|p| (p, &predicate_columns)), + maybe_row_selection, + maybe_row_index_offsets + .as_ref() + .map(|offsets| offsets[row_group_idx]), + 1_000_000_000, + ) + }) + .collect(); let mut record_batches = Vec::with_capacity( - results.iter().map(|r| r.as_ref().map_or(0, |bs| bs.len())).sum() + results + .iter() + .map(|r| r.as_ref().map_or(0, |bs| bs.len())) + .sum(), ); for r in results { record_batches.extend(r?) } - + + // Inject NullArray columns for default-null columns missing from parquet + if !missing_null_columns.is_empty() { + record_batches = record_batches + .into_iter() + .map(|batch| { + let num_rows = batch.num_rows(); + let mut fields: Vec<_> = batch.schema().fields().iter().cloned().collect(); + let mut arrays: Vec<_> = batch.columns().to_vec(); + + for col_name in missing_null_columns.iter() { + fields.push(Arc::new(Field::new(*col_name, DataType::Null, true))); + arrays.push(new_null_array(&DataType::Null, num_rows)); + } + + RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays) + }) + .collect::, _>>()?; + } + Ok(record_batches) } @@ -176,7 +250,6 @@ impl TableReader for ParquetFile { } } - fn read_row_group( io: MmapIO, metadata: &ArrowReaderMetadata, @@ -185,13 +258,9 @@ fn read_row_group( maybe_predicate: Option<(RowPredicateRef, &HashSet)>, maybe_row_selection: Option, maybe_row_index_offset: Option, - record_batch_size: usize + record_batch_size: usize, ) -> anyhow::Result> { - - let mut reader = ParquetRecordBatchReaderBuilder::new_with_metadata( - io, - metadata.clone() - ); + let mut reader = ParquetRecordBatchReaderBuilder::new_with_metadata(io, metadata.clone()); reader = reader.with_row_groups(vec![row_group_idx]); reader = reader.with_batch_size(record_batch_size); @@ -232,7 +301,6 @@ fn read_row_group( Ok(result) } - fn build_row_group_offsets(row_groups: &[RowGroupMetaData]) -> Vec { let mut offsets = Vec::with_capacity(row_groups.len() + 1); offsets.push(0); @@ -244,17 +312,16 @@ fn build_row_group_offsets(row_groups: &[RowGroupMetaData]) -> Vec { offsets } - fn apply_predicate( mut batch: RecordBatch, predicate: &dyn RowPredicate, - predicate_columns: &HashSet -) -> anyhow::Result -{ + predicate_columns: &HashSet, +) -> anyhow::Result { let mask = predicate.evaluate(&batch)?; if predicate_columns.len() > 0 { - let projection: Vec = batch.schema() + let projection: Vec = batch + .schema() .fields() .iter() .enumerate() @@ -264,7 +331,8 @@ fn apply_predicate( } else { Some(idx) } - }).collect(); + }) + .collect(); batch = batch.project(&projection)?; } @@ -274,7 +342,6 @@ fn apply_predicate( Ok(batch) } - fn to_parquet_row_selection(ranges: &RowRangeList) -> RowSelection { let mut selectors: Vec = Vec::new(); let mut last_end = 0u32; @@ -296,4 +363,4 @@ fn to_parquet_row_selection(ranges: &RowRangeList) -> RowSelection { } RowSelection::from(selectors) -} \ No newline at end of file +} diff --git a/crates/query/src/scan/reader.rs b/crates/query/src/scan/reader.rs index 7bd058fc..69be7583 100644 --- a/crates/query/src/scan/reader.rs +++ b/crates/query/src/scan/reader.rs @@ -6,13 +6,20 @@ use std::collections::HashSet; pub trait TableReader { + /// Reads record batches with optional filtering, projection, and row selection. + /// + /// When `default_null_columns` is provided, columns listed there that are + /// missing from the underlying data source should be treated as all-null + /// columns rather than causing an error. This applies to both projection + /// and predicate columns. fn read( &self, predicate: Option, projection: Option<&HashSet>, row_selection: Option<&RowRangeList>, - with_row_index: bool + with_row_index: bool, + default_null_columns: Option<&HashSet> ) -> anyhow::Result>; - + fn schema(&self) -> SchemaRef; } diff --git a/crates/query/src/scan/scan.rs b/crates/query/src/scan/scan.rs index 82a2716c..e2140f61 100644 --- a/crates/query/src/scan/scan.rs +++ b/crates/query/src/scan/scan.rs @@ -13,7 +13,8 @@ pub struct Scan<'a> { predicate: Option, projection: Option>, row_selection: Option, - row_index: bool + row_index: bool, + default_null_columns: Option> } @@ -24,10 +25,11 @@ impl <'a> Scan<'a> { predicate: None, projection: None, row_selection: None, - row_index: false + row_index: false, + default_null_columns: None } } - + pub fn schema(&self) -> SchemaRef { self.reader.schema() } @@ -67,12 +69,18 @@ impl <'a> Scan<'a> { self } + pub fn with_default_null_columns(mut self, columns: HashSet) -> Self { + self.default_null_columns = Some(columns); + self + } + pub fn execute(&self) -> anyhow::Result> { self.reader.read( self.predicate.clone(), self.projection.as_ref(), self.row_selection.as_ref(), - self.row_index + self.row_index, + self.default_null_columns.as_ref() ) } @@ -80,4 +88,4 @@ impl <'a> Scan<'a> { let batches = self.execute()?; record_batch_vec_to_lazy_polars_df(&batches) } -} \ No newline at end of file +} diff --git a/crates/query/src/scan/storage/reader.rs b/crates/query/src/scan/storage/reader.rs index c0747896..e06b9544 100644 --- a/crates/query/src/scan/storage/reader.rs +++ b/crates/query/src/scan/storage/reader.rs @@ -15,7 +15,8 @@ impl <'a> TableReader for SnapshotTableReader<'a> { predicate: Option, projection: Option<&HashSet>, row_selection: Option<&RowRangeList>, - with_row_index: bool + with_row_index: bool, + _default_null_columns: Option<&HashSet> ) -> anyhow::Result> { let mut maybe_new_row_selection = None;