From 2c84355f6c99712c64efb87455589ce5aa6627b5 Mon Sep 17 00:00:00 2001 From: Defnull <879658+define-null@users.noreply.github.com> Date: Mon, 9 Mar 2026 18:25:05 +0100 Subject: [PATCH 1/6] Add null default columns support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TableReader` trait** (`reader.rs`) — Added `default_null_columns: Option<&HashSet>` parameter to `read()`. **`Scan`** (`scan.rs`) — Added `default_null_columns` field and `with_default_null_columns()` builder method. Passes it through to `reader.read()`. **`ParquetFile::read()`** (`parquet/file.rs`) — In Stage 3, columns in `default_null_columns` that are missing from the parquet schema are skipped instead of erroring. After reading (Stage 4), `NullArray` columns are injected for them into every record batch. This handles both projection and predicate columns — predicates will see NullArrays and naturally evaluate to false/null for comparisons. **`SnapshotTableReader::read()`** (`storage/reader.rs`) — Accepts the new parameter (unused for now since storage tables are expected to always have all columns). **`execute_output`** (`plan.rs`) — Simplified to use `scan.with_default_null_columns()` instead of manual missing-column detection and null-array injection. --- crates/query/src/json/exp.rs | 19 ++++++ crates/query/src/plan/plan.rs | 46 +++++++++++++- crates/query/src/plan/table.rs | 22 ++++++- crates/query/src/scan/parquet/file.rs | 80 +++++++++++++++++++++---- crates/query/src/scan/reader.rs | 11 +++- crates/query/src/scan/scan.rs | 18 ++++-- crates/query/src/scan/storage/reader.rs | 3 +- 7 files changed, 177 insertions(+), 22 deletions(-) diff --git a/crates/query/src/json/exp.rs b/crates/query/src/json/exp.rs index bbb06adc..efa4f789 100644 --- a/crates/query/src/json/exp.rs +++ b/crates/query/src/json/exp.rs @@ -218,6 +218,16 @@ fn eval_object(array: &dyn Array, props: &Vec<(Name, Exp)>) -> Result) { + out.extend_from_slice(b"null") + } +} + + fn eval_prop(array: &dyn Array, name: Name, exp: &Exp) -> Result { let array: &StructArray = array.as_any().downcast_ref().ok_or_else(|| { schema_error!("expected a StructArray, but got {}", array.data_type()) @@ -227,6 +237,15 @@ fn eval_prop(array: &dyn Array, name: Name, exp: &Exp) -> Result, + default_null_columns: HashSet, weight_per_row: RowWeight, weight_columns: Vec, exp: Exp, @@ -75,6 +76,13 @@ struct PlanExecution<'a> { impl <'a> PlanExecution<'a> { + /// Executes the full query plan against a data chunk. + /// + /// The plan operates across multiple tables (one per output). + /// Execution proceeds in three phases: + /// 1. Scans - find matching row indexes in each table (parallel per scan) + /// 2. Relations - propagate row selections across related tables (e.g. join, children) + /// 3. Output - read actual data for selected rows from each table and build the result fn execute(&self) -> anyhow::Result> { self.check_parent_block()?; @@ -171,6 +179,12 @@ impl <'a> PlanExecution<'a> { } } + /// Phase 1: Execute predicate scans to collect matching row indexes. + /// + /// Each scan targets a single table and collects row indexes that match + /// its predicate. These row indexes are distributed to relation inputs + /// (for cross-table joins in phase 2) and/or output inputs (for direct + /// data reading in phase 3). Scans run in parallel. fn execute_scans( &self, relation_inputs: &Vec, @@ -197,6 +211,12 @@ impl <'a> PlanExecution<'a> { }) } + /// Phase 2: Propagate row selections through relations. + /// + /// Relations link rows between tables (e.g. join, children, parents). + /// For each relation, the matched rows from phase 1 are used to find + /// corresponding rows in the relation's output table, adding them + /// to that table's output inputs. Relations run in parallel. fn execute_relations( &self, relation_inputs: Vec, @@ -214,6 +234,18 @@ impl <'a> PlanExecution<'a> { }) } + /// Phase 3: Read actual column data and build the output. + /// + /// Each output corresponds to a single table (outputs[0] is always the + /// block header table). For each table independently: + /// 1. First pass: read key + weight columns to compute per-block weights + /// and determine which blocks fit in the ~20MB output budget + /// 2. Second pass: read the full projection for selected rows and build + /// DataItem encoders for JSON serialization + /// + /// Columns missing from the parquet file are handled gracefully if they + /// have a default value of Null (via default_null_columns) — they are + /// replaced with NullArrays so the encoder outputs "null" for them. fn execute_output(&self, output_inputs: Vec) -> anyhow::Result> { use sqd_polars::prelude::*; @@ -383,11 +415,16 @@ impl <'a> PlanExecution<'a> { row_index.column("row_index").unwrap().u32()?.into_no_null_iter() ); - let records = self.data_chunk + let mut scan = self.data_chunk .scan_table(output.table)? .with_row_selection(row_selection) - .with_projection(output.projection.clone()) - .execute()?; + .with_projection(output.projection.clone()); + + if !output.default_null_columns.is_empty() { + scan = scan.with_default_null_columns(output.default_null_columns.clone()); + } + + let records = scan.execute()?; let data_item = DataItem::new( output.item_name, @@ -450,6 +487,7 @@ impl PlanBuilder{ table: table.name, key: &table.primary_key, projection: HashSet::new(), + default_null_columns: HashSet::new(), weight_per_row: 0, weight_columns: Vec::new(), exp: Exp::Object(vec![]), @@ -528,6 +566,8 @@ impl PlanBuilder{ output.projection.insert(name); }); + output.default_null_columns = table.default_null_columns(); + let mut per_row = 0; let mut weight_columns = Vec::new(); diff --git a/crates/query/src/plan/table.rs b/crates/query/src/plan/table.rs index 90920f8f..a9f15490 100644 --- a/crates/query/src/plan/table.rs +++ b/crates/query/src/plan/table.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use crate::primitives::{Name, RowWeight}; @@ -8,10 +8,16 @@ pub enum ColumnWeight { } +pub enum ColumnDefault { + Null +} + + pub struct Table { pub name: Name, pub primary_key: Vec, pub column_weights: HashMap, + pub column_defaults: HashMap, pub result_item_name: Name, pub children: HashMap> } @@ -33,6 +39,19 @@ impl Table { self } + pub fn set_column_default(&mut self, column: Name, default: ColumnDefault) -> &mut Self { + self.column_defaults.insert(column, default); + self + } + + pub fn default_null_columns(&self) -> HashSet { + self.column_defaults.iter() + .filter_map(|(name, default)| match default { + ColumnDefault::Null => Some(*name) + }) + .collect() + } + pub fn add_child(&mut self, name: Name, key: Vec) -> &mut Self { assert_eq!(key.len(), self.primary_key.len()); self.children.insert(name, key); @@ -88,6 +107,7 @@ impl TableSet { name, primary_key: pk, column_weights: HashMap::new(), + column_defaults: HashMap::new(), result_item_name: name, children: HashMap::new() }); diff --git a/crates/query/src/scan/parquet/file.rs b/crates/query/src/scan/parquet/file.rs index 2f1dfcf5..2dbb090c 100644 --- a/crates/query/src/scan/parquet/file.rs +++ b/crates/query/src/scan/parquet/file.rs @@ -4,7 +4,8 @@ use crate::scan::parquet::metadata::ParquetMetadata; use crate::scan::reader::TableReader; use crate::scan::row_predicate::{RowPredicate, RowPredicateRef}; use crate::scan::util::{add_row_index, build_row_index_array}; -use arrow::array::RecordBatch; +use arrow::array::{new_null_array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector}; use parquet::arrow::ProjectionMask; use parquet::file::metadata::RowGroupMetaData; @@ -13,7 +14,6 @@ use std::cmp::Ordering; use std::collections::HashSet; use std::ops::Not; use std::sync::Arc; -use arrow::datatypes::SchemaRef; #[derive(Clone)] @@ -45,14 +45,43 @@ impl ParquetFile { impl TableReader for ParquetFile { + /// Reads record batches from the parquet file with optional filtering and projection. + /// + /// The read proceeds in several stages: + /// + /// 1. **Row group stats pruning**: If the predicate supports stats evaluation, + /// row group-level min/max statistics are checked to skip entire row groups + /// that can't contain matching rows. The result is intersected with any + /// existing row_selection. + /// + /// 2. **Page stats pruning**: Within selected row groups, page-level statistics + /// are checked to further narrow down which pages need to be read. + /// + /// 3. **Projection resolution**: The requested `projection` columns and any + /// additional columns needed by the `predicate` are resolved to parquet + /// schema field indices. Predicate-only columns (not in the projection) + /// are tracked separately so they can be stripped from the output after + /// predicate evaluation. + /// + /// Columns listed in `default_null_columns` that are missing from the + /// parquet schema are skipped during projection resolution. After reading, + /// NullArray columns are injected for them so that predicates and callers + /// see them as all-null. + /// + /// 4. **Parallel row group reading**: Selected row groups are read in parallel. + /// Each row group read applies the projection mask, row selection, and + /// optionally adds a row_index column. After reading, the predicate is + /// evaluated and predicate-only columns are removed from the output. fn read( &self, predicate: Option, projection: Option<&HashSet>, row_selection: Option<&RowRangeList>, - with_row_index: bool + with_row_index: bool, + default_null_columns: Option<&HashSet> ) -> anyhow::Result> { + // Stage 1: Row group stats pruning let mut maybe_new_row_selection = None; if let Some(predicate) = predicate.as_ref() { @@ -88,6 +117,7 @@ impl TableReader for ParquetFile { .collect() }; + // Stage 2: Page stats pruning if let Some(predicate) = predicate.as_ref() { if predicate.can_evaluate_stats() { for (row_group_idx, sel_ptr) in row_groups.iter_mut() { @@ -107,6 +137,13 @@ impl TableReader for ParquetFile { } } + // Stage 3: Projection resolution — map column names to parquet field indices. + // Predicate columns not already in the projection are tracked separately + // so they can be stripped from the output after predicate evaluation. + // Columns in default_null_columns that are missing from the parquet schema + // are skipped here and injected as NullArrays after reading. + let mut missing_null_columns: Vec = Vec::new(); + let (projection_mask, predicate_columns) = if let Some(columns) = projection { let predicate_columns = predicate.as_ref() .map_or([].as_slice(), |p| p.projection()) @@ -123,12 +160,18 @@ impl TableReader for ParquetFile { let fields = self.metadata.metadata().parquet_schema().root_schema().get_fields(); for name in columns.iter().chain(predicate_columns.iter()).copied() { - let idx = fields.iter() - .position(|f| f.name() == name) - .ok_or_else(|| { - anyhow::format_err!("column '{}' is not found in {}", name, self.filename) - })?; - indices.push(idx); + match fields.iter().position(|f| f.name() == name) { + Some(idx) => { + indices.push(idx); + } + None => { + if default_null_columns.map_or(false, |dnc| dnc.contains(name)) { + missing_null_columns.push(name); + } else { + anyhow::bail!("column '{}' is not found in {}", name, self.filename); + } + } + } } let projection_mask = ProjectionMask::roots( @@ -147,6 +190,7 @@ impl TableReader for ParquetFile { }) }); + // Stage 4: Parallel row group reading let results: Vec<_> = row_groups.into_par_iter().map(|(row_group_idx, maybe_row_selection)| { read_row_group( self.io.clone(), @@ -167,7 +211,23 @@ impl TableReader for ParquetFile { for r in results { record_batches.extend(r?) } - + + // Inject NullArray columns for default-null columns missing from parquet + if !missing_null_columns.is_empty() { + record_batches = record_batches.into_iter().map(|batch| { + let num_rows = batch.num_rows(); + let mut fields: Vec<_> = batch.schema().fields().iter().cloned().collect(); + let mut arrays: Vec<_> = batch.columns().to_vec(); + + for col_name in missing_null_columns.iter() { + fields.push(Arc::new(Field::new(*col_name, DataType::Null, true))); + arrays.push(new_null_array(&DataType::Null, num_rows)); + } + + RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays) + }).collect::, _>>()?; + } + Ok(record_batches) } diff --git a/crates/query/src/scan/reader.rs b/crates/query/src/scan/reader.rs index 7bd058fc..69be7583 100644 --- a/crates/query/src/scan/reader.rs +++ b/crates/query/src/scan/reader.rs @@ -6,13 +6,20 @@ use std::collections::HashSet; pub trait TableReader { + /// Reads record batches with optional filtering, projection, and row selection. + /// + /// When `default_null_columns` is provided, columns listed there that are + /// missing from the underlying data source should be treated as all-null + /// columns rather than causing an error. This applies to both projection + /// and predicate columns. fn read( &self, predicate: Option, projection: Option<&HashSet>, row_selection: Option<&RowRangeList>, - with_row_index: bool + with_row_index: bool, + default_null_columns: Option<&HashSet> ) -> anyhow::Result>; - + fn schema(&self) -> SchemaRef; } diff --git a/crates/query/src/scan/scan.rs b/crates/query/src/scan/scan.rs index 82a2716c..e2140f61 100644 --- a/crates/query/src/scan/scan.rs +++ b/crates/query/src/scan/scan.rs @@ -13,7 +13,8 @@ pub struct Scan<'a> { predicate: Option, projection: Option>, row_selection: Option, - row_index: bool + row_index: bool, + default_null_columns: Option> } @@ -24,10 +25,11 @@ impl <'a> Scan<'a> { predicate: None, projection: None, row_selection: None, - row_index: false + row_index: false, + default_null_columns: None } } - + pub fn schema(&self) -> SchemaRef { self.reader.schema() } @@ -67,12 +69,18 @@ impl <'a> Scan<'a> { self } + pub fn with_default_null_columns(mut self, columns: HashSet) -> Self { + self.default_null_columns = Some(columns); + self + } + pub fn execute(&self) -> anyhow::Result> { self.reader.read( self.predicate.clone(), self.projection.as_ref(), self.row_selection.as_ref(), - self.row_index + self.row_index, + self.default_null_columns.as_ref() ) } @@ -80,4 +88,4 @@ impl <'a> Scan<'a> { let batches = self.execute()?; record_batch_vec_to_lazy_polars_df(&batches) } -} \ No newline at end of file +} diff --git a/crates/query/src/scan/storage/reader.rs b/crates/query/src/scan/storage/reader.rs index c0747896..e06b9544 100644 --- a/crates/query/src/scan/storage/reader.rs +++ b/crates/query/src/scan/storage/reader.rs @@ -15,7 +15,8 @@ impl <'a> TableReader for SnapshotTableReader<'a> { predicate: Option, projection: Option<&HashSet>, row_selection: Option<&RowRangeList>, - with_row_index: bool + with_row_index: bool, + _default_null_columns: Option<&HashSet> ) -> anyhow::Result> { let mut maybe_new_row_selection = None; From 17b7b4fe51d95e7d6cac0e25baa4ae380dab3d94 Mon Sep 17 00:00:00 2001 From: Defnull <879658+define-null@users.noreply.github.com> Date: Tue, 10 Mar 2026 10:01:43 +0100 Subject: [PATCH 2/6] Add ChunkWithDefaults that wrap raw Chunk and is used everywhere to support default-null columns automatically across all phases --- crates/query/src/plan/plan.rs | 557 +++++++++++++++++++--------------- crates/query/src/scan/mod.rs | 2 +- 2 files changed, 306 insertions(+), 253 deletions(-) diff --git a/crates/query/src/plan/plan.rs b/crates/query/src/plan/plan.rs index 988c5fd3..758b3622 100644 --- a/crates/query/src/plan/plan.rs +++ b/crates/query/src/plan/plan.rs @@ -12,47 +12,46 @@ use sqd_polars::arrow::record_batch_vec_to_lazy_polars_df; use sqd_primitives::BlockRef; use std::collections::{HashMap, HashSet}; - type Idx = usize; - struct Scan { table: Name, predicate: Option, relations: Vec, - output: Option + output: Option, } - struct Output { table: Name, key: &'static [Name], projection: HashSet, - default_null_columns: HashSet, weight_per_row: RowWeight, weight_columns: Vec, exp: Exp, - item_name: Name + item_name: Name, } - pub struct Plan { + tables: &'static TableSet, scans: Vec, relations: Vec, outputs: Vec, include_all_blocks: bool, parent_block_hash: Option, first_block: Option, - last_block: Option + last_block: Option, } - impl Plan { pub fn execute(&self, data_chunk: &dyn Chunk) -> anyhow::Result> { PlanExecution { - data_chunk, + chunk: ChunkWithDefaults { + chunk: data_chunk, + tables: self.tables, + }, plan: self, - }.execute() + } + .execute() } pub fn set_parent_block_hash(&mut self, hash: impl Into>) { @@ -68,14 +67,32 @@ impl Plan { } } +/// A wrapper around `Chunk` that automatically attaches default-null columns +/// from the `TableSet` schema when creating scans. This ensures that columns +/// with a null default that are missing from the underlying data source are +/// treated as all-null rather than causing errors. +struct ChunkWithDefaults<'a> { + chunk: &'a dyn Chunk, + tables: &'static TableSet, +} + +impl Chunk for ChunkWithDefaults<'_> { + fn scan_table(&self, name: Name) -> anyhow::Result> { + let mut scan = self.chunk.scan_table(name)?; + let default_null = self.tables.get(name).default_null_columns(); + if !default_null.is_empty() { + scan = scan.with_default_null_columns(default_null); + } + Ok(scan) + } +} struct PlanExecution<'a> { - data_chunk: &'a dyn Chunk, + chunk: ChunkWithDefaults<'a>, plan: &'a Plan, } - -impl <'a> PlanExecution<'a> { +impl<'a> PlanExecution<'a> { /// Executes the full query plan against a data chunk. /// /// The plan operates across multiple tables (one per output). @@ -86,13 +103,9 @@ impl <'a> PlanExecution<'a> { fn execute(&self) -> anyhow::Result> { self.check_parent_block()?; - let relation_inputs = self.plan.relations.iter() - .map(|_| RowList::new()) - .collect(); + let relation_inputs = self.plan.relations.iter().map(|_| RowList::new()).collect(); - let output_inputs = self.plan.outputs.iter() - .map(|_| RowList::new()) - .collect(); + let output_inputs = self.plan.outputs.iter().map(|_| RowList::new()).collect(); self.execute_scans(&relation_inputs, &output_inputs)?; self.execute_relations(relation_inputs, &output_inputs)?; @@ -102,139 +115,165 @@ impl <'a> PlanExecution<'a> { fn check_parent_block(&self) -> anyhow::Result<()> { let parent_hash = match self.plan.parent_block_hash.as_ref() { Some(s) => s.as_str(), - None => return Ok(()) + None => return Ok(()), }; let block_number = match self.plan.first_block { Some(bn) => bn, - None => bail!("invalid plan: parent block hash is specified, but block number is not available") + None => bail!( + "invalid plan: parent block hash is specified, but block number is not available" + ), }; - let block_scan = self.data_chunk.scan_table(self.plan.outputs[0].table)?; + let block_scan = self.chunk.scan_table(self.plan.outputs[0].table)?; - let mut refs: Vec<_> = if block_scan.schema().column_with_name("parent_number").is_some() { + let mut refs: Vec<_> = if block_scan + .schema() + .column_with_name("parent_number") + .is_some() + { // this is Solana with possible gaps in block numbers let df = block_scan .with_column("parent_number") .with_column("parent_hash") - .with_predicate( - col_between("parent_number", block_number.saturating_sub(100), block_number.saturating_sub(1)) - ) + .with_predicate(col_between( + "parent_number", + block_number.saturating_sub(100), + block_number.saturating_sub(1), + )) .to_lazy_df()? .collect()?; - let numbers = df.column("parent_number")?.cast(&sqd_polars::prelude::DataType::UInt64)?; + let numbers = df + .column("parent_number")? + .cast(&sqd_polars::prelude::DataType::UInt64)?; let numbers = numbers.u64()?; let hashes = df.column("parent_hash")?; let hashes = hashes.str()?; - (0..df.shape().0).map(|i| { - let number = numbers.get(i).expect("block number can't be null according to the predicate applied"); - let hash = hashes.get(i).unwrap_or(""); - BlockRef { - number, - hash: hash.to_string() - } - }).collect() + (0..df.shape().0) + .map(|i| { + let number = numbers + .get(i) + .expect("block number can't be null according to the predicate applied"); + let hash = hashes.get(i).unwrap_or(""); + BlockRef { + number, + hash: hash.to_string(), + } + }) + .collect() } else { let df = block_scan .with_column("number") .with_column("parent_hash") - .with_predicate( - col_between("number", block_number.saturating_sub(100), block_number) - ) + .with_predicate(col_between( + "number", + block_number.saturating_sub(100), + block_number, + )) .to_lazy_df()? .collect()?; - - let numbers = df.column("number")?.cast(&sqd_polars::prelude::DataType::UInt64)?; + + let numbers = df + .column("number")? + .cast(&sqd_polars::prelude::DataType::UInt64)?; let numbers = numbers.u64()?; let hashes = df.column("parent_hash")?; let hashes = hashes.str()?; - (0..df.shape().0).map(|i| { - let number = numbers.get(i).expect("block number can't be null according to the predicate applied"); - let hash = hashes.get(i).unwrap_or(""); - BlockRef { - number: number.saturating_sub(1), - hash: hash.to_string() - } - }).collect() + (0..df.shape().0) + .map(|i| { + let number = numbers + .get(i) + .expect("block number can't be null according to the predicate applied"); + let hash = hashes.get(i).unwrap_or(""); + BlockRef { + number: number.saturating_sub(1), + hash: hash.to_string(), + } + }) + .collect() }; refs.sort_by(|a, b| a.number.cmp(&b.number)); - let parent_block = refs.last().ok_or_else(|| { - anyhow!("block {} is not present in the chunk", block_number) - })?; - + let parent_block = refs + .last() + .ok_or_else(|| anyhow!("block {} is not present in the chunk", block_number))?; + if parent_block.hash == parent_hash { Ok(()) } else { Err(anyhow!(UnexpectedBaseBlock { prev_blocks: refs, expected_hash: parent_hash.to_string() - })) + })) } } - /// Phase 1: Execute predicate scans to collect matching row indexes. + /// Execute predicate scans to collect matching row indexes. /// /// Each scan targets a single table and collects row indexes that match /// its predicate. These row indexes are distributed to relation inputs - /// (for cross-table joins in phase 2) and/or output inputs (for direct - /// data reading in phase 3). Scans run in parallel. + /// (for cross-table joins in `execute_relations`) and/or output inputs (for direct + /// data reading in `execute_output`). Scans run in parallel. fn execute_scans( &self, relation_inputs: &Vec, - output_inputs: &Vec - ) -> anyhow::Result<()> - { - self.plan.scans.par_iter().try_for_each(|scan| -> anyhow::Result<()> { - let rows = self.data_chunk - .scan_table(scan.table)? - .with_row_index(true) - .with_columns([]) - .with_predicate(scan.predicate.clone()) - .execute()?; - - for rel_idx in scan.relations.iter() { - relation_inputs[*rel_idx].extend_from_record_batch_vec(&rows); - } + output_inputs: &Vec, + ) -> anyhow::Result<()> { + self.plan + .scans + .par_iter() + .try_for_each(|scan| -> anyhow::Result<()> { + let rows = self + .chunk + .scan_table(scan.table)? + .with_row_index(true) + .with_columns([]) + .with_predicate(scan.predicate.clone()) + .execute()?; - if let Some(idx) = &scan.output { - output_inputs[*idx].extend_from_record_batch_vec(&rows) - } + for rel_idx in scan.relations.iter() { + relation_inputs[*rel_idx].extend_from_record_batch_vec(&rows); + } - Ok(()) - }) + if let Some(idx) = &scan.output { + output_inputs[*idx].extend_from_record_batch_vec(&rows) + } + + Ok(()) + }) } - /// Phase 2: Propagate row selections through relations. + /// Propagate row selections through relations. /// /// Relations link rows between tables (e.g. join, children, parents). - /// For each relation, the matched rows from phase 1 are used to find + /// For each relation, the matched rows from `execute_scans` are used to find /// corresponding rows in the relation's output table, adding them /// to that table's output inputs. Relations run in parallel. fn execute_relations( &self, relation_inputs: Vec, - output_inputs: &Vec - ) -> anyhow::Result<()> - { - relation_inputs.into_par_iter().enumerate().try_for_each(|(idx, row_list)| -> anyhow::Result<()> { - let input = row_list.into_inner(); - if input.is_empty() { - return Ok(()) - } - let rel = &self.plan.relations[idx]; - let output = &output_inputs[self.get_output_index(rel.output_table())]; - rel.eval(self.data_chunk, &input, output) - }) + output_inputs: &Vec, + ) -> anyhow::Result<()> { + relation_inputs.into_par_iter().enumerate().try_for_each( + |(idx, row_list)| -> anyhow::Result<()> { + let input = row_list.into_inner(); + if input.is_empty() { + return Ok(()); + } + let rel = &self.plan.relations[idx]; + let output = &output_inputs[self.get_output_index(rel.output_table())]; + rel.eval(&self.chunk, &input, output) + }, + ) } - /// Phase 3: Read actual column data and build the output. + /// Read actual column data and build the output. /// /// Each output corresponds to a single table (outputs[0] is always the /// block header table). For each table independently: @@ -249,22 +288,25 @@ impl <'a> PlanExecution<'a> { fn execute_output(&self, output_inputs: Vec) -> anyhow::Result> { use sqd_polars::prelude::*; - let rows = output_inputs.into_par_iter() + let rows = output_inputs + .into_par_iter() .enumerate() .map(|(idx, row_list)| -> anyhow::Result { let output = &self.plan.outputs[idx]; - let maybe_row_selection = if idx == 0 { // block header + let maybe_row_selection = if idx == 0 { + // block header None } else { let row_indexes = row_list.into_inner(); if row_indexes.is_empty() { - return Ok(DataFrame::empty()) + return Ok(DataFrame::empty()); } Some(RowRangeList::from_sorted_indexes(row_indexes)) }; - let record_batches = self.data_chunk + let record_batches = self + .chunk .scan_table(output.table)? .with_row_selection(maybe_row_selection) .with_predicate(self.get_block_number_predicate(idx)) @@ -274,7 +316,7 @@ impl <'a> PlanExecution<'a> { .execute()?; let df = if record_batches.len() == 0 { - return Ok(DataFrame::empty()) + return Ok(DataFrame::empty()); } else { record_batch_vec_to_lazy_polars_df(&record_batches)? }; @@ -285,48 +327,48 @@ impl <'a> PlanExecution<'a> { } weight_exp = weight_exp.alias("weight"); - let rows = df.select([ - col("row_index"), - col(output.key[0]).alias("block_number"), - weight_exp - ]).collect()?; + let rows = df + .select([ + col("row_index"), + col(output.key[0]).alias("block_number"), + weight_exp, + ]) + .collect()?; Ok(rows) - }).collect::>>()?; + }) + .collect::>>()?; let header_rows = &rows[0]; if header_rows.is_empty() { - return Ok(None) + return Ok(None); } let mut item_union = Vec::with_capacity(self.plan.outputs.len() + 1); for df in rows.iter().skip(1).filter(|df| !df.is_empty()) { - item_union.push( - df.clone().lazy().select([ - col("block_number"), - col("weight").strict_cast(RowWeightPolarsType::get_dtype()) - ]) - ) + item_union.push(df.clone().lazy().select([ + col("block_number"), + col("weight").strict_cast(RowWeightPolarsType::get_dtype()), + ])) } let block_weights = if self.plan.include_all_blocks { - item_union.push( - header_rows.clone().lazy().select([ - col("block_number"), - col("weight").strict_cast(RowWeightPolarsType::get_dtype()) - ]) - ); + item_union.push(header_rows.clone().lazy().select([ + col("block_number"), + col("weight").strict_cast(RowWeightPolarsType::get_dtype()), + ])); concat(item_union, UnionArgs::default())? .group_by([col("block_number")]) .agg([sum("weight").alias("weight")]) } else { - let agg = header_rows.clone() + let agg = header_rows + .clone() .lazy() .select([ min("block_number").alias("first_block"), - max("block_number").alias("last_block") + max("block_number").alias("last_block"), ]) .collect()?; @@ -337,33 +379,32 @@ impl <'a> PlanExecution<'a> { item_union.push( DataFrame::new(vec![ block_numbers, - Series::new("weight".into(), &[0 as RowWeight, 0 as RowWeight]) - ])?.lazy() + Series::new("weight".into(), &[0 as RowWeight, 0 as RowWeight]), + ])? + .lazy(), ); let item_stats = concat(item_union, UnionArgs::default())? .group_by([col("block_number")]) .agg([sum("weight").alias("weight")]); - header_rows.clone().lazy().left_join( - item_stats, - col("block_number"), - col("block_number") - ).select([ - col("block_number"), - (col("weight") + col("weight_right")).alias("weight") - ]) + header_rows + .clone() + .lazy() + .left_join(item_stats, col("block_number"), col("block_number")) + .select([ + col("block_number"), + (col("weight") + col("weight_right")).alias("weight"), + ]) }; - let package_weight = block_weights.sort( - ["block_number"], - SortMultipleOptions::default() - ).select([ - col("block_number"), - col("weight").cum_sum(false) - ]).collect()?; + let package_weight = block_weights + .sort(["block_number"], SortMultipleOptions::default()) + .select([col("block_number"), col("weight").cum_sum(false)]) + .collect()?; - let mut selected_blocks = package_weight.clone() + let mut selected_blocks = package_weight + .clone() .lazy() .filter(col("weight").lt_eq(lit(20 * 1024 * 1024))) .select([col("block_number")]) @@ -373,7 +414,8 @@ impl <'a> PlanExecution<'a> { selected_blocks = package_weight.head(Some(1)) } - let last_block: BlockNumber = selected_blocks.column("block_number") + let last_block: BlockNumber = selected_blocks + .column("block_number") .unwrap() .tail(Some(1)) .cast(&DataType::UInt64)? @@ -384,70 +426,66 @@ impl <'a> PlanExecution<'a> { let data_items_mutex = parking_lot::Mutex::new( std::iter::repeat_with(|| None) .take(self.plan.outputs.len()) - .collect::>() + .collect::>(), ); - rows.into_par_iter().enumerate().try_for_each(|(idx, rows)| -> anyhow::Result<()> { - if rows.is_empty() { - return Ok(()); - } + rows.into_par_iter() + .enumerate() + .try_for_each(|(idx, rows)| -> anyhow::Result<()> { + if rows.is_empty() { + return Ok(()); + } - let output = &self.plan.outputs[idx]; + let output = &self.plan.outputs[idx]; - let row_index = if idx == 0 && !self.plan.include_all_blocks { - rows.lazy() - .semi_join( + let row_index = if idx == 0 && !self.plan.include_all_blocks { + rows.lazy().semi_join( selected_blocks.clone().lazy(), col("block_number"), - col("block_number") - ) - } else { - rows.lazy().filter( - col("block_number").lt_eq( - lit(last_block) + col("block_number"), ) - ) - }.select([ - col("row_index") - ]).collect()?; - - let row_selection = RowRangeList::from_sorted_indexes( - row_index.column("row_index").unwrap().u32()?.into_no_null_iter() - ); - - let mut scan = self.data_chunk - .scan_table(output.table)? - .with_row_selection(row_selection) - .with_projection(output.projection.clone()); + } else { + rows.lazy() + .filter(col("block_number").lt_eq(lit(last_block))) + } + .select([col("row_index")]) + .collect()?; - if !output.default_null_columns.is_empty() { - scan = scan.with_default_null_columns(output.default_null_columns.clone()); - } + let row_selection = RowRangeList::from_sorted_indexes( + row_index + .column("row_index") + .unwrap() + .u32()? + .into_no_null_iter(), + ); - let records = scan.execute()?; + let records = self + .chunk + .scan_table(output.table)? + .with_row_selection(row_selection) + .with_projection(output.projection.clone()) + .execute()?; - let data_item = DataItem::new( - output.item_name, - output.key, - records, - &output.exp - )?; + let data_item = DataItem::new(output.item_name, output.key, records, &output.exp)?; - data_items_mutex.lock()[idx] = Some(data_item); + data_items_mutex.lock()[idx] = Some(data_item); - Ok(()) - })?; + Ok(()) + })?; - Ok(Some(BlockWriter::new(data_items_mutex - .into_inner() - .into_iter() - .flatten() - .collect() + Ok(Some(BlockWriter::new( + data_items_mutex + .into_inner() + .into_iter() + .flatten() + .collect(), ))) } fn get_output_index(&self, table: Name) -> usize { - self.plan.outputs.iter() + self.plan + .outputs + .iter() .position(|o| o.table == table) .unwrap() } @@ -458,12 +496,11 @@ impl <'a> PlanExecution<'a> { (Some(fst), Some(lst)) => Some(col_between(column, fst, lst)), (None, Some(lst)) => Some(col_lt_eq(column, lst)), (Some(fst), None) => Some(col_gt_eq(column, fst)), - (None, None) => None + (None, None) => None, } } } - pub struct PlanBuilder { tables: &'static TableSet, scans: Vec, @@ -472,32 +509,31 @@ pub struct PlanBuilder { include_all_blocks: bool, parent_block_hash: Option, first_block: Option, - last_block: Option + last_block: Option, } - -impl PlanBuilder{ +impl PlanBuilder { pub fn new(tables: &'static TableSet) -> Self { PlanBuilder { tables, scans: Vec::new(), relations: Vec::new(), - outputs: tables.iter().map(|table| { - Output { + outputs: tables + .iter() + .map(|table| Output { table: table.name, key: &table.primary_key, projection: HashSet::new(), - default_null_columns: HashSet::new(), weight_per_row: 0, weight_columns: Vec::new(), exp: Exp::Object(vec![]), - item_name: table.result_item_name - } - }).collect(), + item_name: table.result_item_name, + }) + .collect(), include_all_blocks: false, parent_block_hash: None, first_block: None, - last_block: None + last_block: None, } } @@ -512,7 +548,7 @@ impl PlanBuilder{ self.scans.push(scan); ScanBuilder { plan: self, - scan_idx + scan_idx, } } @@ -546,13 +582,14 @@ impl PlanBuilder{ self.simplify(); self.set_output_weights(); Plan { + tables: self.tables, scans: self.scans, relations: self.relations, outputs: self.outputs, include_all_blocks: self.include_all_blocks, parent_block_hash: self.parent_block_hash, first_block: self.first_block, - last_block: self.last_block + last_block: self.last_block, } } @@ -566,13 +603,15 @@ impl PlanBuilder{ output.projection.insert(name); }); - output.default_null_columns = table.default_null_columns(); - let mut per_row = 0; let mut weight_columns = Vec::new(); for col in output.projection.iter() { - match table.column_weights.get(col).unwrap_or(&ColumnWeight::Fixed(32)) { + match table + .column_weights + .get(col) + .unwrap_or(&ColumnWeight::Fixed(32)) + { ColumnWeight::Fixed(weight) => { per_row += weight; } @@ -587,10 +626,13 @@ impl PlanBuilder{ } fn add_rel(&mut self, rel: Rel) -> usize { - self.relations.iter().position(|r| r == &rel).unwrap_or_else(|| { - self.relations.push(rel); - self.relations.len() - 1 - }) + self.relations + .iter() + .position(|r| r == &rel) + .unwrap_or_else(|| { + self.relations.push(rel); + self.relations.len() - 1 + }) } /// Lame plan optimization procedure @@ -664,9 +706,11 @@ impl PlanBuilder{ // Remove relations, that point to fully populated outputs. // We'll make sure, that those outputs remain populated later. - let relations_remove_mask = self.relations.iter().map(|rel| { - is_full[self.tables.get_index(rel.output_table())] - }).collect::>(); + let relations_remove_mask = self + .relations + .iter() + .map(|rel| is_full[self.tables.get_index(rel.output_table())]) + .collect::>(); remove_elements(&mut is_full_rel, &relations_remove_mask); self.remove_relations(&relations_remove_mask); @@ -687,25 +731,35 @@ impl PlanBuilder{ // Introduce new scans to populate that. let mut new_scans: HashMap = HashMap::new(); - for out_idx in is_full.iter().enumerate().filter_map(|(idx, full)| full.then_some(idx)) { + for out_idx in is_full + .iter() + .enumerate() + .filter_map(|(idx, full)| full.then_some(idx)) + { let table = self.outputs[out_idx].table; - new_scans.insert(table, Scan { + new_scans.insert( table, - predicate: None, - relations: vec![], - output: Some(out_idx) - }); - } - - for (idx, rel) in self.relations.iter().enumerate().filter(|(idx, _)| is_full_rel[*idx]) { - let table = rel.output_table(); - let scan = new_scans.entry(table).or_insert_with(|| { Scan { table, predicate: None, relations: vec![], - output: None - } + output: Some(out_idx), + }, + ); + } + + for (idx, rel) in self + .relations + .iter() + .enumerate() + .filter(|(idx, _)| is_full_rel[*idx]) + { + let table = rel.output_table(); + let scan = new_scans.entry(table).or_insert_with(|| Scan { + table, + predicate: None, + relations: vec![], + output: None, }); scan.relations.push(idx); } @@ -717,15 +771,18 @@ impl PlanBuilder{ remove_elements(&mut self.relations, remove_mask); let mut idx = 0; - let index_map = remove_mask.iter().map(|&remove| { - if remove { - None - } else { - let i = idx; - idx += 1; - Some(i) - } - }).collect::>(); + let index_map = remove_mask + .iter() + .map(|&remove| { + if remove { + None + } else { + let i = idx; + idx += 1; + Some(i) + } + }) + .collect::>(); for scan in self.scans.iter_mut() { scan.relations.retain_mut(|rel_idx| { @@ -742,19 +799,23 @@ impl PlanBuilder{ /// checks whether full input implies full output fn is_full_rel(&self, rel: &Rel) -> bool { match rel { - Rel::Join { input_table, input_key, output_table, output_key } => { + Rel::Join { + input_table, + input_key, + output_table, + output_key, + } => { let input_desc = self.tables.get(input_table); input_key == &input_desc.primary_key && input_desc.children.get(output_table) == Some(output_key) - }, + } Rel::Children { .. } => true, Rel::Parents { .. } => true, - _ => false + _ => false, } } } - fn remove_elements(vec: &mut Vec, remove_mask: &[bool]) { let mut idx = 0; vec.retain(|_| { @@ -764,14 +825,12 @@ fn remove_elements(vec: &mut Vec, remove_mask: &[bool]) { }); } - pub struct ScanBuilder<'a> { plan: &'a mut PlanBuilder, - scan_idx: usize + scan_idx: usize, } - -impl <'a> ScanBuilder<'a> { +impl<'a> ScanBuilder<'a> { pub fn with_predicate(&mut self, predicate: RowPredicateRef) -> &mut Self { self.scan_mut().predicate = Some(predicate); self @@ -781,13 +840,13 @@ impl <'a> ScanBuilder<'a> { &mut self, output_table: Name, output_key: Vec, - scan_key: Vec + scan_key: Vec, ) -> &mut Self { let rel_idx = self.plan.add_rel(Rel::Join { input_table: self.plan.scans[self.scan_idx].table, input_key: scan_key, output_table, - output_key + output_key, }); self.scan_mut().relations.push(rel_idx); self @@ -796,10 +855,7 @@ impl <'a> ScanBuilder<'a> { pub fn include_children(&mut self) -> &mut Self { let table = &self.plan.scans[self.scan_idx].table; let key = self.plan.tables.get(table).primary_key.clone(); - let rel_idx = self.plan.add_rel(Rel::Children { - table, - key - }); + let rel_idx = self.plan.add_rel(Rel::Children { table, key }); self.scan_mut().relations.push(rel_idx); self } @@ -807,10 +863,7 @@ impl <'a> ScanBuilder<'a> { pub fn include_parents(&mut self) -> &mut Self { let table = &self.plan.scans[self.scan_idx].table; let key = self.plan.tables.get(table).primary_key.clone(); - let rel_idx = self.plan.add_rel(Rel::Parents { - table, - key - }); + let rel_idx = self.plan.add_rel(Rel::Parents { table, key }); self.scan_mut().relations.push(rel_idx); self } @@ -819,13 +872,13 @@ impl <'a> ScanBuilder<'a> { &mut self, output_table: Name, output_key: Vec, - scan_key: Vec + scan_key: Vec, ) -> &mut Self { let rel_idx = self.plan.add_rel(Rel::ForeignChildren { input_table: self.plan.scans[self.scan_idx].table, input_key: scan_key, output_table, - output_key + output_key, }); self.scan_mut().relations.push(rel_idx); self @@ -835,13 +888,13 @@ impl <'a> ScanBuilder<'a> { &mut self, output_table: Name, output_key: Vec, - scan_key: Vec + scan_key: Vec, ) -> &mut Self { let rel_idx = self.plan.add_rel(Rel::ForeignParents { input_table: self.plan.scans[self.scan_idx].table, input_key: scan_key, output_table, - output_key + output_key, }); self.scan_mut().relations.push(rel_idx); self @@ -855,4 +908,4 @@ impl <'a> ScanBuilder<'a> { fn scan_mut(&mut self) -> &mut Scan { &mut self.plan.scans[self.scan_idx] } -} \ No newline at end of file +} diff --git a/crates/query/src/scan/mod.rs b/crates/query/src/scan/mod.rs index d0d1d506..fe01f675 100644 --- a/crates/query/src/scan/mod.rs +++ b/crates/query/src/scan/mod.rs @@ -6,7 +6,7 @@ pub mod parquet; mod reader; mod row_predicate; mod row_predicate_dsl; -mod scan; +pub(crate) mod scan; #[cfg(feature = "storage")] mod storage; mod util; From 52d63a3d53d744cdc1747699e7bc384b589129bb Mon Sep 17 00:00:00 2001 From: Defnull <879658+define-null@users.noreply.github.com> Date: Tue, 10 Mar 2026 16:31:29 +0100 Subject: [PATCH 3/6] Add tests for missing columnds. --- .../queries/missing_block_fields/README.md | 8 ++++++ .../queries/missing_block_fields/query.json | 16 +++++++++++ .../queries/missing_block_fields/result.json | 3 ++ .../queries/missing_field_with_join/README.md | 3 ++ .../missing_field_with_join/query.json | 28 +++++++++++++++++++ .../missing_field_with_join/result.json | 3 ++ crates/query/src/plan/table.rs | 7 +++++ crates/query/src/query/bitcoin.rs | 12 +++++--- crates/query/src/query/eth.rs | 15 ++++++---- crates/query/src/query/fuel.rs | 15 ++++++---- crates/query/src/query/hyperliquid_fills.rs | 6 ++-- .../src/query/hyperliquid_replica_cmds.rs | 6 ++-- crates/query/src/query/solana.rs | 21 +++++++++----- crates/query/src/query/substrate.rs | 12 +++++--- crates/query/src/query/util.rs | 4 +++ 15 files changed, 130 insertions(+), 29 deletions(-) create mode 100644 crates/query/fixtures/fuel/queries/missing_block_fields/README.md create mode 100644 crates/query/fixtures/fuel/queries/missing_block_fields/query.json create mode 100644 crates/query/fixtures/fuel/queries/missing_block_fields/result.json create mode 100644 crates/query/fixtures/fuel/queries/missing_field_with_join/README.md create mode 100644 crates/query/fixtures/fuel/queries/missing_field_with_join/query.json create mode 100644 crates/query/fixtures/fuel/queries/missing_field_with_join/result.json diff --git a/crates/query/fixtures/fuel/queries/missing_block_fields/README.md b/crates/query/fixtures/fuel/queries/missing_block_fields/README.md new file mode 100644 index 00000000..ac5eef8b --- /dev/null +++ b/crates/query/fixtures/fuel/queries/missing_block_fields/README.md @@ -0,0 +1,8 @@ +This test validates that when nullable columns are missing intirely from the parquet file, such as: + +- eventInboxRoot +- consensusParametersVersion +- stateTransitionBytecodeVersion +- messageOutboxRoot + +The output will include these columnds with null values diff --git a/crates/query/fixtures/fuel/queries/missing_block_fields/query.json b/crates/query/fixtures/fuel/queries/missing_block_fields/query.json new file mode 100644 index 00000000..b8e5d40e --- /dev/null +++ b/crates/query/fixtures/fuel/queries/missing_block_fields/query.json @@ -0,0 +1,16 @@ +{ + "type": "fuel", + "fromBlock": 9000000, + "toBlock": 9000003, + "fields": { + "block": { + "number": true, + "hash": true, + "eventInboxRoot": true, + "consensusParametersVersion": true, + "stateTransitionBytecodeVersion": true, + "messageOutboxRoot": true + } + }, + "includeAllBlocks": false +} diff --git a/crates/query/fixtures/fuel/queries/missing_block_fields/result.json b/crates/query/fixtures/fuel/queries/missing_block_fields/result.json new file mode 100644 index 00000000..13404a7f --- /dev/null +++ b/crates/query/fixtures/fuel/queries/missing_block_fields/result.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:33134c5ff181dbcc4ef7aecda446521e58c291b5a639eef18d2fa900c1bc16d9 +size 583 diff --git a/crates/query/fixtures/fuel/queries/missing_field_with_join/README.md b/crates/query/fixtures/fuel/queries/missing_field_with_join/README.md new file mode 100644 index 00000000..ad89c663 --- /dev/null +++ b/crates/query/fixtures/fuel/queries/missing_field_with_join/README.md @@ -0,0 +1,3 @@ +This test validates that when nullable columns are missing intirely from the parquet file, such as: +- upgrade_purpose +The output will include these columnds with null values diff --git a/crates/query/fixtures/fuel/queries/missing_field_with_join/query.json b/crates/query/fixtures/fuel/queries/missing_field_with_join/query.json new file mode 100644 index 00000000..44074a42 --- /dev/null +++ b/crates/query/fixtures/fuel/queries/missing_field_with_join/query.json @@ -0,0 +1,28 @@ +{ + "type": "fuel", + "fromBlock": 9025510, + "toBlock": 9025549, + "fields": { + "block": { + "number": true, + "hash": true + }, + "receipt": { + "receiptType": true, + "contract": true, + "index": true, + "transactionIndex": true + }, + "transaction": { + "index": true, + "hash": true, + "upgradePurpose": true + } + }, + "receipts": [ + { + "type": ["LOG_DATA"], + "transaction": true + } + ] +} diff --git a/crates/query/fixtures/fuel/queries/missing_field_with_join/result.json b/crates/query/fixtures/fuel/queries/missing_field_with_join/result.json new file mode 100644 index 00000000..ec36baac --- /dev/null +++ b/crates/query/fixtures/fuel/queries/missing_field_with_join/result.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4305c10b962392df8c44de367922fcbbcee89cad10de5718e388a64d3f45cb2c +size 1154 diff --git a/crates/query/src/plan/table.rs b/crates/query/src/plan/table.rs index a9f15490..9dd6238d 100644 --- a/crates/query/src/plan/table.rs +++ b/crates/query/src/plan/table.rs @@ -44,6 +44,13 @@ impl Table { self } + pub fn set_nullable(&mut self, columns: &[Name]) -> &mut Self { + for col in columns { + self.column_defaults.insert(col, ColumnDefault::Null); + } + self + } + pub fn default_null_columns(&self) -> HashSet { self.column_defaults.iter() .filter_map(|(name, default)| match default { diff --git a/crates/query/src/query/bitcoin.rs b/crates/query/src/query/bitcoin.rs index 80a95e36..bb4eb482 100644 --- a/crates/query/src/query/bitcoin.rs +++ b/crates/query/src/query/bitcoin.rs @@ -12,26 +12,30 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" - ]); + ]) + .set_nullable(BlockFieldSelection::columns()); tables.add_table("transactions", vec![ "block_number", "transaction_index" ]) .add_child("inputs", vec!["block_number", "transaction_index"]) - .add_child("outputs", vec!["block_number", "transaction_index"]); + .add_child("outputs", vec!["block_number", "transaction_index"]) + .set_nullable(TransactionFieldSelection::columns()); tables.add_table("inputs", vec![ "block_number", "transaction_index", "input_index" - ]); + ]) + .set_nullable(InputFieldSelection::columns()); tables.add_table("outputs", vec![ "block_number", "transaction_index", "output_index" - ]); + ]) + .set_nullable(OutputFieldSelection::columns()); tables }); diff --git a/crates/query/src/query/eth.rs b/crates/query/src/query/eth.rs index 6b328f09..db3379d2 100644 --- a/crates/query/src/query/eth.rs +++ b/crates/query/src/query/eth.rs @@ -16,7 +16,8 @@ static TABLES: LazyLock = LazyLock::new(|| { "number" ]) .set_weight("logs_bloom", 512) - .set_weight_column("extra_data", "extra_data_size"); + .set_weight_column("extra_data", "extra_data_size") + .set_nullable(BlockFieldSelection::columns()); tables.add_table("transactions", vec![ "block_number", @@ -25,13 +26,15 @@ static TABLES: LazyLock = LazyLock::new(|| { .add_child("logs", vec!["block_number", "transaction_index"]) .add_child("traces", vec!["block_number", "transaction_index"]) .add_child("statediffs", vec!["block_number", "transaction_index"]) - .set_weight_column("input", "input_size"); + .set_weight_column("input", "input_size") + .set_nullable(TransactionFieldSelection::columns()); tables.add_table("logs", vec![ "block_number", "log_index" ]) - .set_weight_column("data", "data_size"); + .set_weight_column("data", "data_size") + .set_nullable(LogFieldSelection::columns()); tables.add_table("traces", vec![ "block_number", @@ -41,7 +44,8 @@ static TABLES: LazyLock = LazyLock::new(|| { .set_weight_column("create_init", "create_init_size") .set_weight_column("create_result_code", "create_result_code_size") .set_weight_column("call_input", "call_input_size") - .set_weight_column("call_result_output", "call_result_output_size"); + .set_weight_column("call_result_output", "call_result_output_size") + .set_nullable(TraceFieldSelection::columns()); tables.add_table("statediffs", vec![ "block_number", @@ -51,7 +55,8 @@ static TABLES: LazyLock = LazyLock::new(|| { ]) .set_weight_column("prev", "prev_size") .set_weight_column("next", "next_size") - .set_result_item_name("stateDiffs"); + .set_result_item_name("stateDiffs") + .set_nullable(StateDiffFieldSelection::columns()); tables }); diff --git a/crates/query/src/query/fuel.rs b/crates/query/src/query/fuel.rs index f220d59a..8319443e 100644 --- a/crates/query/src/query/fuel.rs +++ b/crates/query/src/query/fuel.rs @@ -12,7 +12,8 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" - ]); + ]) + .set_nullable(BlockFieldSelection::columns()); tables.add_table("transactions", vec![ "block_number", @@ -24,14 +25,16 @@ static TABLES: LazyLock = LazyLock::new(|| { .set_weight_column("storage_slots", "storage_slots_size") .set_weight_column("proof_set", "proof_set_size") .set_weight_column("script_data", "script_data_size") - .set_weight_column("raw_payload", "raw_payload_size"); + .set_weight_column("raw_payload", "raw_payload_size") + .set_nullable(TransactionFieldSelection::columns()); tables.add_table("receipts", vec![ "block_number", "transaction_index", "index" ]) - .set_weight_column("data", "data_size"); + .set_weight_column("data", "data_size") + .set_nullable(ReceiptFieldSelection::columns()); tables.add_table("inputs", vec![ "block_number", @@ -39,13 +42,15 @@ static TABLES: LazyLock = LazyLock::new(|| { "index" ]) .set_weight_column("coin_predicate", "coin_predicate_size") - .set_weight_column("message_predicate", "message_predicate_size"); + .set_weight_column("message_predicate", "message_predicate_size") + .set_nullable(InputFieldSelection::columns()); tables.add_table("outputs", vec![ "block_number", "transaction_index", "index" - ]); + ]) + .set_nullable(OutputFieldSelection::columns()); tables }); diff --git a/crates/query/src/query/hyperliquid_fills.rs b/crates/query/src/query/hyperliquid_fills.rs index 0e1c2dc3..6f28420a 100644 --- a/crates/query/src/query/hyperliquid_fills.rs +++ b/crates/query/src/query/hyperliquid_fills.rs @@ -12,12 +12,14 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" - ]); + ]) + .set_nullable(BlockFieldSelection::columns()); tables.add_table("fills", vec![ "block_number", "fill_index" - ]); + ]) + .set_nullable(FillFieldSelection::columns()); tables }); diff --git a/crates/query/src/query/hyperliquid_replica_cmds.rs b/crates/query/src/query/hyperliquid_replica_cmds.rs index 4165a3ba..9f18f054 100644 --- a/crates/query/src/query/hyperliquid_replica_cmds.rs +++ b/crates/query/src/query/hyperliquid_replica_cmds.rs @@ -13,14 +13,16 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" - ]); + ]) + .set_nullable(BlockFieldSelection::columns()); tables.add_table("actions", vec![ "block_number", "action_index" ]) .set_weight_column("action", "action_size") - .set_weight_column("response", "response_size"); + .set_weight_column("response", "response_size") + .set_nullable(ActionFieldSelection::columns()); tables }); diff --git a/crates/query/src/query/solana.rs b/crates/query/src/query/solana.rs index 6cde99e7..1b28ba33 100644 --- a/crates/query/src/query/solana.rs +++ b/crates/query/src/query/solana.rs @@ -15,7 +15,8 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" - ]); + ]) + .set_nullable(BlockFieldSelection::columns()); tables.add_table("transactions", vec![ "block_number", @@ -27,7 +28,8 @@ static TABLES: LazyLock = LazyLock::new(|| { .set_weight_column("account_keys", "account_keys_size") .set_weight_column("address_table_lookups", "address_table_lookups_size") .set_weight_column("signatures", "signatures_size") - .set_weight_column("loaded_addresses", "loaded_addresses_size"); + .set_weight_column("loaded_addresses", "loaded_addresses_size") + .set_nullable(TransactionFieldSelection::columns()); tables.add_table("instructions", vec![ "block_number", @@ -51,32 +53,37 @@ static TABLES: LazyLock = LazyLock::new(|| { .set_weight("a13", 0) .set_weight("a14", 0) .set_weight("a15", 0) - .set_weight("rest_accounts", 0); + .set_weight("rest_accounts", 0) + .set_nullable(InstructionFieldSelection::columns()); tables.add_table("logs", vec![ "block_number", "transaction_index", "log_index" ]) - .set_weight_column("message", "message_size"); + .set_weight_column("message", "message_size") + .set_nullable(LogFieldSelection::columns()); tables.add_table("balances", vec![ "block_number", "transaction_index", "account" - ]); + ]) + .set_nullable(BalanceFieldSelection::columns()); tables.add_table("token_balances", vec![ "block_number", "transaction_index", "account" - ]); + ]) + .set_nullable(TokenBalanceFieldSelection::columns()); tables.add_table("rewards", vec![ "block_number", "pubkey", "reward_type" - ]); + ]) + .set_nullable(RewardFieldSelection::columns()); tables }); diff --git a/crates/query/src/query/substrate.rs b/crates/query/src/query/substrate.rs index 7887d957..774b8993 100644 --- a/crates/query/src/query/substrate.rs +++ b/crates/query/src/query/substrate.rs @@ -13,27 +13,31 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" ]) - .set_weight("digest", 32 * 4); + .set_weight("digest", 32 * 4) + .set_nullable(BlockFieldSelection::columns()); tables.add_table("events", vec![ "block_number", "index" ]) - .set_weight_column("args", "args_size"); + .set_weight_column("args", "args_size") + .set_nullable(EventFieldSelection::columns()); tables.add_table("calls", vec![ "block_number", "extrinsic_index", "address" ]) - .set_weight_column("args", "args_size"); + .set_weight_column("args", "args_size") + .set_nullable(CallFieldSelection::columns()); tables.add_table("extrinsics", vec![ "block_number", "index" ]) .add_child("calls", vec!["block_number", "extrinsic_index"]) - .set_weight("signature", 4 * 32); + .set_weight("signature", 4 * 32) + .set_nullable(ExtrinsicFieldSelection::columns()); tables }); diff --git a/crates/query/src/query/util.rs b/crates/query/src/query/util.rs index 339b42af..9e3b970b 100644 --- a/crates/query/src/query/util.rs +++ b/crates/query/src/query/util.rs @@ -24,6 +24,10 @@ macro_rules! item_field_selection { let $fields = self; $projection.into() } + + pub fn columns() -> &'static [crate::primitives::Name] { + &[$(stringify!($field)),*] + } } }; } From 5ef5bbb1768e80f695d7ece0788a203881bec4b4 Mon Sep 17 00:00:00 2001 From: Defnull <879658+define-null@users.noreply.github.com> Date: Tue, 10 Mar 2026 16:50:52 +0100 Subject: [PATCH 4/6] Remove some code duplication --- crates/query/src/plan/plan.rs | 102 ++++++++++++---------------------- 1 file changed, 37 insertions(+), 65 deletions(-) diff --git a/crates/query/src/plan/plan.rs b/crates/query/src/plan/plan.rs index 758b3622..0391ab08 100644 --- a/crates/query/src/plan/plan.rs +++ b/crates/query/src/plan/plan.rs @@ -127,77 +127,49 @@ impl<'a> PlanExecution<'a> { let block_scan = self.chunk.scan_table(self.plan.outputs[0].table)?; - let mut refs: Vec<_> = if block_scan + let has_parent_number = block_scan .schema() .column_with_name("parent_number") - .is_some() - { - // this is Solana with possible gaps in block numbers - let df = block_scan - .with_column("parent_number") - .with_column("parent_hash") - .with_predicate(col_between( - "parent_number", - block_number.saturating_sub(100), - block_number.saturating_sub(1), - )) - .to_lazy_df()? - .collect()?; + .is_some(); - let numbers = df - .column("parent_number")? - .cast(&sqd_polars::prelude::DataType::UInt64)?; - let numbers = numbers.u64()?; - - let hashes = df.column("parent_hash")?; - let hashes = hashes.str()?; - - (0..df.shape().0) - .map(|i| { - let number = numbers - .get(i) - .expect("block number can't be null according to the predicate applied"); - let hash = hashes.get(i).unwrap_or(""); - BlockRef { - number, - hash: hash.to_string(), - } - }) - .collect() + let (number_col, predicate_upper) = if has_parent_number { + ("parent_number", block_number.saturating_sub(1)) } else { - let df = block_scan - .with_column("number") - .with_column("parent_hash") - .with_predicate(col_between( - "number", - block_number.saturating_sub(100), - block_number, - )) - .to_lazy_df()? - .collect()?; - - let numbers = df - .column("number")? - .cast(&sqd_polars::prelude::DataType::UInt64)?; - let numbers = numbers.u64()?; - - let hashes = df.column("parent_hash")?; - let hashes = hashes.str()?; - - (0..df.shape().0) - .map(|i| { - let number = numbers - .get(i) - .expect("block number can't be null according to the predicate applied"); - let hash = hashes.get(i).unwrap_or(""); - BlockRef { - number: number.saturating_sub(1), - hash: hash.to_string(), - } - }) - .collect() + ("number", block_number) }; + let df = block_scan + .with_column(number_col) + .with_column("parent_hash") + .with_predicate(col_between( + number_col, + block_number.saturating_sub(100), + predicate_upper, + )) + .to_lazy_df()? + .collect()?; + + let numbers = df + .column(number_col)? + .cast(&sqd_polars::prelude::DataType::UInt64)?; + let numbers = numbers.u64()?; + + let hashes = df.column("parent_hash")?; + let hashes = hashes.str()?; + + let mut refs: Vec<_> = (0..df.shape().0) + .map(|i| { + let number = numbers + .get(i) + .expect("block number can't be null according to the predicate applied"); + let hash = hashes.get(i).unwrap_or(""); + BlockRef { + number: if has_parent_number { number } else { number.saturating_sub(1) }, + hash: hash.to_string(), + } + }) + .collect(); + refs.sort_by(|a, b| a.number.cmp(&b.number)); let parent_block = refs From ab4d4c73ab87e8d8d8c8e3b2712aca04a1a95a00 Mon Sep 17 00:00:00 2001 From: Defnull <879658+define-null@users.noreply.github.com> Date: Wed, 11 Mar 2026 19:01:35 +0100 Subject: [PATCH 5/6] Add optional tracing to query crate. Treat missing authorization_list as an error --- Cargo.lock | 1 + crates/query/Cargo.toml | 4 + crates/query/src/query/eth.rs | 5 +- crates/query/src/scan/parquet/file.rs | 138 ++++++++++++++------------ 4 files changed, 81 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index becafa4d..754ce071 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4967,6 +4967,7 @@ dependencies = [ "sqd-storage", "tempfile", "tikv-jemallocator", + "tracing", ] [[package]] diff --git a/crates/query/Cargo.toml b/crates/query/Cargo.toml index 37bb19c3..8e5bfbdf 100644 --- a/crates/query/Cargo.toml +++ b/crates/query/Cargo.toml @@ -24,6 +24,7 @@ sqd-bloom-filter = { path = "../bloom-filter" } sqd-polars = { path = "../polars" } sqd-primitives = { path = "../primitives", features = ["range"] } sqd-storage = { path = "../storage", optional = true } +tracing = "0.1.40" [dev-dependencies] glob = "0.3.1" @@ -37,6 +38,9 @@ sqd-dataset = { path = "../dataset" } tikv-jemallocator = "0.6" [features] +max_level_trace = ["tracing/max_level_trace"] +max_level_debug = ["tracing/max_level_debug"] +max_level_info = ["tracing/max_level_info"] parquet = [ "dep:parquet", "dep:memmap2" diff --git a/crates/query/src/query/eth.rs b/crates/query/src/query/eth.rs index db3379d2..2ded8df9 100644 --- a/crates/query/src/query/eth.rs +++ b/crates/query/src/query/eth.rs @@ -27,7 +27,10 @@ static TABLES: LazyLock = LazyLock::new(|| { .add_child("traces", vec!["block_number", "transaction_index"]) .add_child("statediffs", vec!["block_number", "transaction_index"]) .set_weight_column("input", "input_size") - .set_nullable(TransactionFieldSelection::columns()); + .set_nullable(&TransactionFieldSelection::columns().iter() + .filter(|c| **c != "authorization_list") + .copied() + .collect::>()); tables.add_table("logs", vec![ "block_number", diff --git a/crates/query/src/scan/parquet/file.rs b/crates/query/src/scan/parquet/file.rs index 2dbb090c..904f0e8c 100644 --- a/crates/query/src/scan/parquet/file.rs +++ b/crates/query/src/scan/parquet/file.rs @@ -6,7 +6,10 @@ use crate::scan::row_predicate::{RowPredicate, RowPredicateRef}; use crate::scan::util::{add_row_index, build_row_index_array}; use arrow::array::{new_null_array, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector}; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, + RowSelector, +}; use parquet::arrow::ProjectionMask; use parquet::file::metadata::RowGroupMetaData; use rayon::prelude::*; @@ -15,35 +18,30 @@ use std::collections::HashSet; use std::ops::Not; use std::sync::Arc; - #[derive(Clone)] pub struct ParquetFile { io: MmapIO, metadata: Arc, - filename: Arc + filename: Arc, } - impl ParquetFile { pub fn open>(file: P) -> anyhow::Result { let filename = file.into(); let io = MmapIO::open(&filename)?; - let metadata = ArrowReaderMetadata::load( - &io, - ArrowReaderOptions::new().with_page_index(true) - )?; + let metadata = + ArrowReaderMetadata::load(&io, ArrowReaderOptions::new().with_page_index(true))?; Ok(Self { io, metadata: Arc::new(ParquetMetadata::new(metadata)), - filename: Arc::new(filename) + filename: Arc::new(filename), }) } } - impl TableReader for ParquetFile { /// Reads record batches from the parquet file with optional filtering and projection. /// @@ -132,7 +130,9 @@ impl TableReader for ParquetFile { } } row_groups.retain(|rg| { - rg.1.as_ref().map(|ranges| !ranges.is_empty()).unwrap_or(true) + rg.1.as_ref() + .map(|ranges| !ranges.is_empty()) + .unwrap_or(true) }) } } @@ -145,19 +145,21 @@ impl TableReader for ParquetFile { let mut missing_null_columns: Vec = Vec::new(); let (projection_mask, predicate_columns) = if let Some(columns) = projection { - let predicate_columns = predicate.as_ref() + let predicate_columns = predicate + .as_ref() .map_or([].as_slice(), |p| p.projection()) .iter() - .filter_map(|col| { - columns.contains(col).not().then(|| *col) - }) + .filter_map(|col| columns.contains(col).not().then(|| *col)) .collect::>(); - let mut indices = Vec::with_capacity( - columns.len() + predicate_columns.len() - ); + let mut indices = Vec::with_capacity(columns.len() + predicate_columns.len()); - let fields = self.metadata.metadata().parquet_schema().root_schema().get_fields(); + let fields = self + .metadata + .metadata() + .parquet_schema() + .root_schema() + .get_fields(); for name in columns.iter().chain(predicate_columns.iter()).copied() { match fields.iter().position(|f| f.name() == name) { @@ -168,44 +170,52 @@ impl TableReader for ParquetFile { if default_null_columns.map_or(false, |dnc| dnc.contains(name)) { missing_null_columns.push(name); } else { + tracing::error!("column '{}' is not found in {}", name, self.filename); anyhow::bail!("column '{}' is not found in {}", name, self.filename); } } } } - let projection_mask = ProjectionMask::roots( - parquet_metadata.file_metadata().schema_descr(), - indices - ); + let projection_mask = + ProjectionMask::roots(parquet_metadata.file_metadata().schema_descr(), indices); (projection_mask, predicate_columns) } else { (ProjectionMask::all(), HashSet::new()) }; + tracing::trace!("missing columnds: {missing_null_columns:?}"); + let maybe_row_index_offsets = with_row_index.then(|| { - maybe_row_group_offsets.unwrap_or_else(|| { - build_row_group_offsets(parquet_metadata.row_groups()) - }) + maybe_row_group_offsets + .unwrap_or_else(|| build_row_group_offsets(parquet_metadata.row_groups())) }); // Stage 4: Parallel row group reading - let results: Vec<_> = row_groups.into_par_iter().map(|(row_group_idx, maybe_row_selection)| { - read_row_group( - self.io.clone(), - self.metadata.metadata(), - row_group_idx, - projection_mask.clone(), - predicate.clone().map(|p| (p, &predicate_columns)), - maybe_row_selection, - maybe_row_index_offsets.as_ref().map(|offsets| offsets[row_group_idx]), - 1_000_000_000 - ) - }).collect(); + let results: Vec<_> = row_groups + .into_par_iter() + .map(|(row_group_idx, maybe_row_selection)| { + read_row_group( + self.io.clone(), + self.metadata.metadata(), + row_group_idx, + projection_mask.clone(), + predicate.clone().map(|p| (p, &predicate_columns)), + maybe_row_selection, + maybe_row_index_offsets + .as_ref() + .map(|offsets| offsets[row_group_idx]), + 1_000_000_000, + ) + }) + .collect(); let mut record_batches = Vec::with_capacity( - results.iter().map(|r| r.as_ref().map_or(0, |bs| bs.len())).sum() + results + .iter() + .map(|r| r.as_ref().map_or(0, |bs| bs.len())) + .sum(), ); for r in results { @@ -214,18 +224,21 @@ impl TableReader for ParquetFile { // Inject NullArray columns for default-null columns missing from parquet if !missing_null_columns.is_empty() { - record_batches = record_batches.into_iter().map(|batch| { - let num_rows = batch.num_rows(); - let mut fields: Vec<_> = batch.schema().fields().iter().cloned().collect(); - let mut arrays: Vec<_> = batch.columns().to_vec(); - - for col_name in missing_null_columns.iter() { - fields.push(Arc::new(Field::new(*col_name, DataType::Null, true))); - arrays.push(new_null_array(&DataType::Null, num_rows)); - } + record_batches = record_batches + .into_iter() + .map(|batch| { + let num_rows = batch.num_rows(); + let mut fields: Vec<_> = batch.schema().fields().iter().cloned().collect(); + let mut arrays: Vec<_> = batch.columns().to_vec(); + + for col_name in missing_null_columns.iter() { + fields.push(Arc::new(Field::new(*col_name, DataType::Null, true))); + arrays.push(new_null_array(&DataType::Null, num_rows)); + } - RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays) - }).collect::, _>>()?; + RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays) + }) + .collect::, _>>()?; } Ok(record_batches) @@ -236,7 +249,6 @@ impl TableReader for ParquetFile { } } - fn read_row_group( io: MmapIO, metadata: &ArrowReaderMetadata, @@ -245,13 +257,9 @@ fn read_row_group( maybe_predicate: Option<(RowPredicateRef, &HashSet)>, maybe_row_selection: Option, maybe_row_index_offset: Option, - record_batch_size: usize + record_batch_size: usize, ) -> anyhow::Result> { - - let mut reader = ParquetRecordBatchReaderBuilder::new_with_metadata( - io, - metadata.clone() - ); + let mut reader = ParquetRecordBatchReaderBuilder::new_with_metadata(io, metadata.clone()); reader = reader.with_row_groups(vec![row_group_idx]); reader = reader.with_batch_size(record_batch_size); @@ -292,7 +300,6 @@ fn read_row_group( Ok(result) } - fn build_row_group_offsets(row_groups: &[RowGroupMetaData]) -> Vec { let mut offsets = Vec::with_capacity(row_groups.len() + 1); offsets.push(0); @@ -304,17 +311,16 @@ fn build_row_group_offsets(row_groups: &[RowGroupMetaData]) -> Vec { offsets } - fn apply_predicate( mut batch: RecordBatch, predicate: &dyn RowPredicate, - predicate_columns: &HashSet -) -> anyhow::Result -{ + predicate_columns: &HashSet, +) -> anyhow::Result { let mask = predicate.evaluate(&batch)?; if predicate_columns.len() > 0 { - let projection: Vec = batch.schema() + let projection: Vec = batch + .schema() .fields() .iter() .enumerate() @@ -324,7 +330,8 @@ fn apply_predicate( } else { Some(idx) } - }).collect(); + }) + .collect(); batch = batch.project(&projection)?; } @@ -334,7 +341,6 @@ fn apply_predicate( Ok(batch) } - fn to_parquet_row_selection(ranges: &RowRangeList) -> RowSelection { let mut selectors: Vec = Vec::new(); let mut last_end = 0u32; @@ -356,4 +362,4 @@ fn to_parquet_row_selection(ranges: &RowRangeList) -> RowSelection { } RowSelection::from(selectors) -} \ No newline at end of file +} From 284899d18643754cc7d058b7f2a2f4df726d04f5 Mon Sep 17 00:00:00 2001 From: Defnull <879658+define-null@users.noreply.github.com> Date: Mon, 16 Mar 2026 09:11:58 +0100 Subject: [PATCH 6/6] Convert column does not exist to distinct error type, remove nullable defaults --- crates/query/src/lib.rs | 2 +- crates/query/src/query/bitcoin.rs | 12 ++--- crates/query/src/query/eth.rs | 20 +++----- crates/query/src/query/fuel.rs | 17 +++---- crates/query/src/query/hyperliquid_fills.rs | 6 +-- .../src/query/hyperliquid_replica_cmds.rs | 6 +-- crates/query/src/query/solana.rs | 23 +++------ crates/query/src/query/substrate.rs | 14 ++---- crates/query/src/scan/chunk.rs | 25 ---------- crates/query/src/scan/errors.rs | 50 +++++++++++++++++++ crates/query/src/scan/mod.rs | 2 + crates/query/src/scan/parquet/file.rs | 3 +- 12 files changed, 88 insertions(+), 92 deletions(-) create mode 100644 crates/query/src/scan/errors.rs diff --git a/crates/query/src/lib.rs b/crates/query/src/lib.rs index 9b67d80d..318e4c23 100644 --- a/crates/query/src/lib.rs +++ b/crates/query/src/lib.rs @@ -13,5 +13,5 @@ pub use primitives::BlockNumber; pub use query::*; #[cfg(feature = "parquet")] pub use scan::parquet::ParquetChunk; -pub use scan::{Chunk, TableDoesNotExist}; +pub use scan::{Chunk, TableDoesNotExist, ColumnDoesNotExist}; pub use sqd_polars::set_polars_thread_pool_size; diff --git a/crates/query/src/query/bitcoin.rs b/crates/query/src/query/bitcoin.rs index bb4eb482..80a95e36 100644 --- a/crates/query/src/query/bitcoin.rs +++ b/crates/query/src/query/bitcoin.rs @@ -12,30 +12,26 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" - ]) - .set_nullable(BlockFieldSelection::columns()); + ]); tables.add_table("transactions", vec![ "block_number", "transaction_index" ]) .add_child("inputs", vec!["block_number", "transaction_index"]) - .add_child("outputs", vec!["block_number", "transaction_index"]) - .set_nullable(TransactionFieldSelection::columns()); + .add_child("outputs", vec!["block_number", "transaction_index"]); tables.add_table("inputs", vec![ "block_number", "transaction_index", "input_index" - ]) - .set_nullable(InputFieldSelection::columns()); + ]); tables.add_table("outputs", vec![ "block_number", "transaction_index", "output_index" - ]) - .set_nullable(OutputFieldSelection::columns()); + ]); tables }); diff --git a/crates/query/src/query/eth.rs b/crates/query/src/query/eth.rs index 2ded8df9..cb874d98 100644 --- a/crates/query/src/query/eth.rs +++ b/crates/query/src/query/eth.rs @@ -16,8 +16,7 @@ static TABLES: LazyLock = LazyLock::new(|| { "number" ]) .set_weight("logs_bloom", 512) - .set_weight_column("extra_data", "extra_data_size") - .set_nullable(BlockFieldSelection::columns()); + .set_weight_column("extra_data", "extra_data_size"); tables.add_table("transactions", vec![ "block_number", @@ -26,18 +25,13 @@ static TABLES: LazyLock = LazyLock::new(|| { .add_child("logs", vec!["block_number", "transaction_index"]) .add_child("traces", vec!["block_number", "transaction_index"]) .add_child("statediffs", vec!["block_number", "transaction_index"]) - .set_weight_column("input", "input_size") - .set_nullable(&TransactionFieldSelection::columns().iter() - .filter(|c| **c != "authorization_list") - .copied() - .collect::>()); + .set_weight_column("input", "input_size"); tables.add_table("logs", vec![ "block_number", "log_index" ]) - .set_weight_column("data", "data_size") - .set_nullable(LogFieldSelection::columns()); + .set_weight_column("data", "data_size"); tables.add_table("traces", vec![ "block_number", @@ -47,8 +41,7 @@ static TABLES: LazyLock = LazyLock::new(|| { .set_weight_column("create_init", "create_init_size") .set_weight_column("create_result_code", "create_result_code_size") .set_weight_column("call_input", "call_input_size") - .set_weight_column("call_result_output", "call_result_output_size") - .set_nullable(TraceFieldSelection::columns()); + .set_weight_column("call_result_output", "call_result_output_size"); tables.add_table("statediffs", vec![ "block_number", @@ -58,8 +51,7 @@ static TABLES: LazyLock = LazyLock::new(|| { ]) .set_weight_column("prev", "prev_size") .set_weight_column("next", "next_size") - .set_result_item_name("stateDiffs") - .set_nullable(StateDiffFieldSelection::columns()); + .set_result_item_name("stateDiffs"); tables }); @@ -673,4 +665,4 @@ impl EthQuery { statediffs, ) } -} \ No newline at end of file +} diff --git a/crates/query/src/query/fuel.rs b/crates/query/src/query/fuel.rs index 8319443e..4c9cb950 100644 --- a/crates/query/src/query/fuel.rs +++ b/crates/query/src/query/fuel.rs @@ -12,8 +12,7 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" - ]) - .set_nullable(BlockFieldSelection::columns()); + ]); tables.add_table("transactions", vec![ "block_number", @@ -25,16 +24,14 @@ static TABLES: LazyLock = LazyLock::new(|| { .set_weight_column("storage_slots", "storage_slots_size") .set_weight_column("proof_set", "proof_set_size") .set_weight_column("script_data", "script_data_size") - .set_weight_column("raw_payload", "raw_payload_size") - .set_nullable(TransactionFieldSelection::columns()); + .set_weight_column("raw_payload", "raw_payload_size"); tables.add_table("receipts", vec![ "block_number", "transaction_index", "index" ]) - .set_weight_column("data", "data_size") - .set_nullable(ReceiptFieldSelection::columns()); + .set_weight_column("data", "data_size"); tables.add_table("inputs", vec![ "block_number", @@ -42,15 +39,13 @@ static TABLES: LazyLock = LazyLock::new(|| { "index" ]) .set_weight_column("coin_predicate", "coin_predicate_size") - .set_weight_column("message_predicate", "message_predicate_size") - .set_nullable(InputFieldSelection::columns()); + .set_weight_column("message_predicate", "message_predicate_size"); tables.add_table("outputs", vec![ "block_number", "transaction_index", "index" - ]) - .set_nullable(OutputFieldSelection::columns()); + ]); tables }); @@ -646,4 +641,4 @@ impl FuelQuery { outputs, ) } -} \ No newline at end of file +} diff --git a/crates/query/src/query/hyperliquid_fills.rs b/crates/query/src/query/hyperliquid_fills.rs index 6f28420a..0e1c2dc3 100644 --- a/crates/query/src/query/hyperliquid_fills.rs +++ b/crates/query/src/query/hyperliquid_fills.rs @@ -12,14 +12,12 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" - ]) - .set_nullable(BlockFieldSelection::columns()); + ]); tables.add_table("fills", vec![ "block_number", "fill_index" - ]) - .set_nullable(FillFieldSelection::columns()); + ]); tables }); diff --git a/crates/query/src/query/hyperliquid_replica_cmds.rs b/crates/query/src/query/hyperliquid_replica_cmds.rs index 9f18f054..4165a3ba 100644 --- a/crates/query/src/query/hyperliquid_replica_cmds.rs +++ b/crates/query/src/query/hyperliquid_replica_cmds.rs @@ -13,16 +13,14 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" - ]) - .set_nullable(BlockFieldSelection::columns()); + ]); tables.add_table("actions", vec![ "block_number", "action_index" ]) .set_weight_column("action", "action_size") - .set_weight_column("response", "response_size") - .set_nullable(ActionFieldSelection::columns()); + .set_weight_column("response", "response_size"); tables }); diff --git a/crates/query/src/query/solana.rs b/crates/query/src/query/solana.rs index 1b28ba33..64ec6df5 100644 --- a/crates/query/src/query/solana.rs +++ b/crates/query/src/query/solana.rs @@ -15,8 +15,7 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" - ]) - .set_nullable(BlockFieldSelection::columns()); + ]); tables.add_table("transactions", vec![ "block_number", @@ -28,8 +27,7 @@ static TABLES: LazyLock = LazyLock::new(|| { .set_weight_column("account_keys", "account_keys_size") .set_weight_column("address_table_lookups", "address_table_lookups_size") .set_weight_column("signatures", "signatures_size") - .set_weight_column("loaded_addresses", "loaded_addresses_size") - .set_nullable(TransactionFieldSelection::columns()); + .set_weight_column("loaded_addresses", "loaded_addresses_size"); tables.add_table("instructions", vec![ "block_number", @@ -53,37 +51,32 @@ static TABLES: LazyLock = LazyLock::new(|| { .set_weight("a13", 0) .set_weight("a14", 0) .set_weight("a15", 0) - .set_weight("rest_accounts", 0) - .set_nullable(InstructionFieldSelection::columns()); + .set_weight("rest_accounts", 0); tables.add_table("logs", vec![ "block_number", "transaction_index", "log_index" ]) - .set_weight_column("message", "message_size") - .set_nullable(LogFieldSelection::columns()); + .set_weight_column("message", "message_size"); tables.add_table("balances", vec![ "block_number", "transaction_index", "account" - ]) - .set_nullable(BalanceFieldSelection::columns()); + ]); tables.add_table("token_balances", vec![ "block_number", "transaction_index", "account" - ]) - .set_nullable(TokenBalanceFieldSelection::columns()); + ]); tables.add_table("rewards", vec![ "block_number", "pubkey", "reward_type" - ]) - .set_nullable(RewardFieldSelection::columns()); + ]); tables }); @@ -813,4 +806,4 @@ impl SolanaQuery { rewards, ) } -} \ No newline at end of file +} diff --git a/crates/query/src/query/substrate.rs b/crates/query/src/query/substrate.rs index 774b8993..84e2a118 100644 --- a/crates/query/src/query/substrate.rs +++ b/crates/query/src/query/substrate.rs @@ -13,31 +13,27 @@ static TABLES: LazyLock = LazyLock::new(|| { tables.add_table("blocks", vec![ "number" ]) - .set_weight("digest", 32 * 4) - .set_nullable(BlockFieldSelection::columns()); + .set_weight("digest", 32 * 4); tables.add_table("events", vec![ "block_number", "index" ]) - .set_weight_column("args", "args_size") - .set_nullable(EventFieldSelection::columns()); + .set_weight_column("args", "args_size"); tables.add_table("calls", vec![ "block_number", "extrinsic_index", "address" ]) - .set_weight_column("args", "args_size") - .set_nullable(CallFieldSelection::columns()); + .set_weight_column("args", "args_size"); tables.add_table("extrinsics", vec![ "block_number", "index" ]) .add_child("calls", vec!["block_number", "extrinsic_index"]) - .set_weight("signature", 4 * 32) - .set_nullable(ExtrinsicFieldSelection::columns()); + .set_weight("signature", 4 * 32); tables }); @@ -576,4 +572,4 @@ impl SubstrateQuery { , ) } -} \ No newline at end of file +} diff --git a/crates/query/src/scan/chunk.rs b/crates/query/src/scan/chunk.rs index de44d3c7..7cd4e482 100644 --- a/crates/query/src/scan/chunk.rs +++ b/crates/query/src/scan/chunk.rs @@ -5,28 +5,3 @@ use crate::scan::scan::Scan; pub trait Chunk: Send + Sync { fn scan_table(&self, name: Name) -> anyhow::Result>; } - - -#[derive(Debug)] -pub struct TableDoesNotExist { - pub table_name: Name -} - - -impl TableDoesNotExist { - pub fn new(table_name: Name) -> Self { - Self { - table_name - } - } -} - - -impl std::fmt::Display for TableDoesNotExist { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "table '{}' does not exist", self.table_name) - } -} - - -impl std::error::Error for TableDoesNotExist {} \ No newline at end of file diff --git a/crates/query/src/scan/errors.rs b/crates/query/src/scan/errors.rs new file mode 100644 index 00000000..ccc975d8 --- /dev/null +++ b/crates/query/src/scan/errors.rs @@ -0,0 +1,50 @@ +use crate::primitives::Name; + +#[derive(Debug)] +pub struct TableDoesNotExist { + pub table_name: Name +} + + +impl TableDoesNotExist { + pub fn new(table_name: Name) -> Self { + Self { + table_name + } + } +} + + +impl std::fmt::Display for TableDoesNotExist { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "table '{}' does not exist", self.table_name) + } +} + + +impl std::error::Error for TableDoesNotExist {} + +#[derive(Debug)] +pub struct ColumnDoesNotExist { + pub column_name: Name, + pub table_name: String, +} + + +impl ColumnDoesNotExist { + pub fn new(table_name: String, column_name: Name) -> Self { + Self { + table_name, + column_name + } + } +} + + +impl std::fmt::Display for ColumnDoesNotExist { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "column '{}' is not found in {}", self.column_name, self.table_name) + } +} + +impl std::error::Error for ColumnDoesNotExist {} diff --git a/crates/query/src/scan/mod.rs b/crates/query/src/scan/mod.rs index fe01f675..0f33b477 100644 --- a/crates/query/src/scan/mod.rs +++ b/crates/query/src/scan/mod.rs @@ -9,10 +9,12 @@ mod row_predicate_dsl; pub(crate) mod scan; #[cfg(feature = "storage")] mod storage; +mod errors; mod util; pub use arrow::*; pub use chunk::*; +pub use errors::*; pub use row_predicate::RowPredicateRef; pub use row_predicate_dsl::*; diff --git a/crates/query/src/scan/parquet/file.rs b/crates/query/src/scan/parquet/file.rs index 904f0e8c..6c1615e0 100644 --- a/crates/query/src/scan/parquet/file.rs +++ b/crates/query/src/scan/parquet/file.rs @@ -4,6 +4,7 @@ use crate::scan::parquet::metadata::ParquetMetadata; use crate::scan::reader::TableReader; use crate::scan::row_predicate::{RowPredicate, RowPredicateRef}; use crate::scan::util::{add_row_index, build_row_index_array}; +use crate::ColumnDoesNotExist; use arrow::array::{new_null_array, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use parquet::arrow::arrow_reader::{ @@ -171,7 +172,7 @@ impl TableReader for ParquetFile { missing_null_columns.push(name); } else { tracing::error!("column '{}' is not found in {}", name, self.filename); - anyhow::bail!("column '{}' is not found in {}", name, self.filename); + anyhow::bail!(ColumnDoesNotExist::new(self.filename.to_string(), name)); } } }