Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upgrade object store version #1541

Merged
merged 4 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
369 changes: 304 additions & 65 deletions Cargo.lock

Large diffs are not rendered by default.

25 changes: 0 additions & 25 deletions src/analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ use object_store::{
disk_cache::DiskCacheStore,
mem_cache::{MemCache, MemCacheStore},
metrics::StoreWithMetrics,
obkv,
prefix::StoreWithPrefix,
s3, LocalFileSystem, ObjectStoreRef,
};
use snafu::{ResultExt, Snafu};
use table_engine::engine::{EngineRuntimes, TableEngineRef};
use table_kv::obkv::ObkvImpl;
use wal::manager::{OpenedWals, WalManagerRef};

use crate::{
Expand All @@ -55,9 +53,6 @@ pub enum Error {
source: crate::instance::engine::Error,
},

#[snafu(display("Failed to open obkv, err:{}", source))]
OpenObkv { source: table_kv::obkv::Error },

#[snafu(display("Failed to execute in runtime, err:{}", source))]
RuntimeExec { source: runtime::Error },

Expand Down Expand Up @@ -214,26 +209,6 @@ fn open_storage(
let store_with_prefix = StoreWithPrefix::new(aliyun_opts.prefix, oss);
Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
}
ObjectStoreOptions::Obkv(obkv_opts) => {
let obkv_config = obkv_opts.client;
let obkv = engine_runtimes
.write_runtime
.spawn_blocking(move || ObkvImpl::new(obkv_config).context(OpenObkv))
.await
.context(RuntimeExec)??;

let oss: ObjectStoreRef = Arc::new(
obkv::ObkvObjectStore::try_new(
Arc::new(obkv),
obkv_opts.shard_num,
obkv_opts.part_size.0 as usize,
obkv_opts.max_object_size.0 as usize,
obkv_opts.upload_parallelism,
)
.context(OpenObjectStore)?,
);
Arc::new(StoreWithPrefix::new(obkv_opts.prefix, oss).context(OpenObjectStore)?) as _
}
ObjectStoreOptions::S3(s3_option) => {
let oss: ObjectStoreRef =
Arc::new(s3::try_new(&s3_option).context(OpenObjectStore)?);
Expand Down
2 changes: 1 addition & 1 deletion src/analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ mod tests {

let bytes = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap();
let meta_path = object_store::Path::from(meta_path);
store.put(&meta_path, bytes).await.unwrap();
store.put(&meta_path, bytes.into()).await.unwrap();
}

#[tokio::test]
Expand Down
100 changes: 30 additions & 70 deletions src/analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ use datafusion::parquet::basic::Compression;
use futures::StreamExt;
use generic_error::BoxError;
use logger::{debug, error};
use object_store::{ObjectStoreRef, Path};
use parquet::data_type::AsBytes;
use object_store::{MultiUploadWriter, ObjectStore, ObjectStoreRef, Path, WriteMultipartRef};
use snafu::{OptionExt, ResultExt};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::io::AsyncWrite;

use crate::{
sst::{
Expand All @@ -45,8 +44,8 @@ use crate::{
},
},
writer::{
self, BuildParquetFilter, EncodePbData, EncodeRecordBatch, ExpectTimestampColumn, Io,
MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo, SstWriter, Storage,
BuildParquetFilter, EncodePbData, EncodeRecordBatch, ExpectTimestampColumn, MetaData,
PollRecordBatch, RecordBatchStream, Result, SstInfo, SstWriter, Storage,
},
},
table::sst_util,
Expand Down Expand Up @@ -405,67 +404,24 @@ impl<'a> RecordBatchGroupWriter<'a> {
}
}

struct ObjectStoreMultiUploadAborter<'a> {
location: &'a Path,
session_id: String,
object_store: &'a ObjectStoreRef,
}

impl<'a> ObjectStoreMultiUploadAborter<'a> {
async fn initialize_upload(
object_store: &'a ObjectStoreRef,
location: &'a Path,
) -> Result<(
ObjectStoreMultiUploadAborter<'a>,
Box<dyn AsyncWrite + Unpin + Send>,
)> {
let (session_id, upload_writer) = object_store
.put_multipart(location)
.await
.context(Storage)?;
let aborter = Self {
location,
session_id,
object_store,
};
Ok((aborter, upload_writer))
}

async fn abort(self) -> Result<()> {
self.object_store
.abort_multipart(self.location, &self.session_id)
.await
.context(Storage)
}
}

