diff --git a/Cargo.toml b/Cargo.toml index 002e3c12b..e19d70b31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ arrow-json = "57.1.0" arrow-schema = { version = "57.1.0", features = ["serde"] } arrow-select = "57.1.0" datafusion = "51.0.0" +datafusion-proto = "51.0.0" object_store = { version = "0.12.4", features = [ "cloud", "aws", @@ -27,6 +28,10 @@ object_store = { version = "0.12.4", features = [ "gcp", ] } parquet = "57.1.0" +# ballista-core = "51.0.0" +ballista-scheduler = "51.0.0" +ballista-executor = "51.0.0" +ballista = {version = "51.0.0"} # Web server and HTTP-related actix-cors = "0.7.0" diff --git a/src/lib.rs b/src/lib.rs index 725bed5f9..4cfb8cb66 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,9 +55,15 @@ use std::time::Duration; // Public re-exports of crates being used in enterprise pub use datafusion; +pub use datafusion_proto; pub use handlers::http::modal::{ ParseableServer, ingest_server::IngestServer, query_server::QueryServer, server::Server, }; +// pub use ballista_core; +pub use ballista_executor; +pub use ballista_scheduler; +pub use ballista; + use once_cell::sync::Lazy; pub use opentelemetry_proto; use parseable::PARSEABLE; diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index d6c6eb1ff..f45880e2b 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -46,6 +46,7 @@ use datafusion::{ }; use futures_util::TryFutureExt; use itertools::Itertools; +use serde::{Deserialize, Serialize}; use crate::{ catalog::{ @@ -99,8 +100,8 @@ impl SchemaProvider for GlobalSchemaProvider { } } -#[derive(Debug)] -struct StandardTableProvider { +#[derive(Debug, Serialize, Deserialize)] +pub struct StandardTableProvider { schema: SchemaRef, // prefix under which to find snapshot stream: String, diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 4920c1094..4e413f075 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -30,11 +30,10 @@ use async_trait::async_trait; use bytes::Bytes; use chrono::Utc; use datafusion::{ - datasource::listing::ListingTableUrl, - execution::{ + config::{ConfigExtension, ExtensionOptions}, datasource::listing::ListingTableUrl, execution::{ object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl}, runtime_env::RuntimeEnvBuilder, - }, + } }; use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered}; use object_store::{ @@ -201,6 +200,32 @@ impl ObjectStorageProvider for AzureBlobConfig { } } +impl ConfigExtension for AzureBlobConfig { + const PREFIX: &'static str = "blob-store"; +} + +impl ExtensionOptions for AzureBlobConfig { + fn as_any(&self) -> &dyn std::any::Any { + todo!() + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + todo!() + } + + fn cloned(&self) -> Box { + todo!() + } + + fn set(&mut self, _key: &str, _value: &str) -> datafusion::error::Result<()> { + todo!() + } + + fn entries(&self) -> Vec { + todo!() + } +} + // ObjStoreClient is generic client to enable interactions with different cloudprovider's // object store such as S3 and Azure Blob #[derive(Debug)] diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 6e6b7090b..fe3d68143 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -38,11 +38,10 @@ use async_trait::async_trait; use bytes::Bytes; use chrono::Utc; use datafusion::{ - datasource::listing::ListingTableUrl, - execution::{ + config::{ConfigExtension, ExtensionOptions}, datasource::listing::ListingTableUrl, execution::{ object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl}, runtime_env::RuntimeEnvBuilder, - }, + } }; use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered}; use object_store::{ @@ -126,6 +125,32 @@ impl GcsConfig { } } +impl ConfigExtension for GcsConfig { + const PREFIX: &'static str = "gcs"; +} + +impl ExtensionOptions for GcsConfig { + fn as_any(&self) -> &dyn std::any::Any { + todo!() + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + todo!() + } + + fn cloned(&self) -> Box { + todo!() + } + + fn set(&mut self, _key: &str, _value: &str) -> datafusion::error::Result<()> { + todo!() + } + + fn entries(&self) -> Vec { + todo!() + } +} + impl ObjectStorageProvider for GcsConfig { fn name(&self) -> &'static str { "gcs" diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 25f2b2c07..627cd3efa 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -25,7 +25,7 @@ use std::{ use async_trait::async_trait; use bytes::Bytes; use chrono::Utc; -use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; +use datafusion::{config::{ConfigExtension, ExtensionOptions}, datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use fs_extra::file::CopyOptions; use futures::{TryStreamExt, stream::FuturesUnordered}; use object_store::{ListResult, ObjectMeta, buffered::BufReader}; @@ -70,6 +70,32 @@ pub struct FSConfig { pub root: PathBuf, } +impl ConfigExtension for FSConfig { + const PREFIX: &'static str = "drive"; +} + +impl ExtensionOptions for FSConfig { + fn as_any(&self) -> &dyn std::any::Any { + todo!() + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + todo!() + } + + fn cloned(&self) -> Box { + todo!() + } + + fn set(&mut self, _key: &str, _value: &str) -> datafusion::error::Result<()> { + todo!() + } + + fn entries(&self) -> Vec { + todo!() + } +} + impl ObjectStorageProvider for FSConfig { fn name(&self) -> &'static str { "drive" diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 5204b7fca..bd72493ed 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -20,6 +20,7 @@ use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; +use datafusion::config::ExtensionOptions; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use object_store::ListResult; use object_store::ObjectMeta; @@ -248,7 +249,7 @@ async fn validate_uploaded_parquet_file( } } -pub trait ObjectStorageProvider: std::fmt::Debug + Send + Sync { +pub trait ObjectStorageProvider: std::fmt::Debug + Send + Sync + ExtensionOptions { fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder; fn construct_client(&self) -> Arc; fn get_object_store(&self) -> Arc { diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 17c360251..59c7320fe 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -22,8 +22,7 @@ use std::{ path::Path, str::FromStr, sync::{ - Arc, - atomic::{AtomicU64, Ordering}, + Arc, atomic::{AtomicU64, Ordering}, }, time::Duration, }; @@ -32,6 +31,8 @@ use async_trait::async_trait; use bytes::Bytes; use chrono::Utc; use datafusion::{ + common::config_err, + config::{ConfigExtension, ExtensionOptions}, datasource::listing::ListingTableUrl, execution::{ object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl}, @@ -232,7 +233,7 @@ impl Display for ObjectEncryptionAlgorithm { } impl S3Config { - fn get_default_builder(&self) -> AmazonS3Builder { + pub fn get_default_builder(&self) -> AmazonS3Builder { let mut client_options = ClientOptions::default() .with_allow_http(true) .with_connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS)) @@ -331,6 +332,68 @@ impl ObjectStorageProvider for S3Config { } } +impl ConfigExtension for S3Config { + const PREFIX: &'static str = "s3"; +} + +impl ExtensionOptions for S3Config { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } + + fn cloned(&self) -> Box { + Box::new(self.clone()) + } + + fn set(&mut self, key: &str, value: &str) -> datafusion::error::Result<()> { + tracing::warn!("set config, key:{key}, value:{value}"); + match key { + "access_key_id" => { + self.access_key_id = Some(value.into()); + // let mut c = self.config.write(); + // c.access_key_id.set(key, value)?; + } + "secret_access_key" => { + self.secret_key = Some(value.into()); + // let mut c = self.config.write(); + // c.secret_access_key.set(key, value)?; + } + "session_token" => { + // let mut c = self.config.write(); + // c.session_token.set(key, value)?; + } + "region" => { + self.region = value.into(); + // let mut c = self.config.write(); + // c.region.set(key, value)?; + } + "endpoint" => { + self.endpoint_url = value.into(); + // let mut c = self.config.write(); + // c.endpoint.set(key, value)?; + } + "allow_http" => { + self.skip_tls = true; + // let mut c = self.config.write(); + // c.allow_http.set(key, value)?; + } + _ => { + tracing::warn!("Config value {key} cant be set to {value}"); + return config_err!("Config value \"{}\" not found in S3Options", key); + } + } + Ok(()) + } + + fn entries(&self) -> Vec { + vec![] + } +} + #[derive(Debug)] pub struct S3 { client: AmazonS3,