Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: destroy on main thread #14954

Merged
merged 43 commits into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b474d23
destroy hosts on master
lambdai Nov 12, 2020
7b933b1
tobetrypost
lambdai Nov 15, 2020
65ed52e
to tryPost
lambdai Nov 19, 2020
df9beaa
revert hosts guard
lambdai Nov 20, 2020
3af5aac
Merge branch 'master' into completedestroyonmaster
lambdai Nov 20, 2020
b647d0c
fix cluster test
lambdai Nov 20, 2020
8c6266d
fix server fuzz test
lambdai Nov 20, 2020
d8d639c
Merge branch 'main' into clusterdestory
lambdai Feb 3, 2021
564b17a
cleanup
lambdai Feb 5, 2021
8e380ce
remove extra runPostCallbacks() in run
lambdai Feb 5, 2021
bcad62e
remove exit flag in dispatcher
lambdai Feb 5, 2021
50ec141
rename to movePost returning void, tests failing
lambdai Feb 5, 2021
4746d53
relax ssl manager required empty context
lambdai Feb 5, 2021
0ca27ec
format
lambdai Feb 5, 2021
ec6eb4a
revert accidentally touched files
lambdai Feb 5, 2021
2f26bf4
add preShutdown
lambdai Feb 6, 2021
63b063a
fix typo
lambdai Feb 6, 2021
75c257a
fixing server tests
lambdai Feb 6, 2021
dd4b151
clang-tidy
lambdai Feb 6, 2021
968a611
clang-tidy another try
lambdai Feb 8, 2021
d4dc8f8
fix format
lambdai Feb 8, 2021
ea86eb1
rename shutdown, add integration test
lambdai Feb 9, 2021
187179b
clean up movePost
lambdai Feb 9, 2021
7782047
fix ambigious
lambdai Feb 9, 2021
16e1aca
Revert "fix ambigious"
lambdai Feb 10, 2021
cf64576
Revert "clean up movePost"
lambdai Feb 10, 2021
40f2530
introduce DispatcherThreadDeletablePtr and replace movePost
lambdai Feb 10, 2021
a5c2c3c
add mising file
lambdai Feb 10, 2021
30c9751
remove assert dispatcher-thread-deletable, add comment, fix sds test …
lambdai Feb 10, 2021
13379da
use independent thread_local_delete_cb_ in dispatcher
lambdai Feb 11, 2021
cc0f944
revert server.h
lambdai Feb 11, 2021
8680fa7
check shutdown_called_
lambdai Feb 16, 2021
5af6d37
call worker dispatcher shutdown at worker thread
lambdai Feb 16, 2021
93e0b0f
fail to fix dispatcher shutdown
lambdai Feb 17, 2021
31fc603
DispatcherShutdownTest
lambdai Feb 17, 2021
f9cd68f
test, log cleanup
lambdai Feb 17, 2021
01a6557
revert source/common/config/grpc_mux_impl.h
lambdai Feb 17, 2021
eb01989
revert source/common/grpc/typed_async_client.h
lambdai Feb 17, 2021
32381e6
revert assert on nullptr
lambdai Feb 18, 2021
19309cb
call reserve() to massage clangtidy
lambdai Feb 18, 2021
2d603b8
Merge branch 'main' into clusterdestroy
lambdai Feb 18, 2021
171b6d4
trace level and delete assert nullptr before use
lambdai Feb 23, 2021
d1bb2d5
fix format
lambdai Feb 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/envoy/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ envoy_cc_library(
hdrs = ["deferred_deletable.h"],
)

envoy_cc_library(
name = "dispatcher_thread_deletable",
hdrs = ["dispatcher_thread_deletable.h"],
)

