Skip to content

Commit

Permalink
thrift/wangle support io_uring
Browse files Browse the repository at this point in the history
Summary: add some hooks through wangle that allow thrift servers to use io_uring socket by setting: server->setPreferIoUring(true);

Reviewed By: NiteshKant

Differential Revision: D37922426

fbshipit-source-id: 2cb216b2aaf94440993fdc2b1cd725ac88cb9c0d
  • Loading branch information
Dylan Yudaken authored and facebook-github-bot committed Sep 23, 2022
1 parent 23f3317 commit 84bfa30
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 20 deletions.
29 changes: 23 additions & 6 deletions thrift/lib/cpp2/server/Cpp2Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <folly/Overload.h>
#include <folly/String.h>
#include <folly/experimental/io/AsyncIoUringSocketFactory.h>
#include <folly/io/async/AsyncSSLSocket.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/EventBaseLocal.h>
Expand Down Expand Up @@ -84,6 +85,16 @@ void Cpp2Worker::onNewConnection(
return;
}

PeekingManagerOptions const peekingManagerOptions =
PeekingManagerOptions().withPreferBufferMovable(
secureTransportType == wangle::SecureTransportType::NONE &&
server_->preferIoUring() &&
folly::AsyncIoUringSocketFactory::supports(sock->getEventBase()));
if (peekingManagerOptions.preferBufferMovable()) {
sock = folly::AsyncIoUringSocketFactory::create<
folly::AsyncTransport::UniquePtr>(std::move(sock));
}

