diff --git a/bottomless/Cargo.toml b/bottomless/Cargo.toml index 312eda77..31fac29d 100644 --- a/bottomless/Cargo.toml +++ b/bottomless/Cargo.toml @@ -10,7 +10,7 @@ description = "Bottomless replication for libSQL" [dependencies] anyhow = "1.0.66" -async-compression = { version = "0.3.15", features = ["tokio", "gzip"] } +async-compression = { version = "0.3.15", features = ["tokio", "gzip", "xz"] } aws-config = { version = "0.55" } aws-sdk-s3 = { version = "0.28" } bytes = "1" diff --git a/bottomless/src/backup.rs b/bottomless/src/backup.rs index bb6e9081..f3de86af 100644 --- a/bottomless/src/backup.rs +++ b/bottomless/src/backup.rs @@ -116,6 +116,11 @@ impl WalCopier { wal.copy_frames(&mut gzip, len).await?; gzip.shutdown().await?; } + CompressionKind::Xz => { + let mut xz = async_compression::tokio::write::XzEncoder::new(&mut out); + wal.copy_frames(&mut xz, len).await?; + xz.shutdown().await?; + } } if tracing::enabled!(tracing::Level::DEBUG) { let elapsed = Instant::now() - period_start; diff --git a/bottomless/src/read.rs b/bottomless/src/read.rs index 1177f60b..f1837c19 100644 --- a/bottomless/src/read.rs +++ b/bottomless/src/read.rs @@ -1,7 +1,7 @@ use crate::replicator::CompressionKind; use crate::wal::WalFrameHeader; use anyhow::Result; -use async_compression::tokio::bufread::GzipDecoder; +use async_compression::tokio::bufread::{GzipDecoder, XzEncoder}; use aws_sdk_s3::primitives::ByteStream; use std::io::ErrorKind; use std::pin::Pin; @@ -32,6 +32,10 @@ impl BatchReader { let gzip = GzipDecoder::new(reader); Box::pin(gzip) } + CompressionKind::Xz => { + let xz = XzEncoder::new(reader); + Box::pin(xz) + } }, } } diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index f95bc973..5ad70a23 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -5,7 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp; use crate::wal::WalFileReader; use anyhow::{anyhow, bail}; use arc_swap::ArcSwapOption; -use async_compression::tokio::write::GzipEncoder; +use async_compression::tokio::write::{GzipEncoder, XzEncoder}; use aws_sdk_s3::config::{Credentials, Region}; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; @@ -171,7 +171,7 @@ impl Options { let secret_access_key = env_var("LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY").ok(); let region = env_var("LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION").ok(); let max_frames_per_batch = - env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 500).parse::()?; + env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 10000).parse::()?; let s3_upload_max_parallelism = env_var_or("LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX", 32).parse::()?; let restore_transaction_page_swap_after = @@ -653,7 +653,7 @@ impl Replicator { CompressionKind::None => Ok(ByteStream::from_path(db_path).await?), CompressionKind::Gzip => { let mut reader = File::open(db_path).await?; - let gzip_path = Self::db_gzip_path(db_path); + let gzip_path = Self::db_compressed_path(db_path, "gz"); let compressed_file = OpenOptions::new() .create(true) .write(true) @@ -671,13 +671,33 @@ impl Replicator { ); Ok(ByteStream::from_path(gzip_path).await?) } + CompressionKind::Xz => { + let mut reader = File::open(db_path).await?; + let xz_path = Self::db_compressed_path(db_path, "xz"); + let compressed_file = OpenOptions::new() + .create(true) + .write(true) + .read(true) + .truncate(true) + .open(&xz_path) + .await?; + let mut writer = XzEncoder::new(compressed_file); + let size = tokio::io::copy(&mut reader, &mut writer).await?; + writer.shutdown().await?; + tracing::debug!( + "Compressed database file ({} bytes) into `{}`", + size, + xz_path.display() + ); + Ok(ByteStream::from_path(xz_path).await?) + } } } - fn db_gzip_path(db_path: &Path) -> PathBuf { - let mut gzip_path = db_path.to_path_buf(); - gzip_path.pop(); - gzip_path.join("db.gz") + fn db_compressed_path(db_path: &Path, suffix: &'static str) -> PathBuf { + let mut compressed_path: PathBuf = db_path.to_path_buf(); + compressed_path.pop(); + compressed_path.join(format!("db.{suffix}")) } fn restore_db_path(&self) -> PathBuf { @@ -816,9 +836,10 @@ impl Replicator { let _ = snapshot_notifier.send(Ok(Some(generation))); let elapsed = Instant::now() - start; tracing::debug!("Snapshot upload finished (took {:?})", elapsed); - // cleanup gzip database snapshot if exists - let gzip_path = Self::db_gzip_path(&db_path); - let _ = tokio::fs::remove_file(gzip_path).await; + // cleanup gzip/xz database snapshot if exists + for suffix in &["gz", "xz"] { + let _ = tokio::fs::remove_file(Self::db_compressed_path(&db_path, suffix)).await; + } }); let elapsed = Instant::now() - start_ts; tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed); @@ -1160,31 +1181,58 @@ impl Replicator { } async fn restore_from_snapshot(&mut self, generation: &Uuid, db: &mut File) -> Result { - let main_db_path = match self.use_compression { - CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation), - CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation), + let algos_to_try = match self.use_compression { + CompressionKind::None => &[ + CompressionKind::None, + CompressionKind::Xz, + CompressionKind::Gzip, + ], + CompressionKind::Gzip => &[ + CompressionKind::Gzip, + CompressionKind::Xz, + CompressionKind::None, + ], + CompressionKind::Xz => &[ + CompressionKind::Xz, + CompressionKind::Gzip, + CompressionKind::None, + ], }; - if let Ok(db_file) = self.get_object(main_db_path).send().await { - let mut body_reader = db_file.body.into_async_read(); - let db_size = match self.use_compression { - CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?, - CompressionKind::Gzip => { - let mut decompress_reader = async_compression::tokio::bufread::GzipDecoder::new( - tokio::io::BufReader::new(body_reader), - ); - tokio::io::copy(&mut decompress_reader, db).await? - } + for algo in algos_to_try { + let main_db_path = match algo { + CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation), + CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation), + CompressionKind::Xz => format!("{}-{}/db.xz", self.db_name, generation), }; - db.flush().await?; + if let Ok(db_file) = self.get_object(main_db_path).send().await { + let mut body_reader = db_file.body.into_async_read(); + let db_size = match algo { + CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?, + CompressionKind::Gzip => { + let mut decompress_reader = + async_compression::tokio::bufread::GzipDecoder::new( + tokio::io::BufReader::new(body_reader), + ); + tokio::io::copy(&mut decompress_reader, db).await? + } + CompressionKind::Xz => { + let mut decompress_reader = + async_compression::tokio::bufread::XzDecoder::new( + tokio::io::BufReader::new(body_reader), + ); + tokio::io::copy(&mut decompress_reader, db).await? + } + }; + db.flush().await?; - let page_size = Self::read_page_size(db).await?; - self.set_page_size(page_size)?; - tracing::info!("Restored the main database file ({} bytes)", db_size); - Ok(true) - } else { - Ok(false) + let page_size = Self::read_page_size(db).await?; + self.set_page_size(page_size)?; + tracing::info!("Restored the main database file ({} bytes)", db_size); + return Ok(true); + } } + Ok(false) } async fn restore_wal( @@ -1235,6 +1283,7 @@ impl Replicator { Some(result) => result, None => { if !key.ends_with(".gz") + && !key.ends_with(".xz") && !key.ends_with(".db") && !key.ends_with(".meta") && !key.ends_with(".dep") @@ -1423,6 +1472,7 @@ impl Replicator { let str = fpath.to_str()?; if str.ends_with(".db") | str.ends_with(".gz") + | str.ends_with(".xz") | str.ends_with(".raw") | str.ends_with(".meta") | str.ends_with(".dep") @@ -1670,6 +1720,7 @@ pub enum CompressionKind { #[default] None, Gzip, + Xz, } impl CompressionKind { @@ -1677,6 +1728,7 @@ impl CompressionKind { match kind { "gz" | "gzip" => Ok(CompressionKind::Gzip), "raw" | "" => Ok(CompressionKind::None), + "xz" => Ok(CompressionKind::Xz), other => Err(other), } } @@ -1687,6 +1739,7 @@ impl std::fmt::Display for CompressionKind { match self { CompressionKind::None => write!(f, "raw"), CompressionKind::Gzip => write!(f, "gz"), + CompressionKind::Xz => write!(f, "xz"), } } }