Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio::io::copy_bidirectional can return with Error on MacOS #4674

Open
GlenDC opened this issue May 10, 2022 · 11 comments
Open

tokio::io::copy_bidirectional can return with Error on MacOS #4674

GlenDC opened this issue May 10, 2022 · 11 comments
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-net Module: tokio/net

Comments

@GlenDC
Copy link

GlenDC commented May 10, 2022

Version

➜ cargo tree | grep tokio
│   └── tokio v1.18.2
│       └── tokio-macros v1.7.0 (proc-macro)
│   ├── tokio v1.18.2 (*)
│   └── tokio-stream v0.1.8
│       └── tokio v1.18.2 (*)
│   ├── tokio v1.18.2 (*)
│   ├── tokio-native-tls v0.3.0
│   │   └── tokio v1.18.2 (*)
│   ├── tokio-util v0.6.9
│   │   └── tokio v1.18.2 (*)
├── tokio v1.18.2 (*)
├── tokio-stream v0.1.8 (*)
└── tokio-test v0.4.2
    ├── tokio v1.18.2 (*)
    └── tokio-stream v0.1.8 (*)

Platform

Darwin Kernel Version 20.6.0: Wed Jan 12 22:22:42 PST 2022; root:xnu-7195.141.19~2/RELEASE_X86_64 x86_64

Description

On MacOS it can be observed that when copying bidirectionally between two TCP streams that
the function usually returns in one of two errors: ErrorKind::NotConnected or ErrorKind::ConnectionReset.

If I'm lucky it does return with an Ok answer. In a way it is innocent enough as one can explicitly match on these error kinds and map them into Ok results. Only problem for me is that I really want to have access to the bytes read/written, which only seems to be exposed for a regular Ok result?

I tried this code:

match tokio::io::copy_bidirectional(&mut a, &mut b).await {
    Ok(res) => {
        info!("socket transfer closed ({}, {})", res.0, res.1);
    }
    Err(err) => match err.kind() {
        ErrorKind::NotConnected => {
            error!("socket transfer closed by not connected error");
        }
        ErrorKind::ConnectionReset => {
            error!("socket transfer closed by conn reset");
        }
        _ => error!("socket transfer error: {:#}", err),
    },
}

I expected to see this happen: that it logs me the read/written byte count

Instead, this happened: error log either the NotConnected or ConnectionReset.
Sometimes when I'm lucky (at least it's how it feels) I do get the bytes read/written log line.

Some more context:

  • My client is curl (7.64.1);
  • My target is another proxy (either Socks5 or HTTP over TCP);
  • The bidirectional copy is happening within a socks5 server which accepts the incoming stream (a) from the client and connects it with the outgoing connection to the other proxy (b);
@GlenDC GlenDC added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels May 10, 2022
@Darksonn Darksonn added the M-net Module: tokio/net label May 10, 2022
@Darksonn
Copy link
Contributor

My guess is that this is a duplicate of #4665.

@GlenDC
Copy link
Author

GlenDC commented May 10, 2022

Could very well be. Thank you for pointing out.
I tried looking for my specific errors and the function, but didn't find any results.
My bad!

@Darksonn
Copy link
Contributor

Unfortunately I don't know why shutdown is behaving this way or how to fix it.

@GlenDC
Copy link
Author

GlenDC commented May 10, 2022

I suppose it might be undesired and/or out of scope. But would there be a way to have this data also be included in the error cases? Just asking.

@Darksonn
Copy link
Contributor

I don't think that could be done without a breaking change.

@GlenDC
Copy link
Author

GlenDC commented May 10, 2022

I see. I asked as that is really my main issue here, that I do not have access to that information. Because the error I can capture very easily specific for those error kinds. But then I do not have any idea on how much data has been transferred until that point, which could be a lot. In a sense it's most likely even the entire stream anyway, as it just seems that the shutdown happens a bit dirty at one layer.

