From 757a071f48196195424e52b95bcfc81678aea816 Mon Sep 17 00:00:00 2001 From: larry-aptos <112209412+larry-aptos@users.noreply.github.com> Date: Tue, 9 Apr 2024 11:16:14 -0700 Subject: [PATCH] Change the compression to lz4 on main (#12747) --- Cargo.lock | 2 +- .../indexer-grpc-cache-worker/src/worker.rs | 2 +- .../indexer-grpc-data-service/src/config.rs | 2 +- .../indexer-grpc-file-store/src/processor.rs | 17 +++- .../indexer-grpc-utils/Cargo.toml | 2 +- .../src/compression_util.rs | 94 ++++++++++--------- .../src/file_store_operator/gcs.rs | 2 +- .../src/file_store_operator/local.rs | 2 +- 8 files changed, 70 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4789cb16e365f..42aeac3c33941 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2164,9 +2164,9 @@ dependencies = [ "chrono", "cloud-storage", "dashmap", - "flate2", "futures", "itertools 0.12.1", + "lz4", "once_cell", "prometheus", "prost 0.12.3", diff --git a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs index 051adfaca691a..2af05115f8de5 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs @@ -80,7 +80,7 @@ impl Worker { enable_cache_compression: bool, ) -> Result { let cache_storage_format = if enable_cache_compression { - StorageFormat::GzipCompressedProto + StorageFormat::Lz4CompressedProto } else { StorageFormat::Base64UncompressedProto }; diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs index 8e66977538873..71460c361f1fa 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs @@ -134,7 +134,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig { .map_err(|e| anyhow::anyhow!("Failed to build reflection service: {}", e))?; let cache_storage_format: StorageFormat = if self.enable_cache_compression { - StorageFormat::GzipCompressedProto + StorageFormat::Lz4CompressedProto } else { StorageFormat::Base64UncompressedProto }; diff --git a/ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs b/ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs index 82dc8b0938836..1d875da609e83 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs @@ -18,6 +18,7 @@ use tracing::debug; // If the version is ahead of the cache head, retry after a short sleep. const AHEAD_OF_CACHE_SLEEP_DURATION_IN_MILLIS: u64 = 100; const SERVICE_TYPE: &str = "file_worker"; +const MAX_CONCURRENT_BATCHES: usize = 50; /// Processor tails the data in cache and stores the data in file store. pub struct Processor { @@ -34,7 +35,7 @@ impl Processor { enable_cache_compression: bool, ) -> Result { let cache_storage_format = if enable_cache_compression { - StorageFormat::GzipCompressedProto + StorageFormat::Lz4CompressedProto } else { StorageFormat::Base64UncompressedProto }; @@ -132,8 +133,17 @@ impl Processor { while start_version + (FILE_ENTRY_TRANSACTION_COUNT) < cache_worker_latest { batches.push(start_version); start_version += FILE_ENTRY_TRANSACTION_COUNT; + if batches.len() >= MAX_CONCURRENT_BATCHES { + break; + } } + tracing::info!( + batch_start_version = batch_start_version, + cache_worker_latest = cache_worker_latest, + batches = ?batches, + "Filestore processor loop" + ); // we're too close to the head if batches.is_empty() { debug!( @@ -150,6 +160,7 @@ impl Processor { // Create thread and fetch transactions let mut tasks = vec![]; + for start_version in batches { let mut cache_operator_clone = self.cache_operator.clone(); let mut file_store_operator_clone = self.file_store_operator.clone_box(); @@ -172,7 +183,9 @@ impl Processor { Some(FILE_ENTRY_TRANSACTION_COUNT as i64), None, ); - + for (i, txn) in transactions.iter().enumerate() { + assert_eq!(txn.version, start_version + i as u64); + } let upload_start_time = std::time::Instant::now(); let (start, end) = file_store_operator_clone .upload_transaction_batch(chain_id, transactions) diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml index 50a1c31a8f636..2c0516ddda804 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml @@ -22,9 +22,9 @@ base64 = { workspace = true } chrono = { workspace = true } cloud-storage = { workspace = true } dashmap = { workspace = true } -flate2 = { workspace = true } futures = { workspace = true } itertools = { workspace = true } +lz4 = { workspace = true } once_cell = { workspace = true } prometheus = { workspace = true } prost = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/compression_util.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/compression_util.rs index 92094a1be1d5f..07f528e6df124 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/compression_util.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/compression_util.rs @@ -3,17 +3,17 @@ use crate::default_file_storage_format; use aptos_protos::{indexer::v1::TransactionsInStorage, transaction::v1::Transaction}; -use flate2::read::{GzDecoder, GzEncoder}; +use lz4::{Decoder, EncoderBuilder}; use prost::Message; use ripemd::{Digest, Ripemd128}; use serde::{Deserialize, Serialize}; -use std::io::Read; +use std::io::{Read, Write}; pub const FILE_ENTRY_TRANSACTION_COUNT: u64 = 1000; #[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)] pub enum StorageFormat { - GzipCompressedProto, + Lz4CompressedProto, // Only used for legacy file format. // Use by cache only. Base64UncompressedProto, @@ -66,7 +66,7 @@ impl FileStoreMetadata { } pub enum CacheEntry { - GzipCompressionProto(Vec), + Lz4CompressionProto(Vec), // Only used for legacy cache entry. Base64UncompressedProto(Vec), } @@ -74,7 +74,7 @@ pub enum CacheEntry { impl CacheEntry { pub fn new(bytes: Vec, storage_format: StorageFormat) -> Self { match storage_format { - StorageFormat::GzipCompressedProto => Self::GzipCompressionProto(bytes), + StorageFormat::Lz4CompressedProto => Self::Lz4CompressionProto(bytes), // Legacy format. StorageFormat::Base64UncompressedProto => Self::Base64UncompressedProto(bytes), StorageFormat::JsonBase64UncompressedProto => { @@ -85,14 +85,14 @@ impl CacheEntry { pub fn into_inner(self) -> Vec { match self { - CacheEntry::GzipCompressionProto(bytes) => bytes, + CacheEntry::Lz4CompressionProto(bytes) => bytes, CacheEntry::Base64UncompressedProto(bytes) => bytes, } } pub fn size(&self) -> usize { match self { - CacheEntry::GzipCompressionProto(bytes) => bytes.len(), + CacheEntry::Lz4CompressionProto(bytes) => bytes.len(), CacheEntry::Base64UncompressedProto(bytes) => bytes.len(), } } @@ -103,13 +103,15 @@ impl CacheEntry { .encode(&mut bytes) .expect("proto serialization failed."); match storage_format { - StorageFormat::GzipCompressedProto => { - let mut compressed = GzEncoder::new(bytes.as_slice(), flate2::Compression::fast()); - let mut result = Vec::new(); + StorageFormat::Lz4CompressedProto => { + let mut compressed = EncoderBuilder::new() + .level(4) + .build(Vec::new()) + .expect("Lz4 compression failed."); compressed - .read_to_end(&mut result) - .expect("Gzip compression failed."); - CacheEntry::GzipCompressionProto(result) + .write_all(&bytes) + .expect("Lz4 compression failed."); + CacheEntry::Lz4CompressionProto(compressed.finish().0) }, StorageFormat::Base64UncompressedProto => { let base64 = base64::encode(bytes).into_bytes(); @@ -124,8 +126,8 @@ impl CacheEntry { pub fn build_key(version: u64, storage_format: StorageFormat) -> String { match storage_format { - StorageFormat::GzipCompressedProto => { - format!("gz:{}", version) + StorageFormat::Lz4CompressedProto => { + format!("l4:{}", version) }, StorageFormat::Base64UncompressedProto => { format!("{}", version) @@ -139,12 +141,12 @@ impl CacheEntry { pub fn into_transaction(self) -> Transaction { match self { - CacheEntry::GzipCompressionProto(bytes) => { - let mut decompressor = GzDecoder::new(&bytes[..]); + CacheEntry::Lz4CompressionProto(bytes) => { + let mut decompressor = Decoder::new(&bytes[..]).expect("Lz4 decompression failed."); let mut decompressed = Vec::new(); decompressor .read_to_end(&mut decompressed) - .expect("Gzip decompression failed."); + .expect("Lz4 decompression failed."); Transaction::decode(decompressed.as_slice()).expect("proto deserialization failed.") }, CacheEntry::Base64UncompressedProto(bytes) => { @@ -156,7 +158,7 @@ impl CacheEntry { } pub enum FileEntry { - GzipCompressionProto(Vec), + Lz4CompressionProto(Vec), // Only used for legacy file format. JsonBase64UncompressedProto(Vec), } @@ -164,7 +166,7 @@ pub enum FileEntry { impl FileEntry { pub fn new(bytes: Vec, storage_format: StorageFormat) -> Self { match storage_format { - StorageFormat::GzipCompressedProto => Self::GzipCompressionProto(bytes), + StorageFormat::Lz4CompressedProto => Self::Lz4CompressionProto(bytes), StorageFormat::Base64UncompressedProto => { panic!("Base64UncompressedProto is not supported.") }, @@ -174,14 +176,14 @@ impl FileEntry { pub fn into_inner(self) -> Vec { match self { - FileEntry::GzipCompressionProto(bytes) => bytes, + FileEntry::Lz4CompressionProto(bytes) => bytes, FileEntry::JsonBase64UncompressedProto(bytes) => bytes, } } pub fn size(&self) -> usize { match self { - FileEntry::GzipCompressionProto(bytes) => bytes.len(), + FileEntry::Lz4CompressionProto(bytes) => bytes.len(), FileEntry::JsonBase64UncompressedProto(bytes) => bytes.len(), } } @@ -203,18 +205,20 @@ impl FileEntry { panic!("Starting version has to be a multiple of FILE_ENTRY_TRANSACTION_COUNT.") } match storage_format { - StorageFormat::GzipCompressedProto => { + StorageFormat::Lz4CompressedProto => { let t = TransactionsInStorage { starting_version: Some(transactions.first().unwrap().version), transactions, }; t.encode(&mut bytes).expect("proto serialization failed."); - let mut compressed = GzEncoder::new(bytes.as_slice(), flate2::Compression::fast()); - let mut result = Vec::new(); + let mut compressed = EncoderBuilder::new() + .level(4) + .build(Vec::new()) + .expect("Lz4 compression failed."); compressed - .read_to_end(&mut result) - .expect("Gzip compression failed."); - FileEntry::GzipCompressionProto(result) + .write_all(&bytes) + .expect("Lz4 compression failed."); + FileEntry::Lz4CompressionProto(compressed.finish().0) }, StorageFormat::Base64UncompressedProto => { panic!("Base64UncompressedProto is not supported.") @@ -247,9 +251,9 @@ impl FileEntry { hasher.update(starting_version.to_string()); let file_prefix = format!("{:x}", hasher.finalize()); match storage_format { - StorageFormat::GzipCompressedProto => { + StorageFormat::Lz4CompressedProto => { format!( - "compressed_files/gzip/{}_{}.bin", + "compressed_files/lz4/{}_{}.bin", file_prefix, starting_version ) }, @@ -264,12 +268,12 @@ impl FileEntry { pub fn into_transactions_in_storage(self) -> TransactionsInStorage { match self { - FileEntry::GzipCompressionProto(bytes) => { - let mut decompressor = GzDecoder::new(&bytes[..]); + FileEntry::Lz4CompressionProto(bytes) => { + let mut decompressor = Decoder::new(&bytes[..]).expect("Lz4 decompression failed."); let mut decompressed = Vec::new(); decompressor .read_to_end(&mut decompressed) - .expect("Gzip decompression failed."); + .expect("Lz4 decompression failed."); TransactionsInStorage::decode(decompressed.as_slice()) .expect("proto deserialization failed.") }, @@ -317,7 +321,7 @@ mod tests { } #[test] - fn test_cache_entry_builder_gzip_compressed_proto() { + fn test_cache_entry_builder_lz4_compressed_proto() { let transaction = Transaction { version: 42, epoch: 333, @@ -326,7 +330,7 @@ mod tests { let transaction_clone = transaction.clone(); let proto_size = transaction.encoded_len(); let cache_entry = - CacheEntry::from_transaction(transaction, StorageFormat::GzipCompressedProto); + CacheEntry::from_transaction(transaction, StorageFormat::Lz4CompressedProto); let compressed_size = cache_entry.size(); assert!(compressed_size != proto_size); let deserialized_transaction = cache_entry.into_transaction(); @@ -379,7 +383,7 @@ mod tests { } #[test] - fn test_file_entry_builder_gzip_compressed_proto() { + fn test_file_entry_builder_lz4_compressed_proto() { let transactions = (1000..2000) .map(|version| Transaction { version, @@ -393,7 +397,7 @@ mod tests { }; let transactions_in_storage_size = transactions_in_storage.encoded_len(); let file_entry = - FileEntry::from_transactions(transactions.clone(), StorageFormat::GzipCompressedProto); + FileEntry::from_transactions(transactions.clone(), StorageFormat::Lz4CompressedProto); assert_ne!(file_entry.size(), transactions_in_storage_size); let deserialized_transactions = file_entry.into_transactions_in_storage(); for (i, transaction) in transactions.iter().enumerate() { @@ -402,10 +406,10 @@ mod tests { } #[test] - fn test_cache_entry_key_to_string_gzip_compressed_proto() { + fn test_cache_entry_key_to_string_lz4_compressed_proto() { assert_eq!( - CacheEntry::build_key(42, StorageFormat::GzipCompressedProto), - "gz:42" + CacheEntry::build_key(42, StorageFormat::Lz4CompressedProto), + "l4:42" ); } @@ -424,10 +428,10 @@ mod tests { } #[test] - fn test_file_entry_key_to_string_gzip_compressed_proto() { + fn test_file_entry_key_to_string_lz4_compressed_proto() { assert_eq!( - FileEntry::build_key(42, StorageFormat::GzipCompressedProto), - "compressed_files/gzip/3d1bff1ba654ca5fdb6ac1370533d876_0.bin" + FileEntry::build_key(42, StorageFormat::Lz4CompressedProto), + "compressed_files/lz4/3d1bff1ba654ca5fdb6ac1370533d876_0.bin" ); } @@ -470,7 +474,7 @@ mod tests { "chain_id": 1, "file_folder_size": 1000, "version": 1, - "storage_format": "GzipCompressedProto" + "storage_format": "Lz4CompressedProto" }"#; let file_metadata: FileStoreMetadata = serde_json::from_str(file_metadata_serialized_json) @@ -478,7 +482,7 @@ mod tests { assert_eq!( file_metadata.storage_format, - StorageFormat::GzipCompressedProto + StorageFormat::Lz4CompressedProto ); assert_eq!(file_metadata.chain_id, 1); assert_eq!(file_metadata.file_folder_size, 1000); diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/gcs.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/gcs.rs index 05578bccdf898..444791cc23c63 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/gcs.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/gcs.rs @@ -31,7 +31,7 @@ impl GcsFileStoreOperator { ) -> Self { env::set_var(SERVICE_ACCOUNT_ENV_VAR, service_account_path); let storage_format = if enable_compression { - StorageFormat::GzipCompressedProto + StorageFormat::Lz4CompressedProto } else { StorageFormat::JsonBase64UncompressedProto }; diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/local.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/local.rs index 65245733935a9..ada1740511fd5 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/local.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/local.rs @@ -23,7 +23,7 @@ pub struct LocalFileStoreOperator { impl LocalFileStoreOperator { pub fn new(path: PathBuf, enable_compression: bool) -> Self { let storage_format = if enable_compression { - StorageFormat::GzipCompressedProto + StorageFormat::Lz4CompressedProto } else { StorageFormat::JsonBase64UncompressedProto };