Skip to content

Commit

Permalink
Implement streamlocal-forward for remote => local UDS forwarding (#312)
Browse files Browse the repository at this point in the history
I left a `// NEED HELP` comment on places where I didn't fully figure
out what to do, so I'd really appreciate it if some maintainers helped
me out in those places.

---------

Co-authored-by: Eugene <[email protected]>
  • Loading branch information
kanpov and Eugeny authored Aug 20, 2024
1 parent a78d798 commit 67a6ba8
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This is a fork of [Thrussh](https://nest.pijul.com/pijul/thrussh) by Pierre-Éti
* `direct-tcpip` (local port forwarding)
* `forward-tcpip` (remote port forwarding) ✨
* `direct-streamlocal` (local UNIX socket forwarding, client only) ✨
* `forward-streamlocal` (remote UNIX socket forwarding) ✨
* Ciphers:
* `[email protected]`
* `[email protected]`
Expand Down
23 changes: 23 additions & 0 deletions russh/src/client/encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,17 @@ impl Session {
)
.await?
}
ChannelType::ForwardedStreamLocal(d) => {
confirm();
let channel = self.accept_server_initiated_channel(id, &msg);
client
.server_channel_open_forwarded_streamlocal(
channel,
&d.socket_path,
self,
)
.await?;
}
ChannelType::AgentForward => {
confirm();
client.server_channel_open_agent_forward(id, self).await?
Expand Down Expand Up @@ -848,6 +859,12 @@ impl Session {
Some(GlobalRequestResponse::CancelTcpIpForward(return_channel)) => {
let _ = return_channel.send(true);
}
Some(GlobalRequestResponse::StreamLocalForward(return_channel)) => {
let _ = return_channel.send(true);
}
Some(GlobalRequestResponse::CancelStreamLocalForward(return_channel)) => {
let _ = return_channel.send(true);
}
None => {
error!("Received global request failure for unknown request!")
}
Expand All @@ -866,6 +883,12 @@ impl Session {
Some(GlobalRequestResponse::CancelTcpIpForward(return_channel)) => {
let _ = return_channel.send(false);
}
Some(GlobalRequestResponse::StreamLocalForward(return_channel)) => {
let _ = return_channel.send(false);
}
Some(GlobalRequestResponse::CancelStreamLocalForward(return_channel)) => {
let _ = return_channel.send(false);
}
None => {
error!("Received global request failure for unknown request!")
}
Expand Down
78 changes: 78 additions & 0 deletions russh/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ pub enum Msg {
address: String,
port: u32,
},
StreamLocalForward {
/// Provide a channel for the reply result to request a reply from the server
reply_channel: Option<oneshot::Sender<bool>>,
socket_path: String,
},
CancelStreamLocalForward {
/// Provide a channel for the reply result to request a reply from the server
reply_channel: Option<oneshot::Sender<bool>>,
socket_path: String,
},
Close {
id: ChannelId,
},
Expand Down Expand Up @@ -571,6 +581,7 @@ impl<H: Handler> Handle<H> {
}
}

// Requests the server to close a TCP/IP forward channel
pub async fn cancel_tcpip_forward<A: Into<String>>(
&self,
address: A,
Expand All @@ -596,6 +607,54 @@ impl<H: Handler> Handle<H> {
}
}

// Requests the server to open a UDS forward channel
pub async fn streamlocal_forward<A: Into<String>>(
&mut self,
socket_path: A,
) -> Result<(), crate::Error> {
let (reply_send, reply_recv) = oneshot::channel();
self.sender
.send(Msg::StreamLocalForward {
reply_channel: Some(reply_send),
socket_path: socket_path.into(),
})
.await
.map_err(|_| crate::Error::SendError)?;

match reply_recv.await {
Ok(true) => Ok(()),
Ok(false) => Err(crate::Error::RequestDenied),
Err(e) => {
error!("Unable to receive StreamLocalForward result: {e:?}");
Err(crate::Error::Disconnect)
}
}
}

// Requests the server to close a UDS forward channel
pub async fn cancel_streamlocal_forward<A: Into<String>>(
&self,
socket_path: A,
) -> Result<(), crate::Error> {
let (reply_send, reply_recv) = oneshot::channel();
self.sender
.send(Msg::CancelStreamLocalForward {
reply_channel: Some(reply_send),
socket_path: socket_path.into(),
})
.await
.map_err(|_| crate::Error::SendError)?;

match reply_recv.await {
Ok(true) => Ok(()),
Ok(false) => Err(crate::Error::RequestDenied),
Err(e) => {
error!("Unable to receive CancelStreamLocalForward result: {e:?}");
Err(crate::Error::Disconnect)
}
}
}

/// Sends a disconnect message.
pub async fn disconnect(
&self,
Expand Down Expand Up @@ -1050,6 +1109,14 @@ impl Session {
address,
port,
} => self.cancel_tcpip_forward(reply_channel, &address, port),
Msg::StreamLocalForward {
reply_channel,
socket_path,
} => self.streamlocal_forward(reply_channel, &socket_path),
Msg::CancelStreamLocalForward {
reply_channel,
socket_path,
} => self.cancel_streamlocal_forward(reply_channel, &socket_path),
Msg::Disconnect {
reason,
description,
Expand Down Expand Up @@ -1551,6 +1618,17 @@ pub trait Handler: Sized + Send {
Ok(())
}

// Called when the server opens a channel for a new remote UDS forwarding connection
#[allow(unused_variables)]
async fn server_channel_open_forwarded_streamlocal(
&mut self,
channel: Channel<Msg>,
socket_path: &str,
session: &mut Session,
) -> Result<(), Self::Error> {
Ok(())
}

/// Called when the server opens an agent forwarding channel
#[allow(unused_variables)]
async fn server_channel_open_agent_forward(
Expand Down
54 changes: 53 additions & 1 deletion russh/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl Session {

/// Requests cancellation of TCP/IP forwarding from the server
///
/// If `want_reply` is `true`, returns a oneshot receiveing the server's reply:
/// If `reply_channel` is not None, sets want_reply and returns the server's response via the channel,
/// `true` for a success message, or `false` for failure
pub fn cancel_tcpip_forward(
&mut self,
Expand All @@ -318,6 +318,58 @@ impl Session {
}
}

/// Requests a UDS forwarding from the server, `socket path` being the server side socket path.
///
/// If `reply_channel` is not None, sets want_reply and returns the server's response via the channel,
/// `true` for a success message, or `false` for failure
pub fn streamlocal_forward(
&mut self,
reply_channel: Option<oneshot::Sender<bool>>,
socket_path: &str,
) {
if let Some(ref mut enc) = self.common.encrypted {
let want_reply = reply_channel.is_some();
if let Some(reply_channel) = reply_channel {
self.open_global_requests.push_back(
crate::session::GlobalRequestResponse::StreamLocalForward(reply_channel),
);
}
push_packet!(enc.write, {
enc.write.push(msg::GLOBAL_REQUEST);
enc.write
.extend_ssh_string(b"[email protected]");
enc.write.push(want_reply as u8);
enc.write.extend_ssh_string(socket_path.as_bytes());
});
}
}

/// Requests cancellation of UDS forwarding from the server
///
/// If `reply_channel` is not None, sets want_reply and returns the server's response via the channel,
/// `true` for a success message and `false` for failure.
pub fn cancel_streamlocal_forward(
&mut self,
reply_channel: Option<oneshot::Sender<bool>>,
socket_path: &str,
) {
if let Some(ref mut enc) = self.common.encrypted {
let want_reply = reply_channel.is_some();
if let Some(reply_channel) = reply_channel {
self.open_global_requests.push_back(
crate::session::GlobalRequestResponse::CancelStreamLocalForward(reply_channel),
);
}
push_packet!(enc.write, {
enc.write.push(msg::GLOBAL_REQUEST);
enc.write
.extend_ssh_string(b"[email protected]");
enc.write.push(want_reply as u8);
enc.write.extend_ssh_string(socket_path.as_bytes());
});
}
}

pub fn send_keepalive(&mut self, want_reply: bool) {
self.open_global_requests
.push_back(crate::session::GlobalRequestResponse::Keepalive);
Expand Down
19 changes: 19 additions & 0 deletions russh/src/parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ impl OpenChannelMessage {
}
b"direct-tcpip" => ChannelType::DirectTcpip(TcpChannelInfo::new(r)?),
b"forwarded-tcpip" => ChannelType::ForwardedTcpIp(TcpChannelInfo::new(r)?),
b"[email protected]" => {
ChannelType::ForwardedStreamLocal(StreamLocalChannelInfo::new(r)?)
}
b"[email protected]" => ChannelType::AgentForward,
t => ChannelType::Unknown { typ: t.to_vec() },
};
Expand Down Expand Up @@ -91,6 +94,7 @@ pub enum ChannelType {
},
DirectTcpip(TcpChannelInfo),
ForwardedTcpIp(TcpChannelInfo),
ForwardedStreamLocal(StreamLocalChannelInfo),
AgentForward,
Unknown {
typ: Vec<u8>,
Expand All @@ -105,6 +109,21 @@ pub struct TcpChannelInfo {
pub originator_port: u32,
}

#[derive(Debug)]
pub struct StreamLocalChannelInfo {
pub socket_path: String,
}

impl StreamLocalChannelInfo {
fn new(r: &mut Position) -> Result<Self, crate::Error> {
let socket_path = std::str::from_utf8(r.read_string().map_err(crate::Error::from)?)
.map_err(crate::Error::from)?
.to_owned();

Ok(Self { socket_path })
}
}

impl TcpChannelInfo {
fn new(r: &mut Position) -> Result<Self, crate::Error> {
let host_to_connect = std::str::from_utf8(r.read_string().map_err(crate::Error::from)?)
Expand Down
48 changes: 46 additions & 2 deletions russh/src/server/encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,40 @@ impl Session {
}
Ok(())
}
b"[email protected]" => {
let server_socket_path =
std::str::from_utf8(r.read_string().map_err(crate::Error::from)?)
.map_err(crate::Error::from)?;
debug!("handler.streamlocal_forward {:?}", server_socket_path);
let result = handler
.streamlocal_forward(server_socket_path, self)
.await?;
if let Some(ref mut enc) = self.common.encrypted {
if result {
push_packet!(enc.write, enc.write.push(msg::REQUEST_SUCCESS))
} else {
push_packet!(enc.write, enc.write.push(msg::REQUEST_FAILURE))
}
}
Ok(())
}
b"[email protected]" => {
let socket_path =
std::str::from_utf8(r.read_string().map_err(crate::Error::from)?)
.map_err(crate::Error::from)?;
debug!("handler.cancel_streamlocal_forward {:?}", socket_path);
let result = handler
.cancel_streamlocal_forward(socket_path, self)
.await?;
if let Some(ref mut enc) = self.common.encrypted {
if result {
push_packet!(enc.write, enc.write.push(msg::REQUEST_SUCCESS))
} else {
push_packet!(enc.write, enc.write.push(msg::REQUEST_FAILURE))
}
}
Ok(())
}
_ => {
if let Some(ref mut enc) = self.common.encrypted {
push_packet!(enc.write, {
Expand Down Expand Up @@ -1087,7 +1121,7 @@ impl Session {
Some(GlobalRequestResponse::CancelTcpIpForward(return_channel)) => {
let _ = return_channel.send(true);
}
None => {
_ => {
error!("Received global request failure for unknown request!")
}
}
Expand All @@ -1105,7 +1139,7 @@ impl Session {
Some(GlobalRequestResponse::CancelTcpIpForward(return_channel)) => {
let _ = return_channel.send(false);
}
None => {
_ => {
error!("Received global request failure for unknown request!")
}
}
Expand Down Expand Up @@ -1211,6 +1245,16 @@ impl Session {
}
result
}
ChannelType::ForwardedStreamLocal(_) => {
if let Some(ref mut enc) = self.common.encrypted {
msg.fail(
&mut enc.write,
msg::SSH_OPEN_ADMINISTRATIVELY_PROHIBITED,
b"Unsupported channel type",
);
}
Ok(false)
}
ChannelType::AgentForward => {
if let Some(ref mut enc) = self.common.encrypted {
msg.fail(
Expand Down
18 changes: 18 additions & 0 deletions russh/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,24 @@ pub trait Handler: Sized {
) -> Result<bool, Self::Error> {
Ok(false)
}

#[allow(unused_variables)]
async fn streamlocal_forward(
&mut self,
socket_path: &str,
session: &mut Session,
) -> Result<bool, Self::Error> {
Ok(false)
}

#[allow(unused_variables)]
async fn cancel_streamlocal_forward(
&mut self,
socket_path: &str,
session: &mut Session,
) -> Result<bool, Self::Error> {
Ok(false)
}
}

#[async_trait]
Expand Down
Loading

0 comments on commit 67a6ba8

Please sign in to comment.