Skip to content

Commit

Permalink
Now Next persist time is shown
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Nov 22, 2024
1 parent 80d2c65 commit efafb26
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 144 deletions.
36 changes: 36 additions & 0 deletions src/persist_markers/persist_by_table_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,42 @@ impl PersistByTableItem {
.unwrap()
.clean_when_syncing_rows(rows);
}

pub fn get_metrics(&self) -> PersistMetrics {
let mut result = self.metrics.clone();

if let Some(value) = self.persist_whole_table_content {
result.next_persist_time = Some(value);
return result;
}

if let Some(value) = self.persist_table_attributes {
result.next_persist_time = Some(value);
return result;
}

for partition in self.persist_partitions.iter() {
if let Some(value) = partition.persist_whole_partition {
result.next_persist_time = Some(value);
return result;
}

for row in partition.rows_to_persist.iter() {
match result.next_persist_time {
Some(current) => {
if row.persist_moment < current {
result.next_persist_time = Some(row.persist_moment);
}
}
None => {
result.next_persist_time = Some(row.persist_moment);
}
}
}
}

result
}
}

impl EntityWithStrKey for PersistByTableItem {
Expand Down
145 changes: 1 addition & 144 deletions src/persist_markers/persist_markers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,151 +124,8 @@ impl PersistMarkers {
pub async fn get_persist_metrics(&self, table_name: &str) -> PersistMetrics {
let inner = self.inner.lock().await;
match inner.get_by_table(table_name) {
Some(metrics) => metrics.metrics.clone(),
Some(metrics) => metrics.get_metrics(),
None => PersistMetrics::default(),
}
}

/*
pub async fn persist_partition(
&self,
db_table: &DbTable,
partition_key: &impl PartitionKeyParameter,
sync_moment: DateTimeAsMicroseconds,
) {
if !db_table.attributes.persist {
return;
}
let mut write_access = self.by_table.lock().await;
match write_access.insert_or_update(db_table.name.as_str()) {
my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Insert(
entry,
) => {
let mut item = PersistByTableItem {
table_name: db_table.name.clone(),
data: TablePersistData::new(),
};
item.data
.data_to_persist
.mark_partition_to_persist(partition_key, sync_moment);
entry.insert(item);
}
my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Update(
entry,
) => {
entry
.item
.data
.data_to_persist
.mark_partition_to_persist(partition_key, sync_moment);
}
}
}
pub async fn persist_table(&self, db_table: &DbTable, sync_moment: DateTimeAsMicroseconds) {
if !db_table.attributes.persist {
return;
}
let mut write_access = self.by_table.lock().await;
match write_access.insert_or_update(&db_table.name) {
my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Insert(
entry,
) => {
let mut item = PersistByTableItem {
table_name: db_table.name.to_string(),
data: TablePersistData::new(),
};
item.data.data_to_persist.mark_table_to_persist(sync_moment);
entry.insert(item);
}
my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Update(
entry,
) => {
entry
.item
.data
.data_to_persist
.mark_table_to_persist(sync_moment);
}
}
}
pub async fn persist_table_attrs(&self, db_table: &DbTable) {
if !db_table.attributes.persist {
return;
}
let mut write_access = self.by_table.lock().await;
match write_access.insert_or_update(&db_table.name) {
my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Insert(
entry,
) => {
let mut item = PersistByTableItem {
table_name: db_table.name.to_string(),
data: TablePersistData::new(),
};
item.data.data_to_persist.mark_persist_attrs();
entry.insert(item);
}
my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Update(
entry,
) => {
entry.item.data.data_to_persist.mark_persist_attrs();
}
}
}
pub async fn get_job_to_persist(
&self,
table_name: &str,
now: DateTimeAsMicroseconds,
is_shutting_down: bool,
) -> Option<PersistResult> {
let mut write_access = self.by_table.lock().await;
let item = write_access.get_mut(table_name)?;
item.data
.data_to_persist
.get_what_to_persist(now, is_shutting_down)
}
pub async fn set_persisted(&self, table_name: &str, duration: Duration) {
let mut write_access = self.by_table.lock().await;
if let Some(item) = write_access.get_mut(table_name) {
item.data.add_persist_duration(duration);
}
}
pub async fn get_persist_metrics(&self, table_name: &str) -> PersistMetrics {
let read_access = self.by_table.lock().await;
match read_access.get(table_name) {
Some(result) => PersistMetrics {
last_persist_time: result.data.last_persist_time.clone(),
next_persist_time: result.data.data_to_persist.get_next_persist_time(),
persist_amount: result.data.data_to_persist.get_persist_amount(),
last_persist_duration: result.data.persist_duration.clone(),
},
None => PersistMetrics {
last_persist_time: None,
next_persist_time: None,
persist_amount: 0,
last_persist_duration: vec![],
},
}
}
*/
}

0 comments on commit efafb26

Please sign in to comment.