Skip to content

Commit

Permalink
listener: move active_udp_listener out of conn handler (#15349)
Browse files Browse the repository at this point in the history
Signed-off-by: Yuchen Dai <[email protected]>
  • Loading branch information
lambdai authored Mar 8, 2021
1 parent 62bd82d commit f2a517c
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "common/protobuf/utility.h"
#include "common/runtime/runtime_protos.h"

#include "server/active_udp_listener.h"
#include "server/connection_handler_impl.h"

#include "extensions/quic_listeners/quiche/envoy_quic_dispatcher.h"
Expand Down
21 changes: 21 additions & 0 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ envoy_cc_library(
envoy_cc_library(
name = "connection_handler_lib",
deps = [
":active_udp_listener",
":connection_handler_impl",
],
)
Expand Down Expand Up @@ -95,6 +96,26 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "active_udp_listener",
srcs = ["active_udp_listener.cc"],
hdrs = [
"active_udp_listener.h",
],
deps = [
"//include/envoy/event:dispatcher_interface",
"//include/envoy/network:connection_handler_interface",
"//include/envoy/network:exception_interface",
"//include/envoy/network:filter_interface",
"//include/envoy/network:listen_socket_interface",
"//include/envoy/network:listener_interface",
"//include/envoy/server:active_udp_listener_config_interface",
"//include/envoy/server:listener_manager_interface",
"//source/extensions/transport_sockets:well_known_names",
"//source/server:connection_handler_impl",
],
)

envoy_cc_library(
name = "drain_manager_lib",
srcs = ["drain_manager_impl.cc"],
Expand Down
1 change: 1 addition & 0 deletions source/server/active_raw_udp_listener_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "envoy/api/v2/listener/listener.pb.h"

#include "server/active_udp_listener.h"
#include "server/connection_handler_impl.h"
#include "server/well_known_names.h"

Expand Down
135 changes: 135 additions & 0 deletions source/server/active_udp_listener.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#include "server/active_udp_listener.h"

#include "envoy/network/exception.h"
#include "envoy/server/listener_manager.h"
#include "envoy/stats/scope.h"

#include "spdlog/spdlog.h"

namespace Envoy {
namespace Server {
ActiveUdpListenerBase::ActiveUdpListenerBase(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent,
Network::Socket& listen_socket,
Network::UdpListenerPtr&& listener,
Network::ListenerConfig* config)
: ConnectionHandlerImpl::ActiveListenerImplBase(parent, config), worker_index_(worker_index),
concurrency_(concurrency), parent_(parent), listen_socket_(listen_socket),
udp_listener_(std::move(listener)) {
ASSERT(worker_index_ < concurrency_);
config_->udpListenerWorkerRouter()->get().registerWorkerForListener(*this);
}

ActiveUdpListenerBase::~ActiveUdpListenerBase() {
config_->udpListenerWorkerRouter()->get().unregisterWorkerForListener(*this);
}

void ActiveUdpListenerBase::post(Network::UdpRecvData&& data) {
ASSERT(!udp_listener_->dispatcher().isThreadSafe(),
"Shouldn't be posting if thread safe; use onWorkerData() instead.");

// It is not possible to capture a unique_ptr because the post() API copies the lambda, so we must
// bundle the socket inside a shared_ptr that can be captured.
// TODO(mattklein123): It may be possible to change the post() API such that the lambda is only
// moved, but this is non-trivial and needs investigation.
auto data_to_post = std::make_shared<Network::UdpRecvData>();
*data_to_post = std::move(data);

udp_listener_->dispatcher().post(
[data_to_post, tag = config_->listenerTag(), &parent = parent_]() {
Network::UdpListenerCallbacksOptRef listener = parent.getUdpListenerCallbacks(tag);
if (listener.has_value()) {
listener->get().onDataWorker(std::move(*data_to_post));
}
});
}

void ActiveUdpListenerBase::onData(Network::UdpRecvData&& data) {
uint32_t dest = worker_index_;

// For concurrency == 1, the packet will always go to the current worker.
if (concurrency_ > 1) {
dest = destination(data);
ASSERT(dest < concurrency_);
}

if (dest == worker_index_) {
onDataWorker(std::move(data));
} else {
config_->udpListenerWorkerRouter()->get().deliver(dest, std::move(data));
}
}

ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent,
Event::Dispatcher& dispatcher,
Network::ListenerConfig& config)
: ActiveRawUdpListener(worker_index, concurrency, parent,
config.listenSocketFactory().getListenSocket(), dispatcher, config) {}

ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent,
Network::SocketSharedPtr listen_socket_ptr,
Event::Dispatcher& dispatcher,
Network::ListenerConfig& config)
: ActiveRawUdpListener(worker_index, concurrency, parent, *listen_socket_ptr, listen_socket_ptr,
dispatcher, config) {}

ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent,
Network::Socket& listen_socket,
Network::SocketSharedPtr listen_socket_ptr,
Event::Dispatcher& dispatcher,
Network::ListenerConfig& config)
: ActiveRawUdpListener(worker_index, concurrency, parent, listen_socket,
dispatcher.createUdpListener(listen_socket_ptr, *this), config) {}

ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent,
Network::Socket& listen_socket,
Network::UdpListenerPtr&& listener,
Network::ListenerConfig& config)
: ActiveUdpListenerBase(worker_index, concurrency, parent, listen_socket, std::move(listener),
&config),
read_filter_(nullptr) {
// Create the filter chain on creating a new udp listener
config_->filterChainFactory().createUdpListenerFilterChain(*this, *this);

// If filter is nullptr, fail the creation of the listener
if (read_filter_ == nullptr) {
throw Network::CreateListenerException(
fmt::format("Cannot create listener as no read filter registered for the udp listener: {} ",
config_->name()));
}

// Create udp_packet_writer
udp_packet_writer_ = config.udpPacketWriterFactory()->get().createUdpPacketWriter(
listen_socket_.ioHandle(), config.listenerScope());
}

