Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ arrow-json = "57.1.0"
arrow-schema = { version = "57.1.0", features = ["serde"] }
arrow-select = "57.1.0"
datafusion = "51.0.0"
datafusion-proto = "51.0.0"
object_store = { version = "0.12.4", features = [
"cloud",
"aws",
"azure",
"gcp",
] }
parquet = "57.1.0"
# ballista-core = "51.0.0"
ballista-scheduler = "51.0.0"
ballista-executor = "51.0.0"
ballista = {version = "51.0.0"}

# Web server and HTTP-related
actix-cors = "0.7.0"
Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@ use std::time::Duration;

// Public re-exports of crates being used in enterprise
pub use datafusion;
pub use datafusion_proto;
pub use handlers::http::modal::{
ParseableServer, ingest_server::IngestServer, query_server::QueryServer, server::Server,
};
// pub use ballista_core;
pub use ballista_executor;
pub use ballista_scheduler;
pub use ballista;

use once_cell::sync::Lazy;
pub use opentelemetry_proto;
use parseable::PARSEABLE;
Expand Down
5 changes: 3 additions & 2 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use datafusion::{
};
use futures_util::TryFutureExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use crate::{
catalog::{
Expand Down Expand Up @@ -99,8 +100,8 @@ impl SchemaProvider for GlobalSchemaProvider {
}
}

#[derive(Debug)]
struct StandardTableProvider {
#[derive(Debug, Serialize, Deserialize)]
pub struct StandardTableProvider {
schema: SchemaRef,
// prefix under which to find snapshot
stream: String,
Expand Down
31 changes: 28 additions & 3 deletions src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use datafusion::{
datasource::listing::ListingTableUrl,
execution::{
config::{ConfigExtension, ExtensionOptions}, datasource::listing::ListingTableUrl, execution::{
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl},
runtime_env::RuntimeEnvBuilder,
},
}
};
use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered};
use object_store::{
Expand Down Expand Up @@ -201,6 +200,32 @@ impl ObjectStorageProvider for AzureBlobConfig {
}
}

impl ConfigExtension for AzureBlobConfig {
const PREFIX: &'static str = "blob-store";
}

impl ExtensionOptions for AzureBlobConfig {
fn as_any(&self) -> &dyn std::any::Any {
todo!()
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
todo!()
}

fn cloned(&self) -> Box<dyn ExtensionOptions> {
todo!()
}

fn set(&mut self, _key: &str, _value: &str) -> datafusion::error::Result<()> {
todo!()
}

fn entries(&self) -> Vec<datafusion::config::ConfigEntry> {
todo!()
}
}

// ObjStoreClient is generic client to enable interactions with different cloudprovider's
// object store such as S3 and Azure Blob
#[derive(Debug)]
Expand Down
31 changes: 28 additions & 3 deletions src/storage/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use datafusion::{
datasource::listing::ListingTableUrl,
execution::{
config::{ConfigExtension, ExtensionOptions}, datasource::listing::ListingTableUrl, execution::{
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl},
runtime_env::RuntimeEnvBuilder,
},
}
};
use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered};
use object_store::{
Expand Down Expand Up @@ -126,6 +125,32 @@ impl GcsConfig {
}
}

impl ConfigExtension for GcsConfig {
const PREFIX: &'static str = "gcs";
}

impl ExtensionOptions for GcsConfig {
fn as_any(&self) -> &dyn std::any::Any {
todo!()
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
todo!()
}

fn cloned(&self) -> Box<dyn ExtensionOptions> {
todo!()
}

fn set(&mut self, _key: &str, _value: &str) -> datafusion::error::Result<()> {
todo!()
}

fn entries(&self) -> Vec<datafusion::config::ConfigEntry> {
todo!()
}
}

