Skip to content

Commit

Permalink
Made backup
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Dec 2, 2024
1 parent fe0e215 commit 222eee4
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/db_operations/transactions/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn commit(
match event {
TransactionalOperationStep::CleanTable { table_name } => {
let db_table = tables.get(table_name.as_str()).unwrap();
crate::db_operations::write::clean_table::execute(
crate::db_operations::write::clean_table(
app,
&db_table,
event_src.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/db_operations/write/clean_partition_and_bulk_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{

pub async fn clean_partition_and_bulk_insert(
app: &AppContext,
db_table: Arc<DbTableWrapper>,
db_table: &Arc<DbTableWrapper>,
partition_to_clean: String,
entities: Vec<(String, Vec<Arc<DbRow>>)>,
event_src: EventSource,
Expand Down
2 changes: 1 addition & 1 deletion src/db_operations/write/clean_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
db_sync::{states::InitTableEventSyncData, EventSource, SyncEvent},
};

pub async fn execute(
pub async fn clean_table(
app: &AppContext,
db_table: &Arc<DbTableWrapper>,
event_src: EventSource,
Expand Down
3 changes: 2 additions & 1 deletion src/db_operations/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ pub mod bulk_insert_or_update;

mod clean_partition_and_bulk_insert;
pub use clean_partition_and_bulk_insert::*;
pub mod clean_table;
mod clean_table;
pub use clean_table::*;
pub mod clean_table_and_bulk_insert;
mod delete_partitions;
pub mod delete_row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async fn handle_request(
&action.app,
backup_content,
input_data.get_table_name(),
input_data.clean_table,
)
.await
}
Expand All @@ -64,13 +65,17 @@ async fn handle_request(

#[derive(MyHttpInput)]
pub struct RestoreFromBackupInputData {
#[http_form_data(name = "fileName", description = "File in backup folder")]
pub file_name: String,
#[http_form_data(
name = "tableName",
description = "Name of the table or '*' for all tables"
)]
pub table_name: String,

#[http_form_data(name = "fileName", description = "File in backup folder")]
pub file_name: String,

#[http_form_data(name = "cleanTable", description = "Clean table before restore")]
pub clean_table: bool,
}

impl RestoreFromBackupInputData {
Expand Down
9 changes: 7 additions & 2 deletions src/http_server/controllers/backup/restore_from_zip_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ async fn handle_request(
&action.app,
input_data.zip.content,
table_name.as_deref(),
input_data.clean_table,
)
.await;

Expand All @@ -48,13 +49,17 @@ async fn handle_request(

#[derive(MyHttpInput)]
pub struct RestoreFromBackupZipFileInputData {
#[http_form_data(name = "fileName", description = "File in backup folder")]
pub zip: FileContent,
#[http_form_data(
name = "tableName",
description = "Name of the table or '*' for all tables"
)]
pub table_name: String,

#[http_form_data(name = "fileName", description = "File in backup folder")]
pub zip: FileContent,

#[http_form_data(name = "cleanTable", description = "Clean table before restore")]
pub clean_table: bool,
}

impl RestoreFromBackupZipFileInputData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn handle_request(
Some(partition_key) => {
crate::db_operations::write::clean_partition_and_bulk_insert(
action.app.as_ref(),
db_table,
&db_table,
partition_key,
rows_by_partition,
event_src,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn handle_request(

let event_src = EventSource::as_client_request(action.app.as_ref());

crate::db_operations::write::clean_table::execute(
crate::db_operations::write::clean_table(
action.app.as_ref(),
&db_table,
event_src,
Expand Down
26 changes: 23 additions & 3 deletions src/operations/backup/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub async fn restore(
app: &Arc<AppContext>,
backup_content: Vec<u8>,
table_name: Option<&str>,
clean_table: bool,
) -> Result<(), BackupError> {
let mut zip_reader = ZipReader::new(backup_content);

Expand Down Expand Up @@ -61,15 +62,22 @@ pub async fn restore(
match table_name {
Some(table_name) => match partitions.remove(table_name) {
Some(files) => {
restore_to_db(&app, table_name, files, &mut zip_reader).await?;
restore_to_db(&app, table_name, files, &mut zip_reader, clean_table).await?;
}
None => {
return Err(BackupError::TableNotFoundInBackupFile);
}
},
None => {
for (table_name, files) in partitions {
restore_to_db(&app, table_name.as_str(), files, &mut zip_reader).await?;
restore_to_db(
&app,
table_name.as_str(),
files,
&mut zip_reader,
clean_table,
)
.await?;
}
}
}
Expand All @@ -81,6 +89,7 @@ async fn restore_to_db(
table_name: &str,
mut files: Vec<RestoreFileName>,
zip: &mut ZipReader,
clean_table: bool,
) -> Result<(), BackupError> {
let persist_moment = DateTimeAsMicroseconds::now().add(Duration::from_secs(5));
let db_table = if files.get(0).unwrap().file_type.is_metadata() {
Expand Down Expand Up @@ -116,6 +125,17 @@ async fn restore_to_db(
db_table.unwrap()
};

if clean_table {
crate::db_operations::write::clean_table(
&app,
&db_table,
EventSource::Backup,
persist_moment,
)
.await
.unwrap();
}

for partition_file in files {
let partition_key = partition_file.file_type.unwrap_as_partition_key();

Expand All @@ -133,7 +153,7 @@ async fn restore_to_db(

crate::db_operations::write::clean_partition_and_bulk_insert(
app,
db_table.clone(),
&db_table,
partition_key.to_string(),
vec![(partition_key, db_rows)],
EventSource::Backup,
Expand Down

0 comments on commit 222eee4

Please sign in to comment.