diff --git a/Cargo.toml b/Cargo.toml index c83b2749..4d387059 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,6 @@ hashbrown = { version = "0.12", features = ["raw"] } hex = ">=0.4.0" itertools = ">=0.10.0" log = "0.4" -mimalloc = { version = "*", default-features = false } moka = { version = "0.9.3", default_features = false, features = ["future", "atomic64", "quanta"] } object_store = "0.3.0" pretty_env_logger = "0.4" diff --git a/docs/static/logotype.svg b/docs/static/logotype.svg index f8135d30..feb12215 100755 --- a/docs/static/logotype.svg +++ b/docs/static/logotype.svg @@ -1,6 +1,6 @@ - - diff --git a/src/config/context.rs b/src/config/context.rs index adcc9bb8..08de2f0e 100644 --- a/src/config/context.rs +++ b/src/config/context.rs @@ -19,10 +19,11 @@ use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; use crate::repository::postgres::PostgresRepository; use crate::object_store::http::add_http_object_store; +use crate::object_store::wrapped::InternalObjectStore; #[cfg(feature = "object-store-s3")] use object_store::aws::new_s3; -use super::schema::{self, MEMORY_FRACTION, S3}; +use super::schema::{self, MEBIBYTES, MEMORY_FRACTION, S3}; async fn build_catalog( config: &schema::SeafowlConfig, @@ -87,8 +88,8 @@ pub async fn build_context( ) -> Result { let mut runtime_config = RuntimeConfig::new(); if let Some(max_memory) = cfg.runtime.max_memory { - runtime_config = - runtime_config.with_memory_limit(max_memory as usize, MEMORY_FRACTION); + runtime_config = runtime_config + .with_memory_limit((max_memory * MEBIBYTES) as usize, MEMORY_FRACTION); } if let Some(temp_dir) = &cfg.runtime.temp_dir { @@ -108,7 +109,7 @@ pub async fn build_context( context.runtime_env().register_object_store( INTERNAL_OBJECT_STORE_SCHEME, "", - object_store, + object_store.clone(), ); // Register the HTTP object store for external tables @@ -141,6 +142,10 @@ pub async fn build_context( table_catalog: tables, partition_catalog: partitions, function_catalog: functions, + internal_object_store: Arc::new(InternalObjectStore { + inner: object_store, + config: cfg.object_store.clone(), + }), database: DEFAULT_DB.to_string(), database_id: default_db, max_partition_size: cfg.misc.max_partition_size, diff --git a/src/context.rs b/src/context.rs index e51a3433..0f29c0e6 100644 --- a/src/context.rs +++ b/src/context.rs @@ -5,6 +5,8 @@ use base64::decode; use bytes::Bytes; use datafusion::datasource::TableProvider; use datafusion::sql::ResolvedTableReference; +use object_store::local::LocalFileSystem; +use tokio::io::AsyncReadExt; use std::fs::File; @@ -26,20 +28,19 @@ use crate::datafusion::utils::{ build_schema, compound_identifier_to_column, normalize_ident, }; use crate::object_store::http::try_prepare_http_url; +use crate::object_store::wrapped::InternalObjectStore; +use crate::utils::hash_file; use crate::wasm_udf::wasm::create_udf_from_wasm; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; -use hex::encode; + #[cfg(test)] use mockall::automock; -use object_store::memory::InMemory; use object_store::{path::Path, ObjectStore}; -use sha2::Digest; -use sha2::Sha256; + use sqlparser::ast::{ AlterTableOperation, ObjectType, Statement, TableFactor, TableWithJoins, }; -use std::io::Read; use std::iter::zip; use std::sync::Arc; @@ -68,7 +69,7 @@ use datafusion::{ }; use log::warn; use prost::Message; -use tempfile::NamedTempFile; +use tempfile::TempPath; use crate::catalog::{PartitionCatalog, DEFAULT_SCHEMA, STAGING_SCHEMA}; use crate::data_types::{TableId, TableVersionId}; @@ -86,6 +87,12 @@ use crate::{ // with DataFusion's object store registry. pub const INTERNAL_OBJECT_STORE_SCHEME: &str = "seafowl"; +// Max Parquet row group size, in rows. This is what the ArrowWriter uses to determine how many +// rows to buffer in memory before flushing them out to disk. The default for this is 1024^2, which +// means that we're effectively buffering a whole partition in memory, causing issues on RAM-limited +// environments. +const MAX_ROW_GROUP_SIZE: usize = 65536; + pub fn internal_object_store_url() -> ObjectStoreUrl { ObjectStoreUrl::parse(format!("{}://", INTERNAL_OBJECT_STORE_SCHEME)).unwrap() } @@ -105,7 +112,7 @@ fn reference_to_name(reference: &ResolvedTableReference) -> String { /// Load the Statistics for a Parquet file in memory async fn get_parquet_file_statistics_bytes( - data: Bytes, + path: &std::path::Path, schema: SchemaRef, ) -> Result { // DataFusion's methods for this are all private (see fetch_statistics / summarize_min_max) @@ -118,11 +125,18 @@ async fn get_parquet_file_statistics_bytes( // that serves as a write-through cache so that we can use it both when downloading and uploading // Parquet files. - // Create a dummy object store pointing to our temporary directory (we don't know if - // DiskManager will always put all files in the same dir) - let dummy_object_store: Arc = Arc::from(InMemory::new()); - let dummy_path = Path::from("data"); - dummy_object_store.put(&dummy_path, data).await.unwrap(); + let tmp_dir = path + .parent() + .expect("Temporary Parquet file in the FS root"); + let file_name = path + .file_name() + .expect("Temporary Parquet file pointing to a directory") + .to_string_lossy(); + + // Create a dummy object store pointing to our temporary directory + let dummy_object_store: Arc = + Arc::from(LocalFileSystem::new_with_prefix(tmp_dir)?); + let dummy_path = Path::from(file_name.to_string()); let parquet = ParquetFormat::default(); let meta = dummy_object_store @@ -192,6 +206,7 @@ pub struct DefaultSeafowlContext { pub table_catalog: Arc, pub partition_catalog: Arc, pub function_catalog: Arc, + pub internal_object_store: Arc, pub database: String, pub database_id: DatabaseId, pub max_partition_size: i64, @@ -208,19 +223,26 @@ fn make_dummy_exec() -> Arc { fn temp_partition_file_writer( disk_manager: Arc, arrow_schema: SchemaRef, -) -> Result<(File, ArrowWriter)> { +) -> Result<(TempPath, ArrowWriter)> { let partition_file = disk_manager.create_tmp_file()?; - // Maintain a second handle to the file (the first one is consumed by ArrowWriter) - // We'll close this handle (and hence drop the file) upon reading the partition contents, just - // prior to uploading them. - let partition_file_handle = partition_file.reopen().map_err(|_| { - DataFusionError::Execution("Error with temporary Parquet file".to_string()) - })?; - - let writer_properties = WriterProperties::builder().build(); + + // Hold on to the path of the file, in case we need to just move it instead of + // uploading the data to the object store. This can be a consistency/security issue, but the + // worst someone can do is swap out the file with something else if the original temporary + // file gets deleted and an attacker creates a temporary file with the same name. In that case, + // we can end up copying an arbitrary file to the object store, which requires access to the + // machine anyway (and at that point there's likely other things that the attacker can do, like + // change the write access control settings). + let path = partition_file.into_temp_path(); + + let file_writer = File::options().write(true).open(&path)?; + + let writer_properties = WriterProperties::builder() + .set_max_row_group_size(MAX_ROW_GROUP_SIZE) + .build(); let writer = - ArrowWriter::try_new(partition_file, arrow_schema, Some(writer_properties))?; - Ok((partition_file_handle, writer)) + ArrowWriter::try_new(file_writer, arrow_schema, Some(writer_properties))?; + Ok((path, writer)) } /// Execute a plan and upload the results to object storage as Parquet files, indexing them. @@ -228,14 +250,14 @@ fn temp_partition_file_writer( pub async fn plan_to_object_store( state: &SessionState, plan: &Arc, - store: Arc, + store: Arc, disk_manager: Arc, max_partition_size: i64, ) -> Result> { let mut current_partition_size = 0; - let (mut current_partition_file_handle, mut writer) = + let (mut current_partition_file_path, mut writer) = temp_partition_file_writer(disk_manager.clone(), plan.schema())?; - let mut partition_file_handles = vec![current_partition_file_handle]; + let mut partition_file_paths = vec![current_partition_file_path]; let mut tasks = vec![]; // Iterate over Datafusion partitions and rechuhk them into Seafowl partitions, since we want to @@ -268,9 +290,9 @@ pub async fn plan_to_object_store( current_partition_size = 0; leftover_partition_capacity = max_partition_size as usize; - (current_partition_file_handle, writer) = + (current_partition_file_path, writer) = temp_partition_file_writer(disk_manager.clone(), plan.schema())?; - partition_file_handles.push(current_partition_file_handle); + partition_file_paths.push(current_partition_file_path); } current_partition_size += batch.num_rows() as i64; @@ -279,40 +301,53 @@ pub async fn plan_to_object_store( } writer.close().map_err(DataFusionError::from).map(|_| ())?; - for mut partition_file_handle in partition_file_handles { + for partition_file_path in partition_file_paths { let physical = plan.clone(); let store = store.clone(); let handle: tokio::task::JoinHandle> = tokio::task::spawn(async move { - // TODO: the object_store crate doesn't support multi-part uploads / uploading a file - // from a local path. This means we have to read the file back into memory in full. - // https://github.com/influxdata/object_store_rs/issues/9 - // - // Another implication is that we could just keep everything in memory (point ArrowWriter to a byte buffer, - // call get_parquet_file_statistics on that, upload the file) and run the output routine for each partition - // sequentially. - - let mut buf = Vec::new(); - partition_file_handle - .read_to_end(&mut buf) - .expect("Error reading the temporary file"); - let data = Bytes::from(buf); - // Index the Parquet file (get its min-max values) - let partition_stats = - get_parquet_file_statistics_bytes(data.clone(), physical.schema()) - .await?; + let partition_stats = get_parquet_file_statistics_bytes( + &partition_file_path, + physical.schema(), + ) + .await?; let columns = build_partition_columns(&partition_stats, physical.schema()); - let mut hasher = Sha256::new(); - hasher.update(&data); - let hash_str = encode(hasher.finalize()); - let object_storage_id = hash_str + ".parquet"; - store - .put(&Path::from(object_storage_id.clone()), data) - .await?; + let object_storage_id = + hash_file(&partition_file_path).await? + ".parquet"; + + // For local FS stores, we can just move the file to the target location + if let Some(result) = store + .fast_upload( + &partition_file_path, + &Path::from(object_storage_id.clone()), + ) + .await + { + result?; + } else { + // TODO: the object_store crate doesn't support multi-part uploads / uploading a file + // from a local path. This means we have to read the file back into memory in full. + // https://github.com/influxdata/object_store_rs/issues/9 + // + // Another implication is that we could just keep everything in memory (point ArrowWriter to a byte buffer, + // call get_parquet_file_statistics on that, upload the file) and run the output routine for each partition + // sequentially. + + let mut buf = Vec::new(); + + let mut file = tokio::fs::File::open(&partition_file_path).await?; + file.read_to_end(&mut buf).await?; + let data = Bytes::from(buf); + + store + .inner + .put(&Path::from(object_storage_id.clone()), data) + .await?; + } let partition = SeafowlPartition { object_storage_id: Arc::from(object_storage_id), @@ -430,12 +465,8 @@ impl DefaultSeafowlContext { }) } - fn get_internal_object_store(&self) -> Arc { - let object_store_url = internal_object_store_url(); - self.inner - .runtime_env() - .object_store(object_store_url) - .unwrap() + fn get_internal_object_store(&self) -> Arc { + self.internal_object_store.clone() } /// Resolve a table reference into a Seafowl table @@ -1226,12 +1257,11 @@ pub mod test_utils { use crate::{ catalog::{ - DefaultCatalog, MockFunctionCatalog, MockPartitionCatalog, MockTableCatalog, - TableCatalog, DEFAULT_DB, DEFAULT_SCHEMA, + MockFunctionCatalog, MockPartitionCatalog, MockTableCatalog, TableCatalog, + DEFAULT_DB, DEFAULT_SCHEMA, }, object_store::http::add_http_object_store, provider::{SeafowlCollection, SeafowlDatabase}, - repository::sqlite::SqliteRepository, }; use datafusion::{ @@ -1242,6 +1272,9 @@ pub mod test_utils { prelude::SessionConfig, }; + use crate::config::context::build_context; + use crate::config::schema; + use crate::config::schema::{Catalog, SeafowlConfig, Sqlite}; use std::collections::HashMap as StdHashMap; use super::*; @@ -1267,27 +1300,16 @@ pub mod test_utils { /// Build a real (not mocked) in-memory context that uses SQLite pub async fn in_memory_context() -> DefaultSeafowlContext { - let session = make_session(); - - let repository = SqliteRepository::try_new("sqlite://:memory:".to_string()) - .await - .unwrap(); - let catalog = Arc::new(DefaultCatalog::new(Arc::new(repository))); - let default_db = catalog.create_database(DEFAULT_DB).await.unwrap(); - catalog - .create_collection(default_db, DEFAULT_SCHEMA) - .await - .unwrap(); - - DefaultSeafowlContext { - inner: session, - table_catalog: catalog.clone(), - partition_catalog: catalog.clone(), - function_catalog: catalog, - database: DEFAULT_DB.to_string(), - database_id: default_db, - max_partition_size: 1024 * 1024, - } + let config = SeafowlConfig { + object_store: schema::ObjectStore::InMemory(schema::InMemory {}), + catalog: Catalog::Sqlite(Sqlite { + dsn: "sqlite://:memory:".to_string(), + }), + frontend: Default::default(), + runtime: Default::default(), + misc: Default::default(), + }; + build_context(&config).await.unwrap() } pub async fn mock_context() -> DefaultSeafowlContext { @@ -1371,11 +1393,14 @@ pub mod test_utils { setup_table_catalog(&mut table_catalog); - let object_store = Arc::new(InMemory::new()); + let object_store = Arc::new(InternalObjectStore { + inner: Arc::new(InMemory::new()), + config: schema::ObjectStore::InMemory(schema::InMemory {}), + }); session.runtime_env().register_object_store( INTERNAL_OBJECT_STORE_SCHEME, "", - object_store, + object_store.inner.clone(), ); DefaultSeafowlContext { @@ -1383,6 +1408,7 @@ pub mod test_utils { table_catalog: Arc::new(table_catalog), partition_catalog: partition_catalog_ptr, function_catalog: Arc::new(function_catalog), + internal_object_store: object_store, database: "testdb".to_string(), database_id: 0, max_partition_size: 2, @@ -1394,6 +1420,8 @@ pub mod test_utils { mod tests { use arrow::array::Int32Array; use arrow::datatypes::{DataType, Field}; + use tempfile::TempDir; + use std::sync::Arc; use datafusion::execution::disk_manager::DiskManagerConfig; @@ -1403,9 +1431,12 @@ mod tests { use crate::context::test_utils::mock_context_with_catalog_assertions; + use crate::config::schema; use datafusion::assert_batches_eq; use datafusion::from_slice::FromSlice; use datafusion::physical_plan::memory::MemoryExec; + use itertools::Itertools; + use object_store::local::LocalFileSystem; use super::*; @@ -1423,8 +1454,31 @@ mod tests { Arc::from(scalar_value_to_bytes(&value)) } + async fn assert_uploaded_objects( + object_store: Arc, + expected: Vec, + ) { + let actual = object_store + .inner + .list(None) + .await + .unwrap() + .map_ok(|meta| meta.location) + .try_collect::>() + .await + .map(|p| p.into_iter().sorted().collect_vec()) + .unwrap(); + assert_eq!(expected.into_iter().sorted().collect_vec(), actual); + } + + #[test_case( + false; "In-memory object store (standard)" + )] + #[test_case( + true; "Local object store (test renames)" + )] #[tokio::test] - async fn test_plan_to_object_storage() { + async fn test_plan_to_object_storage(is_local: bool) { let sf_context = mock_context().await; // Make a SELECT VALUES(...) query @@ -1441,12 +1495,37 @@ mod tests { .await .unwrap(); - let object_store = Arc::new(InMemory::new()); + let (object_store, _tmpdir) = if is_local { + // Return tmp_dir to the upper scope so that we don't delete the temporary directory + // until after the test is done + let tmp_dir = TempDir::new().unwrap(); + + ( + Arc::new(InternalObjectStore { + inner: Arc::new( + LocalFileSystem::new_with_prefix(tmp_dir.path()).unwrap(), + ), + config: schema::ObjectStore::Local(schema::Local { + data_dir: tmp_dir.path().to_string_lossy().to_string(), + }), + }), + Some(tmp_dir), + ) + } else { + ( + Arc::new(InternalObjectStore { + inner: Arc::new(InMemory::new()), + config: schema::ObjectStore::InMemory(schema::InMemory {}), + }), + None, + ) + }; + let disk_manager = DiskManager::try_new(DiskManagerConfig::new()).unwrap(); let partitions = plan_to_object_store( &sf_context.inner.state(), &execution_plan, - object_store, + object_store.clone(), disk_manager, 2, ) @@ -1523,6 +1602,15 @@ mod tests { }, ] ); + + assert_uploaded_objects( + object_store, + vec![ + Path::from(PARTITION_1_FILE_NAME.to_string()), + Path::from(PARTITION_2_FILE_NAME.to_string()), + ], + ) + .await; } #[test_case( @@ -1610,7 +1698,10 @@ mod tests { MemoryExec::try_new(df_partitions.as_slice(), schema, None).unwrap(), ); - let object_store = Arc::new(InMemory::new()); + let object_store = Arc::new(InternalObjectStore { + inner: Arc::new(InMemory::new()), + config: schema::ObjectStore::InMemory(schema::InMemory {}), + }); let disk_manager = DiskManager::try_new(DiskManagerConfig::new()).unwrap(); let partitions = plan_to_object_store( &sf_context.inner.state(), @@ -1803,18 +1894,11 @@ mod tests { .unwrap(); let store = sf_context.get_internal_object_store(); - let uploaded_objects = store - .list(None) - .await - .unwrap() - .map_ok(|meta| meta.location) - .try_collect::>() - .await - .unwrap(); - assert_eq!( - uploaded_objects, - vec![Path::from(EXPECTED_INSERT_FILE_NAME)] - ); + assert_uploaded_objects( + store, + vec![Path::from(EXPECTED_INSERT_FILE_NAME.to_string())], + ) + .await; } #[tokio::test] diff --git a/src/http_object_store.rs b/src/http_object_store.rs deleted file mode 100644 index 390c27aa..00000000 --- a/src/http_object_store.rs +++ /dev/null @@ -1,278 +0,0 @@ -/// ObjectStore implementation for HTTP/HTTPs for DataFusion's CREATE EXTERNAL TABLE -use async_trait::async_trait; -use bytes::{Buf, BufMut, Bytes}; -use chrono::Utc; -use futures::stream::BoxStream; -use futures::{stream, Stream, StreamExt}; -use log::warn; -use object_store::path::Path; -use object_store::{GetResult, ListResult, ObjectMeta, ObjectStore}; - -use datafusion::prelude::SessionContext; -use reqwest::{header, Client, RequestBuilder, StatusCode}; -use std::error::Error; -use std::fmt::{Debug, Display, Formatter}; -use std::ops::Range; -use std::sync::Arc; - -#[derive(Debug)] -pub struct HttpObjectStore { - client: Client, - scheme: String, -} - -impl Display for HttpObjectStore { - fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { - todo!() - } -} - -#[derive(Debug)] -enum HttpObjectStoreError { - WritesUnsupported, - NoContentLengthResponse, - ListingUnsupported, - InvalidRangeError, - HttpClientError(reqwest::Error), -} - -impl From for object_store::Error { - fn from(e: HttpObjectStoreError) -> Self { - object_store::Error::Generic { - store: "http", - source: Box::new(e), - } - } -} - -impl From for HttpObjectStoreError { - fn from(e: reqwest::Error) -> Self { - Self::HttpClientError(e) - } -} - -impl Display for HttpObjectStoreError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::WritesUnsupported => writeln!(f, "Writes to HTTP are unsupported"), - Self::NoContentLengthResponse => { - writeln!(f, "Server did not respond with a Content-Length header") - } - Self::ListingUnsupported => writeln!(f, "HTTP doesn't support listing"), - Self::InvalidRangeError => { - writeln!(f, "Invalid range passed to the HTTP store") - } - Self::HttpClientError(e) => writeln!(f, "HTTP error: {:?}", e), - } - } -} - -impl Error for HttpObjectStoreError {} - -impl HttpObjectStore { - pub fn new(scheme: String) -> Self { - Self { - client: Client::new(), - // DataFusion strips the URL scheme when passing it to us (e.g. http://), so we - // have to record it in the object in order to reconstruct the actual full URL. - scheme, - } - } - - fn get_uri(&self, path: &Path) -> String { - format!("{}://{}", &self.scheme, path) - } - - fn request_builder(&self, path: &Path) -> RequestBuilder { - self.client.get(self.get_uri(path)).header( - "User-Agent", - format!("Seafowl/{}", env!("VERGEN_GIT_SEMVER")), - ) - } -} - -async fn range_to_bytes( - body: S, - range: &Range, -) -> Result -where - E: Into, - S: Stream>, -{ - let mut vec = Vec::with_capacity(range.end - range.start); - let mut body = Box::pin(body); - - // Skip bytes until we reach the start - let mut pos: usize = 0; - - while pos < range.end { - let mut buf = body - .next() - .await - .ok_or(HttpObjectStoreError::InvalidRangeError)? - .map_err(|e| e.into())?; - let buf_len = buf.remaining(); - - if pos < range.start { - if buf_len > range.start - pos { - // This buffer spans the range start. Skip up to the start and - // add the rest to the buffer. - buf.advance(range.start - pos); - vec.put(buf); - } - } else if buf_len >= range.end - pos { - // This buffer spans the range end. Add what's missing to the buffer - // and break out of the loop. - vec.put(buf.take(range.end - pos)); - break; - } else { - vec.put(buf) - } - pos += buf_len; - } - - Ok(vec.into()) -} - -#[async_trait] -impl ObjectStore for HttpObjectStore { - async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> { - Err(object_store::Error::NotSupported { - source: Box::new(HttpObjectStoreError::WritesUnsupported), - }) - } - - async fn get(&self, location: &Path) -> object_store::Result { - let response = self - .request_builder(location) - .send() - .await - .map_err(HttpObjectStoreError::HttpClientError)?; - - let body = response.bytes_stream(); - - Ok(GetResult::Stream( - body.map(|c| c.map_err(|e| HttpObjectStoreError::HttpClientError(e).into())) - .boxed(), - )) - } - - async fn get_range( - &self, - location: &Path, - range: Range, - ) -> object_store::Result { - let response = self - .request_builder(location) - .header("Range", format!("bytes={}-{}", range.start, range.end)) - .send() - .await - .map_err(HttpObjectStoreError::HttpClientError)?; - - // If the server returned a 206: it understood our range query - if response.status() == StatusCode::PARTIAL_CONTENT { - Ok(response - .bytes() - .await - .map_err(HttpObjectStoreError::HttpClientError)?) - } else { - // Slice the range out ourselves - warn!( - "The server doesn't support Range requests. Complete object downloaded." - ); - Ok(range_to_bytes(response.bytes_stream(), &range).await?) - } - } - - async fn head(&self, location: &Path) -> object_store::Result { - let response = self - .request_builder(location) - .send() - .await - .map_err(HttpObjectStoreError::HttpClientError)?; - - let length = response - .headers() - .get(header::CONTENT_LENGTH) - .ok_or(HttpObjectStoreError::NoContentLengthResponse)? - .to_str() - .map_err(|_| HttpObjectStoreError::NoContentLengthResponse)? - .parse::() - .map_err(|_| HttpObjectStoreError::NoContentLengthResponse)?; - - Ok(ObjectMeta { - location: location.clone(), - // DF only uses this in `paths_to_batch` which constructs a fake batch for partition - // pruning (column _df_part_file_modified), but it doesn't look like partition pruning - // expressions can access this. - last_modified: Utc::now(), - size: usize::try_from(length).expect("unsupported size on this platform"), - }) - } - - async fn delete(&self, _location: &Path) -> object_store::Result<()> { - Err(object_store::Error::NotSupported { - source: Box::new(HttpObjectStoreError::WritesUnsupported), - }) - } - - async fn list( - &self, - prefix: Option<&Path>, - ) -> object_store::Result>> { - // DataFusion uses the HEAD request instead of it's listing a single file - // (path doesn't end with a slash). Since HTTP doesn't support listing anyway, - // this makes our job easier. - - match prefix { - None => Err(HttpObjectStoreError::ListingUnsupported.into()), - Some(p) => { - let p_str = p.to_string(); - if p_str.ends_with('/') { - Err(HttpObjectStoreError::ListingUnsupported.into()) - } else { - // Use the HEAD implementation instead - Ok(Box::pin(stream::iter(vec![self.head(p).await]))) - } - } - } - } - - async fn list_with_delimiter( - &self, - _prefix: Option<&Path>, - ) -> object_store::Result { - // Not used by DF, so we punt on implementing this - Err(object_store::Error::NotImplemented) - } - - async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { - Err(object_store::Error::NotSupported { - source: Box::new(HttpObjectStoreError::WritesUnsupported), - }) - } - - async fn copy_if_not_exists( - &self, - _from: &Path, - _to: &Path, - ) -> object_store::Result<()> { - Err(object_store::Error::NotSupported { - source: Box::new(HttpObjectStoreError::WritesUnsupported), - }) - } -} - -pub fn add_http_object_store(context: &SessionContext) { - context.runtime_env().register_object_store( - "http", - "anyhost", - Arc::new(HttpObjectStore::new("http".to_string())), - ); - - context.runtime_env().register_object_store( - "https", - "anyhost", - Arc::new(HttpObjectStore::new("https".to_string())), - ); -} diff --git a/src/main.rs b/src/main.rs index 441822c1..57901322 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,6 @@ use std::{ use clap::Parser; use futures::{future::join_all, Future, FutureExt}; -use mimalloc::MiMalloc; use pretty_env_logger::env_logger; use seafowl::{ @@ -30,9 +29,6 @@ use tokio::sync::broadcast::{channel, Sender}; #[cfg(feature = "frontend-postgres")] use seafowl::frontend::postgres::run_pg_server; -#[global_allocator] -static GLOBAL: MiMalloc = MiMalloc; - extern crate pretty_env_logger; #[macro_use] extern crate log; diff --git a/src/object_store/mod.rs b/src/object_store/mod.rs index eb8f0bef..402451c5 100644 --- a/src/object_store/mod.rs +++ b/src/object_store/mod.rs @@ -1,5 +1,6 @@ pub mod cache; pub mod http; +pub mod wrapped; #[cfg(test)] pub(crate) mod testutils; diff --git a/src/object_store/wrapped.rs b/src/object_store/wrapped.rs new file mode 100644 index 00000000..bf0c946f --- /dev/null +++ b/src/object_store/wrapped.rs @@ -0,0 +1,65 @@ +use crate::config::schema; +use crate::config::schema::Local; +use futures::TryFutureExt; +use log::debug; +use object_store::ObjectStore; + +use tokio::fs::{copy, remove_file, rename}; + +use std::path::Path; +use std::sync::Arc; + +/// Wrapper around the object_store crate that holds on to the original config +/// in order to provide a more efficient "upload" for the local object store (since it's +/// stored on the local filesystem, we can just move the file to it instead). +pub struct InternalObjectStore { + pub inner: Arc, + pub config: schema::ObjectStore, +} + +impl InternalObjectStore { + /// For local filesystem object stores, try "uploading" by just moving the file. + /// Returns a None if the store isn't local. + pub async fn fast_upload( + &self, + from: &Path, + to: &object_store::path::Path, + ) -> Option> { + let object_store_path = match &self.config { + schema::ObjectStore::Local(Local { data_dir }) => data_dir, + _ => return None, + }; + + let target_path = + Path::new(&object_store_path).join(Path::new(to.to_string().as_str())); + + debug!( + "Moving temporary partition file from {} to {}", + from.display(), + target_path.display() + ); + + let result = rename(&from, &target_path).await; + + Some(if let Err(e) = result { + // Cross-device link (can't move files between filesystems) + // Copy and remove the old file + if e.raw_os_error() == Some(18) { + copy(from, target_path) + .and_then(|_| remove_file(from)) + .map_err(|e| object_store::Error::Generic { + store: "local", + source: Box::new(e), + }) + .await + } else { + Err(object_store::Error::Generic { + store: "local", + source: Box::new(e), + }) + } + } else { + Ok(()) + }) + } +} diff --git a/src/utils.rs b/src/utils.rs index 02e3ef7c..01bca93f 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,6 +1,15 @@ -use std::{io::Write, sync::Arc}; +use std::{ + io::{self, IoSlice, Write}, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; use arrow::json::LineDelimitedWriter; +use datafusion::error::Result; +use hex::encode; +use sha2::{Digest, Sha256}; +use tokio::{fs::File, io::AsyncWrite}; use crate::context::SeafowlContext; @@ -31,11 +40,80 @@ pub async fn run_one_off_command( } } +/// A Sha256 hasher that works as a Tokio async writer +struct AsyncSha256Hasher { + pub hasher: Sha256, +} + +impl AsyncSha256Hasher { + pub fn new() -> Self { + Self { + hasher: Sha256::new(), + } + } +} + +impl Write for AsyncSha256Hasher { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.hasher.update(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl AsyncWrite for AsyncSha256Hasher { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.get_mut().hasher.update(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + Poll::Ready(io::Write::write_vectored(&mut *self, bufs)) + } + + fn is_write_vectored(&self) -> bool { + true + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + +pub async fn hash_file(path: &std::path::Path) -> Result { + let mut hasher = AsyncSha256Hasher::new(); + + let mut file = File::open(path).await?; + tokio::io::copy(&mut file, &mut hasher).await?; + Ok(encode(hasher.hasher.finalize())) +} + #[cfg(test)] mod tests { use std::sync::Arc; - use super::run_one_off_command; + use tempfile::NamedTempFile; + use tokio::{fs::OpenOptions, io::AsyncWriteExt}; + + use super::{hash_file, run_one_off_command}; use crate::context::test_utils::in_memory_context; #[tokio::test] @@ -50,4 +128,22 @@ mod tests { "{\"Int64(1)\":1}\n{\"Int64(1)\":1}\n" ); } + + #[tokio::test] + async fn test_hash_file() { + let file_path = NamedTempFile::new().unwrap().into_temp_path(); + let mut file = OpenOptions::new() + .write(true) + .open(&file_path) + .await + .unwrap(); + + file.write_all(b"test").await.unwrap(); + file.flush().await.unwrap(); + + assert_eq!( + hash_file(&file_path).await.unwrap(), + "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08" + ); + } }