Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

strand/executor issue #1589

Closed
Gibson85 opened this issue Apr 23, 2019 · 28 comments
Closed

strand/executor issue #1589

Gibson85 opened this issue Apr 23, 2019 · 28 comments

Comments

@Gibson85
Copy link

Hi,

my WebSession class looked like this:

class WebSession : public enable_shared_from_this<WebSession>
{
public:
	explicit WebSession(tcp::socket socket) :
		m_Buffer(),
		m_WebSocket(std::move(socket)),
		m_Strand(m_WebSocket.get_executor())
	{
	}

private:
	websocket::stream<tcp::socket> m_WebSocket;
	boost::asio::strand<boost::asio::io_context::executor_type> m_Strand;
	boost::beast::multi_buffer m_Buffer;

};

With the latest version boost 1.70 the code doesn't compile anymore at the following line

m_Strand(m_WebSocket.get_executor())

error C2664: 'boost::asio::strandboost::asio::io_context::executor_type::strand(boost::asio::strandboost::asio::io_context::executor_type &&) noexcept': cannot convert argument 1 from 'boost::asio::executor' to 'const boost::asio::io_context::executor_type &'
note: Reason: cannot convert from 'boost::asio::executor' to 'const boost::asio::io_context::executor_type'
note: No user-defined-conversion operator available that can perform this conversion, or the operator cannot be called

Any idea how to fix this?

@vinniefalco
Copy link
Member

Yes, remove m_Strand and construct the websocket with boost::asio::make_strand(socket.get_executor())

@Gibson85
Copy link
Author

Thanks for your fast reply. And what about the following calls?

m_WebSocket.async_accept(boost::asio::bind_executor(m_Strand, std::bind(&WebSession::OnAccept, shared_from_this(), std::placeholders::_1)));

m_WebSocket.async_read(m_Buffer, boost::asio::bind_executor(m_Strand, std::bind(&WebSession::OnRead, shared_from_this(), std::placeholders::_1, std::placeholders::_2)));

m_WebSocket.async_write(m_Buffer.data(), boost::asio::bind_executor(m_Strand, std::bind(&WebSession::OnWrite, shared_from_this(), std::placeholders::_1, std::placeholders::_2)));

@vinniefalco
Copy link
Member

By constructing the websocket with the strand, you do not need to call bind_executor anymore, see:
boostorg/asio@59066d8

See also:
http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p1322r0.html

@Gibson85
Copy link
Author

Ok, that solved it. Thanks again.

@ecoindev
Copy link

What about running_in_this_thread(). How to use this if strand is incapsulated in ws stream object?

@vinniefalco
Copy link
Member

Use ws.get_executor().running_in_this_thread()

@ecoindev
Copy link

error: no member named 'running_in_this_thread' in 'boost::asio::executor'
https://wandbox.org/permlink/ErNto4m8i0RwHF1a

@vinniefalco
Copy link
Member

You have to use a NextLayer that names the strand instead of the polymorphic executor, something like:

beast::websocket::stream<
    asio::ip::tcp::basic_socket<
        asio::strand<net::io_context::executor_type>>> ws;

@ecoindev
Copy link

Thanks, so in ssl context smth like

boost::beast::websocket::stream<boost::beast::ssl_stream< boost::beast::basic_stream<boost::asio::ip::tcp, boost::asio::strand<boost::asio::io_context::executor_type>, boost::beast::unlimited_rate_policy>>> ?

@vinniefalco
Copy link
Member

Yes just like that

@ecoindev
Copy link

Thanks

@ecoindev
Copy link

@vinniefalco
How the example from docs:
https://www.boost.org/doc/libs/1_71_0/libs/beast/example/websocket/client/async/websocket_client_async.cpp
will look like in case of applying strand from #1589 (comment):

https://wandbox.org/permlink/CtsYIKpNIjelFNUw

Will the use of strand executor make possible to concurrently call async_write from several threads, or is a queue necessary?

@vinniefalco
Copy link
Member

Will the use of strand executor make possible to concurrently call async_write from several threads, or is a queue necessary?

You need a queue. See: https://github.com/boostorg/beast/blob/develop/example/websocket/server/chat-multi/websocket_session.cpp#L102

@ecoindev
Copy link

ecoindev commented Oct 1, 2019

Thanks, @vinniefalco
I saw this implementation, but it’s not entirely clear to me how the queue (queue_) is protected from concurrent access from multiple threads.

Right now I am using an atomic write counter with the read-modify-write operation to prevent race condition in call async_write from multiple threads. And as a queue I use boost::asio - in case the writing is already in progress, I just call the boost::post.

@vinniefalco
Copy link
Member

The queue is protected because it is only accessed from the strand:
https://github.com/boostorg/beast/blob/develop/example/websocket/server/chat-multi/websocket_session.cpp#L90

@ecoindev
Copy link

ecoindev commented Oct 1, 2019

Sorry for the stupid question, but if post to strand protect this members from from concurrent access why we need to protect member ws_ (call async_write) from concurrent execution.

