From 9d5cfcc64fb0ea592431eaeee18ab7ea95adc2a7 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Sun, 29 Sep 2024 23:25:32 -0700 Subject: [PATCH] refactor(cubestore): Move build_writer_props to CubestoreMetadataCacheFactory (#8756) --- rust/cubestore/Cargo.lock | 2 +- rust/cubestore/cubestore/src/config/mod.rs | 21 +-- rust/cubestore/cubestore/src/sql/mod.rs | 7 +- .../cubestore/src/store/compaction.rs | 67 ++++++--- rust/cubestore/cubestore/src/store/mod.rs | 37 +++-- rust/cubestore/cubestore/src/table/parquet.rs | 129 ++++++++++++++---- 6 files changed, 185 insertions(+), 78 deletions(-) diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index fbc0671e17d5d..a2c007a67309e 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -1324,7 +1324,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube#7e935118e556a31c35f5e5bb4ecb465cefb19ebb" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube#c29478ed9678ad1dcae7508dc0195fb94a689e6c" dependencies = [ "ahash", "arrow", diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index ed9d87caff2df..3a03bf2d0c374 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -2007,8 +2007,7 @@ impl Config { .register_typed::(async move |i| { let metadata_cache_factory = i .get_service_typed::() - .await - .cache_factory(); + .await; ChunkStore::new( i.get_service_typed().await, i.get_service_typed().await, @@ -2025,10 +2024,10 @@ impl Config { self.injector .register_typed::(async move |i| { let c = i.get_service_typed::().await; - let metadata_cache_factory = i + let cubestore_metadata_cache_factory = i .get_service_typed::() - .await - .cache_factory(); + .await; + let metadata_cache_factory: &_ = cubestore_metadata_cache_factory.cache_factory(); CubestoreParquetMetadataCacheImpl::new( match c.metadata_cache_max_capacity_bytes() { 0 => metadata_cache_factory.make_noop_cache(), @@ -2045,8 +2044,7 @@ impl Config { .register_typed::(async move |i| { let metadata_cache_factory = i .get_service_typed::() - .await - .cache_factory(); + .await; CompactionServiceImpl::new( i.get_service_typed().await, i.get_service_typed().await, @@ -2093,7 +2091,8 @@ impl Config { i.get_service_typed().await, i.get_service_typed::() .await - .cache_factory(), + .cache_factory() + .clone(), ) }) .await; @@ -2195,7 +2194,8 @@ impl Config { let metadata_cache_factory = i .get_service_typed::() .await - .cache_factory(); + .cache_factory() + .clone(); QueryPlannerImpl::new( i.get_service_typed().await, i.get_service_typed().await, @@ -2211,7 +2211,8 @@ impl Config { QueryExecutorImpl::new( i.get_service_typed::() .await - .cache_factory(), + .cache_factory() + .clone(), i.get_service_typed().await, i.get_service_typed().await, ) diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 8c53ccb6d9bb6..2ff2144db1037 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -1660,6 +1660,7 @@ mod tests { use crate::metastore::job::JobType; use crate::store::compaction::CompactionService; + use crate::table::parquet::CubestoreMetadataCacheFactoryImpl; use async_compression::tokio::write::GzipEncoder; use cuberockstore::rocksdb::{Options, DB}; use datafusion::physical_plan::parquet::BasicMetadataCacheFactory; @@ -1728,7 +1729,7 @@ mod tests { remote_fs.clone(), Arc::new(MockCluster::new()), config.config_obj(), - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), rows_per_chunk, ); let limits = Arc::new(ConcurrencyLimits::new(4)); @@ -1807,7 +1808,7 @@ mod tests { remote_fs.clone(), Arc::new(MockCluster::new()), config.config_obj(), - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), rows_per_chunk, ); let limits = Arc::new(ConcurrencyLimits::new(4)); @@ -1917,7 +1918,7 @@ mod tests { remote_fs.clone(), Arc::new(MockCluster::new()), config.config_obj(), - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), rows_per_chunk, ); let limits = Arc::new(ConcurrencyLimits::new(4)); diff --git a/rust/cubestore/cubestore/src/store/compaction.rs b/rust/cubestore/cubestore/src/store/compaction.rs index e4fbfa2037ebf..cd224c44be09c 100644 --- a/rust/cubestore/cubestore/src/store/compaction.rs +++ b/rust/cubestore/cubestore/src/store/compaction.rs @@ -13,7 +13,7 @@ use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs}; use crate::store::{min_max_values_from_data, ChunkDataStore, ChunkStore, ROW_GROUP_SIZE}; use crate::table::data::{cmp_min_rows, cmp_partition_key}; -use crate::table::parquet::{arrow_schema, ParquetTableStore}; +use crate::table::parquet::{arrow_schema, CubestoreMetadataCacheFactory, ParquetTableStore}; use crate::table::redistribute::redistribute; use crate::table::{Row, TableValue}; use crate::util::batch_memory::record_batch_buffer_size; @@ -75,7 +75,7 @@ pub struct CompactionServiceImpl { chunk_store: Arc, remote_fs: Arc, config: Arc, - metadata_cache_factory: Arc, + metadata_cache_factory: Arc, } crate::di_service!(CompactionServiceImpl, [CompactionService]); @@ -86,7 +86,7 @@ impl CompactionServiceImpl { chunk_store: Arc, remote_fs: Arc, config: Arc, - metadata_cache_factory: Arc, + metadata_cache_factory: Arc, ) -> Arc { Arc::new(CompactionServiceImpl { meta_store, @@ -658,7 +658,9 @@ impl CompactionService for CompactionServiceImpl { ROW_GROUP_SIZE, 1, None, - self.metadata_cache_factory.make_noop_cache(), + self.metadata_cache_factory + .cache_factory() + .make_noop_cache(), )?); Arc::new(TraceDataLoadedExec::new( @@ -680,8 +682,14 @@ impl CompactionService for CompactionServiceImpl { }; let records = merge_chunks(key_size, main_table, new, unique_key, aggregate_columns).await?; - let count_and_min = - write_to_files(records, total_rows as usize, store, new_local_files2).await?; + let count_and_min = write_to_files( + records, + total_rows as usize, + store, + &table, + new_local_files2, + ) + .await?; if let Some(c) = &new_chunk { assert_eq!(new_local_files.len(), 1); @@ -862,7 +870,12 @@ impl CompactionService for CompactionServiceImpl { // TODO deactivate corrupt tables let files = download_files(&partitions, self.remote_fs.clone()).await?; let keys = find_partition_keys( - keys_with_counts(&files, self.metadata_cache_factory.as_ref(), key_len).await?, + keys_with_counts( + &files, + self.metadata_cache_factory.cache_factory().as_ref(), + key_len, + ) + .await?, key_len, // TODO should it respect table partition_split_threshold? self.config.partition_split_threshold() as usize, @@ -1108,6 +1121,7 @@ pub(crate) async fn write_to_files( records: SendableRecordBatchStream, num_rows: usize, store: ParquetTableStore, + table: &IdRow, files: Vec, ) -> Result, Vec)>, CubeError> { let rows_per_file = div_ceil(num_rows as usize, files.len()); @@ -1165,7 +1179,7 @@ pub(crate) async fn write_to_files( }; }; - write_to_files_impl(records, store, files, pick_writer).await?; + write_to_files_impl(records, store, files, table, pick_writer).await?; let mut stats = take(stats.lock().unwrap().deref_mut()); if stats.last().unwrap().0 == 0 { @@ -1185,10 +1199,11 @@ async fn write_to_files_impl( records: SendableRecordBatchStream, store: ParquetTableStore, files: Vec, + table: &IdRow
, mut pick_writer: impl FnMut(&RecordBatch) -> WriteBatchTo, ) -> Result<(), CubeError> { let schema = Arc::new(store.arrow_schema()); - let writer_props = store.writer_props()?; + let writer_props = store.writer_props(table).await?; let mut writers = files.into_iter().map(move |f| -> Result<_, CubeError> { Ok(ArrowWriter::try_new( File::create(f)?, @@ -1254,6 +1269,7 @@ async fn write_to_files_impl( async fn write_to_files_by_keys( records: SendableRecordBatchStream, store: ParquetTableStore, + table: &IdRow
, files: Vec, keys: Vec, ) -> Result, CubeError> { @@ -1297,7 +1313,7 @@ async fn write_to_files_by_keys( panic!("impossible") }; let num_files = files.len(); - write_to_files_impl(records, store, files, pick_writer).await?; + write_to_files_impl(records, store, files, table, pick_writer).await?; let mut row_counts: Vec = take(row_counts.lock().unwrap().as_mut()); assert!( @@ -1418,6 +1434,7 @@ mod tests { use crate::remotefs::LocalDirRemoteFs; use crate::store::MockChunkDataStore; use crate::table::data::rows_to_columns; + use crate::table::parquet::CubestoreMetadataCacheFactoryImpl; use crate::table::{cmp_same_types, Row, TableValue}; use cuberockstore::rocksdb::{Options, DB}; use datafusion::arrow::array::{Int64Array, StringArray}; @@ -1540,7 +1557,7 @@ mod tests { Arc::new(chunk_store), remote_fs, Arc::new(config), - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), ); compaction_service .compact(1, DataLoadedSize::new()) @@ -1680,7 +1697,7 @@ mod tests { remote_fs.clone(), Arc::new(cluster), config.config_obj(), - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), 10, ); metastore @@ -1768,7 +1785,7 @@ mod tests { chunk_store.clone(), remote_fs, config.config_obj(), - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), ); compaction_service .compact_in_memory_chunks(partition.get_id()) @@ -1856,7 +1873,7 @@ mod tests { remote_fs.clone(), Arc::new(MockCluster::new()), config.config_obj(), - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), 50, ); @@ -1959,7 +1976,7 @@ mod tests { chunk_store.clone(), remote_fs.clone(), config.config_obj(), - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), ); compaction_service .compact(partition.get_id(), DataLoadedSize::new()) @@ -2190,7 +2207,7 @@ mod tests { struct MultiSplit { meta: Arc, fs: Arc, - metadata_cache_factory: Arc, + metadata_cache_factory: Arc, keys: Vec, key_len: usize, multi_partition_id: u64, @@ -2206,7 +2223,7 @@ impl MultiSplit { fn new( meta: Arc, fs: Arc, - metadata_cache_factory: Arc, + metadata_cache_factory: Arc, keys: Vec, key_len: usize, multi_partition_id: u64, @@ -2270,6 +2287,10 @@ impl MultiSplit { } }); + let table = self + .meta + .get_table_by_id(p.index.get_row().table_id()) + .await?; let store = ParquetTableStore::new( p.index.get_row().clone(), ROW_GROUP_SIZE, @@ -2278,7 +2299,7 @@ impl MultiSplit { let records = if !in_files.is_empty() { read_files( &in_files.into_iter().map(|(f, _)| f).collect::>(), - self.metadata_cache_factory.as_ref(), + self.metadata_cache_factory.cache_factory().as_ref(), self.key_len, None, ) @@ -2290,8 +2311,14 @@ impl MultiSplit { .execute(0) .await? }; - let row_counts = - write_to_files_by_keys(records, store, out_files.to_vec(), self.keys.clone()).await?; + let row_counts = write_to_files_by_keys( + records, + store, + &table, + out_files.to_vec(), + self.keys.clone(), + ) + .await?; for i in 0..row_counts.len() { mrow_counts[i] += row_counts[i] as u64; diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index d5393c37a23a7..e34ccf31bcd5a 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -10,7 +10,6 @@ use datafusion::physical_plan::hash_aggregate::{ AggregateMode, AggregateStrategy, HashAggregateExec, }; use datafusion::physical_plan::memory::MemoryExec; -use datafusion::physical_plan::parquet::MetadataCacheFactory; use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr}; use serde::{de, Deserialize, Serialize}; extern crate bincode; @@ -39,7 +38,7 @@ use crate::config::ConfigObj; use crate::metastore::chunks::chunk_file_name; use crate::queryplanner::trace_data_loaded::DataLoadedSize; use crate::table::data::cmp_partition_key; -use crate::table::parquet::{arrow_schema, ParquetTableStore}; +use crate::table::parquet::{arrow_schema, CubestoreMetadataCacheFactory, ParquetTableStore}; use compaction::{merge_chunks, merge_replay_handles}; use datafusion::arrow::array::{Array, ArrayRef, Int64Builder, StringBuilder, UInt64Array}; use datafusion::arrow::record_batch::RecordBatch; @@ -183,7 +182,7 @@ pub struct ChunkStore { remote_fs: Arc, cluster: Arc, config: Arc, - metadata_cache_factory: Arc, + metadata_cache_factory: Arc, memory_chunks: RwLock>, chunk_size: usize, } @@ -344,7 +343,7 @@ impl ChunkStore { remote_fs: Arc, cluster: Arc, config: Arc, - metadata_cache_factory: Arc, + metadata_cache_factory: Arc, chunk_size: usize, ) -> Arc { let store = ChunkStore { @@ -592,7 +591,7 @@ impl ChunkDataStore for ChunkStore { )))]) } else { let (local_file, index) = self.download_chunk(chunk, partition, index).await?; - let metadata_cache_factory: Arc = + let metadata_cache_factory: Arc = self.metadata_cache_factory.clone(); Ok(cube_ext::spawn_blocking(move || -> Result<_, CubeError> { let parquet = ParquetTableStore::new(index, ROW_GROUP_SIZE, metadata_cache_factory); @@ -807,6 +806,7 @@ mod tests { use crate::metastore::{BaseRocksStoreFs, IndexDef, IndexType, RocksMetaStore}; use crate::remotefs::LocalDirRemoteFs; use crate::table::data::{concat_record_batches, rows_to_columns}; + use crate::table::parquet::CubestoreMetadataCacheFactoryImpl; use crate::{metastore::ColumnType, table::TableValue}; use cuberockstore::rocksdb::{Options, DB}; use datafusion::arrow::array::{Int64Array, StringArray}; @@ -950,7 +950,7 @@ mod tests { remote_fs.clone(), Arc::new(MockCluster::new()), config.config_obj(), - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), 10, ); @@ -1054,7 +1054,7 @@ mod tests { remote_fs.clone(), Arc::new(MockCluster::new()), config.config_obj(), - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), 10, ); @@ -1384,16 +1384,23 @@ impl ChunkStore { let local_file = self.remote_fs.temp_upload_path(remote_path.clone()).await?; let local_file = scopeguard::guard(local_file, ensure_temp_file_is_dropped); let local_file_copy = local_file.clone(); - let metadata_cache_factory: Arc = + let metadata_cache_factory: Arc = self.metadata_cache_factory.clone(); + + let table = self + .meta_store + .get_table_by_id(index.get_row().table_id()) + .await?; + + let parquet = ParquetTableStore::new( + index.get_row().clone(), + ROW_GROUP_SIZE, + metadata_cache_factory, + ); + + let writer_props = parquet.writer_props(&table).await?; cube_ext::spawn_blocking(move || -> Result<(), CubeError> { - let parquet = ParquetTableStore::new( - index.get_row().clone(), - ROW_GROUP_SIZE, - metadata_cache_factory, - ); - parquet.write_data(&local_file_copy, data)?; - Ok(()) + parquet.write_data_given_props(&local_file_copy, data, writer_props) }) .await??; diff --git a/rust/cubestore/cubestore/src/table/parquet.rs b/rust/cubestore/cubestore/src/table/parquet.rs index 4ba8a5d5d2f7a..31b763f37ce1c 100644 --- a/rust/cubestore/cubestore/src/table/parquet.rs +++ b/rust/cubestore/cubestore/src/table/parquet.rs @@ -1,11 +1,15 @@ use crate::config::injection::DIService; -use crate::metastore::Index; +use crate::metastore::table::Table; +use crate::metastore::{IdRow, Index}; use crate::CubeError; +use async_trait::async_trait; use datafusion::arrow::array::ArrayRef; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; use datafusion::parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}; -use datafusion::parquet::file::properties::{WriterProperties, WriterVersion}; +use datafusion::parquet::file::properties::{ + WriterProperties, WriterPropertiesBuilder, WriterVersion, +}; use datafusion::physical_plan::parquet::{MetadataCacheFactory, ParquetMetadataCache}; use std::fs::File; use std::sync::Arc; @@ -36,8 +40,19 @@ impl CubestoreParquetMetadataCache for CubestoreParquetMetadataCacheImpl { } } +#[async_trait] pub trait CubestoreMetadataCacheFactory: DIService + Send + Sync { - fn cache_factory(&self) -> Arc; + // Once we use a Rust that supports trait upcasting as a stable feature, we could make + // CubestoreMetadataCacheFactory inherit from the MetadataCacheFactory trait and use trait + // upcasting. + fn cache_factory(&self) -> &Arc; + async fn build_writer_props( + &self, + _table: &IdRow
, + builder: WriterPropertiesBuilder, + ) -> Result { + Ok(builder.build()) + } } pub struct CubestoreMetadataCacheFactoryImpl { @@ -60,21 +75,22 @@ impl CubestoreMetadataCacheFactoryImpl { } impl CubestoreMetadataCacheFactory for CubestoreMetadataCacheFactoryImpl { - fn cache_factory(&self) -> Arc { - self.metadata_cache_factory.clone() + fn cache_factory(&self) -> &Arc { + &self.metadata_cache_factory } } pub struct ParquetTableStore { table: Index, row_group_size: usize, - metadata_cache_factory: Arc, + metadata_cache_factory: Arc, } impl ParquetTableStore { pub fn read_columns(&self, path: &str) -> Result, CubeError> { let mut r = ParquetFileArrowReader::new(Arc::new( self.metadata_cache_factory + .cache_factory() .make_noop_cache() .file_reader(path)?, )); @@ -90,7 +106,7 @@ impl ParquetTableStore { pub fn new( table: Index, row_group_size: usize, - metadata_cache_factory: Arc, + metadata_cache_factory: Arc, ) -> ParquetTableStore { ParquetTableStore { table, @@ -113,22 +129,37 @@ impl ParquetTableStore { arrow_schema(&self.table) } - pub fn writer_props(&self) -> Result { + pub async fn writer_props(&self, table: &IdRow
) -> Result { self.metadata_cache_factory .build_writer_props( + table, WriterProperties::builder() .set_max_row_group_size(self.row_group_size) .set_writer_version(WriterVersion::PARQUET_2_0), ) + .await .map_err(CubeError::from) } - pub fn write_data(&self, dest_file: &str, columns: Vec) -> Result<(), CubeError> { + pub async fn write_data( + &self, + dest_file: &str, + columns: Vec, + table: &IdRow
, + ) -> Result<(), CubeError> { + self.write_data_given_props(dest_file, columns, self.writer_props(table).await?) + } + + pub fn write_data_given_props( + &self, + dest_file: &str, + columns: Vec, + props: WriterProperties, + ) -> Result<(), CubeError> { let schema = Arc::new(arrow_schema(&self.table)); let batch = RecordBatch::try_new(schema.clone(), columns.to_vec())?; - let mut w = - ArrowWriter::try_new(File::create(dest_file)?, schema, Some(self.writer_props()?))?; + let mut w = ArrowWriter::try_new(File::create(dest_file)?, schema, Some(props))?; w.write(&batch)?; w.close()?; @@ -145,10 +176,13 @@ mod tests { extern crate test; use crate::assert_eq_columns; - use crate::metastore::{Column, ColumnType, Index}; + use crate::metastore::table::Table; + use crate::metastore::{Column, ColumnType, IdRow, Index}; use crate::store::{compaction, ROW_GROUP_SIZE}; use crate::table::data::{cmp_row_key_heap, concat_record_batches, rows_to_columns, to_stream}; - use crate::table::parquet::{arrow_schema, ParquetTableStore}; + use crate::table::parquet::{ + arrow_schema, CubestoreMetadataCacheFactoryImpl, ParquetTableStore, + }; use crate::table::{Row, TableValue}; use crate::util::decimal::Decimal; use datafusion::arrow::array::{ @@ -166,8 +200,8 @@ mod tests { use std::sync::Arc; use tempfile::NamedTempFile; - #[test] - fn column_statistics() { + #[tokio::test] + async fn column_statistics() { let index = Index::try_new( "table".to_string(), 1, @@ -192,12 +226,13 @@ mod tests { Index::index_type_default(), ) .unwrap(); + let table = dummy_table_row(index.table_id(), index.get_name()); let dest_file = NamedTempFile::new().unwrap(); let store = ParquetTableStore::new( index, ROW_GROUP_SIZE, - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), ); let data: Vec = vec![ @@ -236,7 +271,8 @@ mod tests { // TODO: check floats use total_cmp. store - .write_data(dest_file.path().to_str().unwrap(), data) + .write_data(dest_file.path().to_str().unwrap(), data, &table) + .await .unwrap(); let r = SerializedFileReader::new(dest_file.into_file()).unwrap(); @@ -261,6 +297,30 @@ mod tests { ); } + fn dummy_table_row(table_id: u64, table_name: &str) -> IdRow
{ + IdRow::
::new( + table_id, + Table::new( + table_name.to_string(), + table_id, + vec![], + None, + None, + true, + None, + None, + None, + None, + None, + None, + vec![], + None, + None, + None, + ), + ) + } + #[tokio::test] async fn gutter() { let store = ParquetTableStore { @@ -288,8 +348,11 @@ mod tests { ) .unwrap(), row_group_size: 10, - metadata_cache_factory: Arc::new(BasicMetadataCacheFactory::new()), + metadata_cache_factory: CubestoreMetadataCacheFactoryImpl::new(Arc::new( + BasicMetadataCacheFactory::new(), + )), }; + let table = dummy_table_row(store.table.table_id(), store.table.get_name()); let file = NamedTempFile::new().unwrap(); let file_name = file.path().to_str().unwrap(); @@ -318,7 +381,10 @@ mod tests { .collect::>(); first_rows.sort_by(|a, b| cmp_row_key_heap(3, &a.values(), &b.values())); let first_cols = rows_to_columns(&store.table.columns(), &first_rows); - store.write_data(file_name, first_cols.clone()).unwrap(); + store + .write_data(file_name, first_cols.clone(), &table) + .await + .unwrap(); let read_rows = concat_record_batches(&store.read_columns(file_name).unwrap()); assert_eq_columns!(&first_cols, read_rows.columns()); @@ -351,8 +417,9 @@ mod tests { ParquetTableStore::new( store.table.clone(), store.row_group_size, - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), ), + &dummy_table_row(store.table.table_id(), store.table.get_name()), vec![split_1.to_string(), split_2.to_string()], ) .await @@ -398,11 +465,11 @@ mod tests { ); } - #[test] - fn failed_rle_run_bools() { + #[tokio::test] + async fn failed_rle_run_bools() { const NUM_ROWS: usize = ROW_GROUP_SIZE; - let check_bools = |bools: Vec| { + let check_bools = async |bools: Vec| { let index = Index::try_new( "test".to_string(), 0, @@ -413,17 +480,20 @@ mod tests { Index::index_type_default(), ) .unwrap(); + let table = dummy_table_row(index.table_id(), index.get_name()); let tmp_file = NamedTempFile::new().unwrap(); let store = ParquetTableStore::new( index.clone(), NUM_ROWS, - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), ); store .write_data( tmp_file.path().to_str().unwrap(), vec![Arc::new(BooleanArray::from(bools))], + &table, ) + .await .unwrap(); }; @@ -434,7 +504,7 @@ mod tests { bools.push(true); bools.push(false); } - check_bools(bools); + check_bools(bools).await; // Second, in RLE encoding. let mut bools = Vec::with_capacity(NUM_ROWS); @@ -449,8 +519,8 @@ mod tests { check_bools(bools); } - #[test] - fn read_bytes() { + #[tokio::test] + async fn read_bytes() { const NUM_ROWS: usize = 8; let index = Index::try_new( "index".into(), @@ -465,6 +535,7 @@ mod tests { Index::index_type_default(), ) .unwrap(); + let table = dummy_table_row(index.table_id(), index.get_name()); let file = NamedTempFile::new().unwrap(); let file = file.path().to_str().unwrap(); @@ -478,9 +549,9 @@ mod tests { let w = ParquetTableStore::new( index.clone(), NUM_ROWS, - Arc::new(BasicMetadataCacheFactory::new()), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), ); - w.write_data(file, data.clone()).unwrap(); + w.write_data(file, data.clone(), &table).await.unwrap(); let r = concat_record_batches(&w.read_columns(file).unwrap()); assert_eq_columns!(r.columns(), &data); }