Skip to content

Commit

Permalink
Applied Metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Jan 10, 2024
1 parent c91d0ad commit a22c4b2
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 41 deletions.
2 changes: 1 addition & 1 deletion my-no-sql-tcp-reader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ my-no-sql-abstractions = { path = "../my-no-sql-abstractions" }
my-no-sql-core = { path = "../my-no-sql-core" }

rust-extensions = { tag = "0.1.4", git = "https://github.com/MyJetTools/rust-extensions.git" }
my-tcp-sockets = { tag = "0.1.9", git = "https://github.com/MyJetTools/my-tcp-sockets.git" }
my-tcp-sockets = { branch = "main", git = "https://github.com/MyJetTools/my-tcp-sockets.git" }
my-logger = { tag = "1.1.0", git = "https://github.com/MyJetTools/my-logger.git" }
my-json = { tag = "0.2.2", git = "https://github.com/MyJetTools/my-json.git" }

Expand Down
8 changes: 2 additions & 6 deletions my-no-sql-tcp-reader/src/my_no_sql_tcp_connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};

use my_no_sql_abstractions::MyNoSqlEntity;
use my_no_sql_tcp_shared::{sync_to_main::SyncToMainNodeHandler, MyNoSqlReaderTcpSerializer};
use my_no_sql_tcp_shared::sync_to_main::SyncToMainNodeHandler;
use my_tcp_sockets::TcpClient;
use rust_extensions::{AppStates, StrOrString};