async fn write_metadata<W>(
mut meta_sink: W,
async fn write_metadata(
meta_sink: MultiUploadWriter,
parquet_metadata: ParquetMetaData,
meta_path: &object_store::Path,
) -> writer::Result<usize>
where
W: AsyncWrite + Send + Unpin,
{
) -> Result<usize> {
let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?;
let bytes = buf.as_bytes();
let bytes_size = bytes.len();
meta_sink.write_all(bytes).await.with_context(|| Io {
file: meta_path.clone(),
})?;

meta_sink.shutdown().await.with_context(|| Io {
file: meta_path.clone(),
})?;
let buf_size = buf.len();
let mut uploader = meta_sink.multi_upload.lock().await;
uploader.put(buf);
uploader.finish().await.context(Storage)?;

Ok(bytes_size)
Ok(buf_size)
}

async fn multi_upload_abort(path: &Path, aborter: ObjectStoreMultiUploadAborter<'_>) {
// The uploading file will be leaked if failed to abort. A repair command will
// be provided to clean up the leaked files.
if let Err(e) = aborter.abort().await {
error!("Failed to abort multi-upload for sst:{}, err:{}", path, e);
async fn multi_upload_abort(aborter: WriteMultipartRef) {
// The uploading file will be leaked if failed to abort. A repair command
// will be provided to clean up the leaked files.
if let Err(e) = aborter.lock().await.abort().await {
error!("Failed to abort multi-upload sst, err:{}", e);
}
}

Expand All @@ -476,7 +432,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
request_id: RequestId,
meta: &MetaData,
input: RecordBatchStream,
) -> writer::Result<SstInfo> {
) -> Result<SstInfo> {
debug!(
"Build parquet file, request_id:{}, meta:{:?}, num_rows_per_row_group:{}",
request_id, meta, self.options.num_rows_per_row_group
Expand All @@ -491,28 +447,32 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
};
let group_writer = RecordBatchGroupWriter::new(request_id, input, meta, write_options);

let (aborter, sink) =
ObjectStoreMultiUploadAborter::initialize_upload(self.store, self.path).await?;
let sink = MultiUploadWriter::new(self.store, self.path)
.await
.context(Storage)?;
let aborter = sink.aborter();

let meta_path = Path::from(sst_util::new_metadata_path(self.path.as_ref()));

let (total_num_rows, parquet_metadata, mut data_encoder) =
match group_writer.write_all(sink, &meta_path).await {
Ok(v) => v,
Err(e) => {
multi_upload_abort(self.path, aborter).await;
multi_upload_abort(aborter).await;
return Err(e);
}
};
let time_range = parquet_metadata.time_range;

let (meta_aborter, meta_sink) =
ObjectStoreMultiUploadAborter::initialize_upload(self.store, &meta_path).await?;
let meta_size = match write_metadata(meta_sink, parquet_metadata, &meta_path).await {
let meta_sink = MultiUploadWriter::new(self.store, &meta_path)
.await
.context(Storage)?;
let meta_aborter = meta_sink.aborter();
let meta_size = match write_metadata(meta_sink, parquet_metadata).await {
Ok(v) => v,
Err(e) => {
multi_upload_abort(self.path, aborter).await;
multi_upload_abort(&meta_path, meta_aborter).await;
multi_upload_abort(aborter).await;
multi_upload_abort(meta_aborter).await;
return Err(e);
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/components/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ table_kv = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
twox-hash = "1.6"
upstream = { package = "object_store", version = "0.5.6", features = [ "aws" ] }
upstream = { package = "object_store", version = "0.10.1", features = [ "aws" ] }
uuid = { version = "1.3.3", features = ["v4"] }

[dev-dependencies]
Expand Down
35 changes: 0 additions & 35 deletions src/components/object_store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::time::Duration;

use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
use table_kv::config::ObkvConfig;
use time_ext::ReadableDuration;

#[derive(Debug, Clone, Deserialize, Serialize)]
Expand Down Expand Up @@ -63,7 +62,6 @@ impl Default for StorageOptions {
pub enum ObjectStoreOptions {
Local(LocalOptions),
Aliyun(AliyunOptions),
Obkv(ObkvOptions),
S3(S3Options),
}

Expand All @@ -85,39 +83,6 @@ pub struct AliyunOptions {
pub retry: RetryOptions,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObkvOptions {
pub prefix: String,
#[serde(default = "ObkvOptions::default_shard_num")]
pub shard_num: usize,
#[serde(default = "ObkvOptions::default_part_size")]
pub part_size: ReadableSize,
#[serde(default = "ObkvOptions::default_max_object_size")]
pub max_object_size: ReadableSize,
#[serde(default = "ObkvOptions::default_upload_parallelism")]
pub upload_parallelism: usize,
/// Obkv client config
pub client: ObkvConfig,
}

impl ObkvOptions {
fn default_max_object_size() -> ReadableSize {
ReadableSize::gb(1)
}

fn default_part_size() -> ReadableSize {
ReadableSize::mb(1)
}

fn default_shard_num() -> usize {
512
}

fn default_upload_parallelism() -> usize {
8
}
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct S3Options {
pub region: String,
Expand Down
Loading
Loading