Skip to content

Commit

Permalink
Merge branch 'main' of github.com:MyJetTools/my-no-sql-server
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Dec 2, 2024
2 parents bcbf71b + 170cac8 commit d7e821d
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 161 deletions.
15 changes: 1 addition & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,5 @@ async fn main() {

app.states.wait_until_shutdown().await;

shut_down_task(app).await;
}

async fn shut_down_task(app: Arc<AppContext>) {
let duration = Duration::from_secs(1);

while !app.states.is_shutting_down() {
tokio::time::sleep(duration).await;
}

println!("Shut down detected. Waiting for 1 second to deliver all messages");
tokio::time::sleep(duration).await;

crate::operations::shutdown::execute(app.as_ref()).await;
crate::operations::shutdown(&app).await;
}
3 changes: 2 additions & 1 deletion src/operations/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub mod data_readers;
mod get_metrics;
pub mod persist;
pub mod shutdown;
mod shutdown;
pub use shutdown::*;
pub mod sync;
pub use get_metrics::*;
pub use persist::*;
Expand Down
13 changes: 9 additions & 4 deletions src/operations/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use crate::app::AppContext;

pub async fn execute(app: &AppContext) {
pub async fn shutdown(app: &Arc<AppContext>) {
println!("Doing persistence before shut down");
let duration = Duration::from_secs(1);
while !app.states.is_shutting_down() {
tokio::time::sleep(duration).await;

while app.persist_markers.has_something_to_persist().await {
println!("Has something to persist. Persisting and checking again...");
crate::operations::persist::persist(app).await;
tokio::time::sleep(duration).await;
}

print!("Stopping the application");
println!("Everthing is persisted. App can be closed now");
}
58 changes: 58 additions & 0 deletions src/persist_markers/persist_by_table_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,64 @@ impl PersistByTableItem {
.unwrap()
.clean_when_syncing_rows(rows);
}

pub fn get_metrics(&self) -> PersistMetrics {
let mut result = self.metrics.clone();

if let Some(value) = self.persist_whole_table_content {
result.next_persist_time = Some(value);
return result;
}

if let Some(value) = self.persist_table_attributes {
result.next_persist_time = Some(value);
return result;
}

for partition in self.persist_partitions.iter() {
if let Some(value) = partition.persist_whole_partition {
result.next_persist_time = Some(value);
return result;
}

for row in partition.rows_to_persist.iter() {
match result.next_persist_time {
Some(current) => {
if row.persist_moment < current {
result.next_persist_time = Some(row.persist_moment);
}
}
None => {
result.next_persist_time = Some(row.persist_moment);
}
}
}
}

result
}

pub fn has_something_to_persist(&self) -> bool {
if self.persist_whole_table_content.is_some() {
return true;
}

if self.persist_table_attributes.is_some() {
return true;
}

for partition in self.persist_partitions.iter() {
if partition.persist_whole_partition.is_some() {
return true;
}

if partition.rows_to_persist.len() > 0 {
return true;
}
}

false
}
}

impl EntityWithStrKey for PersistByTableItem {
Expand Down
146 changes: 4 additions & 142 deletions src/persist_markers/persist_markers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,151 +124,13 @@ impl PersistMarkers {
pub async fn get_persist_metrics(&self, table_name: &str) -> PersistMetrics {
let inner = self.inner.lock().await;
match inner.get_by_table(table_name) {
Some(metrics) => metrics.metrics.clone(),
Some(metrics) => metrics.get_metrics(),
None => PersistMetrics::default(),
}
}

/*
pub async fn persist_partition(
&self,
db_table: &DbTable,
partition_key: &impl PartitionKeyParameter,
sync_moment: DateTimeAsMicroseconds,
) {
if !db_table.attributes.persist {
return;
}
let mut write_access = self.by_table.lock().await;
match write_access.insert_or_update(db_table.name.as_str()) {
my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Insert(
entry,
) => {
let mut item = PersistByTableItem {
table_name: db_table.name.clone(),
data: TablePersistData::new(),
};
item.data
.data_to_persist
.mark_partition_to_persist(partition_key, sync_moment);
entry.insert(item);
}
my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Update(
entry,
) => {
entry
.item
.data
.data_to_persist
.mark_partition_to_persist(partition_key, sync_moment);
}
}
}
pub async fn persist_table(&self, db_table: &DbTable, sync_moment: DateTimeAsMicroseconds) {
if !db_table.attributes.persist {
return;
}
let mut write_access = self.by_table.lock().await;
match write_access.insert_or_update(&db_table.name) {
my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Insert(
entry,
) => {
let mut item = PersistByTableItem {
table_name: db_table.name.to_string(),
data: TablePersistData::new(),
};
item.data.data_to_persist.mark_table_to_persist(sync_moment);
entry.insert(item);
}
my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Update(
entry,
) => {
entry
.item
.data
.data_to_persist
.mark_table_to_persist(sync_moment);
}
}
}
pub async fn persist_table_attrs(&self, db_table: &DbTable) {
if !db_table.attributes.persist {
return;
}
let mut write_access = self.by_table.lock().await;
match write_access.insert_or_update(&db_table.name) {
my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Insert(
entry,
) => {
let mut item = PersistByTableItem {
table_name: db_table.name.to_string(),
data: TablePersistData::new(),
};
item.data.data_to_persist.mark_persist_attrs();
entry.insert(item);
}
my_no_sql_sdk::core::rust_extensions::sorted_vec::InsertOrUpdateEntry::Update(
entry,
) => {
entry.item.data.data_to_persist.mark_persist_attrs();
}
}
}
pub async fn get_job_to_persist(
&self,
table_name: &str,
now: DateTimeAsMicroseconds,
is_shutting_down: bool,
) -> Option<PersistResult> {
let mut write_access = self.by_table.lock().await;
let item = write_access.get_mut(table_name)?;
item.data
.data_to_persist
.get_what_to_persist(now, is_shutting_down)
}
pub async fn set_persisted(&self, table_name: &str, duration: Duration) {
let mut write_access = self.by_table.lock().await;
if let Some(item) = write_access.get_mut(table_name) {
item.data.add_persist_duration(duration);
}
}
pub async fn get_persist_metrics(&self, table_name: &str) -> PersistMetrics {
let read_access = self.by_table.lock().await;
match read_access.get(table_name) {
Some(result) => PersistMetrics {
last_persist_time: result.data.last_persist_time.clone(),
next_persist_time: result.data.data_to_persist.get_next_persist_time(),
persist_amount: result.data.data_to_persist.get_persist_amount(),
last_persist_duration: result.data.persist_duration.clone(),
},
None => PersistMetrics {
last_persist_time: None,
next_persist_time: None,
persist_amount: 0,
last_persist_duration: vec![],
},
}
pub async fn has_something_to_persist(&self) -> bool {
let inner = self.inner.lock().await;
inner.has_something_to_persist()
}
*/
}
6 changes: 6 additions & 0 deletions src/persist_markers/persist_markers_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,10 @@ impl PersistMarkersInner {
let item = self.get_item_mut(table_name);
item.metrics.update(moment, duration);
}

pub fn has_something_to_persist(&self) -> bool {
self.items
.iter()
.any(|item| item.has_something_to_persist())
}
}

0 comments on commit d7e821d

Please sign in to comment.