diff --git a/Cargo.lock b/Cargo.lock index 3a9f80f58..76661988d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5995,6 +5995,7 @@ dependencies = [ "glaredb", "glob", "logutil", + "metastore", "num_cpus", "object_store", "openssh", diff --git a/crates/datasources/src/native/access.rs b/crates/datasources/src/native/access.rs index 4e1c9aeff..bc9aa0df4 100644 --- a/crates/datasources/src/native/access.rs +++ b/crates/datasources/src/native/access.rs @@ -126,12 +126,7 @@ impl NativeTableStorage { pub async fn delete_table(&self, table: &TableEntry) -> Result<()> { let prefix = format!("databases/{}/tables/{}", self.db_id, table.meta.id); - let path: ObjectStorePath = match &self.conf { - StorageConfig::Gcs { bucket, .. } => format!("gs://{}/{}", bucket, prefix).into(), - StorageConfig::Memory => format!("memory://{}", prefix).into(), - _ => prefix.into(), - }; - let mut x = self.store.list(Some(&path)).await?; + let mut x = self.store.list(Some(&prefix.into())).await?; while let Some(meta) = x.next().await { let meta = meta?; self.store.delete(&meta.location).await? @@ -154,9 +149,21 @@ impl NativeTableStorage { let prefix = format!("databases/{}/tables/{}", self.db_id, table.meta.id); let url = match &self.conf { - StorageConfig::Gcs { bucket, .. } => { - Url::parse(&format!("gs://{}/{}", bucket, prefix.clone()))? + StorageConfig::S3 { + endpoint, bucket, .. + } => { + let mut s3_url = if let Some(endpoint) = endpoint { + endpoint.clone() + } else { + "s3://".to_string() + }; + + if let Some(bucket) = bucket { + s3_url = format!("{s3_url}/{bucket}"); + } + Url::parse(&format!("{s3_url}/{prefix}"))? } + StorageConfig::Gcs { bucket, .. } => Url::parse(&format!("gs://{bucket}/{prefix}"))?, StorageConfig::Local { path } => { let path = fs::canonicalize(path) @@ -169,7 +176,7 @@ impl NativeTableStorage { Url::from_file_path(path).map_err(|_| NativeError::Static("Path not absolute"))? } StorageConfig::Memory => { - let s = format!("memory://{}", prefix.clone()); + let s = format!("memory://{prefix}"); Url::parse(&s)? } }; diff --git a/crates/glaredb/src/args/local.rs b/crates/glaredb/src/args/local.rs index a7182642d..59864e4e8 100644 --- a/crates/glaredb/src/args/local.rs +++ b/crates/glaredb/src/args/local.rs @@ -35,6 +35,9 @@ pub struct LocalClientOpts { #[clap(short = 'c', long, value_parser)] pub cloud_url: Option, + #[clap(flatten)] + pub storage_config: StorageConfigArgs, + /// Ignores the proxy and directly goes to the server for remote execution. /// /// (Internal) diff --git a/crates/glaredb/src/args/mod.rs b/crates/glaredb/src/args/mod.rs index 9305d5bd5..b844dfd17 100644 --- a/crates/glaredb/src/args/mod.rs +++ b/crates/glaredb/src/args/mod.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use anyhow::Result; use clap::{Parser, ValueEnum}; use std::fmt::Write as _; @@ -81,3 +82,23 @@ pub struct PgProxyArgs { #[clap(long)] pub cloud_auth_code: String, } + +#[derive(Debug, Clone, Parser)] +pub struct StorageConfigArgs { + /// URL of the object store in which to keep the data in. + #[clap(short, long)] + pub location: Option, + + /// Storage options for building the object store. + #[clap(short = 'o', long = "option", requires = "location", value_parser=parse_key_value_pair)] + pub storage_options: Vec<(String, String)>, +} + +fn parse_key_value_pair(key_value_pair: &str) -> Result<(String, String)> { + key_value_pair + .split_once('=') + .map(|(key, value)| (key.to_string(), value.to_string())) + .ok_or(anyhow!( + "Expected key-value pair delimited by an equals sign, got '{key_value_pair}'" + )) +} diff --git a/crates/glaredb/src/lib.rs b/crates/glaredb/src/lib.rs index 832949a9e..6bddbb0db 100644 --- a/crates/glaredb/src/lib.rs +++ b/crates/glaredb/src/lib.rs @@ -1,11 +1,9 @@ +pub mod args; pub mod commands; +mod highlighter; pub mod local; pub mod metastore; pub mod pg_proxy; +mod prompt; pub mod rpc_proxy; pub mod server; -pub mod util; - -pub mod args; -mod highlighter; -mod prompt; diff --git a/crates/glaredb/src/local.rs b/crates/glaredb/src/local.rs index 653682b96..92cd7eb49 100644 --- a/crates/glaredb/src/local.rs +++ b/crates/glaredb/src/local.rs @@ -1,7 +1,6 @@ -use crate::args::{LocalClientOpts, OutputMode}; +use crate::args::{LocalClientOpts, OutputMode, StorageConfigArgs}; use crate::highlighter::{SQLHighlighter, SQLHinter, SQLValidator}; use crate::prompt::SQLPrompt; -use crate::util::MetastoreClientMode; use anyhow::{anyhow, Result}; use arrow_util::pretty::pretty_format_batches; use clap::ValueEnum; @@ -16,9 +15,9 @@ use datafusion::physical_plan::SendableRecordBatchStream; use futures::StreamExt; use pgrepr::format::Format; use reedline::{FileBackedHistory, Reedline, Signal}; +use std::collections::HashMap; use datafusion_ext::vars::SessionVars; -use sqlexec::engine::EngineStorageConfig; use sqlexec::engine::{Engine, SessionStorageConfig, TrackedSession}; use sqlexec::parser; use sqlexec::remote::client::RemoteClient; @@ -26,8 +25,6 @@ use sqlexec::session::ExecutionResult; use std::env; use std::io::Write; use std::path::PathBuf; -use std::sync::Arc; -use telemetry::Tracker; use tracing::error; use url::Url; @@ -48,22 +45,19 @@ pub struct LocalSession { impl LocalSession { pub async fn connect(opts: LocalClientOpts) -> Result { // Connect to metastore. - let mode = MetastoreClientMode::new_local(opts.data_dir.clone())?; - let metastore_client = mode.into_client().await?; - let tracker = Arc::new(Tracker::Nop); - - let storage_conf = match &opts.data_dir { - Some(path) => EngineStorageConfig::Local { path: path.clone() }, - None => EngineStorageConfig::Memory, + let mut engine = if let StorageConfigArgs { + location: Some(location), + storage_options, + } = &opts.storage_config + { + // TODO: try to consolidate with --data-dir option + Engine::from_storage_options(location, &HashMap::from_iter(storage_options.clone())) + .await? + } else { + Engine::from_data_dir(&opts.data_dir).await? }; - let engine = Engine::new( - metastore_client, - storage_conf, - tracker, - opts.spill_path.clone(), - ) - .await?; + engine = engine.with_spill_path(opts.spill_path.clone()); let sess = if let Some(url) = opts.cloud_url.clone() { let (exec_client, info_msg) = if opts.ignore_rpc_auth { @@ -131,9 +125,16 @@ impl LocalSession { } async fn run_interactive(&mut self) -> Result<()> { - let info = match &self.opts.data_dir { - Some(path) => format!("Persisting database at path: {}", path.display()), - None => "Using in-memory catalog".to_string(), + let info = match (&self.opts.storage_config, &self.opts.data_dir) { + ( + StorageConfigArgs { + location: Some(location), + .. + }, + _, + ) => format!("Persisting database at location: {location}"), + (_, Some(path)) => format!("Persisting database at path: {}", path.display()), + (_, None) => "Using in-memory catalog".to_string(), }; println!("{info}"); diff --git a/crates/glaredb/src/server.rs b/crates/glaredb/src/server.rs index 194d5b5ad..cda23bd29 100644 --- a/crates/glaredb/src/server.rs +++ b/crates/glaredb/src/server.rs @@ -1,5 +1,5 @@ -use crate::util::MetastoreClientMode; use anyhow::{anyhow, Result}; +use metastore::util::MetastoreClientMode; use pgsrv::auth::LocalAuthenticator; use pgsrv::handler::{ProtocolHandler, ProtocolHandlerConfig}; use protogen::gen::rpcsrv::service::execution_service_server::ExecutionServiceServer; @@ -55,7 +55,15 @@ impl ComputeServer { fs::create_dir_all(&env_tmp)?; // Connect to metastore. - let mode = MetastoreClientMode::new_from_options(metastore_addr, data_dir.clone())?; + let mode = match (metastore_addr, &data_dir) { + (Some(_), Some(_)) => { + return Err(anyhow!( + "Only one of metastore address or metastore path may be provided." + )) + } + (Some(addr), None) => MetastoreClientMode::Remote { addr }, + _ => MetastoreClientMode::new_local(data_dir.clone()), + }; let metastore_client = mode.into_client().await?; let tracker = match segment_key { @@ -78,6 +86,7 @@ impl ComputeServer { let storage_conf = match (data_dir, service_account_key) { (None, Some(key)) => EngineStorageConfig::Gcs { service_account_key: key, + bucket: None, }, (Some(dir), None) => EngineStorageConfig::Local { path: dir }, (None, None) => EngineStorageConfig::Memory, diff --git a/crates/metastore/src/errors.rs b/crates/metastore/src/errors.rs index cca1ea8f1..a459b3e4d 100644 --- a/crates/metastore/src/errors.rs +++ b/crates/metastore/src/errors.rs @@ -87,6 +87,9 @@ pub enum MetastoreError { #[error(transparent)] Validation(#[from] sqlbuiltins::validation::ValidationError), + + #[error(transparent)] + TonicTransportError(#[from] tonic::transport::Error), } pub type Result = std::result::Result; diff --git a/crates/metastore/src/lib.rs b/crates/metastore/src/lib.rs index 12eb5beb1..60952c948 100644 --- a/crates/metastore/src/lib.rs +++ b/crates/metastore/src/lib.rs @@ -5,3 +5,4 @@ pub mod srv; mod database; mod storage; +pub mod util; diff --git a/crates/metastore/src/local.rs b/crates/metastore/src/local.rs index 73d8c9a7e..df4cc2f68 100644 --- a/crates/metastore/src/local.rs +++ b/crates/metastore/src/local.rs @@ -27,8 +27,7 @@ pub async fn start_inprocess_local( /// Starts an in-process metastore service, returning a client for the service. /// -/// Useful for some tests, as well as when running GlareDB locally for testing. -/// This should never be used in production. +/// Useful for tests, as well as when running GlareDB locally. pub async fn start_inprocess( store: Arc, ) -> Result> { diff --git a/crates/glaredb/src/util.rs b/crates/metastore/src/util.rs similarity index 59% rename from crates/glaredb/src/util.rs rename to crates/metastore/src/util.rs index 724074840..19911fe41 100644 --- a/crates/glaredb/src/util.rs +++ b/crates/metastore/src/util.rs @@ -1,5 +1,5 @@ -use anyhow::{anyhow, Result}; -use metastore::local::{start_inprocess_inmemory, start_inprocess_local}; +use crate::errors::{MetastoreError, Result}; +use crate::local::{start_inprocess_inmemory, start_inprocess_local}; use protogen::gen::metastore::service::metastore_service_client::MetastoreServiceClient; use std::path::PathBuf; use std::{fs, time::Duration}; @@ -18,19 +18,10 @@ pub enum MetastoreClientMode { } impl MetastoreClientMode { - pub fn new_local(local_path: Option) -> Result { + pub fn new_local(local_path: Option) -> Self { match local_path { - Some(path) => Ok(MetastoreClientMode::LocalDisk { path }), - None => Ok(MetastoreClientMode::LocalInMemory), - } - } - pub fn new_from_options(addr: Option, local_path: Option) -> Result { - match (addr, &local_path) { - (Some(_), Some(_)) => Err(anyhow!( - "Only one of metastore address or metastore path may be provided." - )), - (Some(addr), None) => Ok(MetastoreClientMode::Remote { addr }), - _ => Self::new_local(local_path), + Some(path) => MetastoreClientMode::LocalDisk { path }, + None => MetastoreClientMode::LocalInMemory, } } @@ -49,14 +40,22 @@ impl MetastoreClientMode { } Self::LocalDisk { path } => { if !path.exists() { - fs::create_dir_all(&path)?; + fs::create_dir_all(&path).map_err(|e| { + MetastoreError::FailedInProcessStartup(format!( + "Failed creating directory at path {}: {e}", + path.to_string_lossy() + )) + })?; } if path.exists() && !path.is_dir() { - return Err(anyhow!("Path is not a valid directory")); + return Err(MetastoreError::FailedInProcessStartup(format!( + "Error creating metastore client, path {} is not a valid directory", + path.to_string_lossy() + ))); } - Ok(start_inprocess_local(path).await?) + start_inprocess_local(path).await } - Self::LocalInMemory => Ok(start_inprocess_inmemory().await?), + Self::LocalInMemory => start_inprocess_inmemory().await, } } } diff --git a/crates/object_store_util/src/conf.rs b/crates/object_store_util/src/conf.rs index 84c517470..82445eea6 100644 --- a/crates/object_store_util/src/conf.rs +++ b/crates/object_store_util/src/conf.rs @@ -1,3 +1,4 @@ +use object_store::aws::{AmazonS3Builder, S3CopyIfNotExists}; use object_store::{ gcp::GoogleCloudStorageBuilder, local::LocalFileSystem, memory::InMemory, Error as ObjectStoreError, ObjectStore, @@ -11,6 +12,13 @@ static IN_MEMORY_STORE: Lazy> = Lazy::new(|| Arc::new(InMemory::ne /// Configuration options for various types of storage we support. #[derive(Debug, Clone)] pub enum StorageConfig { + S3 { + access_key_id: String, + secret_access_key: String, + region: Option, + endpoint: Option, + bucket: Option, + }, Gcs { service_account_key: String, bucket: String, @@ -25,6 +33,39 @@ impl StorageConfig { /// Create a new object store using this config. pub fn new_object_store(&self) -> Result, ObjectStoreError> { Ok(match self { + StorageConfig::S3 { + access_key_id, + secret_access_key, + region, + endpoint, + bucket, + } => { + let mut builder = AmazonS3Builder::new() + .with_access_key_id(access_key_id) + .with_secret_access_key(secret_access_key) + .with_region(region.clone().unwrap_or_default()); + + if let Some(endpoint) = endpoint { + if endpoint.starts_with("http://") { + builder = builder.with_allow_http(true); + } + builder = builder.with_endpoint(endpoint); + if endpoint.contains("r2.cloudflarestorage.com") { + // Ensure `ObjectStore::copy_if_not_exists` is enabled on the S3 client for + // Cloudflare R2 with the adequate header + builder = builder.with_copy_if_not_exists(S3CopyIfNotExists::Header( + "cf-copy-destination-if-none-match".to_string(), + "*".to_string(), + )) + } + } + + if let Some(bucket) = bucket { + builder = builder.with_bucket_name(bucket); + } + + Arc::new(builder.build()?) + } StorageConfig::Gcs { service_account_key, bucket, diff --git a/crates/object_store_util/src/shared.rs b/crates/object_store_util/src/shared.rs index d75b07774..16b277514 100644 --- a/crates/object_store_util/src/shared.rs +++ b/crates/object_store_util/src/shared.rs @@ -1,7 +1,9 @@ use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; -use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result}; +use object_store::{ + path::Path, Error as ObjectStoreError, GetResult, ListResult, ObjectMeta, ObjectStore, Result, +}; use object_store::{GetOptions, MultipartId}; use std::ops::Range; use std::sync::Arc; @@ -85,15 +87,28 @@ impl ObjectStore for SharedObjectStore { self.inner.copy(from, to).await } + /// Tries to perform a copy-if-not-exists but on failure falls back to plain copy if path is empty async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - self.inner.copy_if_not_exists(from, to).await + match self.inner.copy_if_not_exists(from, to).await { + Ok(_) => Ok(()), + Err(ObjectStoreError::NotSupported { source }) => { + // Go with the poor man's copy-if-not-exists: try a regular copy if the path doesn't exist + match self.head(to).await { + Ok(_) => { + return Err(ObjectStoreError::AlreadyExists { + path: to.to_string(), + source, + }) + } + Err(ObjectStoreError::NotFound { .. }) => self.copy(from, to).await, + Err(e) => Err(e), + } + } + Err(e) => Err(e), + } } async fn rename(&self, from: &Path, to: &Path) -> Result<()> { - self.rename(from, to).await - } - - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - self.inner.rename_if_not_exists(from, to).await + self.inner.rename(from, to).await } } diff --git a/crates/sqlexec/Cargo.toml b/crates/sqlexec/Cargo.toml index 7f242363e..a1d0a44e4 100644 --- a/crates/sqlexec/Cargo.toml +++ b/crates/sqlexec/Cargo.toml @@ -15,6 +15,7 @@ sqlbuiltins = { path = "../sqlbuiltins" } datasources = { path = "../datasources" } datafusion_ext = { path = "../datafusion_ext" } object_store_util = { path = "../object_store_util" } +metastore = { path = "../metastore" } thiserror.workspace = true tokio = { version = "1", features = ["full"] } async-trait = "0.1.72" diff --git a/crates/sqlexec/src/engine.rs b/crates/sqlexec/src/engine.rs index a01b1ef5e..f1308c446 100644 --- a/crates/sqlexec/src/engine.rs +++ b/crates/sqlexec/src/engine.rs @@ -3,7 +3,9 @@ use crate::context::remote::RemoteSessionContext; use crate::errors::{ExecError, Result}; use crate::metastore::client::{MetastoreClientSupervisor, DEFAULT_METASTORE_CLIENT_CONFIG}; use crate::session::Session; +use std::collections::HashMap; +use object_store::aws::AmazonS3ConfigKey; use std::fs; use std::ops::{Deref, DerefMut}; use std::path::Path; @@ -13,8 +15,12 @@ use std::sync::Arc; use crate::metastore::catalog::SessionCatalog; use datafusion_ext::vars::SessionVars; +use datasources::common::url::{DatasourceUrl, DatasourceUrlType}; use datasources::native::access::NativeTableStorage; +use metastore::local::start_inprocess; +use metastore::util::MetastoreClientMode; use object_store_util::conf::StorageConfig; +use object_store_util::shared::SharedObjectStore; use protogen::gen::metastore::service::metastore_service_client::MetastoreServiceClient; use telemetry::Tracker; use tonic::transport::Channel; @@ -30,30 +36,125 @@ pub struct SessionStorageConfig { pub gcs_bucket: Option, } +// TODO: There's a significant amount of overlap with `StorageConfig`, would be good to consider +// consolidating them into one /// Storage configuration for the compute engine. /// /// The configuration defined here alongside the configuration passed in through /// the proxy will be used to connect to database storage. #[derive(Debug, Clone)] pub enum EngineStorageConfig { - Gcs { service_account_key: String }, - Local { path: PathBuf }, + S3 { + access_key_id: String, + secret_access_key: String, + region: Option, + endpoint: Option, + bucket: Option, + }, + Gcs { + service_account_key: String, + bucket: Option, + }, + Local { + path: PathBuf, + }, Memory, } impl EngineStorageConfig { + pub fn try_from_options(location: &String, opts: HashMap) -> Result { + if location.starts_with("memory://") { + return Ok(EngineStorageConfig::Memory); + } + + let datasource_url = DatasourceUrl::try_new(location)?; + Ok(match datasource_url { + DatasourceUrl::File(path) => EngineStorageConfig::Local { path }, + DatasourceUrl::Url(ref url) => { + let url_type = datasource_url.datasource_url_type(); + match url_type { + DatasourceUrlType::Gcs => { + let service_account_path = + opts.get("service_account_path").cloned().unwrap_or_else(|| { + std::env::var("GOOGLE_APPLICATION_CREDENTIALS") + .expect( + "'service_account_path' in provided storage options or 'GOOGLE_APPLICATION_CREDENTIALS' as env var", + ) + }); + + let service_account_key = fs::read_to_string(service_account_path)?; + + // Bucket potentially provided as a part of the location URL, try to extract it. + let bucket = opts + .get("bucket") + .cloned() + .or(url.host_str().map(|h| h.to_string())); + EngineStorageConfig::Gcs { + service_account_key, + bucket, + } + } + DatasourceUrlType::S3 | DatasourceUrlType::Http => { + let access_key_id = opts.get("access_key_id").cloned().unwrap_or_else(|| { + std::env::var(AmazonS3ConfigKey::AccessKeyId.as_ref().to_uppercase()) + .expect("'access_key_id' in provided storage options or 'AWS_ACCESS_KEY_ID' as env var") + }); + let secret_access_key = + opts.get("secret_access_key").cloned().unwrap_or_else(|| { + std::env::var(AmazonS3ConfigKey::SecretAccessKey.as_ref().to_uppercase()) + .expect("'secret_access_key' in provided storage options or 'AWS_SECRET_ACCESS_KEY' as env var") + }); + + let mut endpoint = opts.get("endpoint").cloned(); + let region = opts.get("region").cloned(); + + let bucket = if url_type != DatasourceUrlType::S3 + && !location.contains("amazonaws.com") + { + // For now we don't allow proper HTTP object stores as storage locations, + // so interpret this case as either Cloudflare R2 or a MinIO instance + endpoint = Some(location.clone()); + opts.get("bucket").cloned() + } else { + opts.get("bucket") + .cloned() + .or(url.host_str().map(|h| h.to_string())) + }; + + EngineStorageConfig::S3 { + access_key_id, + secret_access_key, + region, + endpoint, + bucket, + } + } + _ => unreachable!(), + } + } + }) + } + /// Create a native table storage config from values in the engine and /// session configs. /// /// Errors if the engine config is incompatible with the session config. - fn storage_config(&self, session_conf: &SessionStorageConfig) -> Result { + pub fn storage_config(&self, session_conf: &SessionStorageConfig) -> Result { Ok(match (self.clone(), session_conf.gcs_bucket.clone()) { - // GCS bucket storage. + // GCS bucket defined via session config or at the engine config level ( EngineStorageConfig::Gcs { service_account_key, + .. }, Some(bucket), + ) + | ( + EngineStorageConfig::Gcs { + service_account_key, + bucket: Some(bucket), + }, + None, ) => StorageConfig::Gcs { service_account_key, bucket, @@ -64,6 +165,22 @@ impl EngineStorageConfig { "Missing bucket on session configuration", )) } + ( + EngineStorageConfig::S3 { + access_key_id, + secret_access_key, + region, + endpoint, + bucket, + }, + _, + ) => StorageConfig::S3 { + access_key_id, + secret_access_key, + region, + endpoint, + bucket, + }, // Local disk storage. (EngineStorageConfig::Local { path }, None) => StorageConfig::Local { path }, // In-memory storage. @@ -119,6 +236,49 @@ impl Engine { }) } + /// Create a new `Engine` instance from the provided storage configuration with a in-process metastore + pub async fn from_storage_options( + location: &String, + opts: &HashMap, + ) -> Result { + let conf = EngineStorageConfig::try_from_options(location, opts.clone())?; + let store = conf + .storage_config(&SessionStorageConfig::default())? + .new_object_store()?; + + // Wrap up the store with a shared one, so that we get to use the non-atomic + // copy-if-not-exists that is defined there when initializing the lease + let store = SharedObjectStore::new(store); + let client = start_inprocess(Arc::new(store)).await.map_err(|e| { + ExecError::String(format!("Failed to start an in-process metastore: {e}")) + })?; + + Engine::new(client, conf, Arc::new(Tracker::Nop), None).await + } + + /// Create a new `Engine` instance from the provided data directory. This can be removed once the + /// `--data-dir` option gets consolidated into the storage options above. + pub async fn from_data_dir(data_dir: &Option) -> Result { + let conf = match data_dir { + Some(path) => EngineStorageConfig::Local { path: path.clone() }, + None => EngineStorageConfig::Memory, + }; + + let mode = MetastoreClientMode::new_local(data_dir.clone()); + let client = mode.into_client().await.map_err(|e| { + ExecError::String(format!( + "Failed creating a local metastore client for data dir {data_dir:?}: {e}" + )) + })?; + + Engine::new(client, conf, Arc::new(Tracker::Nop), None).await + } + + pub fn with_spill_path(mut self, spill_path: Option) -> Engine { + self.spill_path = spill_path; + self + } + /// Attempts to shutdown the engine gracefully. pub async fn shutdown(&self) -> Result<()> { self.background_jobs.close().await?; diff --git a/crates/testing/Cargo.toml b/crates/testing/Cargo.toml index b92aa68dc..25424f3c3 100644 --- a/crates/testing/Cargo.toml +++ b/crates/testing/Cargo.toml @@ -27,6 +27,7 @@ sqlexec = { path = "../sqlexec" } pgrepr = { path = "../pgrepr" } telemetry = { path = "../telemetry" } datafusion_ext = { path = "../datafusion_ext" } +metastore = { path = "../metastore" } [[test]] harness = false diff --git a/crates/testing/src/slt/test.rs b/crates/testing/src/slt/test.rs index c453e9986..592570155 100644 --- a/crates/testing/src/slt/test.rs +++ b/crates/testing/src/slt/test.rs @@ -2,8 +2,8 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use datafusion_ext::vars::SessionVars; use futures::StreamExt; -use glaredb::util::MetastoreClientMode; use glob::Pattern; +use metastore::util::MetastoreClientMode; use pgrepr::format::Format; use pgrepr::scalar::Scalar; use pgrepr::types::arrow_to_pg_type; diff --git a/py-glaredb/src/lib.rs b/py-glaredb/src/lib.rs index ed9321d16..ba78ee878 100644 --- a/py-glaredb/src/lib.rs +++ b/py-glaredb/src/lib.rs @@ -5,12 +5,12 @@ mod runtime; mod session; mod util; -use ::glaredb::util::MetastoreClientMode; // Refers to `glaredb` crate. use environment::PyEnvironmentReader; use error::PyGlareDbError; use futures::lock::Mutex; use runtime::{wait_for_future, TokioRuntime}; use session::LocalSession; +use std::collections::HashMap; use std::{ fs, path::{Path, PathBuf}, @@ -25,12 +25,10 @@ use url::Url; use datafusion_ext::vars::SessionVars; use pyo3::{exceptions::PyRuntimeError, prelude::*}; use sqlexec::{ - engine::{Engine, EngineStorageConfig, SessionStorageConfig}, + engine::{Engine, SessionStorageConfig}, remote::client::RemoteClient, }; -use telemetry::Tracker; - /// Ensure that a directory at the given path exists. Errors if the path exists /// and isn't a directory. fn ensure_dir(path: impl AsRef) -> PyResult<()> { @@ -80,27 +78,30 @@ impl From> for PythonSessionConf { } #[pyfunction] -#[pyo3(signature = (data_dir_or_cloud_url = None, /, *, spill_path = None, disable_tls = true, cloud_addr = String::from("https://console.glaredb.com")))] +#[pyo3(signature = (data_dir_or_cloud_url = None, /, *, spill_path = None, disable_tls = true, cloud_addr = String::from("https://console.glaredb.com"), location = None, storage_options = None))] fn connect( py: Python, data_dir_or_cloud_url: Option, spill_path: Option, disable_tls: bool, cloud_addr: String, + location: Option, + storage_options: Option>, ) -> PyResult { wait_for_future(py, async move { let conf = PythonSessionConf::from(data_dir_or_cloud_url); - // If data dir is provided, then both table storage and metastore - // storage will reside at that path. Otherwise everything is in memory. - let mode = MetastoreClientMode::new_from_options(None, conf.data_dir.clone()) - .map_err(PyGlareDbError::from)?; - let metastore_client = mode.into_client().await.map_err(PyGlareDbError::from)?; - let tracker = Arc::new(Tracker::Nop); - - let storage_conf = match &conf.data_dir { - Some(path) => EngineStorageConfig::Local { path: path.clone() }, - None => EngineStorageConfig::Memory, + let mut engine = if let Some(location) = location { + // TODO: try to consolidate with --data-dir option + Engine::from_storage_options(&location, &storage_options.unwrap_or_default()) + .await + .map_err(PyGlareDbError::from)? + } else { + // If data dir is provided, then both table storage and metastore + // storage will reside at that path. Otherwise everything is in memory. + Engine::from_data_dir(&conf.data_dir) + .await + .map_err(PyGlareDbError::from)? }; // If spill path not provided, default to some tmp dir. @@ -117,10 +118,7 @@ fn connect( ensure_dir(&path).ok().map(|_| path) } }; - - let engine = Engine::new(metastore_client, storage_conf, tracker, spill_path) - .await - .map_err(PyGlareDbError::from)?; + engine = engine.with_spill_path(spill_path); let mut session = if let Some(url) = conf.cloud_url.clone() { let exec_client = RemoteClient::connect_with_proxy_destination(