Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(comms): correctly initialize hidden service #6124

Merged
merged 2 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions comms/core/src/tor/control_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ impl TorControlPortClient {
pub async fn get_info(&mut self, key_name: &'static str) -> Result<Vec<Cow<'_, str>>, TorClientError> {
let command = commands::get_info(key_name);
let response = self.request_response(command).await?;
if response.is_empty() {
return Err(TorClientError::ServerNoResponse);
}
Ok(response)
}

Expand Down Expand Up @@ -202,7 +199,6 @@ impl TorControlPortClient {
let cmd_str = command.to_command_string().map_err(Into::into)?;
self.send_line(cmd_str).await?;
let responses = self.recv_next_responses().await?;
trace!(target: LOG_TARGET, "Response from tor: {:?}", responses);
if responses.is_empty() {
return Err(TorClientError::ServerNoResponse);
}
Expand Down
8 changes: 4 additions & 4 deletions comms/core/src/tor/control_client/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ where
match either {
// Received a command to send to the control server
Either::Left(Some(line)) => {
trace!(target: LOG_TARGET, "Writing command of length '{}'", line.len());
trace!(target: LOG_TARGET, "Tor send: {}", line);
if let Err(err) = sink.send(line).await {
error!(
target: LOG_TARGET,
Expand All @@ -64,7 +64,7 @@ where
},
// Command stream ended
Either::Left(None) => {
debug!(
warn!(
target: LOG_TARGET,
"Tor control server command receiver closed. Monitor is exiting."
);
Expand All @@ -73,7 +73,7 @@ where

// Received a line from the control server
Either::Right(Some(Ok(line))) => {
trace!(target: LOG_TARGET, "Read line of length '{}'", line.len());
trace!(target: LOG_TARGET, "Tor recv: {}", line);
match parsers::response_line(&line) {
Ok(mut line) => {
if line.is_multiline {
Expand Down Expand Up @@ -116,7 +116,7 @@ where
// The control server disconnected
Either::Right(None) => {
cmd_rx.close();
debug!(
warn!(
target: LOG_TARGET,
"Connection to tor control port closed. Monitor is exiting."
);
Expand Down
11 changes: 0 additions & 11 deletions comms/core/src/tor/hidden_service/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::{fs, io, net::SocketAddr, sync::Arc, time::Duration};

use futures::{future, future::Either, pin_mut, StreamExt};
use log::*;
use multiaddr::{multiaddr, Protocol};
use tari_shutdown::OptionalShutdownSignal;
use tari_utilities::hex::Hex;
use thiserror::Error;
Expand Down Expand Up @@ -127,16 +126,6 @@ impl HiddenServiceController {
pub async fn initialize_transport(&mut self) -> Result<SocksTransport, HiddenServiceControllerError> {
self.connect_and_auth().await?;

let socks_addr = self.get_socks_address().await?;
let mut proxied_addr = self.proxied_address();
if proxied_addr.ends_with(&multiaddr!(Tcp(0u16))) {
if let Some(Protocol::Tcp(port)) = socks_addr.iter().last() {
proxied_addr.pop();
proxied_addr.push(Protocol::Tcp(port));
}
self.set_proxied_addr(&proxied_addr);
}
self.create_hidden_service_from_identity().await?;
let socks_addr = self.get_socks_address().await?;
Ok(SocksTransport::new(SocksConfig {
proxy_address: socks_addr,
Expand Down
96 changes: 63 additions & 33 deletions comms/core/src/transports/hidden_service_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
use std::{io, io::ErrorKind, sync::Arc};

use log::*;
use multiaddr::Multiaddr;
use multiaddr::{multiaddr, Multiaddr, Protocol};
use tokio::sync::RwLock;

use crate::{
tor::{HiddenServiceController, TorIdentity},
transports::{SocksTransport, Transport},
transports::{tcp::TcpInbound, SocksTransport, Transport},
};

const LOG_TARGET: &str = "comms::transports::hidden_service_transport";
Expand All @@ -43,7 +43,7 @@ pub enum HiddenServiceTransportError {

struct HiddenServiceTransportInner {
socks_transport: Option<SocksTransport>,
hidden_service_ctl: HiddenServiceController,
hidden_service_ctl: Option<HiddenServiceController>,
}

#[derive(Clone)]
Expand All @@ -57,37 +57,55 @@ impl<F: Fn(TorIdentity)> HiddenServiceTransport<F> {
Self {
inner: Arc::new(RwLock::new(HiddenServiceTransportInner {
socks_transport: None,
hidden_service_ctl,
hidden_service_ctl: Some(hidden_service_ctl),
})),
after_init,
}
}

async fn ensure_initialized(&self) -> Result<(), io::Error> {
let inner = self.inner.read().await;
if inner.socks_transport.is_none() {
drop(inner);
let mut mut_inner = self.inner.write().await;
if mut_inner.socks_transport.is_none() {
let transport = mut_inner.hidden_service_ctl.initialize_transport().await.map_err(|e| {
error!(
target: LOG_TARGET,
"Error initializing hidden transport service stack{}",
e
);
io::Error::new(ErrorKind::Other, e.to_string())
})?;
(self.after_init)(
mut_inner
.hidden_service_ctl
.identity
.clone()
.ok_or(io::Error::new(ErrorKind::Other, "Missing tor identity".to_string()))?,
);
mut_inner.socks_transport = Some(transport);
async fn is_initialized(&self) -> bool {
self.inner.read().await.socks_transport.is_some()
}

async fn initialize(&self, listen_addr: &Multiaddr) -> Result<(TcpInbound, Multiaddr), io::Error> {
let mut inner_mut = self.inner.write().await;
let mut hs_ctl = inner_mut.hidden_service_ctl.take().ok_or(io::Error::new(
ErrorKind::Other,
"BUG: Hidden service controller not set in transport".to_string(),
))?;

let transport = hs_ctl.initialize_transport().await.map_err(|e| {
error!(
target: LOG_TARGET,
"Error initializing hidden transport service stack{}",
e
);
io::Error::new(ErrorKind::Other, e.to_string())
})?;
let (inbound, listen_addr) = transport.listen(listen_addr).await?;
inner_mut.socks_transport = Some(transport);

// Set the proxied address to the port we just listened on
let mut proxied_addr = hs_ctl.proxied_address();
if proxied_addr.ends_with(&multiaddr!(Tcp(0u16))) {
if let Some(Protocol::Tcp(port)) = listen_addr.iter().last() {
proxied_addr.pop();
proxied_addr.push(Protocol::Tcp(port));
}
hs_ctl.set_proxied_addr(&proxied_addr);
}
Ok(())

let hidden_service = hs_ctl.create_hidden_service().await.map_err(|err| {
error!(
target: LOG_TARGET,
"Error creating hidden service: {}",
err
);
io::Error::new(ErrorKind::Other, err.to_string())
})?;

(self.after_init)(hidden_service.tor_identity().clone());
Ok((inbound, listen_addr))
}
}
#[crate::async_trait]
Expand All @@ -97,15 +115,27 @@ impl<F: Fn(TorIdentity) + Send + Sync> Transport for HiddenServiceTransport<F> {
type Output = <SocksTransport as Transport>::Output;

async fn listen(&self, addr: &Multiaddr) -> Result<(Self::Listener, Multiaddr), Self::Error> {
self.ensure_initialized().await?;
let inner = self.inner.read().await;

Ok(inner.socks_transport.as_ref().unwrap().listen(addr).await?)
if self.is_initialized().await {
// For now, we only can listen on a single Tor hidden service. This behaviour is not technically correct as
// per the Transport trait, but we only ever call listen once in practice. The fix for this is to
// improve the tor client implementation to allow for multiple hidden services.
return Err(io::Error::new(
ErrorKind::Other,
"BUG: Hidden service transport already initialized".to_string(),
));
}
let (listener, addr) = self.initialize(addr).await?;
Ok((listener, addr))
}

async fn dial(&self, addr: &Multiaddr) -> Result<Self::Output, Self::Error> {
self.ensure_initialized().await?;
let inner = self.inner.read().await;
Ok(inner.socks_transport.as_ref().unwrap().dial(addr).await?)
let transport = inner.socks_transport.as_ref().ok_or_else(|| {
io::Error::new(
ErrorKind::Other,
"BUG: Hidden service transport not initialized before dialling".to_string(),
)
})?;
transport.dial(addr).await
}
}
4 changes: 2 additions & 2 deletions infrastructure/libtor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "BSD-3-Clause"
[dependencies]
tari_common = { path = "../../common" }
tari_p2p = { path = "../../base_layer/p2p" }
tari_shutdown = { path = "../shutdown"}
tari_shutdown = { path = "../shutdown" }

derivative = "2.2.0"
log = "0.4.8"
Expand All @@ -16,7 +16,7 @@ tempfile = "3.1.0"
tor-hash-passwd = "1.0.1"

[target.'cfg(unix)'.dependencies]
libtor = { version="46.9.0"}
libtor = { version = "46.9.0" }
openssl = { version = "0.10.61", features = ["vendored"] }

[package.metadata.cargo-machete]
Expand Down
3 changes: 3 additions & 0 deletions infrastructure/libtor/src/tor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ impl Tor {
tor.flag(TorFlag::DataDirectory(data_dir.clone()))
.flag(TorFlag::SocksPort(socks_port))
.flag(TorFlag::ControlPort(control_port))
// Disable signal handlers so that ctrl+c can be handled by our application
// https://github.com/torproject/torspec/blob/8961bb4d83fccb2b987f9899ca83aa430f84ab0c/control-spec.txt#L3946
.flag(TorFlag::Custom("__DisableSignalHandlers 1".to_string()))
.flag(TorFlag::Hush())
.flag(TorFlag::LogTo(log_level, LogDestination::File(log_destination)));

Expand Down
Loading