Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable GCS, S3, R2 and MinIO as object stores for local runs #1843

Merged
merged 17 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions crates/datasources/src/native/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,7 @@ impl NativeTableStorage {

pub async fn delete_table(&self, table: &TableEntry) -> Result<()> {
let prefix = format!("databases/{}/tables/{}", self.db_id, table.meta.id);
let path: ObjectStorePath = match &self.conf {
StorageConfig::Gcs { bucket, .. } => format!("gs://{}/{}", bucket, prefix).into(),
StorageConfig::Memory => format!("memory://{}", prefix).into(),
_ => prefix.into(),
};
let mut x = self.store.list(Some(&path)).await?;
let mut x = self.store.list(Some(&prefix.into())).await?;
while let Some(meta) = x.next().await {
let meta = meta?;
self.store.delete(&meta.location).await?
Expand All @@ -153,6 +148,19 @@ impl NativeTableStorage {
let prefix = format!("databases/{}/tables/{}", self.db_id, table.meta.id);

let url = match &self.conf {
StorageConfig::S3 {
endpoint, bucket, ..
} => {
if let Some(endpoint) = endpoint {
Url::parse(endpoint)?
} else if let Some(bucket) = bucket {
Url::parse(&format!("s3://{}", bucket))?
} else {
return Err(NativeError::Static(
"Can't generate root URL for the native table storage, misconfigured S3 bucket",
));
}
}
StorageConfig::Gcs { bucket, .. } => {
Url::parse(&format!("gs://{}/{}", bucket, prefix.clone()))?
}
Expand Down
3 changes: 3 additions & 0 deletions crates/glaredb/src/args/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub struct LocalClientOpts {
#[clap(short = 'c', long, value_parser)]
pub cloud_url: Option<Url>,

#[clap(flatten)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept wanting there to be soemthing like "flatten with prefix" so we could still get strict parsing that we didn't have to write ourselves, but it seems like this isn't to be. I also sort of wanted ArgGroup to be able to get us more parsing, but again, no such luck.

pub storage_options: Option<StorageOptionsArgs>,

/// Ignores the proxy and directly goes to the server for remote execution.
///
/// (Internal)
Expand Down
21 changes: 21 additions & 0 deletions crates/glaredb/src/args/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::anyhow;
use anyhow::Result;
use clap::{Parser, ValueEnum};
use std::fmt::Write as _;
Expand Down Expand Up @@ -81,3 +82,23 @@ pub struct PgProxyArgs {
#[clap(long)]
pub cloud_auth_code: String,
}

#[derive(Debug, Clone, Parser)]
pub struct StorageOptionsArgs {
/// URL of the object store in which to keep the data in.
#[clap(short, long)]
pub location: String,

/// Storage options for the object store.
#[clap(short, long, requires = "location", value_parser=parse_key_value_pair)]
pub opts: Vec<(String, String)>,
}

fn parse_key_value_pair(key_value_pair: &str) -> Result<(String, String)> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if you specify options which aren't supported, or options that are specified more than once? will order matter (should we sort by key?)

While I think it's good to have some kind of flexible configs here (given that not all backing storage is going have the same arguments),

it's my assumption that we'll (eventually) have all this specified by a config file or most people will specify the options interactively in via SQL/python.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if you specify options which aren't supported

Unsupported options will be ignored but the initialization/setup will error out with a clear message (either from us, or courtesy of the object store crate itself) if some of the required keys are missing.

or options that are specified more than once? will order matter (should we sort by key?)

The last option value will be used in this case.

key_value_pair
.split_once('=')
.map(|(key, value)| (key.to_string(), value.to_string()))
.ok_or(anyhow!(
"Expected key-value pair delimited by an equals sign, got '{key_value_pair}'"
))
}
gruuya marked this conversation as resolved.
Show resolved Hide resolved
31 changes: 24 additions & 7 deletions crates/glaredb/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::args::{LocalClientOpts, OutputMode};
use crate::args::{LocalClientOpts, OutputMode, StorageOptionsArgs};
use crate::highlighter::{SQLHighlighter, SQLHinter, SQLValidator};
use crate::prompt::SQLPrompt;
use crate::util::MetastoreClientMode;
Expand All @@ -16,8 +16,10 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use pgrepr::format::Format;
use reedline::{FileBackedHistory, Reedline, Signal};
use std::collections::HashMap;

use datafusion_ext::vars::SessionVars;
use metastore::local::start_inprocess;
use sqlexec::engine::EngineStorageConfig;
use sqlexec::engine::{Engine, SessionStorageConfig, TrackedSession};
use sqlexec::parser;
Expand Down Expand Up @@ -48,15 +50,30 @@ pub struct LocalSession {
impl LocalSession {
pub async fn connect(opts: LocalClientOpts) -> Result<Self> {
// Connect to metastore.
let mode = MetastoreClientMode::new_local(opts.data_dir.clone())?;
let metastore_client = mode.into_client().await?;
let tracker = Arc::new(Tracker::Nop);
let (storage_conf, metastore_client) = if let Some(StorageOptionsArgs { location, opts }) =
&opts.storage_options
{
// TODO: try to consolidate with --data-dir option
let conf =
EngineStorageConfig::try_from_options(location, HashMap::from_iter(opts.clone()))?;
let store = conf
.storage_config(&SessionStorageConfig::default())?
.new_object_store()?;
let client = start_inprocess(store).await?;
(conf, client)
} else {
let conf = match &opts.data_dir {
Some(path) => EngineStorageConfig::Local { path: path.clone() },
None => EngineStorageConfig::Memory,
};

let storage_conf = match &opts.data_dir {
Some(path) => EngineStorageConfig::Local { path: path.clone() },
None => EngineStorageConfig::Memory,
let mode = MetastoreClientMode::new_local(opts.data_dir.clone())?;
let client = mode.into_client().await?;
(conf, client)
};

let tracker = Arc::new(Tracker::Nop);

let engine = Engine::new(
metastore_client,
storage_conf,
Expand Down
1 change: 1 addition & 0 deletions crates/glaredb/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl ComputeServer {
let storage_conf = match (data_dir, service_account_key) {
(None, Some(key)) => EngineStorageConfig::Gcs {
service_account_key: key,
bucket: None,
},
(Some(dir), None) => EngineStorageConfig::Local { path: dir },
(None, None) => EngineStorageConfig::Memory,
Expand Down
3 changes: 1 addition & 2 deletions crates/metastore/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ pub async fn start_inprocess_local(

/// Starts an in-process metastore service, returning a client for the service.
///
/// Useful for some tests, as well as when running GlareDB locally for testing.
/// This should never be used in production.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! 🫣

/// Useful for tests, as well as when running GlareDB locally.
pub async fn start_inprocess(
store: Arc<dyn ObjectStore>,
) -> Result<MetastoreServiceClient<Channel>> {
Expand Down
33 changes: 33 additions & 0 deletions crates/object_store_util/src/conf.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use object_store::aws::AmazonS3Builder;
use object_store::{
gcp::GoogleCloudStorageBuilder, local::LocalFileSystem, memory::InMemory,
Error as ObjectStoreError, ObjectStore,
Expand All @@ -11,6 +12,13 @@ static IN_MEMORY_STORE: Lazy<Arc<InMemory>> = Lazy::new(|| Arc::new(InMemory::ne
/// Configuration options for various types of storage we support.
#[derive(Debug, Clone)]
pub enum StorageConfig {
S3 {
access_key_id: String,
secret_access_key: String,
region: Option<String>,
endpoint: Option<String>,
bucket: Option<String>,
},
Gcs {
service_account_key: String,
bucket: String,
Expand All @@ -25,6 +33,31 @@ impl StorageConfig {
/// Create a new object store using this config.
pub fn new_object_store(&self) -> Result<Arc<dyn ObjectStore>, ObjectStoreError> {
Ok(match self {
StorageConfig::S3 {
access_key_id,
secret_access_key,
region,
endpoint,
bucket,
} => {
let mut builder = AmazonS3Builder::new()
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key)
.with_region(region.clone().unwrap_or_default());

if let Some(endpoint) = endpoint {
if endpoint.starts_with("http://") {
builder = builder.with_allow_http(true);
}
builder = builder.with_endpoint(endpoint);
}

if let Some(bucket) = bucket {
builder = builder.with_bucket_name(bucket);
}

Arc::new(builder.build()?)
}
StorageConfig::Gcs {
service_account_key,
bucket,
Expand Down
114 changes: 110 additions & 4 deletions crates/sqlexec/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use crate::context::remote::RemoteSessionContext;
use crate::errors::{ExecError, Result};
use crate::metastore::client::{MetastoreClientSupervisor, DEFAULT_METASTORE_CLIENT_CONFIG};
use crate::session::Session;
use std::collections::HashMap;

use object_store::aws::AmazonS3ConfigKey;
use object_store::gcp::GoogleConfigKey;
use std::fs;
use std::ops::{Deref, DerefMut};
use std::path::Path;
Expand All @@ -13,6 +16,7 @@ use std::sync::Arc;

use crate::metastore::catalog::SessionCatalog;
use datafusion_ext::vars::SessionVars;
use datasources::common::url::{DatasourceUrl, DatasourceUrlType};
use datasources::native::access::NativeTableStorage;
use object_store_util::conf::StorageConfig;
use protogen::gen::metastore::service::metastore_service_client::MetastoreServiceClient;
Expand All @@ -30,30 +34,116 @@ pub struct SessionStorageConfig {
pub gcs_bucket: Option<String>,
}

// TODO: There's a significant amount of overlap with `StorageConfig`, would be good to consider
// consolidating them into one
/// Storage configuration for the compute engine.
///
/// The configuration defined here alongside the configuration passed in through
/// the proxy will be used to connect to database storage.
#[derive(Debug, Clone)]
pub enum EngineStorageConfig {
gruuya marked this conversation as resolved.
Show resolved Hide resolved
Gcs { service_account_key: String },
Local { path: PathBuf },
S3 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to split into storage config into authentication components which are provider specific and general options (e.g. bucket name, path prefix, etc?)

also does it make sense to have a "path prefix" so that you could theoretically store multiple engines one one bucket?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to split into storage config into authentication components

Fwiw, I did try this initially; will try to change to that setup now as well.

also does it make sense to have a "path prefix"

Good question; usually in these cases the bucket parameter can hold an arbitrary path as well, so I figured that would work, but after a quick test now I see it doesn't so I'll need to investigate/fix that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should get that into main before cutting a release, but can merge this PR and then add prefixes in, if you want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will go ahead with merge, as I have other follow-up work as well (i.e. functional tests for this feature). Opened #1873 to track those two.

access_key_id: String,
secret_access_key: String,
region: Option<String>,
endpoint: Option<String>,
bucket: Option<String>,
},
Gcs {
service_account_key: String,
bucket: Option<String>,
},
Local {
path: PathBuf,
},
Memory,
}

impl EngineStorageConfig {
pub fn try_from_options(location: &String, opts: HashMap<String, String>) -> Result<Self> {
if location.starts_with("memory://") {
return Ok(EngineStorageConfig::Memory);
}

let datasource_url = DatasourceUrl::try_new(location)?;
Ok(match datasource_url {
DatasourceUrl::File(path) => EngineStorageConfig::Local { path },
DatasourceUrl::Url(ref url) => {
// Buket potentially provided as a part of the location URL, try to extract it.
let bucket = url.host_str().map(|h| h.to_string());

match datasource_url.datasource_url_type() {
DatasourceUrlType::Gcs => {
let service_account_path =
opts.get("service_account_path").cloned().unwrap_or_else(|| {
std::env::var(GoogleConfigKey::ServiceAccount.as_ref().to_uppercase())
.expect(
"'service_account_path' in provided storage options or 'GOOGLE_SERVICE_ACCOUNT' as env var",
)
});

let service_account_key = fs::read_to_string(service_account_path)?;

let bucket = bucket.or(opts.get("bucket").cloned());
EngineStorageConfig::Gcs {
service_account_key,
bucket,
}
}
DatasourceUrlType::S3 | DatasourceUrlType::Http => {
let access_key_id = opts.get("access_key_id").cloned().unwrap_or_else(|| {
std::env::var(AmazonS3ConfigKey::AccessKeyId.as_ref().to_uppercase())
.expect("'access_key_id' in provided storage options or 'AWS_ACCESS_KEY_ID' as env var")
});
let secret_access_key =
opts.get("secret_access_key").cloned().unwrap_or_else(|| {
std::env::var(AmazonS3ConfigKey::SecretAccessKey.as_ref().to_uppercase())
.expect("'secret_access_key' in provided storage options or 'AWS_SECRET_ACCESS_KEY' as env var")
});

let mut endpoint = opts.get("endpoint").cloned();
let region = opts.get("region").cloned();
let bucket = bucket.or(opts.get("bucket").cloned());
if !location.starts_with("s3") && !location.contains("amazonaws.com") {
// For now we don't allow proper HTTP object stores as storage locations, so
// interpret this case as either Cloudflare R2 or a MinIO instance
endpoint = Some(location.clone());
}

EngineStorageConfig::S3 {
access_key_id,
secret_access_key,
region,
endpoint,
bucket,
}
}
_ => unreachable!(),
}
}
})
}

/// Create a native table storage config from values in the engine and
/// session configs.
///
/// Errors if the engine config is incompatible with the session config.
fn storage_config(&self, session_conf: &SessionStorageConfig) -> Result<StorageConfig> {
pub fn storage_config(&self, session_conf: &SessionStorageConfig) -> Result<StorageConfig> {
Ok(match (self.clone(), session_conf.gcs_bucket.clone()) {
// GCS bucket storage.
// GCS bucket defined via session config or at the engine config level
(
EngineStorageConfig::Gcs {
service_account_key,
..
},
Some(bucket),
)
| (
EngineStorageConfig::Gcs {
service_account_key,
bucket: Some(bucket),
},
None,
) => StorageConfig::Gcs {
service_account_key,
bucket,
Expand All @@ -64,6 +154,22 @@ impl EngineStorageConfig {
"Missing bucket on session configuration",
))
}
(
EngineStorageConfig::S3 {
access_key_id,
secret_access_key,
region,
endpoint,
bucket,
},
_,
) => StorageConfig::S3 {
access_key_id,
secret_access_key,
region,
endpoint,
bucket,
},
// Local disk storage.
(EngineStorageConfig::Local { path }, None) => StorageConfig::Local { path },
// In-memory storage.
Expand Down
Loading