void ActiveRawUdpListener::onDataWorker(Network::UdpRecvData&& data) { read_filter_->onData(data); }

void ActiveRawUdpListener::onReadReady() {}

void ActiveRawUdpListener::onWriteReady(const Network::Socket&) {
// TODO(sumukhs): This is not used now. When write filters are implemented, this is a
// trigger to invoke the on write ready API on the filters which is when they can write
// data.

// Clear write_blocked_ status for udpPacketWriter.
udp_packet_writer_->setWritable();
}

void ActiveRawUdpListener::onReceiveError(Api::IoError::IoErrorCode error_code) {
read_filter_->onReceiveError(error_code);
}

void ActiveRawUdpListener::addReadFilter(Network::UdpListenerReadFilterPtr&& filter) {
ASSERT(read_filter_ == nullptr, "Cannot add a 2nd UDP read filter");
read_filter_ = std::move(filter);
}

Network::UdpListener& ActiveRawUdpListener::udpListener() { return *udp_listener_; }

} // namespace Server
} // namespace Envoy
102 changes: 102 additions & 0 deletions source/server/active_udp_listener.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#pragma once

#include <cstdint>
#include <memory>

#include "envoy/network/connection_handler.h"
#include "envoy/network/filter.h"
#include "envoy/network/listen_socket.h"
#include "envoy/network/listener.h"
#include "envoy/server/active_udp_listener_config.h"

// TODO(lambdai): remove connection_handler_impl after ActiveListenerImplBase is extracted from it.
#include "server/connection_handler_impl.h"

namespace Envoy {
namespace Server {

class ActiveUdpListenerBase : public ConnectionHandlerImpl::ActiveListenerImplBase,
public Network::ConnectionHandler::ActiveUdpListener {
public:
ActiveUdpListenerBase(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent, Network::Socket& listen_socket,
Network::UdpListenerPtr&& listener, Network::ListenerConfig* config);
~ActiveUdpListenerBase() override;

// Network::UdpListenerCallbacks
void onData(Network::UdpRecvData&& data) final;
uint32_t workerIndex() const final { return worker_index_; }
void post(Network::UdpRecvData&& data) final;

// ActiveListenerImplBase
Network::Listener* listener() override { return udp_listener_.get(); }

protected:
uint32_t destination(const Network::UdpRecvData& /*data*/) const override {
// By default, route to the current worker.
return worker_index_;
}

const uint32_t worker_index_;
const uint32_t concurrency_;
Network::UdpConnectionHandler& parent_;
Network::Socket& listen_socket_;
Network::UdpListenerPtr udp_listener_;
};

/**
* Wrapper for an active udp listener owned by this handler.
*/
class ActiveRawUdpListener : public ActiveUdpListenerBase,
public Network::UdpListenerFilterManager,
public Network::UdpReadFilterCallbacks {
public:
ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent, Event::Dispatcher& dispatcher,
Network::ListenerConfig& config);
ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent,
Network::SocketSharedPtr listen_socket_ptr, Event::Dispatcher& dispatcher,
Network::ListenerConfig& config);
ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent, Network::Socket& listen_socket,
Network::SocketSharedPtr listen_socket_ptr, Event::Dispatcher& dispatcher,
Network::ListenerConfig& config);
ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent, Network::Socket& listen_socket,
Network::UdpListenerPtr&& listener, Network::ListenerConfig& config);

// Network::UdpListenerCallbacks
void onReadReady() override;
void onWriteReady(const Network::Socket& socket) override;
void onReceiveError(Api::IoError::IoErrorCode error_code) override;
Network::UdpPacketWriter& udpPacketWriter() override { return *udp_packet_writer_; }

// Network::UdpWorker
void onDataWorker(Network::UdpRecvData&& data) override;

// ActiveListenerImplBase
void pauseListening() override { udp_listener_->disable(); }
void resumeListening() override { udp_listener_->enable(); }
void shutdownListener() override {
// The read filter should be deleted before the UDP listener is deleted.
// The read filter refers to the UDP listener to send packets to downstream.
// If the UDP listener is deleted before the read filter, the read filter may try to use it
// after deletion.
read_filter_.reset();
udp_listener_.reset();
}

// Network::UdpListenerFilterManager
void addReadFilter(Network::UdpListenerReadFilterPtr&& filter) override;

// Network::UdpReadFilterCallbacks
Network::UdpListener& udpListener() override;

private:
Network::UdpListenerReadFilterPtr read_filter_;
Network::UdpPacketWriterPtr udp_packet_writer_;
};

} // namespace Server
} // namespace Envoy
Loading

0 comments on commit f2a517c

Please sign in to comment.