Skip to content

Commit

Permalink
Change the compression to lz4 on main (#12747)
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-aptos committed Apr 9, 2024
1 parent d43fc86 commit 757a071
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Worker {
enable_cache_compression: bool,
) -> Result<Self> {
let cache_storage_format = if enable_cache_compression {
StorageFormat::GzipCompressedProto
StorageFormat::Lz4CompressedProto
} else {
StorageFormat::Base64UncompressedProto
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
.map_err(|e| anyhow::anyhow!("Failed to build reflection service: {}", e))?;

let cache_storage_format: StorageFormat = if self.enable_cache_compression {
StorageFormat::GzipCompressedProto
StorageFormat::Lz4CompressedProto
} else {
StorageFormat::Base64UncompressedProto
};
Expand Down
17 changes: 15 additions & 2 deletions ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tracing::debug;
// If the version is ahead of the cache head, retry after a short sleep.
const AHEAD_OF_CACHE_SLEEP_DURATION_IN_MILLIS: u64 = 100;
const SERVICE_TYPE: &str = "file_worker";
const MAX_CONCURRENT_BATCHES: usize = 50;

/// Processor tails the data in cache and stores the data in file store.
pub struct Processor {
Expand All @@ -34,7 +35,7 @@ impl Processor {
enable_cache_compression: bool,
) -> Result<Self> {
let cache_storage_format = if enable_cache_compression {
StorageFormat::GzipCompressedProto
StorageFormat::Lz4CompressedProto
} else {
StorageFormat::Base64UncompressedProto
};
Expand Down Expand Up @@ -132,8 +133,17 @@ impl Processor {
while start_version + (FILE_ENTRY_TRANSACTION_COUNT) < cache_worker_latest {
batches.push(start_version);
start_version += FILE_ENTRY_TRANSACTION_COUNT;
if batches.len() >= MAX_CONCURRENT_BATCHES {
break;
}
}

tracing::info!(
batch_start_version = batch_start_version,
cache_worker_latest = cache_worker_latest,
batches = ?batches,
"Filestore processor loop"
);
// we're too close to the head
if batches.is_empty() {
debug!(
Expand All @@ -150,6 +160,7 @@ impl Processor {

// Create thread and fetch transactions
let mut tasks = vec![];

for start_version in batches {
let mut cache_operator_clone = self.cache_operator.clone();
let mut file_store_operator_clone = self.file_store_operator.clone_box();
Expand All @@ -172,7 +183,9 @@ impl Processor {
Some(FILE_ENTRY_TRANSACTION_COUNT as i64),
None,
);

for (i, txn) in transactions.iter().enumerate() {
assert_eq!(txn.version, start_version + i as u64);
}
let upload_start_time = std::time::Instant::now();
let (start, end) = file_store_operator_clone
.upload_transaction_batch(chain_id, transactions)
Expand Down
2 changes: 1 addition & 1 deletion ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ base64 = { workspace = true }
chrono = { workspace = true }
cloud-storage = { workspace = true }
dashmap = { workspace = true }
flate2 = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lz4 = { workspace = true }
once_cell = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
Expand Down
94 changes: 49 additions & 45 deletions ecosystem/indexer-grpc/indexer-grpc-utils/src/compression_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@

use crate::default_file_storage_format;
use aptos_protos::{indexer::v1::TransactionsInStorage, transaction::v1::Transaction};
use flate2::read::{GzDecoder, GzEncoder};
use lz4::{Decoder, EncoderBuilder};
use prost::Message;
use ripemd::{Digest, Ripemd128};
use serde::{Deserialize, Serialize};
use std::io::Read;
use std::io::{Read, Write};

pub const FILE_ENTRY_TRANSACTION_COUNT: u64 = 1000;

#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
pub enum StorageFormat {
GzipCompressedProto,
Lz4CompressedProto,
// Only used for legacy file format.
// Use by cache only.
Base64UncompressedProto,
Expand Down Expand Up @@ -66,15 +66,15 @@ impl FileStoreMetadata {
}

pub enum CacheEntry {
GzipCompressionProto(Vec<u8>),
Lz4CompressionProto(Vec<u8>),
// Only used for legacy cache entry.
Base64UncompressedProto(Vec<u8>),
}

impl CacheEntry {
pub fn new(bytes: Vec<u8>, storage_format: StorageFormat) -> Self {
match storage_format {
StorageFormat::GzipCompressedProto => Self::GzipCompressionProto(bytes),
StorageFormat::Lz4CompressedProto => Self::Lz4CompressionProto(bytes),
// Legacy format.
StorageFormat::Base64UncompressedProto => Self::Base64UncompressedProto(bytes),
StorageFormat::JsonBase64UncompressedProto => {
Expand All @@ -85,14 +85,14 @@ impl CacheEntry {

pub fn into_inner(self) -> Vec<u8> {
match self {
CacheEntry::GzipCompressionProto(bytes) => bytes,
CacheEntry::Lz4CompressionProto(bytes) => bytes,
CacheEntry::Base64UncompressedProto(bytes) => bytes,
}
}

pub fn size(&self) -> usize {
match self {
CacheEntry::GzipCompressionProto(bytes) => bytes.len(),
CacheEntry::Lz4CompressionProto(bytes) => bytes.len(),
CacheEntry::Base64UncompressedProto(bytes) => bytes.len(),
}
}
Expand All @@ -103,13 +103,15 @@ impl CacheEntry {
.encode(&mut bytes)
.expect("proto serialization failed.");
match storage_format {
StorageFormat::GzipCompressedProto => {
let mut compressed = GzEncoder::new(bytes.as_slice(), flate2::Compression::fast());
let mut result = Vec::new();
StorageFormat::Lz4CompressedProto => {
let mut compressed = EncoderBuilder::new()
.level(4)
.build(Vec::new())
.expect("Lz4 compression failed.");
compressed
.read_to_end(&mut result)
.expect("Gzip compression failed.");
CacheEntry::GzipCompressionProto(result)
.write_all(&bytes)
.expect("Lz4 compression failed.");
CacheEntry::Lz4CompressionProto(compressed.finish().0)
},
StorageFormat::Base64UncompressedProto => {
let base64 = base64::encode(bytes).into_bytes();
Expand All @@ -124,8 +126,8 @@ impl CacheEntry {

pub fn build_key(version: u64, storage_format: StorageFormat) -> String {
match storage_format {
StorageFormat::GzipCompressedProto => {
format!("gz:{}", version)
StorageFormat::Lz4CompressedProto => {
format!("l4:{}", version)
},
StorageFormat::Base64UncompressedProto => {
format!("{}", version)
Expand All @@ -139,12 +141,12 @@ impl CacheEntry {

pub fn into_transaction(self) -> Transaction {
match self {
CacheEntry::GzipCompressionProto(bytes) => {
let mut decompressor = GzDecoder::new(&bytes[..]);
CacheEntry::Lz4CompressionProto(bytes) => {
let mut decompressor = Decoder::new(&bytes[..]).expect("Lz4 decompression failed.");
let mut decompressed = Vec::new();
decompressor
.read_to_end(&mut decompressed)
.expect("Gzip decompression failed.");
.expect("Lz4 decompression failed.");
Transaction::decode(decompressed.as_slice()).expect("proto deserialization failed.")
},
CacheEntry::Base64UncompressedProto(bytes) => {
Expand All @@ -156,15 +158,15 @@ impl CacheEntry {
}

pub enum FileEntry {
GzipCompressionProto(Vec<u8>),
Lz4CompressionProto(Vec<u8>),
// Only used for legacy file format.
JsonBase64UncompressedProto(Vec<u8>),
}

impl FileEntry {
pub fn new(bytes: Vec<u8>, storage_format: StorageFormat) -> Self {
match storage_format {
StorageFormat::GzipCompressedProto => Self::GzipCompressionProto(bytes),
StorageFormat::Lz4CompressedProto => Self::Lz4CompressionProto(bytes),
StorageFormat::Base64UncompressedProto => {
panic!("Base64UncompressedProto is not supported.")
},
Expand All @@ -174,14 +176,14 @@ impl FileEntry {

pub fn into_inner(self) -> Vec<u8> {
match self {
FileEntry::GzipCompressionProto(bytes) => bytes,
FileEntry::Lz4CompressionProto(bytes) => bytes,
FileEntry::JsonBase64UncompressedProto(bytes) => bytes,
}
}

pub fn size(&self) -> usize {
match self {
FileEntry::GzipCompressionProto(bytes) => bytes.len(),
FileEntry::Lz4CompressionProto(bytes) => bytes.len(),
FileEntry::JsonBase64UncompressedProto(bytes) => bytes.len(),
}
}
Expand All @@ -203,18 +205,20 @@ impl FileEntry {
panic!("Starting version has to be a multiple of FILE_ENTRY_TRANSACTION_COUNT.")
}
match storage_format {
StorageFormat::GzipCompressedProto => {
StorageFormat::Lz4CompressedProto => {
let t = TransactionsInStorage {
starting_version: Some(transactions.first().unwrap().version),
transactions,
};
t.encode(&mut bytes).expect("proto serialization failed.");
let mut compressed = GzEncoder::new(bytes.as_slice(), flate2::Compression::fast());
let mut result = Vec::new();
let mut compressed = EncoderBuilder::new()
.level(4)
.build(Vec::new())
.expect("Lz4 compression failed.");
compressed
.read_to_end(&mut result)
.expect("Gzip compression failed.");
FileEntry::GzipCompressionProto(result)
.write_all(&bytes)
.expect("Lz4 compression failed.");
FileEntry::Lz4CompressionProto(compressed.finish().0)
},
StorageFormat::Base64UncompressedProto => {
panic!("Base64UncompressedProto is not supported.")
Expand Down Expand Up @@ -247,9 +251,9 @@ impl FileEntry {
hasher.update(starting_version.to_string());
let file_prefix = format!("{:x}", hasher.finalize());
match storage_format {
StorageFormat::GzipCompressedProto => {
StorageFormat::Lz4CompressedProto => {
format!(
"compressed_files/gzip/{}_{}.bin",
"compressed_files/lz4/{}_{}.bin",
file_prefix, starting_version
)
},
Expand All @@ -264,12 +268,12 @@ impl FileEntry {

pub fn into_transactions_in_storage(self) -> TransactionsInStorage {
match self {
FileEntry::GzipCompressionProto(bytes) => {
let mut decompressor = GzDecoder::new(&bytes[..]);
FileEntry::Lz4CompressionProto(bytes) => {
let mut decompressor = Decoder::new(&bytes[..]).expect("Lz4 decompression failed.");
let mut decompressed = Vec::new();
decompressor
.read_to_end(&mut decompressed)
.expect("Gzip decompression failed.");
.expect("Lz4 decompression failed.");
TransactionsInStorage::decode(decompressed.as_slice())
.expect("proto deserialization failed.")
},
Expand Down Expand Up @@ -317,7 +321,7 @@ mod tests {
}

#[test]
fn test_cache_entry_builder_gzip_compressed_proto() {
fn test_cache_entry_builder_lz4_compressed_proto() {
let transaction = Transaction {
version: 42,
epoch: 333,
Expand All @@ -326,7 +330,7 @@ mod tests {
let transaction_clone = transaction.clone();
let proto_size = transaction.encoded_len();
let cache_entry =
CacheEntry::from_transaction(transaction, StorageFormat::GzipCompressedProto);
CacheEntry::from_transaction(transaction, StorageFormat::Lz4CompressedProto);
let compressed_size = cache_entry.size();
assert!(compressed_size != proto_size);
let deserialized_transaction = cache_entry.into_transaction();
Expand Down Expand Up @@ -379,7 +383,7 @@ mod tests {
}

#[test]
fn test_file_entry_builder_gzip_compressed_proto() {
fn test_file_entry_builder_lz4_compressed_proto() {
let transactions = (1000..2000)
.map(|version| Transaction {
version,
Expand All @@ -393,7 +397,7 @@ mod tests {
};
let transactions_in_storage_size = transactions_in_storage.encoded_len();
let file_entry =
FileEntry::from_transactions(transactions.clone(), StorageFormat::GzipCompressedProto);
FileEntry::from_transactions(transactions.clone(), StorageFormat::Lz4CompressedProto);
assert_ne!(file_entry.size(), transactions_in_storage_size);
let deserialized_transactions = file_entry.into_transactions_in_storage();
for (i, transaction) in transactions.iter().enumerate() {
Expand All @@ -402,10 +406,10 @@ mod tests {
}

#[test]
fn test_cache_entry_key_to_string_gzip_compressed_proto() {
fn test_cache_entry_key_to_string_lz4_compressed_proto() {
assert_eq!(
CacheEntry::build_key(42, StorageFormat::GzipCompressedProto),
"gz:42"
CacheEntry::build_key(42, StorageFormat::Lz4CompressedProto),
"l4:42"
);
}

Expand All @@ -424,10 +428,10 @@ mod tests {
}

#[test]
fn test_file_entry_key_to_string_gzip_compressed_proto() {
fn test_file_entry_key_to_string_lz4_compressed_proto() {
assert_eq!(
FileEntry::build_key(42, StorageFormat::GzipCompressedProto),
"compressed_files/gzip/3d1bff1ba654ca5fdb6ac1370533d876_0.bin"
FileEntry::build_key(42, StorageFormat::Lz4CompressedProto),
"compressed_files/lz4/3d1bff1ba654ca5fdb6ac1370533d876_0.bin"
);
}

Expand Down Expand Up @@ -470,15 +474,15 @@ mod tests {
"chain_id": 1,
"file_folder_size": 1000,
"version": 1,
"storage_format": "GzipCompressedProto"
"storage_format": "Lz4CompressedProto"
}"#;

let file_metadata: FileStoreMetadata = serde_json::from_str(file_metadata_serialized_json)
.expect("FileStoreMetadata deserialization failed.");

assert_eq!(
file_metadata.storage_format,
StorageFormat::GzipCompressedProto
StorageFormat::Lz4CompressedProto
);
assert_eq!(file_metadata.chain_id, 1);
assert_eq!(file_metadata.file_folder_size, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl GcsFileStoreOperator {
) -> Self {
env::set_var(SERVICE_ACCOUNT_ENV_VAR, service_account_path);
let storage_format = if enable_compression {
StorageFormat::GzipCompressedProto
StorageFormat::Lz4CompressedProto
} else {
StorageFormat::JsonBase64UncompressedProto
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct LocalFileStoreOperator {
impl LocalFileStoreOperator {
pub fn new(path: PathBuf, enable_compression: bool) -> Self {
let storage_format = if enable_compression {
StorageFormat::GzipCompressedProto
StorageFormat::Lz4CompressedProto
} else {
StorageFormat::JsonBase64UncompressedProto
};
Expand Down

0 comments on commit 757a071

Please sign in to comment.