impl ObjectStorageProvider for GcsConfig {
fn name(&self) -> &'static str {
"gcs"
Expand Down
28 changes: 27 additions & 1 deletion src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use datafusion::{config::{ConfigExtension, ExtensionOptions}, datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use fs_extra::file::CopyOptions;
use futures::{TryStreamExt, stream::FuturesUnordered};
use object_store::{ListResult, ObjectMeta, buffered::BufReader};
Expand Down Expand Up @@ -70,6 +70,32 @@ pub struct FSConfig {
pub root: PathBuf,
}

impl ConfigExtension for FSConfig {
const PREFIX: &'static str = "drive";
}

impl ExtensionOptions for FSConfig {
fn as_any(&self) -> &dyn std::any::Any {
todo!()
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
todo!()
}

fn cloned(&self) -> Box<dyn ExtensionOptions> {
todo!()
}

fn set(&mut self, _key: &str, _value: &str) -> datafusion::error::Result<()> {
todo!()
}

fn entries(&self) -> Vec<datafusion::config::ConfigEntry> {
todo!()
}
}

impl ObjectStorageProvider for FSConfig {
fn name(&self) -> &'static str {
"drive"
Expand Down
3 changes: 2 additions & 1 deletion src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use datafusion::config::ExtensionOptions;
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use object_store::ListResult;
use object_store::ObjectMeta;
Expand Down Expand Up @@ -248,7 +249,7 @@ async fn validate_uploaded_parquet_file(
}
}

pub trait ObjectStorageProvider: std::fmt::Debug + Send + Sync {
pub trait ObjectStorageProvider: std::fmt::Debug + Send + Sync + ExtensionOptions {
fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder;
fn construct_client(&self) -> Arc<dyn ObjectStorage>;
fn get_object_store(&self) -> Arc<dyn ObjectStorage> {
Expand Down
69 changes: 66 additions & 3 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use std::{
path::Path,
str::FromStr,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
Arc, atomic::{AtomicU64, Ordering},
},
time::Duration,
};
Expand All @@ -32,6 +31,8 @@ use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use datafusion::{
common::config_err,
config::{ConfigExtension, ExtensionOptions},
datasource::listing::ListingTableUrl,
execution::{
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl},
Expand Down Expand Up @@ -232,7 +233,7 @@ impl Display for ObjectEncryptionAlgorithm {
}

impl S3Config {
fn get_default_builder(&self) -> AmazonS3Builder {
pub fn get_default_builder(&self) -> AmazonS3Builder {
let mut client_options = ClientOptions::default()
.with_allow_http(true)
.with_connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS))
Expand Down Expand Up @@ -331,6 +332,68 @@ impl ObjectStorageProvider for S3Config {
}
}

impl ConfigExtension for S3Config {
const PREFIX: &'static str = "s3";
}

impl ExtensionOptions for S3Config {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}

fn cloned(&self) -> Box<dyn ExtensionOptions> {
Box::new(self.clone())
}

fn set(&mut self, key: &str, value: &str) -> datafusion::error::Result<()> {
tracing::warn!("set config, key:{key}, value:{value}");
match key {
"access_key_id" => {
self.access_key_id = Some(value.into());
// let mut c = self.config.write();
// c.access_key_id.set(key, value)?;
}
"secret_access_key" => {
self.secret_key = Some(value.into());
// let mut c = self.config.write();
// c.secret_access_key.set(key, value)?;
}
"session_token" => {
// let mut c = self.config.write();
// c.session_token.set(key, value)?;
}
"region" => {
self.region = value.into();
// let mut c = self.config.write();
// c.region.set(key, value)?;
}
"endpoint" => {
self.endpoint_url = value.into();
// let mut c = self.config.write();
// c.endpoint.set(key, value)?;
}
"allow_http" => {
self.skip_tls = true;
// let mut c = self.config.write();
// c.allow_http.set(key, value)?;
}
_ => {
tracing::warn!("Config value {key} cant be set to {value}");
return config_err!("Config value \"{}\" not found in S3Options", key);
}
}
Ok(())
}

fn entries(&self) -> Vec<datafusion::config::ConfigEntry> {
vec![]
}
}

#[derive(Debug)]
pub struct S3 {
client: AmazonS3,
Expand Down
Loading