Skip to content

Commit

Permalink
Listen to exit signal
Browse files Browse the repository at this point in the history
Add channel to unbind listener as well

Reorganize signal handler

Another typo

Fix typos in main doc
  • Loading branch information
johnnyasantoss committed Jul 8, 2024
1 parent 5a2b01b commit a4b4640
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 56 deletions.
98 changes: 54 additions & 44 deletions roles/mining-proxy/src/lib/downstream_mining.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#![allow(dead_code)]

use super::upstream_mining::{StdFrame as UpstreamFrame, UpstreamMiningNode};
use async_channel::{Receiver, SendError, Sender};
use async_channel::{Receiver, Sender, SendError};
use codec_sv2::{StandardEitherFrame, StandardSv2Frame};
use core::convert::TryInto;
use network_helpers_sv2::plain_connection_tokio::PlainConnection;
use roles_logic_sv2::{
common_messages_sv2::{SetupConnection, SetupConnectionSuccess},
common_properties::{CommonDownstreamData, IsDownstream, IsMiningDownstream},
Expand All @@ -15,9 +17,11 @@ use roles_logic_sv2::{
routing_logic::MiningProxyRoutingLogic,
utils::Mutex,
};
use tracing::info;
use std::sync::Arc;
use tokio::{net::TcpListener, sync::oneshot::Receiver as TokioReceiver, task};
use tracing::{error, info};

use codec_sv2::{StandardEitherFrame, StandardSv2Frame};
use super::upstream_mining::{ProxyRemoteSelector, StdFrame as UpstreamFrame, UpstreamMiningNode};

pub type Message = MiningDeviceMessages<'static>;
pub type StdFrame = StandardSv2Frame<Message>;
Expand Down Expand Up @@ -153,10 +157,6 @@ impl DownstreamMiningNodeStatus {
}
}

use core::convert::TryInto;
use std::sync::Arc;
use tokio::task;

impl PartialEq for DownstreamMiningNode {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
Expand Down Expand Up @@ -326,8 +326,6 @@ impl DownstreamMiningNode {
}
}

use super::upstream_mining::ProxyRemoteSelector;

/// It impl UpstreamMining cause the proxy act as an upstream node for the DownstreamMiningNode
impl
ParseDownstreamMiningMessages<
Expand Down Expand Up @@ -483,44 +481,56 @@ impl
}
}

use network_helpers_sv2::plain_connection_tokio::PlainConnection;
use std::net::SocketAddr;
use tokio::net::TcpListener;

