Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: moving functions and member variables out of http1 pool #13867

Merged
merged 4 commits into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion source/common/conn_pool/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {

virtual ConnectionPool::Cancellable* newPendingStream(AttachContext& context) PURE;

void attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client, AttachContext& context);
virtual void attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client,
AttachContext& context);

virtual void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
absl::string_view failure_reason,
Expand Down
5 changes: 3 additions & 2 deletions source/common/http/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ HttpConnPoolImplBase::HttpConnPoolImplBase(
Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options,
Http::Protocol protocol)
Random::RandomGenerator& random_generator, Http::Protocol protocol)
: Envoy::ConnectionPool::ConnPoolImplBase(
host, priority, dispatcher, options,
wrapTransportSocketOptions(transport_socket_options, protocol)) {}
wrapTransportSocketOptions(transport_socket_options, protocol)),
random_generator_(random_generator), protocol_(protocol) {}

ConnectionPool::Cancellable*
HttpConnPoolImplBase::newStream(Http::ResponseDecoder& response_decoder,
Expand Down
19 changes: 16 additions & 3 deletions source/common/http/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
Event::Dispatcher& dispatcher,
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options,
Http::Protocol protocol);
Random::RandomGenerator& random_generator, Http::Protocol protocol);

// ConnectionPool::Instance
void addDrainedCallback(DrainedCb cb) override { addDrainedCallbackImpl(cb); }
Expand All @@ -55,6 +55,7 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
return Envoy::ConnectionPool::ConnPoolImplBase::maybePrefetch(ratio);
}
bool hasActiveConnections() const override;
Http::Protocol protocol() const override { return protocol_; }

// Creates a new PendingStream and enqueues it into the queue.
ConnectionPool::Cancellable*
Expand All @@ -69,6 +70,10 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
Envoy::ConnectionPool::AttachContext& context) override;

virtual CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) PURE;

protected:
Random::RandomGenerator& random_generator_;
Http::Protocol protocol_;
ggreenway marked this conversation as resolved.
Show resolved Hide resolved
};

// An implementation of Envoy::ConnectionPool::ActiveClient for HTTP/1.1 and HTTP/2
Expand All @@ -78,8 +83,15 @@ class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
uint64_t concurrent_stream_limit)
: Envoy::ConnectionPool::ActiveClient(parent, lifetime_stream_limit,
concurrent_stream_limit) {
Upstream::Host::CreateConnectionData data = parent_.host()->createConnection(
parent_.dispatcher(), parent_.socketOptions(), parent_.transportSocketOptions());
// The static cast makes sure we call the base class host() and not
// HttpConnPoolImplBase::host which is of a different type.
Upstream::Host::CreateConnectionData data =
static_cast<Envoy::ConnectionPool::ConnPoolImplBase*>(&parent)->host()->createConnection(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on with this cast? Isn't this an upcast, and thus would happen implicitly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annoyingly,
HttpConnPoolImplBase has
Upstream::HostDescriptionConstSharedPtr host() const override { return host_; }

and the base class has
const Upstream::HostConstSharedPtr& host() const { return host_; }

so without the cast it snags the wrong type and complains there's no createConnection function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yuck. Can you add a comment to that effect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah - will add it to my clean up list too - may be able to do away with it with one of the later refactors!

parent.dispatcher(), parent.socketOptions(), parent.transportSocketOptions());
initialize(data, parent);
}

void initialize(Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase& parent) {
ggreenway marked this conversation as resolved.
Show resolved Hide resolved
real_host_description_ = data.host_description_;
codec_client_ = parent.createCodecClient(data);
codec_client_->addConnectionCallbacks(*this);
Expand All @@ -90,6 +102,7 @@ class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
parent_.host()->cluster().stats().upstream_cx_tx_bytes_buffered_,
&parent_.host()->cluster().stats().bind_errors_, nullptr});
}