const auto& func = server_->getZeroCopyEnableFunc();
if (func && sock) {
sock->setZeroCopy(true);
Expand All @@ -95,7 +106,12 @@ void Cpp2Worker::onNewConnection(
// If no security, peek into the socket to determine type
case wangle::SecureTransportType::NONE: {
new TransportPeekingManager(
shared_from_this(), *addr, tinfo, server_, std::move(sock));
shared_from_this(),
*addr,
tinfo,
server_,
std::move(sock),
peekingManagerOptions);
break;
}
case wangle::SecureTransportType::TLS:
Expand Down Expand Up @@ -132,9 +148,9 @@ void Cpp2Worker::handleHeader(
folly::AsyncTransport::UniquePtr sock,
const folly::SocketAddress* addr,
const wangle::TransportInfo& tinfo) {
auto fd = sock->getUnderlyingTransport<folly::AsyncSocket>()
->getNetworkSocket()
.toFd();
folly::AsyncSocket* underlying =
sock->getUnderlyingTransport<folly::AsyncSocket>();
auto fd = underlying ? underlying->getNetworkSocket().toFd() : -1;
VLOG(4) << "Cpp2Worker: Creating connection for socket " << fd;

auto thriftTransport = createThriftTransport(std::move(sock));
Expand Down Expand Up @@ -170,8 +186,9 @@ std::shared_ptr<folly::AsyncTransport> Cpp2Worker::createThriftTransport(

folly::AsyncSocket* tsock =
sock->getUnderlyingTransport<folly::AsyncSocket>();
CHECK(tsock);
markSocketAccepted(tsock);
if (tsock) {
markSocketAccepted(tsock);
}
// use custom deleter for std::shared_ptr<folly::AsyncTransport> to allow
// socket transfer from header to rocket (if enabled by ThriftFlags)
return apache::thrift::transport::detail::convertToShared(std::move(sock));
Expand Down
7 changes: 7 additions & 0 deletions thrift/lib/cpp2/server/ThriftServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class ThriftServer : public apache::thrift::BaseThriftServer,
// not be useful, e.g. non-C++ languages.
bool allowCheckUnimplementedExtraInterfaces_ = true;

bool preferIoUring_ = false;

std::weak_ptr<folly::ShutdownSocketSet> wShutdownSocketSet_;

//! Listen socket
Expand Down Expand Up @@ -632,6 +634,10 @@ class ThriftServer : public apache::thrift::BaseThriftServer,

void updateTLSCert();

void setPreferIoUring(bool b) { preferIoUring_ = b; }

bool preferIoUring() const { return preferIoUring_; }

/**
* Tells the thrift server to update ticket seeds with the contents of the
* file ticketPath when modified and initialized the seeds with the contents
Expand Down Expand Up @@ -697,6 +703,7 @@ class ThriftServer : public apache::thrift::BaseThriftServer,
config.socketMaxReadsPerEvent = socketMaxReadsPerEvent_;

config.useZeroCopy = !!zeroCopyEnableFunc_;
config.preferIoUring = preferIoUring_;
return config;
}

Expand Down
55 changes: 41 additions & 14 deletions thrift/lib/cpp2/server/peeking/PeekingManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ class PeekingManagerBase : public wangle::ManagedConnection {
ThriftServer* const server_;
};

class PeekingManagerOptions {
public:
explicit PeekingManagerOptions(bool preferBufferMovable = false)
: preferBufferMovable_(preferBufferMovable) {}

PeekingManagerOptions& withPreferBufferMovable(bool b) {
preferBufferMovable_ = b;
return *this;
}
bool preferBufferMovable() const { return preferBufferMovable_; }

private:
bool preferBufferMovable_;
};

class CheckTLSPeekingManager : public PeekingManagerBase,
public wangle::SocketPeeker::Callback {
public:
Expand Down Expand Up @@ -160,10 +175,11 @@ class PreReceivedDataAsyncTransportWrapper

static UniquePtr create(
folly::AsyncTransport::UniquePtr socket,
std::vector<uint8_t> preReceivedData) {
std::vector<uint8_t> preReceivedData,
PeekingManagerOptions options) {
DCHECK(!socket->getReadCallback());
return UniquePtr(new PreReceivedDataAsyncTransportWrapper(
std::move(socket), std::move(preReceivedData)));
std::move(socket), std::move(preReceivedData), options));
}

ReadCallback* getReadCallback() const override { return readCallback_; }
Expand All @@ -176,13 +192,18 @@ class PreReceivedDataAsyncTransportWrapper
return;
}
const auto preReceivedData = std::exchange(preReceivedData_, {});
void* buf;
size_t bufSize;
callback->getReadBuffer(&buf, &bufSize);
CHECK(callback == readCallback_);
CHECK(bufSize >= preReceivedData->size());
std::memcpy(buf, preReceivedData->data(), preReceivedData->size());
callback->readDataAvailable(preReceivedData->size());
if (options_.preferBufferMovable() && readCallback_->isBufferMovable()) {
callback->readBufferAvailable(IOBuf::copyBuffer(
preReceivedData->data(), preReceivedData->size()));
} else {
void* buf;
size_t bufSize;
callback->getReadBuffer(&buf, &bufSize);
CHECK(callback == readCallback_);
CHECK(bufSize >= preReceivedData->size());
std::memcpy(buf, preReceivedData->data(), preReceivedData->size());
callback->readDataAvailable(preReceivedData->size());
}
}
if (readCallback_ == callback) {
Base::setReadCB(callback);
Expand All @@ -209,15 +230,18 @@ class PreReceivedDataAsyncTransportWrapper
private:
PreReceivedDataAsyncTransportWrapper(
folly::AsyncTransport::UniquePtr socket,
std::vector<uint8_t> preReceivedData)
std::vector<uint8_t> preReceivedData,
PeekingManagerOptions options)
: Base(std::move(socket)),
preReceivedData_(
preReceivedData.size() ? std::make_unique<std::vector<uint8_t>>(
std::move(preReceivedData))
: std::unique_ptr<std::vector<uint8_t>>()) {}
: std::unique_ptr<std::vector<uint8_t>>()),
options_(options) {}

std::unique_ptr<std::vector<uint8_t>> preReceivedData_;
folly::AsyncTransport::ReadCallback* readCallback_{};
PeekingManagerOptions const options_;
};

class TransportPeekingManager : public PeekingManagerBase,
Expand All @@ -228,11 +252,13 @@ class TransportPeekingManager : public PeekingManagerBase,
const folly::SocketAddress& clientAddr,
wangle::TransportInfo tinfo,
apache::thrift::ThriftServer* server,
folly::AsyncTransport::UniquePtr socket)
folly::AsyncTransport::UniquePtr socket,
PeekingManagerOptions options = PeekingManagerOptions())
: PeekingManagerBase(
std::move(acceptor), clientAddr, std::move(tinfo), server),
socket_(std::move(socket)),
peeker_(new wangle::TransportPeeker(*socket_, this, kPeekBytes)) {
peeker_(new wangle::TransportPeeker(*socket_, this, kPeekBytes)),
options_(options) {
peeker_->start();
}

Expand All @@ -253,7 +279,7 @@ class TransportPeekingManager : public PeekingManagerBase,
}

auto transport = PreReceivedDataAsyncTransportWrapper::create(
std::move(socket_), peekBytes);
std::move(socket_), peekBytes, options_);

try {
// Check for new transports
Expand Down Expand Up @@ -294,6 +320,7 @@ class TransportPeekingManager : public PeekingManagerBase,
private:
folly::AsyncTransport::UniquePtr socket_;
typename wangle::TransportPeeker::UniquePtr peeker_;
PeekingManagerOptions const options_;
};

} // namespace thrift
Expand Down

0 comments on commit 84bfa30

Please sign in to comment.