Skip to content

Commit

Permalink
New persist format (#11)
Browse files Browse the repository at this point in the history
* New Persist Format

* Minro Fixes

* Debugging Backup
  • Loading branch information
amigin authored Nov 22, 2024
1 parent b9de821 commit 80d2c65
Show file tree
Hide file tree
Showing 118 changed files with 1,722 additions and 2,399 deletions.
28 changes: 11 additions & 17 deletions src/app/app_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,9 @@ use my_no_sql_sdk::core::rust_extensions::{date_time::DateTimeAsMicroseconds, Ap
use my_no_sql_server_core::DbInstance;

use crate::{
data_readers::DataReadersList,
db_operations::multipart::MultipartList,
db_transactions::ActiveTransactions,
persist::PersistMarkersByTable,
persist_io::PersistIoOperations,
persist_operations::{
blob_content_cache::BlobContentCache, data_initializer::load_tasks::InitState,
},
settings_reader::SettingsModel,
data_readers::DataReadersList, db_operations::multipart::MultipartList,
db_transactions::ActiveTransactions, operations::init::InitState,
persist_markers::PersistMarkers, settings_reader::SettingsModel,
};

use super::{EventsSync, HttpWriters, PrometheusMetrics};
Expand All @@ -34,40 +28,40 @@ pub struct AppContext {
pub active_transactions: ActiveTransactions,
pub process_id: String,

pub blob_content_cache: BlobContentCache,
pub data_readers: DataReadersList,

pub multipart_list: MultipartList,
pub persist_io: PersistIoOperations,
//pub persist_io: PersistIoOperations,
pub init_state: InitState,
pub repo: crate::sqlite_repo::SqlLiteRepo,

pub settings: Arc<SettingsModel>,
pub sync: EventsSync,
pub states: Arc<AppStates>,
pub persist_markers: PersistMarkersByTable,
pub persist_markers: PersistMarkers,
pub http_writers: HttpWriters,
persist_amount: AtomicUsize,
}

impl AppContext {
pub fn new(settings: Arc<SettingsModel>, persist_io: PersistIoOperations) -> Self {
pub async fn new(settings: Arc<SettingsModel>) -> Self {
AppContext {
persist_markers: PersistMarkersByTable::new(),
persist_markers: PersistMarkers::new(),
created: DateTimeAsMicroseconds::now(),
init_state: InitState::new(),
db: DbInstance::new(),
metrics: PrometheusMetrics::new(),
active_transactions: ActiveTransactions::new(),
process_id: uuid::Uuid::new_v4().to_string(),
states: Arc::new(AppStates::create_un_initialized()),

blob_content_cache: BlobContentCache::new(),
data_readers: DataReadersList::new(Duration::from_secs(30)),
multipart_list: MultipartList::new(),
persist_io,
repo: settings.get_sqlite_repo().await,
settings,
persist_amount: AtomicUsize::new(0),
sync: EventsSync::new(),
http_writers: HttpWriters::new(),
init_state: InitState::new(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/db_operations/read/get_highest_row_and_below.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn get_highest_row_and_below(
break;
}
}
update_statistics.update(db_partition, Some(db_row), now);
update_statistics.update(db_table_wrapper, db_partition, Some(db_row), now);
json_array_writer.write(db_row.as_ref());

count += 1;
Expand Down
4 changes: 2 additions & 2 deletions src/db_operations/read/get_rows_as_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub async fn get_as_partition_key_and_row_key(

let db_row = db_partition.get_row_and_clone(row_key)?;

update_statistics.update(db_partition, Some(&db_row), now.date_time);
update_statistics.update(table, db_partition, Some(&db_row), now.date_time);

Some(db_row)
}
Expand Down Expand Up @@ -109,7 +109,7 @@ async fn get_as_row_key_only(
let mut result = Vec::new();

for (db_partition, db_row) in read_access.get_by_row_key(row_key, skip, limit) {
update_statistics.update(db_partition, Some(db_row), now.date_time);
update_statistics.update(table, db_partition, Some(db_row), now.date_time);
result.push(db_row.clone());
}

Expand Down
2 changes: 1 addition & 1 deletion src/db_operations/read/rows/get_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub async fn get_all(

let mut json_array_writer = JsonArrayWriter::new();
for (db_partition, db_row) in table_data.get_all_rows(skip, limit) {
update_statistics.update(db_partition, Some(db_row), now);
update_statistics.update(db_table_wrapper, db_partition, Some(db_row), now);
json_array_writer.write(db_row.as_ref());
}

Expand Down
2 changes: 1 addition & 1 deletion src/db_operations/read/rows/get_all_by_partition_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn get_all_by_partition_key(
limit,
skip,
|db_row| {
update_statistics.update(db_partition, Some(db_row), now);
update_statistics.update(db_table, db_partition, Some(db_row), now);
},
);

Expand Down
2 changes: 1 addition & 1 deletion src/db_operations/read/rows/get_all_by_row_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub async fn get_all_by_row_key(

let mut json_array_writer = JsonArrayWriter::new();
for (db_partition, db_row) in table_data.get_by_row_key(row_key, skip, limit) {
update_statistics.update(db_partition, Some(db_row), now);
update_statistics.update(db_table, db_partition, Some(db_row), now);
json_array_writer.write(db_row.as_ref());
}

Expand Down
2 changes: 1 addition & 1 deletion src/db_operations/read/rows/get_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub async fn get_single(

let db_row = db_row.unwrap();

update_statistics.update(db_partition, Some(db_row), now);
update_statistics.update(db_table, db_partition, Some(db_row), now);

return Ok(ReadOperationResult::SingleRow(db_row.to_vec()));
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub async fn get_single_partition_multiple_rows(
let db_row = db_partition.get_row(row_key);

if let Some(db_row) = db_row {
update_statistics.update(db_partition, Some(db_row), now);
update_statistics.update(db_table_wrapper, db_partition, Some(db_row), now);
json_array_writer.write(db_row.as_ref());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,18 @@ use std::sync::Arc;
use my_no_sql_sdk::core::rust_extensions::date_time::DateTimeAsMicroseconds;
use my_no_sql_server_core::DbTableWrapper;

use crate::app::AppContext;

pub fn update_partition_expiration_time(
app: &Arc<AppContext>,
db_table: &Arc<DbTableWrapper>,
partition_key: &str,
partition_key: String,
set_expiration_time: Option<DateTimeAsMicroseconds>,
) {
let partition_key = partition_key.to_string();

let db_table = db_table.clone();

let app = app.clone();

tokio::spawn(async move {
let mut table_data = db_table.data.write().await;

if let Some(db_partition) = table_data.get_partition_mut(&partition_key) {
if let Some(db_partition) = table_data.get_partition_mut(partition_key.as_str()) {
db_partition.expires = set_expiration_time;
}

let mut sync_moment = DateTimeAsMicroseconds::now();
sync_moment.add_minutes(5);

app.persist_markers
.persist_partition(&table_data, &partition_key, sync_moment)
.await;
});
}
17 changes: 15 additions & 2 deletions src/db_operations/update_statistics/update_rows_expiration_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub fn update_rows_expiration_time<'s, TRowKeys: Iterator<Item = &'s str>>(

let mut updated_db_rows = Vec::new();

let mut db_partition_key = None;

if let Some(db_partition) = table_data.get_partition_mut(&partition_key) {
for row_key in row_keys {
let db_row = db_partition
Expand All @@ -33,14 +35,25 @@ pub fn update_rows_expiration_time<'s, TRowKeys: Iterator<Item = &'s str>>(

if let Some(db_row) = db_row {
updated_db_rows.push(db_row);

if db_partition_key.is_none() {
db_partition_key = Some(db_partition.partition_key.clone());
}
}
}
}

if let Some(db_partition_key) = db_partition_key {
let mut sync_moment = DateTimeAsMicroseconds::now();
sync_moment.add_minutes(5);
sync_moment.add_seconds(10);

app.persist_markers
.persist_partition(&table_data, &partition_key, sync_moment)
.persist_rows(
&table_data.name,
&db_partition_key,
sync_moment,
updated_db_rows.iter(),
)
.await;
}
});
Expand Down
11 changes: 7 additions & 4 deletions src/db_operations/update_statistics/update_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use my_no_sql_sdk::core::db::{DbPartition, DbRow};
use my_no_sql_sdk::core::rust_extensions::date_time::DateTimeAsMicroseconds;
use my_no_sql_server_core::DbTableWrapper;

#[derive(Clone, Debug)]
pub struct UpdateStatistics {
Expand All @@ -14,6 +15,7 @@ pub struct UpdateStatistics {
impl UpdateStatistics {
pub fn update(
&self,
db_table: &Arc<DbTableWrapper>,
db_partition: &DbPartition,
db_row: Option<&Arc<DbRow>>,
now: DateTimeAsMicroseconds,
Expand All @@ -22,12 +24,13 @@ impl UpdateStatistics {
db_partition.update_last_read_moment(now);
}

/*
todo!("Not Implemented yet");
if let Some(update_partition_expiration_time) = self.update_partition_expiration_time {
db_partition.update_expires(update_partition_expiration_time);
crate::db_operations::update_statistics::update_partition_expiration_time(
db_table,
db_partition.partition_key.to_string(),
update_partition_expiration_time,
);
}
*/

if let Some(db_row) = db_row {
if self.update_rows_last_read_access_time {
Expand Down
17 changes: 13 additions & 4 deletions src/db_operations/write/bulk_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,23 @@ pub async fn bulk_delete(
table_data.bulk_remove_rows(&partition_key, row_keys.into_iter(), true, Some(now));

if let Some((partition_key, removed_rows, partition_is_empty)) = removed_rows_result {
app.persist_markers
.persist_partition(&table_data, &partition_key, persist_moment)
.await;

if partition_is_empty {
sync_data.new_deleted_partition(&partition_key);

app.persist_markers
.persist_partition(&table_data.name, &partition_key, persist_moment)
.await;
} else {
sync_data.add_deleted_rows(&partition_key, &removed_rows);

app.persist_markers
.delete_db_rows(
&table_data.name,
&partition_key,
persist_moment,
removed_rows.iter(),
)
.await;
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/db_operations/write/bulk_insert_or_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ pub async fn execute(
has_insert_or_replace = true;

app.persist_markers
.persist_partition(&table_data, &partition_key, persist_moment)
.persist_rows(
&table_data.name,
&partition_key,
persist_moment,
db_rows.iter(),
)
.await;

update_rows_state
Expand Down
2 changes: 1 addition & 1 deletion src/db_operations/write/clean_partition_and_bulk_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub async fn execute(

for partition_key in partition_keys {
app.persist_markers
.persist_partition(&table_data, &partition_key, persist_moment)
.persist_partition(&table_data.name, &partition_key, persist_moment)
.await;

let state = InitPartitionsSyncEventData::new_as_update_partition(
Expand Down
2 changes: 1 addition & 1 deletion src/db_operations/write/clean_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub async fn execute(
crate::operations::sync::dispatch(app, SyncEvent::InitTable(sync_data));

app.persist_markers
.persist_table(&table_data, persist_moment)
.persist_table_content(&table_data.name, persist_moment)
.await;
}

Expand Down
2 changes: 1 addition & 1 deletion src/db_operations/write/clean_table_and_bulk_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn execute(
}

app.persist_markers
.persist_table(&table_data, persist_moment)
.persist_table_content(&table_data.name, persist_moment)
.await;

if let Some(event_src) = event_src {
Expand Down
8 changes: 6 additions & 2 deletions src/db_operations/write/delete_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ pub async fn delete_partitions(
let remove_partition_result =
table_write_access.remove_partition(&partition_key, Some(now));

if remove_partition_result.is_some() {
if let Some(removed_partition) = remove_partition_result {
app.persist_markers
.persist_partition(&table_write_access, &partition_key, persist_moment)
.persist_partition(
&db_table.name,
&removed_partition.partition_key,
persist_moment,
)
.await;

sync_data.add(partition_key.into_partition_key(), None);
Expand Down
7 changes: 6 additions & 1 deletion src/db_operations/write/delete_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ pub async fn execute(
let mut sync_data = DeleteRowsEventSyncData::new(&table_data, event_src);

app.persist_markers
.persist_partition(&table_data, &partition_key, persist_moment)
.persist_rows(
&table_data.name,
&partition_key,
persist_moment,
[&removed_row].into_iter(),
)
.await;

if partition_is_empty {
Expand Down
7 changes: 6 additions & 1 deletion src/db_operations/write/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ pub async fn execute(
let partition_key = partition_key.unwrap();

app.persist_markers
.persist_partition(&table_data, &partition_key, persist_moment)
.persist_rows(
&table_data.name,
&partition_key,
persist_moment,
[&db_row].into_iter(),
)
.await;

let mut update_rows_state = UpdateRowsSyncData::new(&table_data, event_src);
Expand Down
7 changes: 6 additions & 1 deletion src/db_operations/write/insert_or_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ pub async fn execute(
let (partition_key, _) = table_data.insert_or_replace_row(&db_row, Some(now));

app.persist_markers
.persist_partition(&table_data, &partition_key, persist_moment)
.persist_rows(
&table_data.name,
&partition_key,
persist_moment,
[&db_row].into_iter(),
)
.await;

let mut update_rows_state = UpdateRowsSyncData::new(&table_data, event_src);
Expand Down
7 changes: 6 additions & 1 deletion src/db_operations/write/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ pub async fn execute(
table_data.insert_row(&db_row, Some(now.date_time));

app.persist_markers
.persist_partition(&table_data, &db_row, persist_moment)
.persist_rows(
&table_data.name,
&partition_key,
persist_moment,
[&db_row].into_iter(),
)
.await;

let mut update_rows_state = UpdateRowsSyncData::new(&table_data, event_src);
Expand Down
Loading

0 comments on commit 80d2c65

Please sign in to comment.