Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature request: ability to track of bytes send and received #20

Closed
GlenDC opened this issue May 5, 2022 · 3 comments
Closed

feature request: ability to track of bytes send and received #20

GlenDC opened this issue May 5, 2022 · 3 comments

Comments

@GlenDC
Copy link

GlenDC commented May 5, 2022

In regards to #15. I managed to make that all work.
However, and please check the code linked there, I had to make it catch
ErroKind (NotConnected/ConnectionReset).

This essentially does mean that we do have access to the bytes send/received (written/read).
However even when no error that number does always seem to be (0,0) (used as a router, but even when running a regular server as per your example).

I tried to write copy directional myself (copying the code and catching the error):

use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

use fast_socks5::ready;

use std::future::Future;
use std::io::{self, ErrorKind};
use std::pin::Pin;
use std::task::{Context, Poll};

enum TransferState {
    Running(CopyBuffer),
    ShuttingDown(u64),
    Done(u64),
}

struct CopyBidirectional<'a, A: ?Sized, B: ?Sized> {
    a: &'a mut A,
    b: &'a mut B,
    a_to_b: TransferState,
    b_to_a: TransferState,
}

fn transfer_one_direction<A, B>(
    cx: &mut Context<'_>,
    state: &mut TransferState,
    r: &mut A,
    w: &mut B,
) -> Poll<io::Result<u64>>
where
    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
    let mut r = Pin::new(r);
    let mut w = Pin::new(w);

    loop {
        match state {
            TransferState::Running(buf) => {
                let count = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?;
                *state = TransferState::ShuttingDown(count);
            }
            TransferState::ShuttingDown(count) => {
                match ready!(w.as_mut().poll_shutdown(cx)) {
                    Ok(_) => (),
                    Err(err) => match err.kind() {
                        ErrorKind::NotConnected | ErrorKind::ConnectionReset => (),
                        _ => return Poll::Ready(Err(err)),
                    },
                };

                *state = TransferState::Done(*count);
            }
            TransferState::Done(count) => return Poll::Ready(Ok(*count)),
        }
    }
}

impl<'a, A, B> Future for CopyBidirectional<'a, A, B>
where
    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
    type Output = io::Result<(u64, u64)>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Unpack self into mut refs to each field to avoid borrow check issues.
        let CopyBidirectional {
            a,
            b,
            a_to_b,
            b_to_a,
        } = &mut *self;

        let a_to_b = transfer_one_direction(cx, a_to_b, &mut *a, &mut *b)?;
        let b_to_a = transfer_one_direction(cx, b_to_a, &mut *b, &mut *a)?;

        // It is not a problem if ready! returns early because transfer_one_direction for the
        // other direction will keep returning TransferState::Done(count) in future calls to poll
        let a_to_b = ready!(a_to_b);
        let b_to_a = ready!(b_to_a);

        Poll::Ready(Ok((a_to_b, b_to_a)))
    }
}

/// Copies data in both directions between `a` and `b`.
///
/// This function returns a future that will read from both streams,
/// writing any data read to the opposing stream.
/// This happens in both directions concurrently.
///
/// If an EOF is observed on one stream, [`shutdown()`] will be invoked on
/// the other, and reading from that stream will stop. Copying of data in
/// the other direction will continue.
///
/// The future will complete successfully once both directions of communication has been shut down.
/// A direction is shut down when the reader reports EOF,
/// at which point [`shutdown()`] is called on the corresponding writer. When finished,
/// it will return a tuple of the number of bytes copied from a to b
/// and the number of bytes copied from b to a, in that order.
///
/// [`shutdown()`]: crate::io::AsyncWriteExt::shutdown
///
/// # Errors
///
/// The future will immediately return an error if any IO operation on `a`
/// or `b` returns an error. Some data read from either stream may be lost (not
/// written to the other stream) in this case.
///
/// # Return value
///
/// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`.
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> Result<(u64, u64), std::io::Error>
where
    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
    CopyBidirectional {
        a,
        b,
        a_to_b: TransferState::Running(CopyBuffer::new()),
        b_to_a: TransferState::Running(CopyBuffer::new()),
    }
    .await
}