I guess if I want to have that data I would need to emulate that functionality myself I suppose?

That or my other hope would be that we find out why that shutdown happens like that I imagine. GIven it only happens on MacOS and localhost though, it might be not the biggest issue in the world, as that is not related to my actual production usage. So on that end I'l also do some tests in near future in a more production-like setting and see how it performs here.

Thanks already so far for the input, and do let me know if I can help mitigate this issue and related issues in any form. Be it in form of actual code contributions, or some more input you need, or anything in between really.

@Darksonn
Copy link
Contributor

Right now, the easiest way to get access to the data would be to copy the implementation of copy_bidirectional and modify it (or otherwise reimplement it yourself).

As for how you can help, well, I don't have access to a mac. If you are able to figure out why this is happening, that would be helpful. I suspect this is an issue that happens only on recent versions of rustc, so it would be helpful to know which release introduced the issue.

@Noah-Kennedy
Copy link
Contributor

I've got access to a mac and can look into this tomorrow.

@GlenDC
Copy link
Author

GlenDC commented May 12, 2022

At the moment I do not seem to be able to allocate time to look further into it. For now I've forked the copy_bidirectional- and copy code as follows:

//! Fork of https://github.com/tokio-rs/tokio/blob/master/tokio/src/io/util/copy_bidirectional.rs
//! to allow us to get the read/write bytes count even
//! when an error occurred, see <https://github.com/tokio-rs/tokio/issues/4674>
//! for more info (and we can delete this fork once the original code in Rust/Tokio is fixed for MacOS localhost shutdown)

use super::copy::CopyBuffer;
use futures_core::ready;
use tokio::io::{AsyncRead, AsyncWrite};

use std::future::Future;
use std::io::{self, ErrorKind};
use std::pin::Pin;
use std::task::{Context, Poll};

use log::debug;

enum TransferState {
    Running(CopyBuffer),
    ShuttingDown(u64),
    Done(u64),
}

struct CopyBidirectional<'a, A: ?Sized, B: ?Sized> {
    a: &'a mut A,
    b: &'a mut B,
    a_to_b: TransferState,
    b_to_a: TransferState,
}

fn transfer_one_direction<A, B>(
    cx: &mut Context<'_>,
    state: &mut TransferState,
    r: &mut A,
    w: &mut B,
) -> Poll<io::Result<u64>>
where
    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
    let mut r = Pin::new(r);
    let mut w = Pin::new(w);

    loop {
        match state {
            TransferState::Running(buf) => {
                let count = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?;
                *state = TransferState::ShuttingDown(count);
            }
            TransferState::ShuttingDown(count) => {
                (match ready!(w.as_mut().poll_shutdown(cx)) {
                    Ok(_) => Ok(()),
                    Err(err) => match err.kind() {
                        ErrorKind::NotConnected | ErrorKind::ConnectionReset => {
                            debug!("tokio copy bidirectional: shutting down: ignore NotConnected/ConnectionReset error ignored");
                            Ok(())
                        }
                        _ => Err(err),
                    },
                })?;

                *state = TransferState::Done(*count);
            }
            TransferState::Done(count) => return Poll::Ready(Ok(*count)),
        }
    }
}

impl<'a, A, B> Future for CopyBidirectional<'a, A, B>
where
    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
    type Output = io::Result<(u64, u64)>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Unpack self into mut refs to each field to avoid borrow check issues.
        let CopyBidirectional {
            a,
            b,
            a_to_b,
            b_to_a,
        } = &mut *self;

        let a_to_b = transfer_one_direction(cx, a_to_b, &mut *a, &mut *b)?;
        let b_to_a = transfer_one_direction(cx, b_to_a, &mut *b, &mut *a)?;

        // It is not a problem if ready! returns early because transfer_one_direction for the
        // other direction will keep returning TransferState::Done(count) in future calls to poll
        let a_to_b = ready!(a_to_b);
        let b_to_a = ready!(b_to_a);

        Poll::Ready(Ok((a_to_b, b_to_a)))
    }
}

