Skip to content

Commit

Permalink
bug(blob): fix support for AWS environment variables (tensorlakeai#1003)
Browse files Browse the repository at this point in the history
* bug(blob): fix support for AWS environment variables

* setting the AWS creds from env if available

* lint

* adding in a smaple config

* removing unwraps

---------

Co-authored-by: Diptanu Gon Choudhury <[email protected]>
  • Loading branch information
seriousben and diptanu authored Nov 5, 2024
1 parent 9b44fe0 commit a3c23fb
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 34 deletions.
76 changes: 68 additions & 8 deletions server/blob_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@ use std::{env, fmt::Debug, sync::Arc};
use anyhow::{anyhow, Result};
use bytes::{Bytes, BytesMut};
use futures::{stream::BoxStream, StreamExt};
use object_store::{parse_url, path::Path, ObjectStore, WriteMultipart};
use object_store::{
aws::{AmazonS3Builder, AmazonS3ConfigKey, DynamoCommit, S3ConditionalPut},
parse_url,
parse_url_opts,
path::Path,
ObjectStore,
ObjectStoreScheme,
WriteMultipart,
};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use tokio::sync::mpsc;
Expand All @@ -13,13 +21,15 @@ use url::Url;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlobStorageConfig {
pub path: Option<String>,
pub path: String,
pub dynamodb_table: Option<String>,
}

impl BlobStorageConfig {
pub fn new(path: &str) -> Self {
pub fn new(path: &str, dynamodb_table: Option<String>) -> Self {
BlobStorageConfig {
path: Some(format!("file://{}", path)),
path: format!("file://{}", path),
dynamodb_table,
}
}
}
Expand All @@ -29,14 +39,15 @@ impl Default for BlobStorageConfig {
let blob_store_path = format!(
"file://{}",
env::current_dir()
.unwrap()
.expect("unable to get current directory")
.join("indexify_storage/blobs")
.to_str()
.unwrap()
.expect("unable to get path as string")
);
info!("using blob store path: {}", blob_store_path);
BlobStorageConfig {
path: Some(blob_store_path),
path: blob_store_path,
dynamodb_table: None,
}
}
}
Expand All @@ -56,13 +67,62 @@ pub struct BlobStorage {

impl BlobStorage {
pub fn new(config: BlobStorageConfig) -> Result<Self> {
let (object_store, path) = parse_url(&config.path.clone().unwrap().parse::<Url>()?)?;
let url = &config.path.clone();
let (object_store, path) = Self::build_object_store(url, config.dynamodb_table)?;
Ok(Self {
object_store: Arc::new(object_store),
path,
})
}

pub fn build_object_store(
url_str: &str,
ddb_table: Option<String>,
) -> Result<(Box<dyn ObjectStore>, Path)> {
let url = &url_str.parse::<Url>()?;
let (scheme, _) = ObjectStoreScheme::parse(url)?;
match scheme {
ObjectStoreScheme::AmazonS3 => {
let ddb_table =
ddb_table.ok_or(anyhow!("dynamodb_table is required for AmazonS3"))?;
// inject AWS environment variables to prioritize keys over instance metadata
// credentials.
let opts: Vec<(AmazonS3ConfigKey, String)> = std::env::vars_os()
.filter_map(|(os_key, os_value)| {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
if key.starts_with("AWS_") {
if let Ok(config_key) = key.to_ascii_lowercase().parse() {
return Some((config_key, String::from(value)));
}
}
}
None
})
.collect();

let mut s3_builder = AmazonS3Builder::new().with_url(url_str);
for (key, value) in opts.iter() {
s3_builder = s3_builder.with_config(*key, value.clone());
}
let s3_builder = s3_builder
.with_conditional_put(S3ConditionalPut::Dynamo(DynamoCommit::new(ddb_table)))
.build()
.expect("failed to create object store");
let (_, path) = parse_url_opts(url, opts)?;
Ok((Box::new(s3_builder), path))
}
_ => Ok(parse_url(url)?),
}
}

pub fn get_object_store(&self) -> Arc<dyn ObjectStore> {
self.object_store.clone()
}

pub fn get_path(&self) -> Path {
self.path.clone()
}

pub async fn put(
&self,
key: &str,
Expand Down
7 changes: 7 additions & 0 deletions server/sample_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
state_store_path: indexify_server_state
listen_addr: 0.0.0.0:8900
blob_storage:
path: "s3://indexifyblobs"
dynamodb_table: kvs_cas_table


6 changes: 4 additions & 2 deletions server/src/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ mod tests {
let state = IndexifyState::new(temp_dir.path().join("state"))
.await
.unwrap();
let config =
blob_store::BlobStorageConfig::new(temp_dir.path().join("blob").to_str().unwrap());
let config = blob_store::BlobStorageConfig::new(
temp_dir.path().join("blob").to_str().unwrap(),
None,
);
let storage = Arc::new(BlobStorage::new(config)?);
let (tx, rx) = watch::channel(());
let mut gc = Gc::new(state.clone(), storage.clone(), rx);
Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ async fn main() {
};
let service = Service::new(config);
if let Err(err) = service.start().await {
error!("Error starting service: {}", err);
error!("Error starting service: {:?}", err);
}
}
30 changes: 14 additions & 16 deletions server/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{net::SocketAddr, path::Path, sync::Arc};
use std::{net::SocketAddr, sync::Arc};

use anyhow::{anyhow, Result};
use anyhow::{Context, Result};
use axum_server::Handle;
use blob_store::BlobStorage;
use state_store::{
Expand Down Expand Up @@ -34,20 +34,14 @@ impl Service {
pub async fn start(&self) -> Result<()> {
let (shutdown_tx, shutdown_rx) = watch::channel(());
let indexify_state = IndexifyState::new(self.config.state_store_path.parse()?).await?;
let blob_storage = Arc::new(BlobStorage::new(self.config.blob_storage.clone())?);
let blob_storage = Arc::new(
BlobStorage::new(self.config.blob_storage.clone())
.context("error initializing BlobStorage")?,
);
let executor_manager = Arc::new(ExecutorManager::new(indexify_state.clone()).await);
let blob_storage_path = self
.config
.blob_storage
.path
.clone()
.unwrap_or_default()
.clone();
let kvs_manifest_path = Path::new(&blob_storage_path).join("graph_ctx_state");
let kvs_manifest_path = kvs_manifest_path
.to_str()
.ok_or(anyhow!(format!("unable to create kv store path")))?;
let kvs = KVS::new(&kvs_manifest_path.to_string()).await?;
let kvs = KVS::new(blob_storage.clone(), "graph_ctx_state")
.await
.context("error initializing KVS")?;
let route_state = RouteState {
indexify_state: indexify_state.clone(),
kvs: Arc::new(kvs),
Expand All @@ -59,7 +53,11 @@ impl Service {
let handle_sh = handle.clone();
let scheduler = Scheduler::new(indexify_state.clone());

let mut gc = Gc::new(indexify_state.clone(), blob_storage, shutdown_rx.clone());
let mut gc = Gc::new(
indexify_state.clone(),
blob_storage.clone(),
shutdown_rx.clone(),
);
let mut system_tasks_executor =
SystemTasksExecutor::new(indexify_state.clone(), shutdown_rx.clone());

Expand Down
17 changes: 10 additions & 7 deletions server/state_store/src/kv.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::Arc;

use anyhow::Result;
use anyhow::{Context, Result};
use blob_store::BlobStorage;
use bytes::Bytes;
use object_store::{parse_url, path::Path};
use slatedb::{config::DbOptions, db::Db};

pub struct WriteContextData {
Expand All @@ -12,17 +12,20 @@ pub struct WriteContextData {
pub key: String,
pub value: Vec<u8>,
}
use url::Url;
pub struct KVS {
kv_store: Arc<Db>,
}

impl KVS {
pub async fn new(path: &str) -> Result<Self> {
let (object_store, path) = parse_url(&path.parse::<Url>()?)?;
pub async fn new(blob_store: Arc<BlobStorage>, prefix: &str) -> Result<Self> {
let options = DbOptions::default();
let kv_store =
Db::open_with_opts(Path::from(path), options, Arc::new(object_store)).await?;
let kv_store = Db::open_with_opts(
blob_store.get_path().child(prefix),
options,
Arc::new(blob_store.get_object_store()),
)
.await
.context("error opening kv store")?;
Ok(KVS {
kv_store: Arc::new(kv_store),
})
Expand Down

0 comments on commit a3c23fb

Please sign in to comment.