void close() override { codec_client_->close(); }
virtual Http::RequestEncoder& newStreamEncoder(Http::ResponseDecoder& response_decoder) PURE;
void onEvent(Network::ConnectionEvent event) override {
Expand Down
55 changes: 20 additions & 35 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,45 +28,14 @@ ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Random::RandomGenerato
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options)
: HttpConnPoolImplBase(std::move(host), std::move(priority), dispatcher, options,
transport_socket_options, Protocol::Http11),
upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() {
upstream_ready_enabled_ = false;
onUpstreamReady();
})),
random_generator_(random_generator) {}
transport_socket_options, random_generator, Protocol::Http11) {}

ConnPoolImpl::~ConnPoolImpl() { destructAllConnections(); }

Envoy::ConnectionPool::ActiveClientPtr ConnPoolImpl::instantiateActiveClient() {
return std::make_unique<ActiveClient>(*this);
}

void ConnPoolImpl::onDownstreamReset(ActiveClient& client) {
// If we get a downstream reset to an attached client, we just blow it away.
client.codec_client_->close();
}

void ConnPoolImpl::onResponseComplete(ActiveClient& client) {
ENVOY_CONN_LOG(debug, "response complete", *client.codec_client_);

if (!client.stream_wrapper_->encode_complete_) {
ENVOY_CONN_LOG(debug, "response before request complete", *client.codec_client_);
onDownstreamReset(client);
} else if (client.stream_wrapper_->close_connection_ || client.codec_client_->remoteClosed()) {
ENVOY_CONN_LOG(debug, "saw upstream close connection", *client.codec_client_);
onDownstreamReset(client);
} else {
client.stream_wrapper_.reset();

if (!pending_streams_.empty() && !upstream_ready_enabled_) {
upstream_ready_enabled_ = true;
upstream_ready_cb_->scheduleCallbackCurrentIteration();
}

checkForDrained();
}
}

ConnPoolImpl::StreamWrapper::StreamWrapper(ResponseDecoder& response_decoder, ActiveClient& parent)
: RequestEncoderWrapper(parent.codec_client_->newStream(*this)),
ResponseDecoderWrapper(response_decoder), parent_(parent) {
Expand All @@ -87,7 +56,7 @@ void ConnPoolImpl::StreamWrapper::decodeHeaders(ResponseHeaderMapPtr&& headers,
close_connection_ =
HeaderUtility::shouldCloseConnection(parent_.codec_client_->protocol(), *headers);
if (close_connection_) {
parent_.parent_.host()->cluster().stats().upstream_cx_close_notify_.inc();
parent_.parent().host()->cluster().stats().upstream_cx_close_notify_.inc();
}
} else {
// If Connection: close OR
Expand All @@ -100,16 +69,32 @@ void ConnPoolImpl::StreamWrapper::decodeHeaders(ResponseHeaderMapPtr&& headers,
Headers::get().ConnectionValues.KeepAlive)) ||
(absl::EqualsIgnoreCase(headers->getProxyConnectionValue(),
Headers::get().ConnectionValues.Close))) {
parent_.parent_.host()->cluster().stats().upstream_cx_close_notify_.inc();
parent_.parent().host()->cluster().stats().upstream_cx_close_notify_.inc();
close_connection_ = true;
}
}
ResponseDecoderWrapper::decodeHeaders(std::move(headers), end_stream);
}