pub async fn listen_for_downstream_mining(address: SocketAddr) {
info!("Listening for downstream mining connections on {}", address);
let listner = TcpListener::bind(address).await.unwrap();
pub async fn listen_for_downstream_mining(
listener: TcpListener,
mut shutdown_rx: TokioReceiver<()>,
) {
let mut ids = roles_logic_sv2::utils::Id::new();

while let Ok((stream, _)) = listner.accept().await {
let (receiver, sender): (Receiver<EitherFrame>, Sender<EitherFrame>) =
PlainConnection::new(stream).await;
let node = DownstreamMiningNode::new(receiver, sender, ids.next());

task::spawn(async move {
let mut incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let routing_logic = super::get_common_routing_logic();
let node = Arc::new(Mutex::new(node));

// Call handle_setup_connection or fail
match DownstreamMiningNode::handle_message_common(
node.clone(),
message_type,
payload,
routing_logic,
) {
Ok(SendToCommon::RelayNewMessageToRemote(_, message)) => {
let message = match message {
roles_logic_sv2::parsers::CommonMessages::SetupConnectionSuccess(m) => m,
loop {
tokio::select! {
accept_result = listener.accept() => {
match accept_result {
Ok((stream, _)) => {
let (receiver, sender): (Receiver<EitherFrame>, Sender<EitherFrame>) =
PlainConnection::new(stream).await;
let node = DownstreamMiningNode::new(receiver, sender, ids.next());

task::spawn(async move {
let mut incoming: StdFrame =
node.receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let routing_logic = super::get_common_routing_logic();
let node = Arc::new(Mutex::new(node));

// Call handle_setup_connection or fail
match DownstreamMiningNode::handle_message_common(
node.clone(),
message_type,
payload,
routing_logic,
) {
Ok(SendToCommon::RelayNewMessageToRemote(_, message)) => {
let message = match message {
roles_logic_sv2::parsers::CommonMessages::SetupConnectionSuccess(m) => {
m
}
_ => panic!(),
};
DownstreamMiningNode::start(node, message).await
}
_ => panic!(),
};
DownstreamMiningNode::start(node, message).await
}
});
}
Err(e) => error!("Failed to accept downstream connection. {:?}", e)
}
_ => panic!(),
}
});
_ = &mut shutdown_rx => {
info!("Closing listener");
return;
}
}
}
}

Expand Down
49 changes: 37 additions & 12 deletions roles/mining-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Downstream means another proxy or a mining device
//!
//! UpstreamMining is the trait that a proxy must implement in order to
//! understant Downstream mining messages.
//! understand Downstream mining messages.
//!
//! DownstreamMining is the trait that a proxy must implement in order to
//! understand Upstream mining messages
Expand All @@ -18,12 +18,16 @@
//! A Downstream that signal the incapacity to handle group channels can open only one channel.
//!
#![allow(special_module_name)]
mod lib;
use std::{net::SocketAddr, sync::Arc};

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tracing::{error, info};

use lib::Config;
use roles_logic_sv2::utils::{GroupId, Mutex};
use std::{net::SocketAddr, sync::Arc};
use tracing::{error, info};

mod lib;

mod args {
use std::path::PathBuf;
Expand Down Expand Up @@ -89,12 +93,12 @@ mod args {
}

/// 1. the proxy scan all the upstreams and map them
/// 2. donwstream open a connetcion with proxy
/// 2. downstream open a connection with proxy
/// 3. downstream send SetupConnection
/// 4. a mining_channle::Upstream is created
/// 4. a mining_channels::Upstream is created
/// 5. upstream_mining::UpstreamMiningNodes is used to pair this downstream with the most suitable
/// upstream
/// 6. mining_channle::Upstream create a new downstream_mining::DownstreamMiningNode embedding
/// 6. mining_channels::Upstream create a new downstream_mining::DownstreamMiningNode embedding
/// itself in it
/// 7. normal operation between the paired downstream_mining::DownstreamMiningNode and
/// upstream_mining::UpstreamMiningNode begin
Expand Down Expand Up @@ -126,16 +130,37 @@ async fn main() {
lib::initialize_r_logic(&config.upstreams, group_id, config.clone()).await,
))
.expect("BUG: Failed to set ROUTING_LOGIC");
info!("PROXY INITIALIZING");

info!("Initializing upstream scanner");
lib::initialize_upstreams(config.min_supported_version, config.max_supported_version).await;
info!("PROXY INITIALIZED");
info!("Initializing downstream listener");

// Wait for downstream connection
let socket = SocketAddr::new(
config.listen_address.parse().unwrap(),
config.listen_mining_port,
);
let listener = TcpListener::bind(socket).await.unwrap();

info!("Listening for downstream mining connections on {}", socket);

let (shutdown_tx, shutdown_rx) = oneshot::channel();

let (_, res) = tokio::join!(
// Wait for downstream connection
lib::downstream_mining::listen_for_downstream_mining(listener, shutdown_rx),
// handle SIGTERM/QUIT / ctrl+c
tokio::spawn(async {
tokio::signal::ctrl_c()
.await
.expect("Failed to listen to signals");
let _ = shutdown_tx.send(());
info!("Interrupt received");
})
);

if let Err(e) = res {
panic!("Failed to wait for clean exit: {:?}", e);
}

info!("PROXY INITIALIZED");
crate::lib::downstream_mining::listen_for_downstream_mining(socket).await
info!("Shutdown done");
}

0 comments on commit a4b4640

Please sign in to comment.