From efafb26bc6885e4c6cce94735f2f216eccb1d56c Mon Sep 17 00:00:00 2001 From: amigin Date: Sat, 23 Nov 2024 00:09:35 +0200 Subject: [PATCH] Now Next persist time is shown --- src/persist_markers/persist_by_table_item.rs | 36 +++++ src/persist_markers/persist_markers.rs | 145 +------------------ 2 files changed, 37 insertions(+), 144 deletions(-) diff --git a/src/persist_markers/persist_by_table_item.rs b/src/persist_markers/persist_by_table_item.rs index b32834e..b635512 100644 --- a/src/persist_markers/persist_by_table_item.rs +++ b/src/persist_markers/persist_by_table_item.rs @@ -161,6 +161,42 @@ impl PersistByTableItem { .unwrap() .clean_when_syncing_rows(rows); } + + pub fn get_metrics(&self) -> PersistMetrics { + let mut result = self.metrics.clone(); + + if let Some(value) = self.persist_whole_table_content { + result.next_persist_time = Some(value); + return result; + } + + if let Some(value) = self.persist_table_attributes { + result.next_persist_time = Some(value); + return result; + } + + for partition in self.persist_partitions.iter() { + if let Some(value) = partition.persist_whole_partition { + result.next_persist_time = Some(value); + return result; + } + + for row in partition.rows_to_persist.iter() { + match result.next_persist_time { + Some(current) => { + if row.persist_moment < current { + result.next_persist_time = Some(row.persist_moment); + } + } + None => { + result.next_persist_time = Some(row.persist_moment); + } + } + } + } + + result + } } impl EntityWithStrKey for PersistByTableItem { diff --git a/src/persist_markers/persist_markers.rs b/src/persist_markers/persist_markers.rs index 8d55c7a..5ee69d6 100644 --- a/src/persist_markers/persist_markers.rs +++ b/src/persist_markers/persist_markers.rs @@ -124,151 +124,8 @@ impl PersistMarkers { pub async fn get_persist_metrics(&self, table_name: &str) -> PersistMetrics { let inner = self.inner.lock().await; match inner.get_by_table(table_name) { - Some(metrics) => metrics.metrics.clone(), + Some(metrics) => metrics.get_metrics(), None => PersistMetrics::default(), } } - - /* - pub async fn persist_partition( - &self, - db_table: &DbTable, - partition_key: &impl PartitionKeyParameter, - sync_moment: DateTimeAsMicroseconds, - ) { - if !db_table.attributes.persist { - return; - } - - let mut write_access = self.by_table.lock().await; - - match write_access.insert_or_update(db_table.name.as_str()) { - my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Insert( - entry, - ) => { - let mut item = PersistByTableItem { - table_name: db_table.name.clone(), - data: TablePersistData::new(), - }; - - item.data - .data_to_persist - .mark_partition_to_persist(partition_key, sync_moment); - - entry.insert(item); - } - my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Update( - entry, - ) => { - entry - .item - .data - .data_to_persist - .mark_partition_to_persist(partition_key, sync_moment); - } - } - } - - pub async fn persist_table(&self, db_table: &DbTable, sync_moment: DateTimeAsMicroseconds) { - if !db_table.attributes.persist { - return; - } - - let mut write_access = self.by_table.lock().await; - - match write_access.insert_or_update(&db_table.name) { - my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Insert( - entry, - ) => { - let mut item = PersistByTableItem { - table_name: db_table.name.to_string(), - data: TablePersistData::new(), - }; - - item.data.data_to_persist.mark_table_to_persist(sync_moment); - - entry.insert(item); - } - my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Update( - entry, - ) => { - entry - .item - .data - .data_to_persist - .mark_table_to_persist(sync_moment); - } - } - } - - pub async fn persist_table_attrs(&self, db_table: &DbTable) { - if !db_table.attributes.persist { - return; - } - - let mut write_access = self.by_table.lock().await; - - match write_access.insert_or_update(&db_table.name) { - my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Insert( - entry, - ) => { - let mut item = PersistByTableItem { - table_name: db_table.name.to_string(), - data: TablePersistData::new(), - }; - - item.data.data_to_persist.mark_persist_attrs(); - - entry.insert(item); - } - my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Update( - entry, - ) => { - entry.item.data.data_to_persist.mark_persist_attrs(); - } - } - } - - pub async fn get_job_to_persist( - &self, - table_name: &str, - now: DateTimeAsMicroseconds, - is_shutting_down: bool, - ) -> Option { - let mut write_access = self.by_table.lock().await; - - let item = write_access.get_mut(table_name)?; - - item.data - .data_to_persist - .get_what_to_persist(now, is_shutting_down) - } - - pub async fn set_persisted(&self, table_name: &str, duration: Duration) { - let mut write_access = self.by_table.lock().await; - - if let Some(item) = write_access.get_mut(table_name) { - item.data.add_persist_duration(duration); - } - } - - pub async fn get_persist_metrics(&self, table_name: &str) -> PersistMetrics { - let read_access = self.by_table.lock().await; - - match read_access.get(table_name) { - Some(result) => PersistMetrics { - last_persist_time: result.data.last_persist_time.clone(), - next_persist_time: result.data.data_to_persist.get_next_persist_time(), - persist_amount: result.data.data_to_persist.get_persist_amount(), - last_persist_duration: result.data.persist_duration.clone(), - }, - None => PersistMetrics { - last_persist_time: None, - next_persist_time: None, - persist_amount: 0, - last_persist_duration: vec![], - }, - } - } - */ }