Skip to content

Commit

Permalink
http: moving functions and member variables out of http1 pool (#13867)
Browse files Browse the repository at this point in the history
As part of #3431 we need to move the logic from HTTP/1 and HTTP/2 connection pools to the active client,
so the new mixed connection pool can just create an active client of the right type.

This removes member variables from the HTTP/1.1 connection pool in preparation for that move.
a follow-up PR will do the same for HTTP/2, then the two classes will be removed in favor of the base http connection
pool being created with an instantiate_active_client_ closure which creates the right type of clients.

The alpn pool will eventually create an active client of the right type based on initial ALPN.

Signed-off-by: Alyssa Wilk <[email protected]>
  • Loading branch information
alyssawilk authored Nov 4, 2020
1 parent 7620e7d commit 91638e6
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 82 deletions.
3 changes: 2 additions & 1 deletion source/common/conn_pool/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {

virtual ConnectionPool::Cancellable* newPendingStream(AttachContext& context) PURE;

void attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client, AttachContext& context);
virtual void attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client,
AttachContext& context);

virtual void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
absl::string_view failure_reason,
Expand Down
5 changes: 3 additions & 2 deletions source/common/http/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ HttpConnPoolImplBase::HttpConnPoolImplBase(
Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options,
Http::Protocol protocol)
Random::RandomGenerator& random_generator, Http::Protocol protocol)
: Envoy::ConnectionPool::ConnPoolImplBase(
host, priority, dispatcher, options,
wrapTransportSocketOptions(transport_socket_options, protocol)) {}
wrapTransportSocketOptions(transport_socket_options, protocol)),
random_generator_(random_generator), protocol_(protocol) {}

ConnectionPool::Cancellable*
HttpConnPoolImplBase::newStream(Http::ResponseDecoder& response_decoder,
Expand Down
19 changes: 16 additions & 3 deletions source/common/http/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
Event::Dispatcher& dispatcher,
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options,
Http::Protocol protocol);
Random::RandomGenerator& random_generator, Http::Protocol protocol);

// ConnectionPool::Instance
void addDrainedCallback(DrainedCb cb) override { addDrainedCallbackImpl(cb); }
Expand All @@ -55,6 +55,7 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
return Envoy::ConnectionPool::ConnPoolImplBase::maybePrefetch(ratio);
}
bool hasActiveConnections() const override;
Http::Protocol protocol() const override { return protocol_; }

// Creates a new PendingStream and enqueues it into the queue.
ConnectionPool::Cancellable*
Expand All @@ -69,6 +70,10 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
Envoy::ConnectionPool::AttachContext& context) override;

virtual CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) PURE;

protected:
Random::RandomGenerator& random_generator_;
Http::Protocol protocol_;
};

// An implementation of Envoy::ConnectionPool::ActiveClient for HTTP/1.1 and HTTP/2
Expand All @@ -78,8 +83,15 @@ class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
uint64_t concurrent_stream_limit)
: Envoy::ConnectionPool::ActiveClient(parent, lifetime_stream_limit,
concurrent_stream_limit) {
Upstream::Host::CreateConnectionData data = parent_.host()->createConnection(
parent_.dispatcher(), parent_.socketOptions(), parent_.transportSocketOptions());
// The static cast makes sure we call the base class host() and not
// HttpConnPoolImplBase::host which is of a different type.
Upstream::Host::CreateConnectionData data =
static_cast<Envoy::ConnectionPool::ConnPoolImplBase*>(&parent)->host()->createConnection(
parent.dispatcher(), parent.socketOptions(), parent.transportSocketOptions());
initialize(data, parent);
}

void initialize(Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase& parent) {
real_host_description_ = data.host_description_;
codec_client_ = parent.createCodecClient(data);
codec_client_->addConnectionCallbacks(*this);
Expand All @@ -90,6 +102,7 @@ class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
parent_.host()->cluster().stats().upstream_cx_tx_bytes_buffered_,
&parent_.host()->cluster().stats().bind_errors_, nullptr});
}

void close() override { codec_client_->close(); }
virtual Http::RequestEncoder& newStreamEncoder(Http::ResponseDecoder& response_decoder) PURE;
void onEvent(Network::ConnectionEvent event) override {
Expand Down
55 changes: 20 additions & 35 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,45 +28,14 @@ ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Random::RandomGenerato
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options)
: HttpConnPoolImplBase(std::move(host), std::move(priority), dispatcher, options,
transport_socket_options, Protocol::Http11),
upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() {
upstream_ready_enabled_ = false;
onUpstreamReady();
})),
random_generator_(random_generator) {}
transport_socket_options, random_generator, Protocol::Http11) {}

ConnPoolImpl::~ConnPoolImpl() { destructAllConnections(); }

Envoy::ConnectionPool::ActiveClientPtr ConnPoolImpl::instantiateActiveClient() {
return std::make_unique<ActiveClient>(*this);
}

void ConnPoolImpl::onDownstreamReset(ActiveClient& client) {
// If we get a downstream reset to an attached client, we just blow it away.
client.codec_client_->close();
}

void ConnPoolImpl::onResponseComplete(ActiveClient& client) {
ENVOY_CONN_LOG(debug, "response complete", *client.codec_client_);

if (!client.stream_wrapper_->encode_complete_) {
ENVOY_CONN_LOG(debug, "response before request complete", *client.codec_client_);
onDownstreamReset(client);
} else if (client.stream_wrapper_->close_connection_ || client.codec_client_->remoteClosed()) {
ENVOY_CONN_LOG(debug, "saw upstream close connection", *client.codec_client_);
onDownstreamReset(client);
} else {
client.stream_wrapper_.reset();

if (!pending_streams_.empty() && !upstream_ready_enabled_) {
upstream_ready_enabled_ = true;
upstream_ready_cb_->scheduleCallbackCurrentIteration();
}

checkForDrained();
}
}

ConnPoolImpl::StreamWrapper::StreamWrapper(ResponseDecoder& response_decoder, ActiveClient& parent)
: RequestEncoderWrapper(parent.codec_client_->newStream(*this)),
ResponseDecoderWrapper(response_decoder), parent_(parent) {
Expand All @@ -87,7 +56,7 @@ void ConnPoolImpl::StreamWrapper::decodeHeaders(ResponseHeaderMapPtr&& headers,
close_connection_ =
HeaderUtility::shouldCloseConnection(parent_.codec_client_->protocol(), *headers);
if (close_connection_) {
parent_.parent_.host()->cluster().stats().upstream_cx_close_notify_.inc();
parent_.parent().host()->cluster().stats().upstream_cx_close_notify_.inc();
}
} else {
// If Connection: close OR
Expand All @@ -100,16 +69,32 @@ void ConnPoolImpl::StreamWrapper::decodeHeaders(ResponseHeaderMapPtr&& headers,
Headers::get().ConnectionValues.KeepAlive)) ||
(absl::EqualsIgnoreCase(headers->getProxyConnectionValue(),
Headers::get().ConnectionValues.Close))) {
parent_.parent_.host()->cluster().stats().upstream_cx_close_notify_.inc();
parent_.parent().host()->cluster().stats().upstream_cx_close_notify_.inc();
close_connection_ = true;
}
}
ResponseDecoderWrapper::decodeHeaders(std::move(headers), end_stream);
}

void ConnPoolImpl::StreamWrapper::onDecodeComplete() {
ASSERT(!decode_complete_);
decode_complete_ = encode_complete_;
parent_.parent().onResponseComplete(parent_);

ENVOY_CONN_LOG(debug, "response complete", *parent_.codec_client_);

if (!parent_.stream_wrapper_->encode_complete_) {
ENVOY_CONN_LOG(debug, "response before request complete", *parent_.codec_client_);
parent_.codec_client_->close();
} else if (parent_.stream_wrapper_->close_connection_ || parent_.codec_client_->remoteClosed()) {
ENVOY_CONN_LOG(debug, "saw upstream close connection", *parent_.codec_client_);
parent_.codec_client_->close();
} else {
auto* pool = &parent_.parent();
pool->dispatcher_.post([pool]() -> void { pool->onUpstreamReady(); });
parent_.stream_wrapper_.reset();

pool->checkForDrained();
}
}

ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent)
Expand Down
14 changes: 2 additions & 12 deletions source/common/http/http1/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ class ConnPoolImpl : public Http::HttpConnPoolImplBase {

~ConnPoolImpl() override;

// ConnectionPool::Instance
Http::Protocol protocol() const override { return Http::Protocol::Http11; }

// ConnPoolImplBase
Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override;

Expand All @@ -51,7 +48,7 @@ class ConnPoolImpl : public Http::HttpConnPoolImplBase {

// Http::StreamCallbacks
void onResetStream(StreamResetReason, absl::string_view) override {
parent_.parent().onDownstreamReset(parent_);
parent_.codec_client_->close();
}
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}
Expand All @@ -68,21 +65,14 @@ class ConnPoolImpl : public Http::HttpConnPoolImplBase {
public:
ActiveClient(ConnPoolImpl& parent);

ConnPoolImpl& parent() { return static_cast<ConnPoolImpl&>(parent_); }
ConnPoolImpl& parent() { return *static_cast<ConnPoolImpl*>(&parent_); }

// ConnPoolImplBase::ActiveClient
bool closingWithIncompleteStream() const override;
RequestEncoder& newStreamEncoder(ResponseDecoder& response_decoder) override;

StreamWrapperPtr stream_wrapper_;
};

void onDownstreamReset(ActiveClient& client);
void onResponseComplete(ActiveClient& client);

Event::SchedulableCallbackPtr upstream_ready_cb_;
bool upstream_ready_enabled_{false};
Random::RandomGenerator& random_generator_;
};

/**
Expand Down
3 changes: 1 addition & 2 deletions source/common/http/http2/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Random::RandomGenerato
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options)
: HttpConnPoolImplBase(std::move(host), std::move(priority), dispatcher, options,
transport_socket_options, Protocol::Http2),
random_generator_(random_generator) {}
transport_socket_options, random_generator, Protocol::Http2) {}

ConnPoolImpl::~ConnPoolImpl() { destructAllConnections(); }

Expand Down
5 changes: 0 additions & 5 deletions source/common/http/http2/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ class ConnPoolImpl : public Envoy::Http::HttpConnPoolImplBase {

~ConnPoolImpl() override;

// Http::ConnectionPool::Instance
Http::Protocol protocol() const override { return Http::Protocol::Http2; }

// ConnPoolImplBase
Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override;

Expand Down Expand Up @@ -55,8 +52,6 @@ class ConnPoolImpl : public Envoy::Http::HttpConnPoolImplBase {

bool closed_with_active_rq_{};
};

Random::RandomGenerator& random_generator_;
};

/**
Expand Down
33 changes: 11 additions & 22 deletions test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ namespace {
class ConnPoolImplForTest : public ConnPoolImpl {
public:
ConnPoolImplForTest(Event::MockDispatcher& dispatcher,
Upstream::ClusterInfoConstSharedPtr cluster,
NiceMock<Event::MockSchedulableCallback>* upstream_ready_cb)
Upstream::ClusterInfoConstSharedPtr cluster)
: ConnPoolImpl(dispatcher, random_, Upstream::makeTestHost(cluster, "tcp://127.0.0.1:9000"),
Upstream::ResourcePriority::Default, nullptr, nullptr),
api_(Api::createApiForTest()), mock_dispatcher_(dispatcher),
mock_upstream_ready_cb_(upstream_ready_cb) {}
api_(Api::createApiForTest()), mock_dispatcher_(dispatcher) {}

~ConnPoolImplForTest() override {
EXPECT_EQ(0U, ready_clients_.size());
Expand Down Expand Up @@ -111,22 +109,15 @@ class ConnPoolImplForTest : public ConnPoolImpl {
}

void expectEnableUpstreamReady() {
EXPECT_FALSE(upstream_ready_enabled_);
EXPECT_CALL(*mock_upstream_ready_cb_, scheduleCallbackCurrentIteration())
.Times(1)
.RetiresOnSaturation();
EXPECT_CALL(mock_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb_));
}

void expectAndRunUpstreamReady() {
EXPECT_TRUE(upstream_ready_enabled_);
mock_upstream_ready_cb_->invokeCallback();
EXPECT_FALSE(upstream_ready_enabled_);
}
void expectAndRunUpstreamReady() { post_cb_(); }

Api::ApiPtr api_;
Event::MockDispatcher& mock_dispatcher_;
NiceMock<Random::MockRandomGenerator> random_;
NiceMock<Event::MockSchedulableCallback>* mock_upstream_ready_cb_;
Event::PostCb post_cb_;
std::vector<TestCodecClient> test_clients_;
};

Expand All @@ -136,17 +127,14 @@ class ConnPoolImplForTest : public ConnPoolImpl {
class Http1ConnPoolImplTest : public testing::Test {
public:
Http1ConnPoolImplTest()
: upstream_ready_cb_(new NiceMock<Event::MockSchedulableCallback>(&dispatcher_)),
conn_pool_(
std::make_unique<ConnPoolImplForTest>(dispatcher_, cluster_, upstream_ready_cb_)) {}
: conn_pool_(std::make_unique<ConnPoolImplForTest>(dispatcher_, cluster_)) {}

~Http1ConnPoolImplTest() override {
EXPECT_EQ("", TestUtility::nonZeroedGauges(cluster_->stats_store_.gauges()));
}

NiceMock<Event::MockDispatcher> dispatcher_;
std::shared_ptr<Upstream::MockClusterInfo> cluster_{new NiceMock<Upstream::MockClusterInfo>()};
NiceMock<Event::MockSchedulableCallback>* upstream_ready_cb_;
std::unique_ptr<ConnPoolImplForTest> conn_pool_;
NiceMock<Runtime::MockLoader> runtime_;
};
Expand Down Expand Up @@ -291,11 +279,9 @@ TEST_F(Http1ConnPoolImplTest, VerifyAlpnFallback) {
cluster_->transport_socket_matcher_ =
std::make_unique<NiceMock<Upstream::MockTransportSocketMatcher>>(std::move(factory));

new NiceMock<Event::MockSchedulableCallback>(&dispatcher_);

// Recreate the conn pool so that the host re-evaluates the transport socket match, arriving at
// our test transport socket factory.
conn_pool_ = std::make_unique<ConnPoolImplForTest>(dispatcher_, cluster_, upstream_ready_cb_);
conn_pool_ = std::make_unique<ConnPoolImplForTest>(dispatcher_, cluster_);
NiceMock<MockResponseDecoder> outer_decoder;
ConnPoolCallbacks callbacks;
conn_pool_->expectClientCreate(Protocol::Http11);
Expand Down Expand Up @@ -646,6 +632,7 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
inner_decoder->decodeHeaders(std::move(response_headers), true);

conn_pool_->expectAndRunUpstreamReady();
conn_pool_->expectEnableUpstreamReady();
callbacks2.outer_encoder_->encodeHeaders(
TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true);
// N.B. clang_tidy insists that we use std::make_unique which can not infer std::initialize_list.
Expand Down Expand Up @@ -693,7 +680,6 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) {

// Finishing request 1 will schedule binding the connection to request 2.
conn_pool_->expectEnableUpstreamReady();

callbacks.outer_encoder_->encodeHeaders(
TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true);
Http::ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
Expand All @@ -712,6 +698,7 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) {
EXPECT_CALL(callbacks2.pool_ready_, ready());
conn_pool_->test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected);

conn_pool_->expectEnableUpstreamReady();
callbacks2.outer_encoder_->encodeHeaders(
TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true);
// N.B. clang_tidy insists that we use std::make_unique which can not infer std::initialize_list.
Expand Down Expand Up @@ -959,7 +946,9 @@ TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) {
r3.startRequest();
EXPECT_EQ(3U, cluster_->stats_.upstream_rq_total_.value());

conn_pool_->expectEnableUpstreamReady();
r2.completeResponse(false);
conn_pool_->expectEnableUpstreamReady();
r3.completeResponse(false);

// Disconnect both clients.
Expand Down

0 comments on commit 91638e6

Please sign in to comment.