/// Copies data in both directions between `a` and `b`.
///
/// This function returns a future that will read from both streams,
/// writing any data read to the opposing stream.
/// This happens in both directions concurrently.
///
/// If an EOF is observed on one stream, [`shutdown()`] will be invoked on
/// the other, and reading from that stream will stop. Copying of data in
/// the other direction will continue.
///
/// The future will complete successfully once both directions of communication has been shut down.
/// A direction is shut down when the reader reports EOF,
/// at which point [`shutdown()`] is called on the corresponding writer. When finished,
/// it will return a tuple of the number of bytes copied from a to b
/// and the number of bytes copied from b to a, in that order.
///
/// [`shutdown()`]: crate::io::AsyncWriteExt::shutdown
///
/// # Errors
///
/// The future will immediately return an error if any IO operation on `a`
/// or `b` returns an error. Some data read from either stream may be lost (not
/// written to the other stream) in this case.
///
/// # Return value
///
/// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`.
pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> Result<(u64, u64), std::io::Error>
where
    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
    CopyBidirectional {
        a,
        b,
        a_to_b: TransferState::Running(CopyBuffer::new()),
        b_to_a: TransferState::Running(CopyBuffer::new()),
    }
    .await
}
//! Fork of https://github.com/tokio-rs/tokio/blob/master/tokio/src/io/util/copy.rs
//! to allow us to get the read/write bytes count even
//! when an error occurred, see <https://github.com/tokio-rs/tokio/issues/4674>
//! for more info (and we can delete this fork once the original code in Rust/Tokio is fixed for MacOS localhost shutdown)

use futures_core::ready;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

use std::future::Future;
use std::io::{self, ErrorKind};
use std::pin::Pin;
use std::task::{Context, Poll};

use log::debug;

#[derive(Debug)]
pub(super) struct CopyBuffer {
    read_done: bool,
    need_flush: bool,
    pos: usize,
    cap: usize,
    amt: u64,
    buf: Box<[u8]>,
}

impl CopyBuffer {
    pub(super) fn new() -> Self {
        Self {
            read_done: false,
            need_flush: false,
            pos: 0,
            cap: 0,
            amt: 0,
            buf: vec![0; super::DEFAULT_BUF_SIZE].into_boxed_slice(),
        }
    }

    pub(super) fn poll_copy<R, W>(
        &mut self,
        cx: &mut Context<'_>,
        mut reader: Pin<&mut R>,
        mut writer: Pin<&mut W>,
    ) -> Poll<io::Result<u64>>
    where
        R: AsyncRead + ?Sized,
        W: AsyncWrite + ?Sized,
    {
        loop {
            // If our buffer is empty, then we need to read some data to
            // continue.
            if self.pos == self.cap && !self.read_done {
                let me = &mut *self;
                let mut buf = ReadBuf::new(&mut me.buf);

                match reader.as_mut().poll_read(cx, &mut buf) {
                    Poll::Ready(Ok(_)) => (),
                    Poll::Ready(Err(err)) => match err.kind() {
                        ErrorKind::NotConnected | ErrorKind::ConnectionReset => {
                            debug!("tokio copy: read data: ignore NotConnected/ConnectionReset error ignored");
                            ()
                        }
                        _ => return Poll::Ready(Err(err)),
                    },
                    Poll::Pending => {
                        // Try flushing when the reader has no progress to avoid deadlock
                        // when the reader depends on buffered writer.
                        if self.need_flush {
                            ready!(writer.as_mut().poll_flush(cx))?;
                            self.need_flush = false;
                        }

                        return Poll::Pending;
                    }
                }

                let n = buf.filled().len();
                if n == 0 {
                    self.read_done = true;
                } else {
                    self.pos = 0;
                    self.cap = n;
                }
            }

            // If our buffer has some data, let's write it out!
            while self.pos < self.cap {
                let me = &mut *self;
                let i = ready!(writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]))?;
                if i == 0 {
                    return Poll::Ready(Err(io::Error::new(
                        io::ErrorKind::WriteZero,
                        "write zero byte into writer",
                    )));
                } else {
                    self.pos += i;
                    self.amt += i as u64;
                    self.need_flush = true;
                }
            }