envoy_cc_library(
name = "dispatcher_interface",
hdrs = ["dispatcher.h"],
deps = [
":deferred_deletable",
":dispatcher_thread_deletable",
":file_event_interface",
":scaled_timer",
":schedulable_cb_interface",
Expand Down
12 changes: 12 additions & 0 deletions include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "envoy/common/scope_tracker.h"
#include "envoy/common/time.h"
#include "envoy/event/dispatcher_thread_deletable.h"
#include "envoy/event/file_event.h"
#include "envoy/event/scaled_timer.h"
#include "envoy/event/schedulable_cb.h"
Expand Down Expand Up @@ -260,6 +261,12 @@ class Dispatcher : public DispatcherBase {
*/
virtual void post(PostCb callback) PURE;

/**
lambdai marked this conversation as resolved.
Show resolved Hide resolved
* Post the deletable to this dispatcher. The deletable objects are guaranteed to be destroyed on
* the dispatcher's thread before dispatcher destroy. This is safe cross thread.
lambdai marked this conversation as resolved.
Show resolved Hide resolved
*/
virtual void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) PURE;

/**
* Runs the event loop. This will not return until exit() is called either from within a callback
* or from a different thread.
Expand Down Expand Up @@ -287,6 +294,11 @@ class Dispatcher : public DispatcherBase {
* Updates approximate monotonic time to current value.
*/
virtual void updateApproximateMonotonicTime() PURE;

/**
* Shutdown the dispatcher by clear dispatcher thread deletable.
lambdai marked this conversation as resolved.
Show resolved Hide resolved
*/
virtual void shutdown() PURE;
};

using DispatcherPtr = std::unique_ptr<Dispatcher>;
Expand Down
21 changes: 21 additions & 0 deletions include/envoy/event/dispatcher_thread_deletable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#pragma once

#include <memory>

namespace Envoy {
namespace Event {

/**
* If an object derives from this class, it can be passed to the destination dispatcher who
* guarantees to delete it in that dispatcher thread. The common use case is to ensure config
* related objects are deleted in the main thread.
*/
class DispatcherThreadDeletable {
public:
virtual ~DispatcherThreadDeletable() = default;
};

using DispatcherThreadDeletableConstPtr = std::unique_ptr<const DispatcherThreadDeletable>;

} // namespace Event
} // namespace Envoy
4 changes: 3 additions & 1 deletion source/common/config/grpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,

bool grpcStreamAvailable() const { return stream_ != nullptr; }

void sendMessage(const RequestProto& request) { stream_->sendMessage(request, false); }
void sendMessage(const RequestProto& request) {
stream_->sendMessage(request, false);
}

// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override {
Expand Down
69 changes: 67 additions & 2 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
? watermark_factory
: std::make_shared<Buffer::WatermarkBufferFactory>()),
scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)),
thread_local_delete_cb_(
base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })),
deferred_delete_cb_(base_scheduler_.createSchedulableCallback(
[this]() -> void { clearDeferredDeleteList(); })),
post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
Expand All @@ -74,7 +76,12 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this));
}

DispatcherImpl::~DispatcherImpl() { FatalErrorHandler::removeFatalErrorHandler(*this); }
DispatcherImpl::~DispatcherImpl() {
ENVOY_LOG(debug, "destroying dispatcher {}", name_);
FatalErrorHandler::removeFatalErrorHandler(*this);
// TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and enable
// ASSERT(deletable_in_dispatcher_thread_.empty())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ASSERT doesn't make sense for dispatchers that are deleted without calling shutdown first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. But we should enforce the shutdown(). Otherwise, a benign deferred deleted could insert new dispatch thread local obj to dispatcher, and vice versa. In ~dispatcher the obj list are destroyed instead of clear in shutdown.

We don't see the above failure because the connection-host-cluster-sslctx destroy chain is straightforward. IHMO dispatcher should call shutdown() to prevent itself

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be illegal to have objects deleted by deferred delete to schedule deferred deletions. Of course, that's not enforced today. You're uncovering a lot of issues in the dispatcher interface that will take some time to fully resolve. It may be worth documenting them in separate issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be illegal to have objects deleted by deferred delete to schedule deferred deletions.

That is too good to be true. There might be cascading deletion. I encountered this in listener (deferred) deletion and the connection attached to that listener. I almost introduced the third deferred delete in between.

I cannot resolve it for now but let me create a separate issue to track this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is for me to fix. File an issue and send it my way.

Copy link
Contributor Author

@lambdai lambdai Feb 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create #15111 to track

}

void DispatcherImpl::registerWatchdog(const Server::WatchDogSharedPtr& watchdog,
std::chrono::milliseconds min_touch_interval) {
Expand Down Expand Up @@ -265,9 +272,23 @@ void DispatcherImpl::post(std::function<void()> callback) {
}
}

