diff --git a/src/background/backup_timer.rs b/src/background/backup_timer.rs new file mode 100644 index 0000000..26cc7ba --- /dev/null +++ b/src/background/backup_timer.rs @@ -0,0 +1,23 @@ +use std::sync::Arc; + +use rust_extensions::MyTimerTick; + +use crate::app::AppContext; + +pub struct BackupTimer { + app: Arc, +} + +impl BackupTimer { + pub fn new(app: Arc) -> Self { + Self { app } + } +} + +#[async_trait::async_trait] +impl MyTimerTick for BackupTimer { + async fn tick(&self) { + crate::operations::backup::save_backup(&self.app, false).await; + crate::operations::backup::gc_backups(&self.app).await; + } +} diff --git a/src/background/gc_db_rows.rs b/src/background/gc_db_rows.rs index a2dfac5..e777165 100644 --- a/src/background/gc_db_rows.rs +++ b/src/background/gc_db_rows.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use rust_extensions::{date_time::DateTimeAsMicroseconds, MyTimerTick}; @@ -80,12 +80,16 @@ async fn gc_it(app: &AppContext) { ) .await { + let mut ctx = HashMap::new(); + + ctx.insert("TableName".to_string(), table.name.to_string()); + app.logs.add_error( table.name.to_string().into(), SystemProcess::Timer, "GcRows".to_string().into(), format!("{:?}", err), - None, + Some(ctx), ) } } diff --git a/src/background/mod.rs b/src/background/mod.rs index 89169de..7e059df 100644 --- a/src/background/mod.rs +++ b/src/background/mod.rs @@ -1,6 +1,8 @@ +mod backup_timer; pub mod gc_db_rows; pub mod gc_http_sessions; pub mod gc_multipart; pub mod metrics_updater; pub mod persist; pub mod sync; +pub use backup_timer::*; diff --git a/src/http/controllers/tables_controller/download_action.rs b/src/http/controllers/backup_controller/download_action.rs similarity index 62% rename from src/http/controllers/tables_controller/download_action.rs rename to src/http/controllers/backup_controller/download_action.rs index 0ed520e..fbbd4af 100644 --- a/src/http/controllers/tables_controller/download_action.rs +++ b/src/http/controllers/backup_controller/download_action.rs @@ -3,11 +3,11 @@ use my_http_server::{HttpContext, HttpFailResult, HttpOkResult, HttpOutput}; use rust_extensions::date_time::DateTimeAsMicroseconds; use std::sync::Arc; -use crate::{app::AppContext, zip::DbZipBuilder}; +use crate::app::AppContext; #[http_route( method: "GET", - route: "/Tables/Download", + route: "/Backup/Download", description: "Download all tables as Zip Archive", summary: "Download all tables as Zip Archive", controller: "Tables", @@ -29,24 +29,13 @@ async fn handle_request( action: &DownloadAction, _ctx: &mut HttpContext, ) -> Result { - crate::db_operations::check_app_states(action.app.as_ref())?; - let tables = action.app.db.get_tables().await; - - let mut zip_builder = DbZipBuilder::new(); - - for db_table in &tables { - let table_snapshot = db_table.get_table_snapshot().await; - - zip_builder - .add_table(&db_table.name, &table_snapshot) - .unwrap(); - } + let db_snapshot_as_zip = crate::operations::build_db_snapshot_as_zip_archive(&action.app).await; let now = DateTimeAsMicroseconds::now(); let filename = format!("{}.zip", &now.to_rfc3339().replace(":", "_")[..19]); - HttpOutput::as_file(filename.to_string(), zip_builder.get_payload().unwrap()) + HttpOutput::as_file(filename.to_string(), db_snapshot_as_zip) .into_ok_result(true) .into() } diff --git a/src/http/controllers/backup_controller/mod.rs b/src/http/controllers/backup_controller/mod.rs new file mode 100644 index 0000000..b1c713b --- /dev/null +++ b/src/http/controllers/backup_controller/mod.rs @@ -0,0 +1,2 @@ +mod download_action; +pub use download_action::*; diff --git a/src/http/controllers/builder.rs b/src/http/controllers/builder.rs index 1a9a093..7c6f0bd 100644 --- a/src/http/controllers/builder.rs +++ b/src/http/controllers/builder.rs @@ -53,10 +53,6 @@ pub fn build(app: &Arc) -> ControllersMiddleware { super::tables_controller::CreateIfNotExistsAction::new(app.clone()), )); - result.register_get_action(Arc::new(super::tables_controller::DownloadAction::new( - app.clone(), - ))); - result.register_post_action(Arc::new(super::transactions::StartTransactionAction::new( app.clone(), ))); @@ -211,5 +207,11 @@ pub fn build(app: &Arc) -> ControllersMiddleware { app.clone(), ))); + // Backup controller + + result.register_get_action(Arc::new(super::backup_controller::DownloadAction::new( + app.clone(), + ))); + result } diff --git a/src/http/controllers/mod.rs b/src/http/controllers/mod.rs index d62c433..dd6387d 100644 --- a/src/http/controllers/mod.rs +++ b/src/http/controllers/mod.rs @@ -18,4 +18,5 @@ pub mod status_controller; pub mod tables_controller; pub mod transactions; pub use mappers::*; +mod backup_controller; pub mod partitions; diff --git a/src/http/controllers/tables_controller/mod.rs b/src/http/controllers/tables_controller/mod.rs index d71cd09..8ff8f57 100644 --- a/src/http/controllers/tables_controller/mod.rs +++ b/src/http/controllers/tables_controller/mod.rs @@ -2,7 +2,7 @@ mod clean_table_action; mod create_if_not_exists_action; mod create_table_action; mod delete_table_action; -mod download_action; + mod get_list_action; mod get_partitions_count_action; mod migration_action; @@ -13,7 +13,6 @@ pub use clean_table_action::CleanTableAction; pub use create_if_not_exists_action::CreateIfNotExistsAction; pub use create_table_action::CreateTableAction; pub use delete_table_action::DeleteTableAction; -pub use download_action::*; pub use get_list_action::GetListAction; pub use get_partitions_count_action::GetPartitionsCountAction; pub use migration_action::MigrationAction; diff --git a/src/main.rs b/src/main.rs index ab3001b..fc97eda 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use app::AppContext; use background::{ gc_db_rows::GcDbRows, gc_http_sessions::GcHttpSessionsTimer, gc_multipart::GcMultipart, - metrics_updater::MetricsUpdater, persist::PersistTimer, sync::SyncEventLoop, + metrics_updater::MetricsUpdater, persist::PersistTimer, sync::SyncEventLoop, BackupTimer, }; use my_no_sql_sdk::tcp_contracts::MyNoSqlReaderTcpSerializer; @@ -80,6 +80,12 @@ async fn main() { timer_30s.start(app.states.clone(), app.clone()); persist_timer.start(app.states.clone(), app.clone()); + let mut backup_timer = MyTimer::new(Duration::from_secs(60)); + + backup_timer.register_timer("BackupDb", Arc::new(BackupTimer::new(app.clone()))); + + backup_timer.start(app.states.clone(), app.clone()); + app.sync.start(app.states.clone(), app.clone()).await; crate::http::start_up::setup_server(&app); diff --git a/src/operations/backup/gc_backups.rs b/src/operations/backup/gc_backups.rs new file mode 100644 index 0000000..ab71d52 --- /dev/null +++ b/src/operations/backup/gc_backups.rs @@ -0,0 +1,65 @@ +use rust_extensions::ShortString; + +use crate::app::AppContext; + +use std::{collections::BTreeMap, path::MAIN_SEPARATOR}; + +use super::utils::compile_backup_file; + +pub async fn gc_backups(app: &AppContext) { + let backup_folder = app.settings.get_backup_folder(); + + let mut read_dir = tokio::fs::read_dir(backup_folder.as_str()).await.unwrap(); + + let mut result = BTreeMap::new(); + + while let Ok(entry) = read_dir.next_entry().await { + if entry.is_none() { + break; + } + + let entry = entry.unwrap(); + + let file_type = entry.file_type().await.unwrap(); + + if file_type.is_file() { + let path = entry.path(); + + let path = format!("{}", path.display()); + + let file_name = extract_file_name(path.as_str(), MAIN_SEPARATOR); + + if if_filename_is_backup(file_name) { + result.insert(file_name.to_string(), ()); + } + } + } + + while result.len() > app.settings.max_backups_to_keep { + let file_name: ShortString = result.keys().next().unwrap().into(); + println!("Deleting backup file: {}", file_name.as_str()); + delete_backup(app, file_name.as_str()).await; + result.remove(file_name.as_str()); + } +} + +async fn delete_backup(app: &AppContext, file_name: &str) { + let file_full_path = compile_backup_file(app, file_name); + tokio::fs::remove_file(file_full_path).await.unwrap(); +} + +pub fn extract_file_name(full_path: &str, separator: char) -> &str { + let full_path_as_bytes = full_path.as_bytes(); + + for index in (0..full_path_as_bytes.len()).rev() { + if full_path_as_bytes[index] == separator as u8 { + return &full_path[index + 1..]; + } + } + + panic!("Can not extract filename from full path [{}]", full_path); +} + +fn if_filename_is_backup(src: &str) -> bool { + return src.ends_with(".zip"); +} diff --git a/src/operations/backup/mod.rs b/src/operations/backup/mod.rs new file mode 100644 index 0000000..c71c6d0 --- /dev/null +++ b/src/operations/backup/mod.rs @@ -0,0 +1,5 @@ +mod save_backup; +pub use save_backup::*; +mod gc_backups; +mod utils; +pub use gc_backups::*; diff --git a/src/operations/backup/save_backup.rs b/src/operations/backup/save_backup.rs new file mode 100644 index 0000000..be6f501 --- /dev/null +++ b/src/operations/backup/save_backup.rs @@ -0,0 +1,75 @@ +use rust_extensions::date_time::DateTimeAsMicroseconds; + +use crate::app::AppContext; + +use super::utils::*; + +pub async fn save_backup(app: &AppContext, force_write: bool) { + let now = DateTimeAsMicroseconds::now(); + + if !force_write { + if let Some(last_backup_time) = get_last_backup_time(app).await { + let backup_interval_seconds = app.settings.backup_interval_hours * 60 * 60; + + if now + .duration_since(last_backup_time) + .as_positive_or_zero() + .as_secs() + < backup_interval_seconds + { + return; + } + } + } + + let backup_content = super::super::build_db_snapshot_as_zip_archive(app).await; + + let file_name = now.to_rfc3339().replace(":", "").replace("-", ""); + + let file_name = compile_backup_file(app, format!("{}.zip", &file_name[..15]).as_str()); + + tokio::fs::write(file_name.as_str(), backup_content) + .await + .unwrap(); + + save_last_backup_time(app, now).await; +} + +async fn get_last_backup_time(app: &AppContext) -> Option { + let file_name = compile_backup_file(app, LAST_TIME_BACKUP_FILE_NAME); + + let content = tokio::fs::read(file_name.as_str()).await; + + if content.is_err() { + println!("Can not open file: {}", file_name.as_str()); + return None; + } + + let content = content.unwrap(); + + let content = String::from_utf8(content); + + if content.is_err() { + println!("Can not parse file: {}", file_name.as_str()); + return None; + } + + let content = content.unwrap(); + + let result = DateTimeAsMicroseconds::from_str(content.as_str()); + + if result.is_none() { + println!("Can not parse date_time from file: {}", file_name.as_str()); + } + + result +} + +async fn save_last_backup_time(app: &AppContext, now: DateTimeAsMicroseconds) { + let file_name = compile_backup_file(app, LAST_TIME_BACKUP_FILE_NAME); + + let backup_content = now.to_rfc3339(); + tokio::fs::write(file_name.as_str(), backup_content) + .await + .unwrap(); +} diff --git a/src/operations/backup/utils.rs b/src/operations/backup/utils.rs new file mode 100644 index 0000000..fc0077c --- /dev/null +++ b/src/operations/backup/utils.rs @@ -0,0 +1,14 @@ +use std::path::MAIN_SEPARATOR; + +use crate::app::AppContext; + +pub const LAST_TIME_BACKUP_FILE_NAME: &str = ".last_backup_time"; + +pub fn compile_backup_file<'s>(app: &AppContext, file_name: &str) -> String { + let backup_folder = app.settings.get_backup_folder(); + if backup_folder.as_str().ends_with(MAIN_SEPARATOR) { + format!("{}{}", backup_folder.as_str(), file_name) + } else { + format!("{}{}{}", backup_folder.as_str(), MAIN_SEPARATOR, file_name) + } +} diff --git a/src/operations/build_db_snapshot_as_zip.rs b/src/operations/build_db_snapshot_as_zip.rs new file mode 100644 index 0000000..1e285e9 --- /dev/null +++ b/src/operations/build_db_snapshot_as_zip.rs @@ -0,0 +1,17 @@ +use crate::{app::AppContext, zip::DbZipBuilder}; + +pub async fn build_db_snapshot_as_zip_archive(app: &AppContext) -> Vec { + let tables = app.db.get_tables().await; + + let mut zip_builder = DbZipBuilder::new(); + + for db_table in &tables { + let table_snapshot = db_table.get_table_snapshot().await; + + zip_builder + .add_table(&db_table.name, &table_snapshot) + .unwrap(); + } + + zip_builder.get_payload().unwrap() +} diff --git a/src/operations/mod.rs b/src/operations/mod.rs index 5bca7d8..2c2db15 100644 --- a/src/operations/mod.rs +++ b/src/operations/mod.rs @@ -5,3 +5,7 @@ pub mod shutdown; pub mod sync; pub use get_metrics::*; pub use persist::persist; + +mod build_db_snapshot_as_zip; +pub use build_db_snapshot_as_zip::*; +pub mod backup; diff --git a/src/persist_io/mod.rs b/src/persist_io/mod.rs index 34606d8..c711f67 100644 --- a/src/persist_io/mod.rs +++ b/src/persist_io/mod.rs @@ -3,3 +3,5 @@ mod table_file; mod with_retries; pub use persist_io_operations::{PersistIoOperations, TableListOfFilesUploader}; pub use table_file::TableFile; + +pub const TABLE_METADATA_FILE_NAME: &str = ".metadata"; diff --git a/src/persist_io/table_file.rs b/src/persist_io/table_file.rs index 4a49e3d..111f626 100644 --- a/src/persist_io/table_file.rs +++ b/src/persist_io/table_file.rs @@ -1,4 +1,4 @@ -pub const TABLE_METADATA_FILE_NAME: &str = ".metadata"; +use crate::persist_io::TABLE_METADATA_FILE_NAME; pub enum TableFile { TableAttributes, diff --git a/src/persist_operations/data_initializer/loaded_table_item.rs b/src/persist_operations/data_initializer/loaded_table_item.rs index 605bf7d..766231c 100644 --- a/src/persist_operations/data_initializer/loaded_table_item.rs +++ b/src/persist_operations/data_initializer/loaded_table_item.rs @@ -20,9 +20,7 @@ impl LoadedTableItem { } TableFile::DbPartition(partition_key) => { let db_partition = - crate::persist_operations::serializers::db_partition::deserialize_from_io( - content, - )?; + crate::persist_operations::serializers::db_partition::deserialize(content)?; let result = LoadedTableItem::DbPartition { partition_key: partition_key.to_string(), diff --git a/src/persist_operations/serializers/db_partition.rs b/src/persist_operations/serializers/db_partition.rs index 5de4b4a..e948917 100644 --- a/src/persist_operations/serializers/db_partition.rs +++ b/src/persist_operations/serializers/db_partition.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use my_json::json_reader::array_parser::ArrayToJsonObjectsSplitter; use my_no_sql_sdk::core::{db::DbPartition, db_json_entity::DbJsonEntity}; -pub fn deserialize_from_io(raw: &[u8]) -> Result { +pub fn deserialize(raw: &[u8]) -> Result { let mut db_partition = DbPartition::new(); for db_entity_json_result in raw.split_array_json_to_objects() { diff --git a/src/persist_operations/serializers/table_attrs.rs b/src/persist_operations/serializers/table_attrs.rs index 9176ac1..edbe2ce 100644 --- a/src/persist_operations/serializers/table_attrs.rs +++ b/src/persist_operations/serializers/table_attrs.rs @@ -11,6 +11,8 @@ pub struct TableMetadataFileContract { pub max_partitions_amount: Option, #[serde(rename = "MaxRowsPerPartitionAmount")] pub max_rows_per_partition_amount: Option, + #[serde(rename = "Created")] + pub created: Option, } impl TableMetadataFileContract { @@ -23,6 +25,7 @@ impl TableMetadataFileContract { max_partitions_amount: None, max_rows_per_partition_amount: None, persist: true, + created: Some(DateTimeAsMicroseconds::now().to_rfc3339()), }, } } @@ -34,12 +37,33 @@ fn default_persist() -> bool { impl Into for TableMetadataFileContract { fn into(self) -> DbTableAttributes { - DbTableAttributes { - created: DateTimeAsMicroseconds::now(), + let mut result = DbTableAttributes { + created: if let Some(created) = &self.created { + match DateTimeAsMicroseconds::from_str(created) { + Some(value) => value, + None => DateTimeAsMicroseconds::now(), + } + } else { + DateTimeAsMicroseconds::now() + }, max_partitions_amount: self.max_partitions_amount, max_rows_per_partition_amount: self.max_rows_per_partition_amount, persist: self.persist, + }; + + if let Some(value) = result.max_partitions_amount { + if value == 0 { + result.max_partitions_amount = None; + } } + + if let Some(value) = result.max_rows_per_partition_amount { + if value == 0 { + result.max_rows_per_partition_amount = None; + } + } + + result } } @@ -48,6 +72,7 @@ pub fn serialize(attrs: &DbTableAttributes) -> Vec { max_partitions_amount: attrs.max_partitions_amount, max_rows_per_partition_amount: attrs.max_rows_per_partition_amount, persist: attrs.persist, + created: Some(attrs.created.to_rfc3339()), }; serde_json::to_vec(&contract).unwrap() diff --git a/src/settings_reader.rs b/src/settings_reader.rs index b94cbd2..8b800eb 100644 --- a/src/settings_reader.rs +++ b/src/settings_reader.rs @@ -1,5 +1,6 @@ use my_azure_storage_sdk::AzureStorageConnection; use my_no_sql_server_core::logs::Logs; +use rust_extensions::StrOrString; use serde::{Deserialize, Serialize}; use std::{env, sync::Arc}; use tokio::{fs::File, io::AsyncReadExt}; @@ -28,6 +29,15 @@ pub struct SettingsModel { #[serde(rename = "TcpSendTimeoutSec")] pub tcp_send_time_out: u64, + + #[serde(rename = "BackupFolder")] + backup_folder: String, + + #[serde(rename = "BackupIntervalHours")] + pub backup_interval_hours: u64, + + #[serde(rename = "MaxBackupsToKeep")] + pub max_backups_to_keep: usize, } impl SettingsModel { @@ -35,6 +45,10 @@ impl SettingsModel { let conn_string = AzureStorageConnection::from_conn_string(self.persistence_dest.as_str()); PersistIoOperations::new(Arc::new(conn_string), logs) } + + pub fn get_backup_folder<'s>(&'s self) -> StrOrString<'s> { + rust_extensions::file_utils::format_path(self.backup_folder.as_str()) + } } pub async fn read_settings() -> SettingsModel { diff --git a/src/zip/db_zip_builder.rs b/src/zip/db_zip_builder.rs index ea19e7a..60eab58 100644 --- a/src/zip/db_zip_builder.rs +++ b/src/zip/db_zip_builder.rs @@ -25,6 +25,15 @@ impl DbZipBuilder { table_name: &str, content: &DbTableSnapshot, ) -> Result<(), zip::result::ZipError> { + let file_name = format!( + "{}/{}", + table_name, + crate::persist_io::TABLE_METADATA_FILE_NAME + ); + self.zip_writer.start_file(file_name, self.options)?; + let payload = crate::persist_operations::serializers::table_attrs::serialize(&content.attr); + write_to_zip_file(&mut self.zip_writer, &payload)?; + for (partition_key, content) in &content.by_partition { use base64::Engine; let encoded_file_name = @@ -37,12 +46,7 @@ impl DbZipBuilder { let payload = json.build(); - let mut pos = 0; - while pos < payload.len() { - let size = self.zip_writer.write(&payload[pos..])?; - - pos += size; - } + write_to_zip_file(&mut self.zip_writer, &payload)?; } Ok(()) @@ -53,3 +57,17 @@ impl DbZipBuilder { Ok(result.buf) } } + +fn write_to_zip_file( + zip_writer: &mut zip::ZipWriter, + payload: &[u8], +) -> Result<(), zip::result::ZipError> { + let mut pos = 0; + while pos < payload.len() { + let size = zip_writer.write(&payload[pos..])?; + + pos += size; + } + + Ok(()) +}