void ConnPoolImpl::StreamWrapper::onDecodeComplete() {
ASSERT(!decode_complete_);
decode_complete_ = encode_complete_;
parent_.parent().onResponseComplete(parent_);

ENVOY_CONN_LOG(debug, "response complete", *parent_.codec_client_);

if (!parent_.stream_wrapper_->encode_complete_) {
ENVOY_CONN_LOG(debug, "response before request complete", *parent_.codec_client_);
parent_.codec_client_->close();
} else if (parent_.stream_wrapper_->close_connection_ || parent_.codec_client_->remoteClosed()) {
ENVOY_CONN_LOG(debug, "saw upstream close connection", *parent_.codec_client_);
parent_.codec_client_->close();
} else {
auto* pool = &parent_.parent();
pool->dispatcher_.post([pool]() -> void { pool->onUpstreamReady(); });
parent_.stream_wrapper_.reset();

pool->checkForDrained();
}
}

ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent)
Expand Down
14 changes: 2 additions & 12 deletions source/common/http/http1/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ class ConnPoolImpl : public Http::HttpConnPoolImplBase {

~ConnPoolImpl() override;

// ConnectionPool::Instance
Http::Protocol protocol() const override { return Http::Protocol::Http11; }

// ConnPoolImplBase
Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override;

Expand All @@ -51,7 +48,7 @@ class ConnPoolImpl : public Http::HttpConnPoolImplBase {

// Http::StreamCallbacks
void onResetStream(StreamResetReason, absl::string_view) override {
parent_.parent().onDownstreamReset(parent_);
parent_.codec_client_->close();
}
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}
Expand All @@ -68,21 +65,14 @@ class ConnPoolImpl : public Http::HttpConnPoolImplBase {
public:
ActiveClient(ConnPoolImpl& parent);

ConnPoolImpl& parent() { return static_cast<ConnPoolImpl&>(parent_); }
ConnPoolImpl& parent() { return *static_cast<ConnPoolImpl*>(&parent_); }

// ConnPoolImplBase::ActiveClient
bool closingWithIncompleteStream() const override;
RequestEncoder& newStreamEncoder(ResponseDecoder& response_decoder) override;

StreamWrapperPtr stream_wrapper_;
};

void onDownstreamReset(ActiveClient& client);
void onResponseComplete(ActiveClient& client);

Event::SchedulableCallbackPtr upstream_ready_cb_;
bool upstream_ready_enabled_{false};
Random::RandomGenerator& random_generator_;
};

/**
Expand Down
3 changes: 1 addition & 2 deletions source/common/http/http2/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Random::RandomGenerato
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options)
: HttpConnPoolImplBase(std::move(host), std::move(priority), dispatcher, options,
transport_socket_options, Protocol::Http2),
random_generator_(random_generator) {}
transport_socket_options, random_generator, Protocol::Http2) {}

ConnPoolImpl::~ConnPoolImpl() { destructAllConnections(); }

Expand Down
5 changes: 0 additions & 5 deletions source/common/http/http2/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ class ConnPoolImpl : public Envoy::Http::HttpConnPoolImplBase {

~ConnPoolImpl() override;

// Http::ConnectionPool::Instance
Http::Protocol protocol() const override { return Http::Protocol::Http2; }

// ConnPoolImplBase
Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override;

Expand Down Expand Up @@ -55,8 +52,6 @@ class ConnPoolImpl : public Envoy::Http::HttpConnPoolImplBase {

bool closed_with_active_rq_{};
};

Random::RandomGenerator& random_generator_;
};

/**
Expand Down
33 changes: 11 additions & 22 deletions test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ namespace {
class ConnPoolImplForTest : public ConnPoolImpl {
public:
ConnPoolImplForTest(Event::MockDispatcher& dispatcher,
Upstream::ClusterInfoConstSharedPtr cluster,
NiceMock<Event::MockSchedulableCallback>* upstream_ready_cb)
Upstream::ClusterInfoConstSharedPtr cluster)
: ConnPoolImpl(dispatcher, random_, Upstream::makeTestHost(cluster, "tcp://127.0.0.1:9000"),
Upstream::ResourcePriority::Default, nullptr, nullptr),
api_(Api::createApiForTest()), mock_dispatcher_(dispatcher),
mock_upstream_ready_cb_(upstream_ready_cb) {}
api_(Api::createApiForTest()), mock_dispatcher_(dispatcher) {}

~ConnPoolImplForTest() override {
EXPECT_EQ(0U, ready_clients_.size());
Expand Down Expand Up @@ -111,22 +109,15 @@ class ConnPoolImplForTest : public ConnPoolImpl {
}

void expectEnableUpstreamReady() {
EXPECT_FALSE(upstream_ready_enabled_);
EXPECT_CALL(*mock_upstream_ready_cb_, scheduleCallbackCurrentIteration())
.Times(1)
.RetiresOnSaturation();
EXPECT_CALL(mock_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb_));
}

void expectAndRunUpstreamReady() {
EXPECT_TRUE(upstream_ready_enabled_);
mock_upstream_ready_cb_->invokeCallback();
EXPECT_FALSE(upstream_ready_enabled_);
}
void expectAndRunUpstreamReady() { post_cb_(); }

Api::ApiPtr api_;
Event::MockDispatcher& mock_dispatcher_;
NiceMock<Random::MockRandomGenerator> random_;
NiceMock<Event::MockSchedulableCallback>* mock_upstream_ready_cb_;
Event::PostCb post_cb_;
std::vector<TestCodecClient> test_clients_;
};

Expand All @@ -136,17 +127,14 @@ class ConnPoolImplForTest : public ConnPoolImpl {
class Http1ConnPoolImplTest : public testing::Test {
public:
Http1ConnPoolImplTest()
: upstream_ready_cb_(new NiceMock<Event::MockSchedulableCallback>(&dispatcher_)),
conn_pool_(
std::make_unique<ConnPoolImplForTest>(dispatcher_, cluster_, upstream_ready_cb_)) {}
: conn_pool_(std::make_unique<ConnPoolImplForTest>(dispatcher_, cluster_)) {}

~Http1ConnPoolImplTest() override {
EXPECT_EQ("", TestUtility::nonZeroedGauges(cluster_->stats_store_.gauges()));
}

NiceMock<Event::MockDispatcher> dispatcher_;
std::shared_ptr<Upstream::MockClusterInfo> cluster_{new NiceMock<Upstream::MockClusterInfo>()};
NiceMock<Event::MockSchedulableCallback>* upstream_ready_cb_;
std::unique_ptr<ConnPoolImplForTest> conn_pool_;
NiceMock<Runtime::MockLoader> runtime_;
};
Expand Down Expand Up @@ -291,11 +279,9 @@ TEST_F(Http1ConnPoolImplTest, VerifyAlpnFallback) {
cluster_->transport_socket_matcher_ =
std::make_unique<NiceMock<Upstream::MockTransportSocketMatcher>>(std::move(factory));

new NiceMock<Event::MockSchedulableCallback>(&dispatcher_);

// Recreate the conn pool so that the host re-evaluates the transport socket match, arriving at
// our test transport socket factory.
conn_pool_ = std::make_unique<ConnPoolImplForTest>(dispatcher_, cluster_, upstream_ready_cb_);
conn_pool_ = std::make_unique<ConnPoolImplForTest>(dispatcher_, cluster_);
NiceMock<MockResponseDecoder> outer_decoder;
ConnPoolCallbacks callbacks;
conn_pool_->expectClientCreate(Protocol::Http11);
Expand Down Expand Up @@ -646,6 +632,7 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
inner_decoder->decodeHeaders(std::move(response_headers), true);

conn_pool_->expectAndRunUpstreamReady();
conn_pool_->expectEnableUpstreamReady();
callbacks2.outer_encoder_->encodeHeaders(
TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true);
// N.B. clang_tidy insists that we use std::make_unique which can not infer std::initialize_list.
Expand Down Expand Up @@ -693,7 +680,6 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) {

// Finishing request 1 will schedule binding the connection to request 2.
conn_pool_->expectEnableUpstreamReady();

callbacks.outer_encoder_->encodeHeaders(
TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true);
Http::ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
Expand All @@ -712,6 +698,7 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) {
EXPECT_CALL(callbacks2.pool_ready_, ready());
conn_pool_->test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected);

conn_pool_->expectEnableUpstreamReady();
callbacks2.outer_encoder_->encodeHeaders(
TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true);
// N.B. clang_tidy insists that we use std::make_unique which can not infer std::initialize_list.
Expand Down Expand Up @@ -959,7 +946,9 @@ TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) {
r3.startRequest();
EXPECT_EQ(3U, cluster_->stats_.upstream_rq_total_.value());

conn_pool_->expectEnableUpstreamReady();
r2.completeResponse(false);
conn_pool_->expectEnableUpstreamReady();
r3.completeResponse(false);

// Disconnect both clients.
Expand Down