Skip to content

Commit

Permalink
Improvements with EventLoop
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Jan 8, 2024
1 parent ad3a99d commit ed9be80
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 80 deletions.
4 changes: 2 additions & 2 deletions my-no-sql-tcp-reader/src/my_no_sql_tcp_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl MyNoSqlTcpConnection {
connect_timeout: Duration::from_secs(3),
tcp_events: Arc::new(TcpEvents::new(
app_name.to_string(),
Arc::new(SyncToMainNodeHandler::new()),
Arc::new(SyncToMainNodeHandler::new(my_logger::LOGGER.clone())),
)),
app_states: Arc::new(AppStates::create_un_initialized()),
}
Expand Down Expand Up @@ -74,7 +74,7 @@ impl MyNoSqlTcpConnection {

self.tcp_events
.sync_handler
.start(self.app_states.clone(), my_logger::LOGGER.clone())
.start(self.app_states.clone())
.await;
}
}
14 changes: 5 additions & 9 deletions my-no-sql-tcp-reader/src/tcp_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,9 @@ impl TcpEvents {
MyNoSqlTcpContract::Unsubscribe(_) => {}
MyNoSqlTcpContract::TableNotFound(_) => {}
MyNoSqlTcpContract::CompressedPayload(_) => {}
MyNoSqlTcpContract::Confirmation { confirmation_id } => {
self.sync_handler
.tcp_events_pusher_got_confirmation(confirmation_id)
.await
}
MyNoSqlTcpContract::Confirmation { confirmation_id } => self
.sync_handler
.tcp_events_pusher_got_confirmation(confirmation_id),
MyNoSqlTcpContract::UpdatePartitionsLastReadTime {
confirmation_id: _,
table_name: _,
Expand Down Expand Up @@ -126,13 +124,11 @@ impl SocketEventCallback<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer> for Tcp
}

self.sync_handler
.tcp_events_pusher_new_connection_established(connection)
.await;
.tcp_events_pusher_new_connection_established(connection);
}
ConnectionEvent::Disconnected(connection) => {
self.sync_handler
.tcp_events_pusher_connection_disconnected(connection)
.await;
.tcp_events_pusher_connection_disconnected(connection);
}
ConnectionEvent::Payload {
connection,
Expand Down
84 changes: 42 additions & 42 deletions my-no-sql-tcp-shared/src/sync_to_main/sync_to_main_node_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use rust_extensions::{events_loop::EventsLoop, ApplicationStates, Logger};
use tokio::sync::Mutex;

use super::{
sync_to_main_node_handler_inner::SyncToMainNodeHandlerInner, DataReaderTcpConnection,
Expand All @@ -9,55 +10,52 @@ use super::{

pub struct SyncToMainNodeHandler {
pub inner: Arc<SyncToMainNodeHandlerInner>,
events_loop: Mutex<EventsLoop<SyncToMainNodeEvent>>,
}

impl SyncToMainNodeHandler {
pub fn new() -> Self {
let inner = Arc::new(SyncToMainNodeHandlerInner::new());
pub fn new(logger: Arc<dyn Logger + Send + Sync + 'static>) -> Self {
let mut events_loop = EventsLoop::new("SyncToMainNodeQueues".to_string(), logger);

Self { inner }
}
let events_publisher = events_loop.get_publisher();

pub async fn start(
&self,
app_states: Arc<dyn ApplicationStates + Send + Sync + 'static>,
logger: Arc<dyn Logger + Send + Sync + 'static>,
) {
let mut events_loop = EventsLoop::new("SyncToMainNodeQueues".to_string(), logger);
let inner = Arc::new(SyncToMainNodeHandlerInner::new(events_publisher));

events_loop.register_event_loop(self.inner.clone());
events_loop.register_event_loop(inner.clone());

self.inner.set_event_loop(events_loop).await;
self.inner.start(app_states).await
Self {
inner,
events_loop: Mutex::new(events_loop),
}
}

pub async fn start(&self, app_states: Arc<dyn ApplicationStates + Send + Sync + 'static>) {
let mut events_loop = self.events_loop.lock().await;
events_loop.start(app_states);
}

pub async fn tcp_events_pusher_new_connection_established(
pub fn tcp_events_pusher_new_connection_established(
&self,
connection: Arc<DataReaderTcpConnection>,
) {
let queues = self.inner.queues.lock().await;

if let Some(events_loop) = queues.events_loop.as_ref() {
events_loop.send(SyncToMainNodeEvent::Connected(connection));
}
self.inner
.events_publisher
.send(SyncToMainNodeEvent::Connected(connection));
}

pub async fn tcp_events_pusher_connection_disconnected(
pub fn tcp_events_pusher_connection_disconnected(
&self,
connection: Arc<DataReaderTcpConnection>,
) {
let queues = self.inner.queues.lock().await;
if let Some(events_loop) = queues.events_loop.as_ref() {
events_loop.send(SyncToMainNodeEvent::Disconnected(connection));
}
self.inner
.events_publisher
.send(SyncToMainNodeEvent::Disconnected(connection));
}

pub async fn tcp_events_pusher_got_confirmation(&self, confirmation_id: i64) {
let queues = self.inner.queues.lock().await;

if let Some(events_loop) = queues.events_loop.as_ref() {
events_loop.send(SyncToMainNodeEvent::Delivered(confirmation_id));
}
pub fn tcp_events_pusher_got_confirmation(&self, confirmation_id: i64) {
self.inner
.events_publisher
.send(SyncToMainNodeEvent::Delivered(confirmation_id));
}

pub async fn update<'s, TRowKeys: Iterator<Item = &'s str>>(
Expand All @@ -78,9 +76,9 @@ impl SyncToMainNodeHandler {
.update_partitions_last_read_time_queue
.add_partition(table_name, partition_key);

if let Some(events_loop) = inner.events_loop.as_ref() {
events_loop.send(SyncToMainNodeEvent::PingToDeliver);
}
self.inner
.events_publisher
.send(SyncToMainNodeEvent::PingToDeliver);
}

if let Some(partition_expiration) = data.partition_expiration_moment {
Expand All @@ -90,19 +88,20 @@ impl SyncToMainNodeHandler {
partition_expiration,
);

if let Some(events_loop) = inner.events_loop.as_ref() {
events_loop.send(SyncToMainNodeEvent::PingToDeliver);
}
self.inner
.events_publisher
.send(SyncToMainNodeEvent::PingToDeliver);
}

if data.row_last_read_moment {
if data.row_last_read_moment {
inner
.update_rows_last_read_time_queue
.add(table_name, partition_key, row_keys());
if let Some(events_loop) = inner.events_loop.as_ref() {
events_loop.send(SyncToMainNodeEvent::PingToDeliver);
}

self.inner
.events_publisher
.send(SyncToMainNodeEvent::PingToDeliver);
}
}

Expand All @@ -113,9 +112,10 @@ impl SyncToMainNodeHandler {
row_keys(),
row_expiration,
);
if let Some(events_loop) = inner.events_loop.as_ref() {
events_loop.send(SyncToMainNodeEvent::PingToDeliver);
}

self.inner
.events_publisher
.send(SyncToMainNodeEvent::PingToDeliver);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
use std::sync::Arc;

use rust_extensions::{
events_loop::{EventsLoop, EventsLoopTick},
ApplicationStates,
};
use rust_extensions::events_loop::{EventsLoopPublisher, EventsLoopTick};
use tokio::sync::Mutex;

use crate::sync_to_main::DeliverToMainNodeEvent;
Expand All @@ -12,24 +7,14 @@ use super::{SyncToMainNodeEvent, SyncToMainNodeQueue};

pub struct SyncToMainNodeHandlerInner {
pub queues: Mutex<SyncToMainNodeQueue>,
pub events_publisher: EventsLoopPublisher<SyncToMainNodeEvent>,
}

impl SyncToMainNodeHandlerInner {
pub fn new() -> Self {
pub fn new(events_publisher: EventsLoopPublisher<SyncToMainNodeEvent>) -> Self {
Self {
queues: Mutex::new(SyncToMainNodeQueue::new()),
}
}

pub async fn set_event_loop(&self, events_loop: EventsLoop<SyncToMainNodeEvent>) {
let mut queues = self.queues.lock().await;
queues.events_loop = Some(events_loop);
}

pub async fn start(&self, app_states: Arc<dyn ApplicationStates + Send + Sync + 'static>) {
let mut queues = self.queues.lock().await;
if let Some(event_loop) = queues.events_loop.as_mut() {
event_loop.start(app_states);
events_publisher,
}
}
}
Expand Down
12 changes: 4 additions & 8 deletions my-no-sql-tcp-shared/src/sync_to_main/sync_to_main_node_queue.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::sync::Arc;

use rust_extensions::events_loop::EventsLoop;

use super::{
DataReaderTcpConnection, SyncToMainNodeEvent, UpdatePartitionExpirationEvent,
UpdatePartitionsExpirationTimeQueue, UpdatePartitionsLastReadTimeEvent,
UpdatePartitionsLastReadTimeQueue, UpdateRowsExpirationTimeEvent,
UpdateRowsExpirationTimeQueue, UpdateRowsLastReadTimeEvent, UpdateRowsLastReadTimeQueue,
DataReaderTcpConnection, UpdatePartitionExpirationEvent, UpdatePartitionsExpirationTimeQueue,
UpdatePartitionsLastReadTimeEvent, UpdatePartitionsLastReadTimeQueue,
UpdateRowsExpirationTimeEvent, UpdateRowsExpirationTimeQueue, UpdateRowsLastReadTimeEvent,
UpdateRowsLastReadTimeQueue,
};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -61,7 +59,6 @@ pub struct SyncToMainNodeQueue {
pub update_rows_last_read_time_queue: UpdateRowsLastReadTimeQueue,
pub on_delivery: Option<DeliverToMainNodeEvent>,
pub connection: Option<Arc<DataReaderTcpConnection>>,
pub events_loop: Option<EventsLoop<SyncToMainNodeEvent>>,
}

impl SyncToMainNodeQueue {
Expand All @@ -74,7 +71,6 @@ impl SyncToMainNodeQueue {
update_partitions_last_read_time_queue: UpdatePartitionsLastReadTimeQueue::new(),
on_delivery: None,
connection: None,
events_loop: None,
}
}

Expand Down

0 comments on commit ed9be80

Please sign in to comment.