void DispatcherImpl::deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) {
bool need_schedule;
{
Thread::LockGuard lock(thread_local_deletable_lock_);
need_schedule = deletables_in_dispatcher_thread_.empty();
deletables_in_dispatcher_thread_.emplace_back(std::move(deletable));
// TODO(lambdai): Enable below after https://github.com/envoyproxy/envoy/issues/15072
// ASSERT(!shutdown_called_, "inserted after shutdown");
}

if (need_schedule) {
thread_local_delete_cb_->scheduleCallbackCurrentIteration();
}
}

void DispatcherImpl::run(RunType type) {
run_tid_ = api_.threadFactory().currentThreadId();

// Flush all post callbacks before we run the event loop. We do this because there are post
// callbacks that have to get run before the initial event loop starts running. libevent does
// not guarantee that events are run in any particular order. So even if we post() and call
Expand All @@ -280,12 +301,56 @@ MonotonicTime DispatcherImpl::approximateMonotonicTime() const {
return approximate_monotonic_time_;
}

void DispatcherImpl::shutdown() {
// TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and loop delete below
// below 3 lists until all lists are empty. The 3 lists are list of deferred delete objects, post
// callbacks and dispatcher thread deletable objects.
ASSERT(isThreadSafe());
auto deferred_deletables_size = current_to_delete_->size();
lambdai marked this conversation as resolved.
Show resolved Hide resolved
std::list<std::function<void()>>::size_type post_callbacks_size;
{
Thread::LockGuard lock(post_lock_);
post_callbacks_size = post_callbacks_.size();
}

std::list<DispatcherThreadDeletableConstPtr> local_deletables;
{
Thread::LockGuard lock(thread_local_deletable_lock_);
local_deletables = std::move(deletables_in_dispatcher_thread_);
}
auto thread_local_deletables_size = local_deletables.size();
while (!local_deletables.empty()) {
local_deletables.pop_front();
}
ASSERT(!shutdown_called_);
shutdown_called_ = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to set "shutdown_called_ = true;" at the top of the shutdown method. I guess I can make that change as part of #15072

Copy link
Contributor Author

@lambdai lambdai Feb 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes or no.

We should flip shutdown_called_ to true only if we are confident no more delete/post. However, at the begining of the shutdown(), deferred delete could call deleteInDispatcherThread().

Below is the over complicated version of shutdown() IMHO

shutdown() {
  is_shutting_down =true; // so that a further reschedule is a warning but not error. 
  cycling cleaning up 3 lists
  shutdown_complete = true; // any reschedule is error!
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into it as part of the followup issues.

ENVOY_LOG(
lambdai marked this conversation as resolved.
Show resolved Hide resolved
trace,
"{} destroyed {} thread local objects. Peek {} deferred deletables, {} post callbacks. ",
__FUNCTION__, deferred_deletables_size, post_callbacks_size, thread_local_deletables_size);
}

void DispatcherImpl::updateApproximateMonotonicTime() { updateApproximateMonotonicTimeInternal(); }

void DispatcherImpl::updateApproximateMonotonicTimeInternal() {
approximate_monotonic_time_ = api_.timeSource().monotonicTime();
}

void DispatcherImpl::runThreadLocalDelete() {
std::list<DispatcherThreadDeletableConstPtr> to_be_delete;
{
Thread::LockGuard lock(thread_local_deletable_lock_);
to_be_delete = std::move(deletables_in_dispatcher_thread_);
ASSERT(deletables_in_dispatcher_thread_.empty());
}
while (!to_be_delete.empty()) {
// Touch the watchdog before deleting the objects to avoid spurious watchdog miss events when
// executing complicated destruction.
touchWatchdog();
// Delete in FIFO order.
to_be_delete.pop_front();
}
}
void DispatcherImpl::runPostCallbacks() {
// Clear the deferred delete list before running post callbacks to reduce non-determinism in
// callback processing, and more easily detect if a scheduled post callback refers to one of the
Expand Down
19 changes: 17 additions & 2 deletions source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
void exit() override;
SignalEventPtr listenForSignal(signal_t signal_num, SignalCb cb) override;
void post(std::function<void()> callback) override;
void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) override;
void run(RunType type) override;
Buffer::WatermarkFactory& getWatermarkFactory() override { return *buffer_factory_; }
void pushTrackedObject(const ScopeTrackedObject* object) override;
void popTrackedObject(const ScopeTrackedObject* expected_object) override;
MonotonicTime approximateMonotonicTime() const override;
void updateApproximateMonotonicTime() override;
void shutdown() override;
lambdai marked this conversation as resolved.
Show resolved Hide resolved

// FatalErrorInterface
void onFatalError(std::ostream& os) const override;
Expand Down Expand Up @@ -127,6 +129,8 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
TimerPtr createTimerInternal(TimerCb cb);
void updateApproximateMonotonicTimeInternal();
void runPostCallbacks();
void runThreadLocalDelete();

// Helper used to touch the watchdog after most schedulable, fd, and timer callbacks.
void touchWatchdog();

Expand All @@ -145,13 +149,24 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
Buffer::WatermarkFactorySharedPtr buffer_factory_;
LibeventScheduler base_scheduler_;
SchedulerPtr scheduler_;

SchedulableCallbackPtr thread_local_delete_cb_;
Thread::MutexBasicLockable thread_local_deletable_lock_;
// `deletables_in_dispatcher_thread` must be destroyed last to allow other callbacks populate.
std::list<DispatcherThreadDeletableConstPtr>
deletables_in_dispatcher_thread_ ABSL_GUARDED_BY(thread_local_deletable_lock_);
bool shutdown_called_{false};

SchedulableCallbackPtr deferred_delete_cb_;

SchedulableCallbackPtr post_cb_;
Thread::MutexBasicLockable post_lock_;
lambdai marked this conversation as resolved.
Show resolved Hide resolved
std::list<std::function<void()>> post_callbacks_ ABSL_GUARDED_BY(post_lock_);

std::vector<DeferredDeletablePtr> to_delete_1_;
std::vector<DeferredDeletablePtr> to_delete_2_;
std::vector<DeferredDeletablePtr>* current_to_delete_;
Thread::MutexBasicLockable post_lock_;
std::list<std::function<void()>> post_callbacks_ ABSL_GUARDED_BY(post_lock_);

absl::InlinedVector<const ScopeTrackedObject*, ExpectedMaxTrackedObjectStackDepth>
tracked_object_stack_;
bool deferred_deleting_{};
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ void AsyncStreamImpl::cleanup() {
// This will destroy us, but only do so if we are actually in a list. This does not happen in
// the immediate failure case.
if (LinkedObject<AsyncStreamImpl>::inserted()) {
ASSERT(dispatcher_->isThreadSafe());
dispatcher_->deferredDelete(
LinkedObject<AsyncStreamImpl>::removeFromList(parent_.active_streams_));
}
Expand Down
3 changes: 3 additions & 0 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ void AsyncStreamImpl::sendHeaders(RequestHeaderMap& headers, bool end_stream) {
}

void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
ASSERT(dispatcher().isThreadSafe());
// Map send calls after local closure to no-ops. The send call could have been queued prior to
// remote reset or closure, and/or closure could have occurred synchronously in response to a
// previous send. In these cases the router will have already cleaned up stream state. This
Expand All @@ -169,6 +170,7 @@ void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
}

void AsyncStreamImpl::sendTrailers(RequestTrailerMap& trailers) {
ASSERT(dispatcher().isThreadSafe());
// See explanation in sendData.
if (local_closed_) {
return;
Expand Down Expand Up @@ -226,6 +228,7 @@ void AsyncStreamImpl::reset() {
}

void AsyncStreamImpl::cleanup() {
ASSERT(dispatcher().isThreadSafe());
local_closed_ = remote_closed_ = true;
// This will destroy us, but only do so if we are actually in a list. This does not happen in
// the immediate failure case.
Expand Down
4 changes: 4 additions & 0 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ Connection::State ConnectionImpl::state() const {
void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); }

void ConnectionImpl::setTransportSocketIsReadable() {
ASSERT(dispatcher_.isThreadSafe());
// Remember that the transport requested read resumption, in case the resumption event is not
// scheduled immediately or is "lost" because read was disabled.
transport_wants_read_ = true;
Expand Down Expand Up @@ -301,6 +302,7 @@ void ConnectionImpl::noDelay(bool enable) {
}

void ConnectionImpl::onRead(uint64_t read_buffer_size) {
ASSERT(dispatcher_.isThreadSafe());
if (inDelayedClose() || !filterChainWantsData()) {
return;
}
Expand Down Expand Up @@ -420,6 +422,7 @@ void ConnectionImpl::raiseEvent(ConnectionEvent event) {
bool ConnectionImpl::readEnabled() const {
// Calls to readEnabled on a closed socket are considered to be an error.
ASSERT(state() == State::Open);
ASSERT(dispatcher_.isThreadSafe());
return read_disable_count_ == 0;
}

Expand All @@ -437,6 +440,7 @@ void ConnectionImpl::write(Buffer::Instance& data, bool end_stream) {

void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) {
ASSERT(!end_stream || enable_half_close_);
ASSERT(dispatcher_.isThreadSafe());

if (write_end_stream_) {
// It is an API violation to write more data after writing end_stream, but a duplicate
Expand Down
19 changes: 13 additions & 6 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -923,9 +923,16 @@ ClusterImplBase::ClusterImplBase(

auto socket_matcher = std::make_unique<TransportSocketMatcherImpl>(
cluster.transport_socket_matches(), factory_context, socket_factory, *stats_scope);
info_ = std::make_unique<ClusterInfoImpl>(cluster, factory_context.clusterManager().bindConfig(),
runtime, std::move(socket_matcher),
std::move(stats_scope), added_via_api, factory_context);
auto& dispatcher = factory_context.dispatcher();
info_ = std::shared_ptr<const ClusterInfoImpl>(
lambdai marked this conversation as resolved.
Show resolved Hide resolved
new ClusterInfoImpl(cluster, factory_context.clusterManager().bindConfig(), runtime,
std::move(socket_matcher), std::move(stats_scope), added_via_api,
factory_context),
[&dispatcher](const ClusterInfoImpl* self) {
ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name());
dispatcher.deleteInDispatcherThread(
std::unique_ptr<const Event::DispatcherThreadDeletable>(self));
});

if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN) &&
!raw_factory_pointer->supportsAlpn()) {
Expand Down Expand Up @@ -1117,7 +1124,7 @@ void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) {
for (size_t priority = 0; priority < host_sets.size(); ++priority) {
const auto& host_set = host_sets[priority];
// TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet?
HostVectorConstSharedPtr hosts_copy(new HostVector(host_set->hosts()));
HostVectorConstSharedPtr hosts_copy = std::make_shared<HostVector>(host_set->hosts());

HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone();
prioritySet().updateHosts(priority,
Expand Down Expand Up @@ -1308,10 +1315,10 @@ void PriorityStateManager::registerHostForPriority(
auto metadata = lb_endpoint.has_metadata()
? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata())
: nullptr;
const HostSharedPtr host(new HostImpl(
const auto host = std::make_shared<HostImpl>(
parent_.info(), hostname, address, metadata, lb_endpoint.load_balancing_weight().value(),
locality_lb_endpoint.locality(), lb_endpoint.endpoint().health_check_config(),
locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source));
locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source);
registerHostForPriority(host, locality_lb_endpoint);
}

Expand Down
4 changes: 3 additions & 1 deletion source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,9 @@ class PrioritySetImpl : public PrioritySet {
/**
* Implementation of ClusterInfo that reads from JSON.
*/
class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable<Logger::Id::upstream> {
class ClusterInfoImpl : public ClusterInfo,
public Event::DispatcherThreadDeletable,
protected Logger::Loggable<Logger::Id::upstream> {
public:
using HttpProtocolOptionsConfigImpl =
Envoy::Extensions::Upstreams::Http::ProtocolOptionsConfigImpl;
Expand Down
1 change: 1 addition & 0 deletions source/server/config_validation/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ void ValidationInstance::shutdown() {
config_.clusterManager()->shutdown();
}
thread_local_.shutdownThread();
dispatcher_->shutdown();
}

} // namespace Server
Expand Down
1 change: 1 addition & 0 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ InstanceImpl::~InstanceImpl() {
ENVOY_LOG(debug, "destroying listener manager");
listener_manager_.reset();
ENVOY_LOG(debug, "destroyed listener manager");
dispatcher_->shutdown();
}

Upstream::ClusterManager& InstanceImpl::clusterManager() {
Expand Down
1 change: 1 addition & 0 deletions source/server/worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ void WorkerImpl::threadRoutine(GuardDog& guard_dog) {
dispatcher_->run(Event::Dispatcher::RunType::Block);
ENVOY_LOG(debug, "worker exited dispatch loop");
guard_dog.stopWatching(watch_dog_);
dispatcher_->shutdown();
lambdai marked this conversation as resolved.
Show resolved Hide resolved

// We must close all active connections before we actually exit the thread. This prevents any
// destructors from running on the main thread which might reference thread locals. Destroying
Expand Down
Loading