Expand Down Expand Up @@ -65,11 +65,7 @@ impl MyNoSqlTcpConnection {
self.app_states.set_initialized();

self.tcp_client
.start(
Arc::new(|| -> MyNoSqlReaderTcpSerializer { MyNoSqlReaderTcpSerializer::new() }),
self.tcp_events.clone(),
my_logger::LOGGER.clone(),
)
.start(self.tcp_events.clone(), my_logger::LOGGER.clone())
.await;

self.tcp_events
Expand Down
10 changes: 5 additions & 5 deletions my-no-sql-tcp-reader/src/tcp_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use my_tcp_sockets::{tcp_connection::TcpSocketConnection, ConnectionEvent, Socke

use crate::subscribers::Subscribers;

pub type TcpConnection = TcpSocketConnection<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer>;
pub type TcpConnection = TcpSocketConnection<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer, ()>;
pub struct TcpEvents {
app_name: String,
pub subscribers: Subscribers,
Expand Down Expand Up @@ -102,25 +102,25 @@ impl TcpEvents {
}

#[async_trait::async_trait]
impl SocketEventCallback<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer> for TcpEvents {
impl SocketEventCallback<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer, ()> for TcpEvents {
async fn handle(
&self,
connection_event: ConnectionEvent<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer>,
connection_event: ConnectionEvent<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer, ()>,
) {
match connection_event {
ConnectionEvent::Connected(connection) => {
let contract = MyNoSqlTcpContract::Greeting {
name: self.app_name.to_string(),
};

connection.send(&contract).await;
connection.send(&contract, &()).await;

for table in self.subscribers.get_tables_to_subscribe().await {
let contract = MyNoSqlTcpContract::Subscribe {
table_name: table.to_string(),
};

connection.send(&contract).await;
connection.send(&contract, &()).await;
}

self.sync_handler
Expand Down
2 changes: 1 addition & 1 deletion my-no-sql-tcp-shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
my-tcp-sockets = { tag = "0.1.9", git = "https://github.com/MyJetTools/my-tcp-sockets.git" }
my-tcp-sockets = { branch = "main", git = "https://github.com/MyJetTools/my-tcp-sockets.git" }
rust-extensions = { tag = "0.1.4", git = "https://github.com/MyJetTools/rust-extensions.git" }

tokio = { version = "*", features = ["full"] }
Expand Down
1 change: 1 addition & 0 deletions my-no-sql-tcp-shared/src/sync_to_main/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ pub use sync_to_main_node_queue::*;
type DataReaderTcpConnection = my_tcp_sockets::tcp_connection::TcpSocketConnection<
crate::MyNoSqlTcpContract,
crate::MyNoSqlReaderTcpSerializer,
(),
>;
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@ pub async fn to_main_node_pusher(
}

connection
.send(&MyNoSqlTcpContract::UpdatePartitionsExpirationTime {
confirmation_id,
table_name: event.table_name,
partitions,
})
.send(
&MyNoSqlTcpContract::UpdatePartitionsExpirationTime {
confirmation_id,
table_name: event.table_name,
partitions,
},
&(),
)
.await;
}
DeliverToMainNodeEvent::UpdatePartitionsLastReadTime {
Expand All @@ -89,11 +92,14 @@ pub async fn to_main_node_pusher(
}

connection
.send(&MyNoSqlTcpContract::UpdatePartitionsLastReadTime {
confirmation_id,
table_name: event.table_name,
partitions,
})
.send(
&MyNoSqlTcpContract::UpdatePartitionsLastReadTime {
confirmation_id,
table_name: event.table_name,
partitions,
},
&(),
)
.await;
}
DeliverToMainNodeEvent::UpdateRowsExpirationTime {
Expand All @@ -107,13 +113,16 @@ pub async fn to_main_node_pusher(
}

connection
.send(&MyNoSqlTcpContract::UpdateRowsExpirationTime {
confirmation_id,
table_name: event.table_name,
partition_key: event.partition_key,
row_keys,
expiration_time: event.expiration_time,
})
.send(
&MyNoSqlTcpContract::UpdateRowsExpirationTime {
confirmation_id,
table_name: event.table_name,
partition_key: event.partition_key,
row_keys,
expiration_time: event.expiration_time,
},
&(),
)
.await;
}
DeliverToMainNodeEvent::UpdateRowsLastReadTime {
Expand All @@ -127,12 +136,15 @@ pub async fn to_main_node_pusher(
}

connection
.send(&MyNoSqlTcpContract::UpdateRowsLastReadTime {
confirmation_id,
table_name: event.table_name,
partition_key: event.partition_key,
row_keys,
})
.send(
&MyNoSqlTcpContract::UpdateRowsLastReadTime {
confirmation_id,
table_name: event.table_name,
partition_key: event.partition_key,
row_keys,
},
&(),
)
.await;
}
}
Expand Down
17 changes: 12 additions & 5 deletions my-no-sql-tcp-shared/src/tcp_serializer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use my_tcp_sockets::{
socket_reader::{ReadingTcpContractFail, SocketReader},
TcpSocketSerializer, TcpWriteBuffer,
SerializationMetadata, TcpSocketSerializer, TcpWriteBuffer,
};

use crate::MyNoSqlTcpContract;
Expand All @@ -14,10 +14,8 @@ impl MyNoSqlReaderTcpSerializer {
}

#[async_trait::async_trait]
impl TcpSocketSerializer<MyNoSqlTcpContract> for MyNoSqlReaderTcpSerializer {
const PING_PACKET_IS_SINGLETON: bool = true;

fn serialize(&self, out: &mut impl TcpWriteBuffer, contract: &MyNoSqlTcpContract) {
impl TcpSocketSerializer<MyNoSqlTcpContract, ()> for MyNoSqlReaderTcpSerializer {
fn serialize(&self, out: &mut impl TcpWriteBuffer, contract: &MyNoSqlTcpContract, _: &()) {
contract.serialize(out)
}

Expand All @@ -28,7 +26,16 @@ impl TcpSocketSerializer<MyNoSqlTcpContract> for MyNoSqlReaderTcpSerializer {
async fn deserialize<TSocketReader: Send + Sync + 'static + SocketReader>(
&mut self,
socket_reader: &mut TSocketReader,
_: &(),
) -> Result<MyNoSqlTcpContract, ReadingTcpContractFail> {
MyNoSqlTcpContract::deserialize(socket_reader).await
}

fn create_serializer() -> Self {
Self::new()
}
}

impl SerializationMetadata<MyNoSqlTcpContract> for () {
fn apply_tcp_contract(&mut self, _: &MyNoSqlTcpContract) {}
}

0 comments on commit a22c4b2

Please sign in to comment.