            // If pos larger than cap, this loop will never stop.
            // In particular, user's wrong poll_write implementation returning
            // incorrect written length may lead to thread blocking.
            debug_assert!(
                self.pos <= self.cap,
                "writer returned length larger than input slice"
            );

            // If we've written all the data and we've seen EOF, flush out the
            // data and finish the transfer.
            if self.pos == self.cap && self.read_done {
                ready!(writer.as_mut().poll_flush(cx))?;
                return Poll::Ready(Ok(self.amt));
            }
        }
    }
}

/// A future that asynchronously copies the entire contents of a reader into a
/// writer.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct Copy<'a, R: ?Sized, W: ?Sized> {
    reader: &'a mut R,
    writer: &'a mut W,
    buf: CopyBuffer,
}

// cfg_io_util! {
//     /// Asynchronously copies the entire contents of a reader into a writer.
//     ///
//     /// This function returns a future that will continuously read data from
//     /// `reader` and then write it into `writer` in a streaming fashion until
//     /// `reader` returns EOF.
//     ///
//     /// On success, the total number of bytes that were copied from `reader` to
//     /// `writer` is returned.
//     ///
//     /// This is an asynchronous version of [`std::io::copy`][std].
//     ///
//     /// [std]: std::io::copy
//     ///
//     /// # Errors
//     ///
//     /// The returned future will return an error immediately if any call to
//     /// `poll_read` or `poll_write` returns an error.
//     ///
//     /// # Examples
//     ///
//     /// ```
//     /// use tokio::io;
//     ///
//     /// # async fn dox() -> std::io::Result<()> {
//     /// let mut reader: &[u8] = b"hello";
//     /// let mut writer: Vec<u8> = vec![];
//     ///
//     /// io::copy(&mut reader, &mut writer).await?;
//     ///
//     /// assert_eq!(&b"hello"[..], &writer[..]);
//     /// # Ok(())
//     /// # }
//     /// ```
//     pub async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result<u64>
//     where
//         R: AsyncRead + Unpin + ?Sized,
//         W: AsyncWrite + Unpin + ?Sized,
//     {
//         Copy {
//             reader,
//             writer,
//             buf: CopyBuffer::new()
//         }.await
//     }
// }

impl<R, W> Future for Copy<'_, R, W>
where
    R: AsyncRead + Unpin + ?Sized,
    W: AsyncWrite + Unpin + ?Sized,
{
    type Output = io::Result<u64>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
        let me = &mut *self;

        me.buf
            .poll_copy(cx, Pin::new(&mut *me.reader), Pin::new(&mut *me.writer))
    }
}

My apologies that this doesn't help you further, but given the project for which I need it is already going over its original scope I'm afraid that for now I need to keep it within reasonable limits.

Over the next weeks I might be able to squeeze some free time into helping find the cause for the actual issue, but for now this helps me out just fine.

@GlenDC
Copy link
Author

GlenDC commented May 14, 2022

Also added BrokenPipe as an error I ignore. This might be the reason that I will always have to refer to my own forked version, as I can imagine that you do not want to capture that one in your current setup.

For me however it's important that in a best-effort manner I do have an idea (even if just roughly) what each tcp session has transferred in terms of bytes r/w.

@soul2eat
Copy link

I would also like to receive somehow the number of bytes transferred. Thanks to this, it will be possible to decide whether the error was critical or just a dirty connection.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-net Module: tokio/net
Projects
None yet
Development

No branches or pull requests

4 participants