Why can't we just do something like this:

// Can be executed from any thread
void websocket_session::write(std::string &&ss) {
  if (!ws.get_executor().running_in_this_thread()) {
    net::post(ws_.get_executor(),
              beast::bind_front_handler(&websocket_session::write, shared_from_this(), std::move(ss)));
  } else {
    ws_.async_write(net::buffer(ss), beast::bind_front_handler(&websocket_session::on_write, shared_from_this()));
  }
}

@vinniefalco
Copy link
Member

In your code, ss goes out of scope after ws_.async_write(net::buffer(ss)... returns and invalidates the buffer, resulting in undefined behavior.

@ecoindev
Copy link

ecoindev commented Oct 2, 2019

In your code, ss goes out of scope after ws_.async_write(net::buffer(ss)... returns and invalidates the buffer, resulting in undefined behavior.

Thanks!
You are absolutely right!!! Perhaps the queue option is not so bad, although it looks a bit clumsy.

@chreniuc
Copy link
Contributor

chreniuc commented Oct 15, 2019

@vinniefalco sorry for disturbing you again... We were also using an algorithm with atomics to try to fix the problem with multiple async calls, we've found your comment recently and wanted to give it a try.

While trying to implement this I got very confused with the strand concept.

Before 1.70.0 we were creating the strands explicitly, just like OP did. Then we updated to 1.70.0 and did the changes to make it work: here is a discussion regarding this.

So let's start from the beginning, a strand object is(according to the official docs):

The io_context::strand class provides the ability to post and dispatch handlers with the guarantee that none of those handlers will execute concurrently.

All clear.

Now before 1.70.0 we were creating the strands explicit:

ws(std::move(socket(io_context)));

strand(io_context);

And when we wanted to use it, we would pass the strand to the async call:

ws.async_accept(
  boost::asio::bind_executor(
     strand, std::bind(&WebSession::on_accept, shared_from_this(), std::placeholders::_1)
  )
);

This gave us the freedom to use multiple strands on the same ws. For example I could have a read strand and a write strand, because I do not care if the completion handlers for write and read operations would execute concurrently. I only cared that the read completion handlers will not execute concurrently with each other and the same thing with the write completion handlers.

Now in 1.7x.0 things have changed, we are creating the ws like this:

boost::asio::ip::tcp::socket socket( ::boost::asio::make_strand( io_context ) );

boost::beast::tcp_stream stream(::std::move(socket));
 ::boost::beast::ssl_stream<::boost::beast::tcp_stream> ssl_stream(::std::move(stream));

::boost::beast::websocket::stream<
    ::boost::beast::ssl_stream<::boost::beast::tcp_stream>>
    ws(::std::move(ssl_stream));

And calling async methods like this:

ws.async_write( boost::asio::buffer( "Message"),
      std::bind( &ws_server::on_write_websocket, this, message_ptr,
        std::placeholders::_1,
        std::placeholders::_2 ) );

All clear, but after reading this issue I got a little bit confused. Notice that I have declared the ws as:

::boost::beast::websocket::stream<
    ::boost::beast::ssl_stream<::boost::beast::tcp_stream>>
    ws(::std::move(ssl_stream));

Not like you said:

You have to use a NextLayer that names the strand instead of the polymorphic executor, something like:

beast::websocket::stream<
    asio::ip::tcp::basic_socket<
        asio::strand<net::io_context::executor_type>>> ws;

Does this mean that we are not using strand, even if we did create the socket using make_strand?

On which completion handlers does that strand apply(the one we created with make_strand and passed to the socket)? Read? Write? Both? How can I manually specify which strand to apply on which completion handlers?

Is this a valid case: to use read and write strands? I'm asking this because I see it as a performance improvement to use two strands, the reads and writes are separated and can be done concurrently without any problem...

What is the purpose of asio::strand<net::io_context::executor_type> in the type of the ws? It's just so we can access more methods? Like running_in_this_thread(methods from the strand object)?

LE:

I forgot to mention, we are using tcp_stream from beast which is declared like this:

using tcp_stream = basic_stream<
    net::ip::tcp,
    net::executor,
    unlimited_rate_policy>;

To apply the strand layer we should do something like this: ?

using tcp_stream_strand =
  basic_stream<net::ip::tcp,
  boost::asio::strand<boost::asio::io_context::executor_type>,
  unlimited_rate_policy>

Thanks 😄

@vinniefalco
Copy link
Member

Is this a valid case: to use read and write strands?

Nope, that will cause undefined behavior, because two different threads could touch the websocket::stream object at the same time. Generally speaking, I/O objects are not thread safe and may not be accessed concurrently.

The purpose of putting the strand executor on the stream is for convenience. It makes calls to initiating functions easier to read (because you don't have to bind the strand executor to the callback every time).

@chreniuc
Copy link
Contributor

Hey @vinniefalco,
like I said in the previous comment:

I forgot to mention, we are using tcp_stream from beast which is declared like this..

I did replace tcp_stream from beast with:

using tcp_stream_strand = ::boost::beast::basic_stream<::boost::asio::ip::tcp,
  boost::asio::strand<boost::asio::io_context::executor_type>,
  ::boost::beast::unlimited_rate_policy>;

But I got an error and I do not know how to fix it:

/celibs/boost_1_71_0/boost/beast/core/impl/basic_stream.hpp:38:17: error: no matching function for call to 'boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::strand<boost::asio::io_context::executor_type> >::basic_stream_socket(boost::asio::basic_stream_socket<boost::asio::ip::tcp>)'

   38 |     , timer(ex())

Here's the source code, it's only 3 lines in main: https://godbolt.org/z/UwLdY1

using tcp_stream_strand = ::boost::beast::basic_stream<::boost::asio::ip::tcp,
  boost::asio::strand<boost::asio::io_context::executor_type>,
  ::boost::beast::unlimited_rate_policy>;

int main()
{
    boost::asio::io_context ioc;
    boost::asio::ip::tcp::socket socket(::boost::asio::make_strand(ioc));
    tcp_stream_strand stream(::std::move(socket)); // here is the error
   // boost::beast::tcp_stream stream(::std::move(socket)); // This works
    return 0;
}

I do not understand why it doesn't work... What am I doing wrong?

@vinniefalco
Copy link
Member

vinniefalco commented Oct 15, 2019

I do not understand why it doesn't work... What am I doing wrong?

https://github.com/boostorg/beast/blob/develop/include/boost/beast/core/basic_stream.hpp#L240

Should read:

        net::basic_waitable_timer<
            chrono::steady_clock,
                Executor> timer; // rate timer;

I don't think that's your problem though, I'm working on it.

@vinniefalco
Copy link
Member

I do not understand why it doesn't work... What am I doing wrong?

I see the problem. You are attempting to use this constructor overload:

https://www.boost.org/doc/libs/1_71_0/doc/html/boost_asio/reference/basic_stream_socket/basic_stream_socket/overload10.html

However, this overload is only available if the following condition is met:

is_convertible< Protocol1, Protocol >::value &&is_convertible< Executor1, Executor >::value

Here, Executor1 is net::executor while Executor is net::io_context::executor_type. You cannot construct a net::io_context::executor_type from a net::executor (you can go the other way though).

If you want to use the concrete type in the beast::basic_stream rather than the polymorphic executor wrapper net::executor, then you must also use the concrete type on the socket. This should compile for you:

net::basic_stream_socket<
    net::ip::tcp,
    net::strand<
        net::io_context::executor_type>
            > sock(net::make_strand(ioc));
basic_stream<
    net::ip::tcp,
    net::strand<
        net::io_context::executor_type>,
    unlimited_rate_policy> stream(std::move(sock));
BOOST_STATIC_ASSERT(
    std::is_convertible<
        decltype(sock)::executor_type,
        decltype(stream)::executor_type>::value);

vinniefalco added a commit to vinniefalco/beast that referenced this issue Oct 15, 2019
@chreniuc
Copy link
Contributor

Thanks, that works 😄

Today I've reread your previous answer again and there are some things that I thought I knew how they work in boost asio...

I though that the purpose of the strand is only to avoid having multiple completion handlers invoked concurrently.(serialize completion handlers of the async operations)

But the strand also serializes the async calls. Let me give you an example:

If I'm using a strand for a stream, when I'm calling two async methods from two different threads: async_read and async_write(on the same stream), those calls aren't done in parallel, those calls are serialize. Right?

It's like when you call async_read, this will be posted in the internal strand of the stream, same thing for the async_write. Then the worker thread will get the async_read from the strand and execute it(which should return immediately) and then it calls async_write(I'm not talking about completion handler, those will also be posted after the async calls are made).

Otherwise there would be two calls on the stream object from two different threads, and there would be a data race. Because:

I/O objects are not thread safe and may not be accessed concurrently.

Can you confirm this? Or the async calls do not modify the stream so there is no need to serialize them?

Thank you again, really. You've helped me alot. 😄

@vinniefalco
Copy link
Member

If I'm using a strand for a stream, when I'm calling two async methods from two different threads: async_read and async_write(on the same stream), those calls aren't done in parallel, those calls are serialize. Right?

This is undefined behavior. You cannot call any member functions concurrently. You must make sure that only one thread is accessing the socket at a time. The easiest way to do this is to use a strand. See:
image

@chreniuc
Copy link
Contributor

Perfect. Thanks. This is something that I didn't knew 😄

@uvguy
Copy link

uvguy commented Jul 13, 2021

@vinniefalco , apologize for spamming stupid question here, I have dummy broadcast ws server here, but my first intention is when the server successfully broadcast write data then the server must read client request again. How to avoid soft_mutex try_lock assertion when async_read on my gist .. thanks

@vinniefalco
Copy link
Member

Please open a new issue, thanks !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants