From efafb26bc6885e4c6cce94735f2f216eccb1d56c Mon Sep 17 00:00:00 2001 From: amigin Date: Sat, 23 Nov 2024 00:09:35 +0200 Subject: [PATCH 1/2] Now Next persist time is shown --- src/persist_markers/persist_by_table_item.rs | 36 +++++ src/persist_markers/persist_markers.rs | 145 +------------------ 2 files changed, 37 insertions(+), 144 deletions(-) diff --git a/src/persist_markers/persist_by_table_item.rs b/src/persist_markers/persist_by_table_item.rs index b32834e..b635512 100644 --- a/src/persist_markers/persist_by_table_item.rs +++ b/src/persist_markers/persist_by_table_item.rs @@ -161,6 +161,42 @@ impl PersistByTableItem { .unwrap() .clean_when_syncing_rows(rows); } + + pub fn get_metrics(&self) -> PersistMetrics { + let mut result = self.metrics.clone(); + + if let Some(value) = self.persist_whole_table_content { + result.next_persist_time = Some(value); + return result; + } + + if let Some(value) = self.persist_table_attributes { + result.next_persist_time = Some(value); + return result; + } + + for partition in self.persist_partitions.iter() { + if let Some(value) = partition.persist_whole_partition { + result.next_persist_time = Some(value); + return result; + } + + for row in partition.rows_to_persist.iter() { + match result.next_persist_time { + Some(current) => { + if row.persist_moment < current { + result.next_persist_time = Some(row.persist_moment); + } + } + None => { + result.next_persist_time = Some(row.persist_moment); + } + } + } + } + + result + } } impl EntityWithStrKey for PersistByTableItem { diff --git a/src/persist_markers/persist_markers.rs b/src/persist_markers/persist_markers.rs index 8d55c7a..5ee69d6 100644 --- a/src/persist_markers/persist_markers.rs +++ b/src/persist_markers/persist_markers.rs @@ -124,151 +124,8 @@ impl PersistMarkers { pub async fn get_persist_metrics(&self, table_name: &str) -> PersistMetrics { let inner = self.inner.lock().await; match inner.get_by_table(table_name) { - Some(metrics) => metrics.metrics.clone(), + Some(metrics) => metrics.get_metrics(), None => PersistMetrics::default(), } } - - /* - pub async fn persist_partition( - &self, - db_table: &DbTable, - partition_key: &impl PartitionKeyParameter, - sync_moment: DateTimeAsMicroseconds, - ) { - if !db_table.attributes.persist { - return; - } - - let mut write_access = self.by_table.lock().await; - - match write_access.insert_or_update(db_table.name.as_str()) { - my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Insert( - entry, - ) => { - let mut item = PersistByTableItem { - table_name: db_table.name.clone(), - data: TablePersistData::new(), - }; - - item.data - .data_to_persist - .mark_partition_to_persist(partition_key, sync_moment); - - entry.insert(item); - } - my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Update( - entry, - ) => { - entry - .item - .data - .data_to_persist - .mark_partition_to_persist(partition_key, sync_moment); - } - } - } - - pub async fn persist_table(&self, db_table: &DbTable, sync_moment: DateTimeAsMicroseconds) { - if !db_table.attributes.persist { - return; - } - - let mut write_access = self.by_table.lock().await; - - match write_access.insert_or_update(&db_table.name) { - my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Insert( - entry, - ) => { - let mut item = PersistByTableItem { - table_name: db_table.name.to_string(), - data: TablePersistData::new(), - }; - - item.data.data_to_persist.mark_table_to_persist(sync_moment); - - entry.insert(item); - } - my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Update( - entry, - ) => { - entry - .item - .data - .data_to_persist - .mark_table_to_persist(sync_moment); - } - } - } - - pub async fn persist_table_attrs(&self, db_table: &DbTable) { - if !db_table.attributes.persist { - return; - } - - let mut write_access = self.by_table.lock().await; - - match write_access.insert_or_update(&db_table.name) { - my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Insert( - entry, - ) => { - let mut item = PersistByTableItem { - table_name: db_table.name.to_string(), - data: TablePersistData::new(), - }; - - item.data.data_to_persist.mark_persist_attrs(); - - entry.insert(item); - } - my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Update( - entry, - ) => { - entry.item.data.data_to_persist.mark_persist_attrs(); - } - } - } - - pub async fn get_job_to_persist( - &self, - table_name: &str, - now: DateTimeAsMicroseconds, - is_shutting_down: bool, - ) -> Option { - let mut write_access = self.by_table.lock().await; - - let item = write_access.get_mut(table_name)?; - - item.data - .data_to_persist - .get_what_to_persist(now, is_shutting_down) - } - - pub async fn set_persisted(&self, table_name: &str, duration: Duration) { - let mut write_access = self.by_table.lock().await; - - if let Some(item) = write_access.get_mut(table_name) { - item.data.add_persist_duration(duration); - } - } - - pub async fn get_persist_metrics(&self, table_name: &str) -> PersistMetrics { - let read_access = self.by_table.lock().await; - - match read_access.get(table_name) { - Some(result) => PersistMetrics { - last_persist_time: result.data.last_persist_time.clone(), - next_persist_time: result.data.data_to_persist.get_next_persist_time(), - persist_amount: result.data.data_to_persist.get_persist_amount(), - last_persist_duration: result.data.persist_duration.clone(), - }, - None => PersistMetrics { - last_persist_time: None, - next_persist_time: None, - persist_amount: 0, - last_persist_duration: vec![], - }, - } - } - */ } From 170cac8457f4323ee2c88c6050f48d96fc6abddc Mon Sep 17 00:00:00 2001 From: amigin Date: Sat, 23 Nov 2024 00:30:50 +0200 Subject: [PATCH 2/2] Fixes with shutdown case --- src/main.rs | 15 +------------ src/operations/mod.rs | 3 ++- src/operations/shutdown.rs | 13 ++++++++---- src/persist_markers/persist_by_table_item.rs | 22 ++++++++++++++++++++ src/persist_markers/persist_markers.rs | 5 +++++ src/persist_markers/persist_markers_inner.rs | 6 ++++++ 6 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/main.rs b/src/main.rs index 517ce8c..aaccf40 100644 --- a/src/main.rs +++ b/src/main.rs @@ -110,18 +110,5 @@ async fn main() { app.states.wait_until_shutdown().await; - shut_down_task(app).await; -} - -async fn shut_down_task(app: Arc) { - let duration = Duration::from_secs(1); - - while !app.states.is_shutting_down() { - tokio::time::sleep(duration).await; - } - - println!("Shut down detected. Waiting for 1 second to deliver all messages"); - tokio::time::sleep(duration).await; - - crate::operations::shutdown::execute(app.as_ref()).await; + crate::operations::shutdown(&app).await; } diff --git a/src/operations/mod.rs b/src/operations/mod.rs index 15cac11..2f59c99 100644 --- a/src/operations/mod.rs +++ b/src/operations/mod.rs @@ -1,7 +1,8 @@ pub mod data_readers; mod get_metrics; pub mod persist; -pub mod shutdown; +mod shutdown; +pub use shutdown::*; pub mod sync; pub use get_metrics::*; pub use persist::*; diff --git a/src/operations/shutdown.rs b/src/operations/shutdown.rs index c958b73..76ec647 100644 --- a/src/operations/shutdown.rs +++ b/src/operations/shutdown.rs @@ -1,12 +1,17 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use crate::app::AppContext; -pub async fn execute(app: &AppContext) { +pub async fn shutdown(app: &Arc) { + println!("Doing persistence before shut down"); let duration = Duration::from_secs(1); - while !app.states.is_shutting_down() { + tokio::time::sleep(duration).await; + + while app.persist_markers.has_something_to_persist().await { + println!("Has something to persist. Persisting and checking again..."); + crate::operations::persist::persist(app).await; tokio::time::sleep(duration).await; } - print!("Stopping the application"); + println!("Everthing is persisted. App can be closed now"); } diff --git a/src/persist_markers/persist_by_table_item.rs b/src/persist_markers/persist_by_table_item.rs index b635512..97c0da6 100644 --- a/src/persist_markers/persist_by_table_item.rs +++ b/src/persist_markers/persist_by_table_item.rs @@ -197,6 +197,28 @@ impl PersistByTableItem { result } + + pub fn has_something_to_persist(&self) -> bool { + if self.persist_whole_table_content.is_some() { + return true; + } + + if self.persist_table_attributes.is_some() { + return true; + } + + for partition in self.persist_partitions.iter() { + if partition.persist_whole_partition.is_some() { + return true; + } + + if partition.rows_to_persist.len() > 0 { + return true; + } + } + + false + } } impl EntityWithStrKey for PersistByTableItem { diff --git a/src/persist_markers/persist_markers.rs b/src/persist_markers/persist_markers.rs index 5ee69d6..6545dbc 100644 --- a/src/persist_markers/persist_markers.rs +++ b/src/persist_markers/persist_markers.rs @@ -128,4 +128,9 @@ impl PersistMarkers { None => PersistMetrics::default(), } } + + pub async fn has_something_to_persist(&self) -> bool { + let inner = self.inner.lock().await; + inner.has_something_to_persist() + } } diff --git a/src/persist_markers/persist_markers_inner.rs b/src/persist_markers/persist_markers_inner.rs index 736564c..6312567 100644 --- a/src/persist_markers/persist_markers_inner.rs +++ b/src/persist_markers/persist_markers_inner.rs @@ -123,4 +123,10 @@ impl PersistMarkersInner { let item = self.get_item_mut(table_name); item.metrics.update(moment, duration); } + + pub fn has_something_to_persist(&self) -> bool { + self.items + .iter() + .any(|item| item.has_something_to_persist()) + } }