Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dynamic cluster immediate resolution initialization bugs. #472

Merged
merged 2 commits into from
Feb 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,10 @@ ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, ClusterManage
new ThreadLocalClusterManagerImpl(*this, dispatcher, local_cluster_name)};
});

// To avoid threading issues, for those clusters that start with hosts already in them (like
// the static cluster), we need to post an update onto each thread to notify them of the update.
// To avoid threading issues, for those clusters that start with hosts already in them (like the
// static cluster), we need to post an update onto each thread to notify them of the update. We
// also require this for dynamic clusters where an immediate resolve occurred in the cluster
// constructor, prior to the member update callback being configured.
for (auto& cluster : primary_clusters_) {
postInitializeCluster(*cluster.second.cluster_);
}
Expand Down
8 changes: 6 additions & 2 deletions source/common/upstream/logical_dns_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ LogicalDnsCluster::LogicalDnsCluster(const Json::Object& config, Runtime::Loader
: ClusterImplBase(config, runtime, stats, ssl_context_manager), dns_resolver_(dns_resolver),
dns_refresh_rate_ms_(
std::chrono::milliseconds(config.getInteger("dns_refresh_rate_ms", 5000))),
tls_(tls), tls_slot_(tls.allocateSlot()),
tls_(tls), tls_slot_(tls.allocateSlot()), initialized_(false),
resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })) {

std::vector<Json::ObjectPtr> hosts_json = config.getObjectArray("hosts");
Expand All @@ -23,11 +23,14 @@ LogicalDnsCluster::LogicalDnsCluster(const Json::Object& config, Runtime::Loader
dns_url_ = hosts_json[0]->getString("url");
Network::Utility::hostFromTcpUrl(dns_url_);
Network::Utility::portFromTcpUrl(dns_url_);
startResolve();

// This must come before startResolve(), since the resolve callback relies on
// tls_slot_ being initialized.
tls.set(tls_slot_, [](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectPtr {
return ThreadLocal::ThreadLocalObjectPtr{new PerThreadCurrentHostData()};
});

startResolve();
}

LogicalDnsCluster::~LogicalDnsCluster() {
Expand Down Expand Up @@ -80,6 +83,7 @@ void LogicalDnsCluster::startResolve() {
initialize_callback_();
initialize_callback_ = nullptr;
}
initialized_ = true;

resolve_timer_->enableTimer(dns_refresh_rate_ms_);
});
Expand Down
8 changes: 7 additions & 1 deletion source/common/upstream/logical_dns_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ class LogicalDnsCluster : public ClusterImplBase {
void initialize() override {}
InitializePhase initializePhase() const override { return InitializePhase::Primary; }
void setInitializedCb(std::function<void()> callback) override {
initialize_callback_ = callback;
if (initialized_) {
callback();
} else {
initialize_callback_ = callback;
}
}

private:
Expand Down Expand Up @@ -80,6 +84,8 @@ class LogicalDnsCluster : public ClusterImplBase {
ThreadLocal::Instance& tls_;
uint32_t tls_slot_;
std::function<void()> initialize_callback_;
// Set once the first resolve completes.
bool initialized_;
Event::TimerPtr resolve_timer_;
std::string dns_url_;
Network::Address::InstancePtr current_resolved_address_;
Expand Down
12 changes: 8 additions & 4 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,12 @@ StrictDnsClusterImpl::StrictDnsClusterImpl(const Json::Object& config, Runtime::
for (Json::ObjectPtr& host : config.getObjectArray("hosts")) {
resolve_targets_.emplace_back(new ResolveTarget(*this, dispatcher, host->getString("url")));
}
// We have to first construct resolve_targets_ before invoking startResolve(),
// since startResolve() might resolve immediately and relies on
// resolve_targets_ indirectly for performing host updates on resolution.
for (const ResolveTargetPtr& target : resolve_targets_) {
target->startResolve();
}
}

void StrictDnsClusterImpl::updateAllHosts(const std::vector<HostPtr>& hosts_added,
Expand All @@ -388,10 +394,7 @@ StrictDnsClusterImpl::ResolveTarget::ResolveTarget(StrictDnsClusterImpl& parent,
const std::string& url)
: parent_(parent), dns_address_(Network::Utility::hostFromTcpUrl(url)),
port_(Network::Utility::portFromTcpUrl(url)),
resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })) {

startResolve();
}
resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })) {}

