Skip to content

Commit

Permalink
Added Ability to restore Table or Tables from Backup
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Dec 2, 2024
1 parent d7e821d commit 072a075
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 14 deletions.
6 changes: 3 additions & 3 deletions src/db_operations/write/clean_partition_and_bulk_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use crate::{
db_sync::{states::InitPartitionsSyncEventData, EventSource, SyncEvent},
};

pub async fn execute(
pub async fn clean_partition_and_bulk_insert(
app: &AppContext,
db_table: Arc<DbTableWrapper>,
partition_key: String,
partition_to_clean: String,
entities: Vec<(String, Vec<Arc<DbRow>>)>,
event_src: EventSource,
persist_moment: DateTimeAsMicroseconds,
Expand All @@ -22,7 +22,7 @@ pub async fn execute(
super::super::check_app_states(app)?;
let mut table_data = db_table.data.write().await;

table_data.remove_partition(&partition_key, None);
table_data.remove_partition(&partition_to_clean, None);

let mut partition_keys = Vec::new();

Expand Down
4 changes: 3 additions & 1 deletion src/db_operations/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod bulk_delete;
pub mod bulk_insert_or_update;
pub mod clean_partition_and_bulk_insert;

mod clean_partition_and_bulk_insert;
pub use clean_partition_and_bulk_insert::*;
pub mod clean_table;
pub mod clean_table_and_bulk_insert;
mod delete_partitions;
Expand Down
1 change: 1 addition & 0 deletions src/db_sync/sync_attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub enum EventSource {
ClientRequest(ClientRequestsSourceData),
GarbageCollector,
Subscriber,
Backup,
}

impl EventSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ async fn handle_request(
}
};

HttpOutput::as_text(format!("{:?}", restore_result))
.into_ok_result(true)
.into()
match restore_result {
Ok(_) => HttpOutput::Empty.into_ok_result(true).into(),
Err(err) => Err(HttpFailResult::as_fatal_error(format!("{:?}", err))),
}
}

#[derive(MyHttpInput)]
Expand Down
2 changes: 1 addition & 1 deletion src/http/controllers/bulk/clean_and_bulk_insert_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn handle_request(

match input_data.partition_key {
Some(partition_key) => {
crate::db_operations::write::clean_partition_and_bulk_insert::execute(
crate::db_operations::write::clean_partition_and_bulk_insert(
action.app.as_ref(),
db_table,
partition_key,
Expand Down
2 changes: 2 additions & 0 deletions src/operations/backup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ mod utils;
pub use gc_backups::*;
mod restore;
pub use restore::*;
mod restore_file_name;
pub use restore_file_name::*;
139 changes: 133 additions & 6 deletions src/operations/backup/restore.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,148 @@
use crate::app::AppContext;
use std::{collections::BTreeMap, sync::Arc, time::Duration};

use my_no_sql_sdk::core::db_json_entity::DbJsonEntity;
use my_no_sql_server_core::rust_extensions::date_time::DateTimeAsMicroseconds;

use crate::{
app::AppContext, db_sync::EventSource,
scripts::serializers::table_attrs::TableMetadataFileContract, zip::ZipReader,
};

use super::RestoreFileName;

#[derive(Debug)]
pub enum BackupError {
TableNotFoundInBackupFile,
#[allow(dead_code)]
InvalidFileName(String),
#[allow(dead_code)]
ZipArchiveError(String),
#[allow(dead_code)]
TableNotFoundToRestoreBackupAndNoMetadataFound(String),
#[allow(dead_code)]
InvalidFileContent {
file_name: String,
partition_key: String,
err: String,
},
}

pub async fn restore(
app: &AppContext,
app: &Arc<AppContext>,
backup_content: Vec<u8>,
table_name: Option<&str>,
) -> Result<(), BackupError> {
let zip_cursor = std::io::Cursor::new(backup_content);
let mut zip_reader = ZipReader::new(backup_content);

let mut partitions: BTreeMap<String, Vec<RestoreFileName>> = BTreeMap::new();

for file_name in zip_reader.get_file_names() {
let file_name =
RestoreFileName::new(file_name).map_err(|err| BackupError::InvalidFileName(err))?;

match partitions.get_mut(&file_name.table_name) {
Some(by_table) => {
if file_name.file_type.is_metadata() {
by_table.insert(0, file_name);
} else {
by_table.push(file_name)
}
}
None => {
partitions.insert(file_name.table_name.to_string(), vec![file_name]);
}
}
}

if partitions.is_empty() {
return Err(BackupError::TableNotFoundInBackupFile);
}

match table_name {
Some(table_name) => match partitions.remove(table_name) {
Some(files) => {
restore_to_db(&app, table_name, files, &mut zip_reader).await?;
}
None => {
return Err(BackupError::TableNotFoundInBackupFile);
}
},
None => {
for (table_name, files) in partitions {
restore_to_db(&app, table_name.as_str(), files, &mut zip_reader).await?;
}
}
}

Ok(())
}

async fn restore_to_db(
app: &Arc<AppContext>,
table_name: &str,
mut files: Vec<RestoreFileName>,
zip: &mut ZipReader,
) -> Result<(), BackupError> {
let persist_moment = DateTimeAsMicroseconds::now().add(Duration::from_secs(5));
let db_table = if files.get(0).unwrap().file_type.is_metadata() {
let metadata_file = files.remove(0);

let content = zip
.get_content_as_vec(&metadata_file.file_name)
.map_err(|err| BackupError::ZipArchiveError(format!("{:?}", err)))?;

let table = TableMetadataFileContract::parse(content.as_slice());

let db_table = crate::db_operations::write::table::create_if_not_exist(
app,
table_name,
table.persist,
table.max_partitions_amount,
table.max_rows_per_partition_amount,
EventSource::Backup,
persist_moment,
)
.await
.unwrap();
db_table
} else {
let db_table = app.db.get_table(table_name).await;

if db_table.is_none() {
return Err(BackupError::TableNotFoundToRestoreBackupAndNoMetadataFound(
table_name.to_string(),
));
}

db_table.unwrap()
};

for partition_file in files {
let partition_key = partition_file.file_type.unwrap_as_partition_key();

let content = zip
.get_content_as_vec(&partition_file.file_name)
.map_err(|err| BackupError::ZipArchiveError(format!("{:?}", err)))?;

let zip = zip::ZipArchive::new(zip_cursor).unwrap();
let db_rows = DbJsonEntity::restore_as_vec(content.as_slice()).map_err(|itm| {
BackupError::InvalidFileContent {
file_name: partition_file.file_name,
partition_key: partition_key.to_string(),
err: format!("{:?}", itm),
}
})?;

for itm in zip.file_names() {
println!("File: {}", itm);
crate::db_operations::write::clean_partition_and_bulk_insert(
app,
db_table.clone(),
partition_key.to_string(),
vec![(partition_key, db_rows)],
EventSource::Backup,
persist_moment,
DateTimeAsMicroseconds::now(),
)
.await
.unwrap();
}

Ok(())
Expand Down
96 changes: 96 additions & 0 deletions src/operations/backup/restore_file_name.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use my_no_sql_server_core::rust_extensions::base64::FromBase64;

use crate::scripts::TABLE_METADATA_FILE_NAME;

pub enum ArchiveFileType {
Metadata,
PartitionKey(String),
}

impl ArchiveFileType {
pub fn is_metadata(&self) -> bool {
matches!(self, Self::Metadata)
}

pub fn unwrap_as_partition_key(self) -> String {
match self {
Self::PartitionKey(key) => key,
_ => panic!("Can not unwrap partition key"),
}
}
}

pub struct RestoreFileName {
pub table_name: String,
pub file_type: ArchiveFileType,
pub file_name: String,
}

impl RestoreFileName {
pub fn new(file_name: &str) -> Result<Self, String> {
let table_separator = file_name.find(std::path::MAIN_SEPARATOR);

if table_separator.is_none() {
return Err(format!("Invalid table file_name [{}]", file_name));
}

let table_separator = table_separator.unwrap();

let partition_key = &file_name[table_separator + 1..];

if partition_key == TABLE_METADATA_FILE_NAME {
return Ok(Self {
table_name: file_name[..table_separator].to_string(),
file_type: ArchiveFileType::Metadata,
file_name: file_name.to_string(),
});
}

let partition_key = partition_key.from_base64();

if partition_key.is_err() {
return Err(format!(
"Invalid file_name key [{}]. Can not extract partition key",
file_name
));
}

let partition_key = partition_key.unwrap();

let partition_key = match String::from_utf8(partition_key) {
Ok(result) => result,
Err(_) => {
return Err(format!(
"Invalid file_name key [{}]. Can not convert partition key to string",
file_name
));
}
};

let result = Self {
table_name: file_name[..table_separator].to_string(),
file_type: ArchiveFileType::PartitionKey(partition_key),
file_name: file_name.to_string(),
};

Ok(result)
}
}

#[cfg(test)]
mod tests {

#[test]
fn test_metadata_file() {
let result = super::RestoreFileName::new("key-value/.metadata").unwrap();
assert_eq!(result.table_name, "key-value");
assert!(result.file_type.is_metadata());
}

#[test]
fn test_partition_key() {
let result = super::RestoreFileName::new("key-value/Yw==").unwrap();
assert_eq!(result.table_name, "key-value");
assert!(result.file_type.unwrap_as_partition_key() == "c");
}
}
2 changes: 2 additions & 0 deletions src/zip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ mod db_zip_builder;
mod vec_writer;
pub use db_zip_builder::*;
pub use vec_writer::*;
mod zip_reader;
pub use zip_reader::*;
36 changes: 36 additions & 0 deletions src/zip/zip_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::io::Read;

pub struct ZipReader {
zip: zip::ZipArchive<std::io::Cursor<Vec<u8>>>,
}

impl ZipReader {
pub fn new(zip_content: Vec<u8>) -> Self {
let zip_cursor = std::io::Cursor::new(zip_content);
let zip = zip::ZipArchive::new(zip_cursor).unwrap();
Self { zip }
}

pub fn get_file_names(&mut self) -> impl Iterator<Item = &str> {
self.zip.file_names()
}

pub fn get_content_as_vec(&mut self, file_name: &str) -> Result<Vec<u8>, std::io::Error> {
let mut file = self.zip.by_name(file_name)?;
let file_size = file.size() as usize;
let mut content: Vec<u8> = Vec::with_capacity(file_size);

let mut pos = 0;
while pos < file_size {
let size = file.read(&mut content[pos..])?;

if size == 0 {
break;
}

pos += size;
}

Ok(content)
}
}

0 comments on commit 072a075

Please sign in to comment.