Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,110 @@ pub fn vortex_array::aggregate_fn::fns::mean::Mean::serialize(&self, options: &S

pub fn vortex_array::aggregate_fn::fns::mean::Mean::state_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub mod vortex_array::aggregate_fn::fns::min_max

pub struct vortex_array::aggregate_fn::fns::min_max::Max

impl core::clone::Clone for vortex_array::aggregate_fn::fns::min_max::Max

pub fn vortex_array::aggregate_fn::fns::min_max::Max::clone(&self) -> vortex_array::aggregate_fn::fns::min_max::Max

impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::min_max::Max

pub type vortex_array::aggregate_fn::fns::min_max::Max::Options = vortex_array::scalar_fn::EmptyOptions

pub fn vortex_array::aggregate_fn::fns::min_max::Max::accumulator(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<alloc::boxed::Box<dyn vortex_array::aggregate_fn::Accumulator>>

pub fn vortex_array::aggregate_fn::fns::min_max::Max::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::aggregate_fn::fns::min_max::Max::id(&self) -> vortex_array::aggregate_fn::AggregateFnId

pub fn vortex_array::aggregate_fn::fns::min_max::Max::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::min_max::Max::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::aggregate_fn::fns::min_max::Max::state_dtype(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub struct vortex_array::aggregate_fn::fns::min_max::Min

impl core::clone::Clone for vortex_array::aggregate_fn::fns::min_max::Min

pub fn vortex_array::aggregate_fn::fns::min_max::Min::clone(&self) -> vortex_array::aggregate_fn::fns::min_max::Min

impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::min_max::Min

pub type vortex_array::aggregate_fn::fns::min_max::Min::Options = vortex_array::scalar_fn::EmptyOptions

pub fn vortex_array::aggregate_fn::fns::min_max::Min::accumulator(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<alloc::boxed::Box<dyn vortex_array::aggregate_fn::Accumulator>>

pub fn vortex_array::aggregate_fn::fns::min_max::Min::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::aggregate_fn::fns::min_max::Min::id(&self) -> vortex_array::aggregate_fn::AggregateFnId

pub fn vortex_array::aggregate_fn::fns::min_max::Min::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::min_max::Min::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::aggregate_fn::fns::min_max::Min::state_dtype(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub mod vortex_array::aggregate_fn::fns::sum

pub struct vortex_array::aggregate_fn::fns::sum::Sum

impl core::clone::Clone for vortex_array::aggregate_fn::fns::sum::Sum

pub fn vortex_array::aggregate_fn::fns::sum::Sum::clone(&self) -> vortex_array::aggregate_fn::fns::sum::Sum

impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::sum::Sum

pub type vortex_array::aggregate_fn::fns::sum::Sum::Options = vortex_array::aggregate_fn::fns::sum::SumOptions

pub fn vortex_array::aggregate_fn::fns::sum::Sum::accumulator(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<alloc::boxed::Box<dyn vortex_array::aggregate_fn::Accumulator>>

pub fn vortex_array::aggregate_fn::fns::sum::Sum::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::aggregate_fn::fns::sum::Sum::id(&self) -> vortex_array::aggregate_fn::AggregateFnId

pub fn vortex_array::aggregate_fn::fns::sum::Sum::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::sum::Sum::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::aggregate_fn::fns::sum::Sum::state_dtype(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub struct vortex_array::aggregate_fn::fns::sum::SumOptions

pub vortex_array::aggregate_fn::fns::sum::SumOptions::checked: bool

impl core::clone::Clone for vortex_array::aggregate_fn::fns::sum::SumOptions

pub fn vortex_array::aggregate_fn::fns::sum::SumOptions::clone(&self) -> vortex_array::aggregate_fn::fns::sum::SumOptions

impl core::cmp::Eq for vortex_array::aggregate_fn::fns::sum::SumOptions

impl core::cmp::PartialEq for vortex_array::aggregate_fn::fns::sum::SumOptions

pub fn vortex_array::aggregate_fn::fns::sum::SumOptions::eq(&self, other: &vortex_array::aggregate_fn::fns::sum::SumOptions) -> bool

impl core::default::Default for vortex_array::aggregate_fn::fns::sum::SumOptions

pub fn vortex_array::aggregate_fn::fns::sum::SumOptions::default() -> vortex_array::aggregate_fn::fns::sum::SumOptions

impl core::fmt::Debug for vortex_array::aggregate_fn::fns::sum::SumOptions

pub fn vortex_array::aggregate_fn::fns::sum::SumOptions::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::fmt::Display for vortex_array::aggregate_fn::fns::sum::SumOptions

pub fn vortex_array::aggregate_fn::fns::sum::SumOptions::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::hash::Hash for vortex_array::aggregate_fn::fns::sum::SumOptions

pub fn vortex_array::aggregate_fn::fns::sum::SumOptions::hash<__H: core::hash::Hasher>(&self, state: &mut __H)

impl core::marker::Copy for vortex_array::aggregate_fn::fns::sum::SumOptions

impl core::marker::StructuralPartialEq for vortex_array::aggregate_fn::fns::sum::SumOptions

pub mod vortex_array::aggregate_fn::session

pub struct vortex_array::aggregate_fn::session::AggregateFnSession
Expand Down Expand Up @@ -226,6 +330,54 @@ pub fn vortex_array::aggregate_fn::fns::mean::Mean::serialize(&self, options: &S

pub fn vortex_array::aggregate_fn::fns::mean::Mean::state_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::min_max::Max

pub type vortex_array::aggregate_fn::fns::min_max::Max::Options = vortex_array::scalar_fn::EmptyOptions

pub fn vortex_array::aggregate_fn::fns::min_max::Max::accumulator(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<alloc::boxed::Box<dyn vortex_array::aggregate_fn::Accumulator>>

pub fn vortex_array::aggregate_fn::fns::min_max::Max::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::aggregate_fn::fns::min_max::Max::id(&self) -> vortex_array::aggregate_fn::AggregateFnId

pub fn vortex_array::aggregate_fn::fns::min_max::Max::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::min_max::Max::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::aggregate_fn::fns::min_max::Max::state_dtype(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::min_max::Min

pub type vortex_array::aggregate_fn::fns::min_max::Min::Options = vortex_array::scalar_fn::EmptyOptions

pub fn vortex_array::aggregate_fn::fns::min_max::Min::accumulator(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<alloc::boxed::Box<dyn vortex_array::aggregate_fn::Accumulator>>

pub fn vortex_array::aggregate_fn::fns::min_max::Min::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::aggregate_fn::fns::min_max::Min::id(&self) -> vortex_array::aggregate_fn::AggregateFnId

pub fn vortex_array::aggregate_fn::fns::min_max::Min::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::min_max::Min::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::aggregate_fn::fns::min_max::Min::state_dtype(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::sum::Sum

pub type vortex_array::aggregate_fn::fns::sum::Sum::Options = vortex_array::aggregate_fn::fns::sum::SumOptions

pub fn vortex_array::aggregate_fn::fns::sum::Sum::accumulator(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<alloc::boxed::Box<dyn vortex_array::aggregate_fn::Accumulator>>

pub fn vortex_array::aggregate_fn::fns::sum::Sum::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::aggregate_fn::fns::sum::Sum::id(&self) -> vortex_array::aggregate_fn::AggregateFnId

pub fn vortex_array::aggregate_fn::fns::sum::Sum::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::sum::Sum::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::aggregate_fn::fns::sum::Sum::state_dtype(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub trait vortex_array::aggregate_fn::AggregateFnVTableExt: vortex_array::aggregate_fn::AggregateFnVTable

pub fn vortex_array::aggregate_fn::AggregateFnVTableExt::bind(&self, options: Self::Options) -> vortex_array::aggregate_fn::AggregateFnRef
Expand Down
124 changes: 124 additions & 0 deletions vortex-array/src/aggregate_fn/fns/min_max/bool_accumulator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::marker::PhantomData;
use std::ops::BitAnd;

use vortex_error::VortexResult;
use vortex_mask::Mask;

use super::Direction;
use crate::ArrayRef;
use crate::IntoArray;
use crate::aggregate_fn::accumulator::Accumulator;
use crate::arrays::BoolArray;
use crate::canonical::ToCanonical;
use crate::scalar::Scalar;

/// Accumulator for boolean min/max.
///
/// - Min is saturated as soon as `false` is seen (since false < true).
/// - Max is saturated as soon as `true` is seen.
pub(super) struct BoolExtremumAccumulator<D> {
current: Option<bool>,
results: Vec<Option<bool>>,
_direction: PhantomData<D>,
}

impl<D: Direction> BoolExtremumAccumulator<D> {
pub(super) fn new() -> Self {
Self {
current: None,
results: Vec::new(),
_direction: PhantomData,
}
}

#[inline]
fn consider(&mut self, candidate: bool) {
match self.current {
None => self.current = Some(candidate),
Some(cur) => {
if D::should_replace_bool(cur, candidate) {
self.current = Some(candidate);
}
}
}
}
}

/// Count of true and false values in a boolean array, considering validity.
struct BoolCounts {
true_count: u64,
false_count: u64,
}

fn bool_counts(bool_array: &BoolArray) -> VortexResult<BoolCounts> {
let validity = bool_array.validity_mask()?;
let bits = bool_array.to_bit_buffer();

match &validity {
Mask::AllTrue(_) => {
let true_count = bits.true_count() as u64;
let false_count = bool_array.len() as u64 - true_count;
Ok(BoolCounts {
true_count,
false_count,
})
}
Mask::AllFalse(_) => Ok(BoolCounts {
true_count: 0,
false_count: 0,
}),
Mask::Values(v) => {
let valid_bits = bits.bitand(v.bit_buffer());
let true_count = valid_bits.true_count() as u64;
let valid_count = v.bit_buffer().true_count() as u64;
let false_count = valid_count - true_count;
Ok(BoolCounts {
true_count,
false_count,
})
}
}
}

impl<D: Direction> Accumulator for BoolExtremumAccumulator<D> {
fn accumulate(&mut self, batch: &ArrayRef) -> VortexResult<()> {
let bool_array = batch.to_bool();
let counts = bool_counts(&bool_array)?;

if counts.true_count > 0 {
self.consider(true);
}
if counts.false_count > 0 {
self.consider(false);
}

Ok(())
}

fn merge(&mut self, state: &Scalar) -> VortexResult<()> {
if state.is_null() {
return Ok(());
}
if let Some(v) = state.as_bool().value() {
self.consider(v);
}
Ok(())
}

fn is_saturated(&self) -> bool {
self.current.is_some_and(D::is_saturated_bool)
}

fn flush(&mut self) -> VortexResult<()> {
self.results.push(self.current);
self.current = None;
Ok(())
}

fn finish(self: Box<Self>) -> VortexResult<ArrayRef> {
Ok(BoolArray::from_iter(self.results).into_array())
}
}
Loading
Loading