StrictDnsClusterImpl::ResolveTarget::~ResolveTarget() {
if (active_query_) {
Expand Down Expand Up @@ -436,6 +439,7 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() {
parent_.initialize_callback_();
parent_.initialize_callback_ = nullptr;
}
parent_.initialized_ = true;

resolve_timer_->enableTimer(parent_.dns_refresh_rate_ms_);
});
Expand Down
8 changes: 7 additions & 1 deletion source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ class BaseDynamicClusterImpl : public ClusterImplBase {
public:
// Upstream::Cluster
void setInitializedCb(std::function<void()> callback) override {
initialize_callback_ = callback;
if (initialized_) {
callback();
} else {
initialize_callback_ = callback;
}
}

protected:
Expand All @@ -282,6 +286,8 @@ class BaseDynamicClusterImpl : public ClusterImplBase {
std::vector<HostPtr>& hosts_removed, bool depend_on_hc);

std::function<void()> initialize_callback_;
// Set once the first resolve completes.
bool initialized_ = false;
};

/**
Expand Down
29 changes: 29 additions & 0 deletions test/common/upstream/logical_dns_cluster_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,35 @@ TEST_F(LogicalDnsClusterTest, BadConfig) {
EXPECT_THROW(setup(json), EnvoyException);
}

// Validate that if the DNS resolves immediately, during the LogicalDnsCluster
// constructor, we have the expected host state and initialization callback
// invocation.
TEST_F(LogicalDnsClusterTest, ImmediateResolve) {
std::string json = R"EOF(
{
"name": "name",
"connect_timeout_ms": 250,
"type": "logical_dns",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://foo.bar.com:443"}]
}
)EOF";

EXPECT_CALL(initialized_, ready());
EXPECT_CALL(dns_resolver_, resolve("foo.bar.com", _))
.WillOnce(Invoke([&](const std::string&, Network::DnsResolver::ResolveCb cb)
-> Network::ActiveDnsQuery& {
EXPECT_CALL(*resolve_timer_, enableTimer(_));
cb(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2"}));
return active_dns_query_;
}));
setup(json);
EXPECT_EQ(1UL, cluster_->hosts().size());
EXPECT_EQ(1UL, cluster_->healthyHosts().size());
EXPECT_CALL(active_dns_query_, cancel());
tls_.shutdownThread();
}

TEST_F(LogicalDnsClusterTest, Basic) {
std::string json = R"EOF(
{
Expand Down
35 changes: 35 additions & 0 deletions test/common/upstream/upstream_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,41 @@ struct ResolverData {
Network::MockActiveDnsQuery active_dns_query_;
};

TEST(StrictDnsClusterImplTest, ImmediateResolve) {
Stats::IsolatedStoreImpl stats;
Ssl::MockContextManager ssl_context_manager;
Network::MockActiveDnsQuery active_dns_query;
NiceMock<Network::MockDnsResolver> dns_resolver;
NiceMock<Event::MockDispatcher> dispatcher;
NiceMock<Runtime::MockLoader> runtime;
ReadyWatcher initialized;

std::string json = R"EOF(
{
"name": "name",
"connect_timeout_ms": 250,
"type": "strict_dns",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://foo.bar.com:443"}]
}
)EOF";

EXPECT_CALL(initialized, ready());
EXPECT_CALL(dns_resolver, resolve("foo.bar.com", _))
.WillOnce(Invoke([&](const std::string&, Network::DnsResolver::ResolveCb cb)
-> Network::ActiveDnsQuery& {
cb(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2"}));
return active_dns_query;
}));
Json::ObjectPtr loader = Json::Factory::LoadFromString(json);
StrictDnsClusterImpl cluster(*loader, runtime, stats, ssl_context_manager, dns_resolver,
dispatcher);
cluster.setInitializedCb([&]() -> void { initialized.ready(); });
EXPECT_EQ(2UL, cluster.hosts().size());
EXPECT_EQ(2UL, cluster.healthyHosts().size());
EXPECT_CALL(active_dns_query, cancel());
}

TEST(StrictDnsClusterImplTest, Basic) {
Stats::IsolatedStoreImpl stats;
Ssl::MockContextManager ssl_context_manager;
Expand Down
2 changes: 1 addition & 1 deletion test/config/integration/server_http2.json
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@
{
"name": "cluster_2",
"connect_timeout_ms": 250,
"type": "strict_dns",
"type": "logical_dns",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://localhost:11001"}]
},
Expand Down