#[derive(Debug)]
struct CopyBuffer {
    read_done: bool,
    need_flush: bool,
    pos: usize,
    cap: usize,
    amt: u64,
    buf: Box<[u8]>,
}

impl CopyBuffer {
    pub(super) fn new() -> Self {
        Self {
            read_done: false,
            need_flush: false,
            pos: 0,
            cap: 0,
            amt: 0,
            buf: vec![0; 2048].into_boxed_slice(),
        }
    }

    pub(super) fn poll_copy<R, W>(
        &mut self,
        cx: &mut Context<'_>,
        mut reader: Pin<&mut R>,
        mut writer: Pin<&mut W>,
    ) -> Poll<io::Result<u64>>
    where
        R: AsyncRead + ?Sized,
        W: AsyncWrite + ?Sized,
    {
        loop {
            // If our buffer is empty, then we need to read some data to
            // continue.
            if self.pos == self.cap && !self.read_done {
                let me = &mut *self;
                let mut buf = ReadBuf::new(&mut me.buf);

                match reader.as_mut().poll_read(cx, &mut buf) {
                    Poll::Ready(Ok(_)) => (),
                    Poll::Ready(Err(err)) => match err.kind() {
                        ErrorKind::NotConnected | ErrorKind::ConnectionReset => (),
                        _ => return Poll::Ready(Err(err)),
                    },
                    Poll::Pending => {
                        // Try flushing when the reader has no progress to avoid deadlock
                        // when the reader depends on buffered writer.
                        if self.need_flush {
                            match ready!(writer.as_mut().poll_flush(cx)) {
                                Ok(_) => (),
                                Err(err) => match err.kind() {
                                    ErrorKind::NotConnected | ErrorKind::ConnectionReset => (),
                                    _ => return Poll::Ready(Err(err)),
                                },
                            };
                            self.need_flush = false;
                        }

                        return Poll::Pending;
                    }
                }

                let n = buf.filled().len();
                if n == 0 {
                    self.read_done = true;
                } else {
                    self.pos = 0;
                    self.cap = n;
                }
            }

            // If our buffer has some data, let's write it out!
            while self.pos < self.cap {
                let me = &mut *self;
                let i = match ready!(writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap])) {
                    Ok(i) => (i),
                    Err(err) => match err.kind() {
                        ErrorKind::NotConnected | ErrorKind::ConnectionReset => 1,
                        _ => return Poll::Ready(Err(err)),
                    },
                };
                if i == 0 {
                    return Poll::Ready(Err(io::Error::new(
                        io::ErrorKind::WriteZero,
                        "write zero byte into writer",
                    )));
                } else {
                    self.pos += i;
                    self.amt += i as u64;
                    self.need_flush = true;
                }
            }

            // If pos larger than cap, this loop will never stop.
            // In particular, user's wrong poll_write implementation returning
            // incorrect written length may lead to thread blocking.
            debug_assert!(
                self.pos <= self.cap,
                "writer returned length larger than input slice"
            );

            // If we've written all the data and we've seen EOF, flush out the
            // data and finish the transfer.
            if self.pos == self.cap && self.read_done {
                match ready!(writer.as_mut().poll_flush(cx)) {
                    Ok(_) => (),
                    Err(err) => match err.kind() {
                        ErrorKind::NotConnected | ErrorKind::ConnectionReset => (),
                        _ => return Poll::Ready(Err(err)),
                    },
                };
                return Poll::Ready(Ok(self.amt));
            }
        }
    }
}

However this still has the same issue, that is, the returned data read/written is (0,0).

It is not as simple as that:

fast_socks5::server] incoming connection from peer 127.0.0.1:64345 @ 127.0.0.1:1337
[2022-05-05T22:26:57Z DEBUG proxy_gateway] handle incoming socket
[2022-05-05T22:26:57Z DEBUG proxy_gateway] upgrade incoming socket as socks5 proxy
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Handshake headers: [version: 5, methods len: 3]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] methods supported sent by the client: [0, 1, 2]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Reply with method AuthenticationMethod::Password (2)
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Auth: [version: 1, user len: 9]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] username bytes: [112, 117, 112, 112, 101, 116, 101, 101, 114]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Auth: [pass len: 3]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] password bytes: [98, 97, 114]
[2022-05-05T22:26:57Z INFO  fast_socks5::server] User `puppeteer` logged successfully.
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Request: [version: 5, command: 1, rev: 0, address_type: 1]
[2022-05-05T22:26:57Z DEBUG fast_socks5::util::target_addr] Address type `IPv4`
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Request target is 185.199.108.153:443
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Domain won't be resolved because `dns_resolve`'s config has been turned off.
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Connected to remote destination
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Wrote success
[2022-05-05T22:26:58Z INFO  fast_socks5::server] transfer closed (615, 11010)
[2022-05-05T22:26:58Z DEBUG proxy_gateway] log original target address of incoming socket
[2022-05-05T22:26:58Z DEBUG proxy_gateway] resolve target dns for incoming socket
[2022-05-05T22:26:58Z INFO  fast_socks5::client] Connected @ 127.0.0.1:1338
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Send version and method len [5, 2]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] client auth methods supported: [0, 2]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Socks version (5), method chosen: 2.
[2022-05-05T22:26:58Z INFO  fast_socks5::client] Password will be used
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Auth: [version: 1, is_success: 0]
[2022-05-05T22:26:58Z INFO  fast_socks5::client] Requesting headers `Some(Ip(185.199.108.153:443))`...
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] TargetAddr::IpV4
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] addr ip [185, 199, 108, 153]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Bytes long version: [5, 1, 0, 1, 185, 199, 108, 153, 1, 187, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Bytes shorted version: [5, 1, 0, 1, 185, 199, 108, 153, 1, 187]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Padding: 10
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Reply received: [version: 5, reply: 0, rsv: 0, address_type: 1]
[2022-05-05T22:26:58Z DEBUG fast_socks5::util::target_addr] Address type `IPv4`
[2022-05-05T22:26:58Z INFO  fast_socks5::client] Remote server bind on 127.0.0.1:0.
[2022-05-05T22:26:58Z INFO  proxy_gateway] socket transfer closed (0, 0)
"2022-05-05 22:26:57.929731 UTC","185.199.108.153:443","127.0.0.1:1338","","",0

From the logs we can see that the size is returned for the initial part of the socks5 stream (header),
but once we do the actual transfer we seem to log nothing.

What am I doing wrong here? both in my fork and vanilla code.
I really need for my purposes to be able to log the payload size.

@dizda
Copy link
Owner

dizda commented May 7, 2022

Hey there,

From running the server example by using RUST_LOG=debug cargo run --example server -- --listen-addr 127.0.0.1:1337 password -u admin -p password

I do get the right amount of bytes transferred as mentioned from the logs:

[2022-05-07T04:03:39Z INFO fast_socks5::server] transfer closed (79, 552)

An improvement we could bring, would be to have to server::transfer() function to return the amount of bytes transferred:

async fn transfer<I, O>(mut inbound: I, mut outbound: O) -> Result<()>

@GlenDC
Copy link
Author

GlenDC commented May 7, 2022

Yes that would be one step in the right direction. Even though it will need to bubble up.

A related problem though is that tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await doesn't return those values in case of an error, which on MacOS is possible due to the client closing (see #23).

@GlenDC
Copy link
Author

GlenDC commented May 13, 2022

Going to close this one as the other work already contributed by me in combination with the root cause of #23 are the reasons why we do not always receive it. Trying to track it from within this crate wouldn't resolve it or even work due to the issues causing it.

Closing this as such.

@GlenDC GlenDC closed this as completed May 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants