Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: removing the http1 and http2 connection pools #13967

Merged
merged 3 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions source/common/http/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ HttpConnPoolImplBase::HttpConnPoolImplBase(
}
}

HttpConnPoolImplBase::~HttpConnPoolImplBase() { destructAllConnections(); }

ConnectionPool::Cancellable*
HttpConnPoolImplBase::newStream(Http::ResponseDecoder& response_decoder,
Http::ConnectionPool::Callbacks& callbacks) {
Expand Down
47 changes: 45 additions & 2 deletions source/common/http/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,15 @@ class HttpPendingStream : public Envoy::ConnectionPool::PendingStream {
HttpAttachContext context_;
};

// An implementation of Envoy::ConnectionPool::ConnPoolImplBase for shared code
// between HTTP/1.1 and HTTP/2
class ActiveClient;

/* An implementation of Envoy::ConnectionPool::ConnPoolImplBase for shared code
* between HTTP/1.1 and HTTP/2
*
* NOTE: The connection pool does NOT do DNS resolution. It assumes it is being given a numeric IP
* address. Higher layer code should handle resolving DNS on error and creating a new pool
* bound to a different IP address.
*/
class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
public Http::ConnectionPool::Instance {
public:
Expand All @@ -45,6 +52,7 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options,
Random::RandomGenerator& random_generator,
std::vector<Http::Protocol> protocol);
~HttpConnPoolImplBase() override;

// ConnectionPool::Instance
void addDrainedCallback(DrainedCb cb) override { addDrainedCallbackImpl(cb); }
Expand All @@ -71,8 +79,10 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
Envoy::ConnectionPool::AttachContext& context) override;

virtual CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) PURE;
Random::RandomGenerator& randomGenerator() { return random_generator_; }

protected:
friend class ActiveClient;
Random::RandomGenerator& random_generator_;
Http::Protocol protocol_;
};
Expand Down Expand Up @@ -111,10 +121,43 @@ class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
}
size_t numActiveStreams() const override { return codec_client_->numActiveRequests(); }
uint64_t id() const override { return codec_client_->id(); }
HttpConnPoolImplBase& parent() { return *static_cast<HttpConnPoolImplBase*>(&parent_); }

Http::CodecClientPtr codec_client_;
};

/* An implementation of Envoy::ConnectionPool::ConnPoolImplBase for HTTP/1 and HTTP/2
*/
class FixedHttpConnPoolImpl : public HttpConnPoolImplBase {
public:
using CreateClientFn =
std::function<Envoy::ConnectionPool::ActiveClientPtr(HttpConnPoolImplBase* pool)>;
using CreateCodecFn = std::function<CodecClientPtr(Upstream::Host::CreateConnectionData& data,
HttpConnPoolImplBase* pool)>;

FixedHttpConnPoolImpl(Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
Event::Dispatcher& dispatcher,
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options,
Random::RandomGenerator& random_generator, CreateClientFn client_fn,
CreateCodecFn codec_fn, std::vector<Http::Protocol> protocol)
: HttpConnPoolImplBase(host, priority, dispatcher, options, transport_socket_options,
random_generator, protocol),
codec_fn_(codec_fn), client_fn_(client_fn) {}

CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) override {
return codec_fn_(data, this);
}

Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override {
return client_fn_(this);
}

protected:
const CreateCodecFn codec_fn_;
const CreateClientFn client_fn_;
};

} // namespace Http

} // namespace Envoy
58 changes: 26 additions & 32 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,22 @@ namespace Envoy {
namespace Http {
namespace Http1 {

ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options)
: HttpConnPoolImplBase(std::move(host), std::move(priority), dispatcher, options,
transport_socket_options, random_generator, {Protocol::Http11}) {}

ConnPoolImpl::~ConnPoolImpl() { destructAllConnections(); }

Envoy::ConnectionPool::ActiveClientPtr ConnPoolImpl::instantiateActiveClient() {
return std::make_unique<ActiveClient>(*this);
}

ConnPoolImpl::StreamWrapper::StreamWrapper(ResponseDecoder& response_decoder, ActiveClient& parent)
ActiveClient::StreamWrapper::StreamWrapper(ResponseDecoder& response_decoder, ActiveClient& parent)
: RequestEncoderWrapper(parent.codec_client_->newStream(*this)),
ResponseDecoderWrapper(response_decoder), parent_(parent) {
RequestEncoderWrapper::inner_.getStream().addCallbacks(*this);
}

ConnPoolImpl::StreamWrapper::~StreamWrapper() {
ActiveClient::StreamWrapper::~StreamWrapper() {
// Upstream connection might be closed right after response is complete. Setting delay=true
// here to attach pending requests in next dispatcher loop to handle that case.
// https://github.com/envoyproxy/envoy/issues/2715
parent_.parent().onStreamClosed(parent_, true);
}

void ConnPoolImpl::StreamWrapper::onEncodeComplete() { encode_complete_ = true; }
void ActiveClient::StreamWrapper::onEncodeComplete() { encode_complete_ = true; }

void ConnPoolImpl::StreamWrapper::decodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) {
void ActiveClient::StreamWrapper::decodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) {
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.fixed_connection_close")) {
close_connection_ =
HeaderUtility::shouldCloseConnection(parent_.codec_client_->protocol(), *headers);
Expand All @@ -76,7 +63,7 @@ void ConnPoolImpl::StreamWrapper::decodeHeaders(ResponseHeaderMapPtr&& headers,
ResponseDecoderWrapper::decodeHeaders(std::move(headers), end_stream);
}

void ConnPoolImpl::StreamWrapper::onDecodeComplete() {
void ActiveClient::StreamWrapper::onDecodeComplete() {
ASSERT(!decode_complete_);
decode_complete_ = encode_complete_;

Expand All @@ -90,44 +77,51 @@ void ConnPoolImpl::StreamWrapper::onDecodeComplete() {
parent_.codec_client_->close();
} else {
auto* pool = &parent_.parent();
pool->dispatcher_.post([pool]() -> void { pool->onUpstreamReady(); });
pool->dispatcher().post([pool]() -> void { pool->onUpstreamReady(); });
parent_.stream_wrapper_.reset();

pool->checkForDrained();
}
}

ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent)
void ActiveClient::StreamWrapper::onResetStream(StreamResetReason, absl::string_view) {
parent_.codec_client_->close();
}

ActiveClient::ActiveClient(HttpConnPoolImplBase& parent)
: Envoy::Http::ActiveClient(
parent, parent.host_->cluster().maxRequestsPerConnection(),
parent, parent.host()->cluster().maxRequestsPerConnection(),
1 // HTTP1 always has a concurrent-request-limit of 1 per connection.
) {
parent.host_->cluster().stats().upstream_cx_http1_total_.inc();
parent.host()->cluster().stats().upstream_cx_http1_total_.inc();
}

bool ConnPoolImpl::ActiveClient::closingWithIncompleteStream() const {
bool ActiveClient::closingWithIncompleteStream() const {
return (stream_wrapper_ != nullptr) && (!stream_wrapper_->decode_complete_);
}

RequestEncoder& ConnPoolImpl::ActiveClient::newStreamEncoder(ResponseDecoder& response_decoder) {
RequestEncoder& ActiveClient::newStreamEncoder(ResponseDecoder& response_decoder) {
ASSERT(!stream_wrapper_);
stream_wrapper_ = std::make_unique<StreamWrapper>(response_decoder, *this);
return *stream_wrapper_;
}

CodecClientPtr ProdConnPoolImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) {
CodecClientPtr codec{new CodecClientProd(CodecClient::Type::HTTP1, std::move(data.connection_),
data.host_description_, dispatcher_, random_generator_)};
return codec;
}

ConnectionPool::InstancePtr
allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options) {
return std::make_unique<Http::Http1::ProdConnPoolImpl>(
dispatcher, random_generator, host, priority, options, transport_socket_options);
return std::make_unique<FixedHttpConnPoolImpl>(
std::move(host), std::move(priority), dispatcher, options, transport_socket_options,
random_generator,
[](HttpConnPoolImplBase* pool) { return std::make_unique<ActiveClient>(*pool); },
[](Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase* pool) {
CodecClientPtr codec{new CodecClientProd(
CodecClient::Type::HTTP1, std::move(data.connection_), data.host_description_,
pool->dispatcher(), pool->randomGenerator())};
return codec;
},
std::vector<Protocol>{Protocol::Http11});
}

} // namespace Http1
Expand Down
54 changes: 10 additions & 44 deletions source/common/http/http1/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,20 @@ namespace Http {
namespace Http1 {

/**
* A connection pool implementation for HTTP/1.1 connections.
* NOTE: The connection pool does NOT do DNS resolution. It assumes it is being given a numeric IP
* address. Higher layer code should handle resolving DNS on error and creating a new pool
* bound to a different IP address.
* An active client for HTTP/1.1 connections.
*/
class ConnPoolImpl : public Http::HttpConnPoolImplBase {
class ActiveClient : public Envoy::Http::ActiveClient {
public:
ConnPoolImpl(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options);
ActiveClient(HttpConnPoolImplBase& parent);

~ConnPoolImpl() override;

// ConnPoolImplBase
Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override;

protected:
class ActiveClient;
// ConnPoolImplBase::ActiveClient
bool closingWithIncompleteStream() const override;
RequestEncoder& newStreamEncoder(ResponseDecoder& response_decoder) override;

struct StreamWrapper : public RequestEncoderWrapper,
public ResponseDecoderWrapper,
public StreamCallbacks {
public StreamCallbacks,
protected Logger::Loggable<Logger::Id::pool> {
StreamWrapper(ResponseDecoder& response_decoder, ActiveClient& parent);
~StreamWrapper() override;

Expand All @@ -47,9 +38,7 @@ class ConnPoolImpl : public Http::HttpConnPoolImplBase {
void onDecodeComplete() override;

// Http::StreamCallbacks
void onResetStream(StreamResetReason, absl::string_view) override {
parent_.codec_client_->close();
}
void onResetStream(StreamResetReason, absl::string_view) override;
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}

Expand All @@ -58,32 +47,9 @@ class ConnPoolImpl : public Http::HttpConnPoolImplBase {
bool close_connection_{};
bool decode_complete_{};
};

using StreamWrapperPtr = std::unique_ptr<StreamWrapper>;

class ActiveClient : public Envoy::Http::ActiveClient {
public:
ActiveClient(ConnPoolImpl& parent);

ConnPoolImpl& parent() { return *static_cast<ConnPoolImpl*>(&parent_); }

// ConnPoolImplBase::ActiveClient
bool closingWithIncompleteStream() const override;
RequestEncoder& newStreamEncoder(ResponseDecoder& response_decoder) override;

StreamWrapperPtr stream_wrapper_;
};
};

/**
* Production implementation of the ConnPoolImpl.
*/
class ProdConnPoolImpl : public ConnPoolImpl {
public:
using ConnPoolImpl::ConnPoolImpl;

// ConnPoolImpl
CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) override;
StreamWrapperPtr stream_wrapper_;
};

ConnectionPool::InstancePtr
Expand Down
45 changes: 16 additions & 29 deletions source/common/http/http2/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,7 @@ namespace Http2 {
// side we do 2^29.
static const uint64_t DEFAULT_MAX_STREAMS = (1 << 29);

ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options)
: HttpConnPoolImplBase(std::move(host), std::move(priority), dispatcher, options,
transport_socket_options, random_generator, {Protocol::Http2}) {}

ConnPoolImpl::~ConnPoolImpl() { destructAllConnections(); }

Envoy::ConnectionPool::ActiveClientPtr ConnPoolImpl::instantiateActiveClient() {
return std::make_unique<ActiveClient>(*this);
}

void ConnPoolImpl::ActiveClient::onGoAway(Http::GoAwayErrorCode) {
void ActiveClient::onGoAway(Http::GoAwayErrorCode) {
ENVOY_CONN_LOG(debug, "remote goaway", *codec_client_);
parent_.host()->cluster().stats().upstream_cx_close_notify_.inc();
if (state_ != ActiveClient::State::DRAINING) {
Expand All @@ -41,7 +28,7 @@ void ConnPoolImpl::ActiveClient::onGoAway(Http::GoAwayErrorCode) {
}
}

void ConnPoolImpl::ActiveClient::onStreamDestroy() {
void ActiveClient::onStreamDestroy() {
parent().onStreamClosed(*this, false);

// If we are destroying this stream because of a disconnect, do not check for drain here. We will
Expand All @@ -52,7 +39,7 @@ void ConnPoolImpl::ActiveClient::onStreamDestroy() {
}
}

void ConnPoolImpl::ActiveClient::onStreamReset(Http::StreamResetReason reason) {
void ActiveClient::onStreamReset(Http::StreamResetReason reason) {
if (reason == StreamResetReason::ConnectionTermination ||
reason == StreamResetReason::ConnectionFailure) {
parent_.host()->cluster().stats().upstream_rq_pending_failure_eject_.inc();
Expand All @@ -68,7 +55,7 @@ uint64_t maxStreamsPerConnection(uint64_t max_streams_config) {
return (max_streams_config != 0) ? max_streams_config : DEFAULT_MAX_STREAMS;
}

ConnPoolImpl::ActiveClient::ActiveClient(Envoy::Http::HttpConnPoolImplBase& parent)
ActiveClient::ActiveClient(HttpConnPoolImplBase& parent)
: Envoy::Http::ActiveClient(
parent, maxStreamsPerConnection(parent.host()->cluster().maxRequestsPerConnection()),
parent.host()->cluster().http2Options().max_concurrent_streams().value()) {
Expand All @@ -77,27 +64,27 @@ ConnPoolImpl::ActiveClient::ActiveClient(Envoy::Http::HttpConnPoolImplBase& pare
parent.host()->cluster().stats().upstream_cx_http2_total_.inc();
}

bool ConnPoolImpl::ActiveClient::closingWithIncompleteStream() const {
return closed_with_active_rq_;
}
bool ActiveClient::closingWithIncompleteStream() const { return closed_with_active_rq_; }

RequestEncoder& ConnPoolImpl::ActiveClient::newStreamEncoder(ResponseDecoder& response_decoder) {
RequestEncoder& ActiveClient::newStreamEncoder(ResponseDecoder& response_decoder) {
return codec_client_->newStream(response_decoder);
}

CodecClientPtr ProdConnPoolImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) {
CodecClientPtr codec{new CodecClientProd(CodecClient::Type::HTTP2, std::move(data.connection_),
data.host_description_, dispatcher_, random_generator_)};
return codec;
}

ConnectionPool::InstancePtr
allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options) {
return std::make_unique<Http::Http2::ProdConnPoolImpl>(
dispatcher, random_generator, host, priority, options, transport_socket_options);
return std::make_unique<FixedHttpConnPoolImpl>(
host, priority, dispatcher, options, transport_socket_options, random_generator,
[](HttpConnPoolImplBase* pool) { return std::make_unique<ActiveClient>(*pool); },
[](Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase* pool) {
CodecClientPtr codec{new CodecClientProd(
CodecClient::Type::HTTP2, std::move(data.connection_), data.host_description_,
pool->dispatcher(), pool->randomGenerator())};
return codec;
},
std::vector<Protocol>{Protocol::Http2});
}

} // namespace Http2
Expand Down
Loading