Skip to content

Commit

Permalink
Add upstream and downstream info in parent read callbacks in tcp too (#…
Browse files Browse the repository at this point in the history
…9949)

Signed-off-by: gargnupur <[email protected]>
  • Loading branch information
gargnupur authored Feb 14, 2020
1 parent b0a1850 commit fade668
Show file tree
Hide file tree
Showing 19 changed files with 97 additions and 56 deletions.
2 changes: 1 addition & 1 deletion source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
TransportSocketPtr&& transport_socket, bool connected)
: ConnectionImplBase(dispatcher, next_global_id_++),
transport_socket_(std::move(transport_socket)), socket_(std::move(socket)),
filter_manager_(*this), stream_info_(dispatcher.timeSource()),
stream_info_(dispatcher.timeSource()), filter_manager_(*this),
write_buffer_(
dispatcher.getWatermarkFactory().create([this]() -> void { this->onLowWatermark(); },
[this]() -> void { this->onHighWatermark(); })),
Expand Down
2 changes: 1 addition & 1 deletion source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback

TransportSocketPtr transport_socket_;
ConnectionSocketPtr socket_;
FilterManagerImpl filter_manager_;
StreamInfo::StreamInfoImpl stream_info_;
FilterManagerImpl filter_manager_;

Buffer::OwnedImpl read_buffer_;
// This must be a WatermarkBuffer, but as it is created by a factory the ConnectionImpl only has
Expand Down
9 changes: 6 additions & 3 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,9 @@ UpstreamDrainManager& Config::drainManager() {
return upstream_drain_manager_slot_->getTyped<UpstreamDrainManager>();
}

Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager,
TimeSource& time_source)
Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager)
: config_(config), cluster_manager_(cluster_manager), downstream_callbacks_(*this),
upstream_callbacks_(new UpstreamCallbacks(this)), stream_info_(time_source) {
upstream_callbacks_(new UpstreamCallbacks(this)) {
ASSERT(config != nullptr);
}

Expand Down Expand Up @@ -292,6 +291,10 @@ void Filter::readDisableDownstream(bool disable) {
}
}

StreamInfo::StreamInfo& Filter::getStreamInfo() {
return read_callbacks_->connection().streamInfo();
}

void Filter::DownstreamCallbacks::onAboveWriteBufferHighWatermark() {
ASSERT(!on_high_watermark_called_);
on_high_watermark_called_ = true;
Expand Down
6 changes: 2 additions & 4 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ class Filter : public Network::ReadFilter,
Tcp::ConnectionPool::Callbacks,
protected Logger::Loggable<Logger::Id::filter> {
public:
Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager,
TimeSource& time_source);
Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager);
~Filter() override;

// Network::ReadFilter
Expand Down Expand Up @@ -302,7 +301,7 @@ class Filter : public Network::ReadFilter,
bool on_high_watermark_called_{false};
};

virtual StreamInfo::StreamInfo& getStreamInfo() { return stream_info_; }
virtual StreamInfo::StreamInfo& getStreamInfo();

protected:
struct DownstreamCallbacks : public Network::ConnectionCallbacks {
Expand Down Expand Up @@ -354,7 +353,6 @@ class Filter : public Network::ReadFilter,
std::shared_ptr<UpstreamCallbacks> upstream_callbacks_; // shared_ptr required for passing as a
// read filter.
std::unique_ptr<GenericUpstream> upstream_;
StreamInfo::StreamInfoImpl stream_info_;
RouteConstSharedPtr route_;
Network::TransportSocketOptionsSharedPtr transport_socket_options_;
uint32_t connect_attempts_{};
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/filters/network/tcp_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ Network::FilterFactoryCb ConfigFactory::createFilterFactoryFromProtoTyped(
Envoy::TcpProxy::ConfigSharedPtr filter_config(
std::make_shared<Envoy::TcpProxy::Config>(proto_config, context));
return [filter_config, &context](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(std::make_shared<Envoy::TcpProxy::Filter>(
filter_config, context.clusterManager(), context.dispatcher().timeSource()));
filter_manager.addReadFilter(
std::make_shared<Envoy::TcpProxy::Filter>(filter_config, context.clusterManager()));
};
}

Expand Down
2 changes: 1 addition & 1 deletion test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ class HttpConnectionManagerImplTest : public testing::Test, public ConnectionMan
bool shouldNormalizePath() const override { return normalize_path_; }
bool shouldMergeSlashes() const override { return merge_slashes_; }

DangerousDeprecatedTestTime test_time_;
Envoy::Event::SimulatedTimeSystem test_time_;
NiceMock<Router::MockRouteConfigProvider> route_config_provider_;
std::shared_ptr<Router::MockConfig> route_config_{new NiceMock<Router::MockConfig>()};
NiceMock<Router::MockScopedRouteConfigProvider> scoped_route_config_provider_;
Expand Down
3 changes: 1 addition & 2 deletions test/common/network/filter_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,7 @@ stat_prefix: name
tcp_proxy.set_cluster("fake_cluster");
TcpProxy::ConfigSharedPtr tcp_proxy_config(new TcpProxy::Config(tcp_proxy, factory_context));
manager.addReadFilter(
std::make_shared<TcpProxy::Filter>(tcp_proxy_config, factory_context.cluster_manager_,
factory_context.dispatcher().timeSource()));
std::make_shared<TcpProxy::Filter>(tcp_proxy_config, factory_context.cluster_manager_));

Extensions::Filters::Common::RateLimit::RequestCallbacks* request_callbacks{};
EXPECT_CALL(*rl_client, limit(_, "foo",
Expand Down
2 changes: 1 addition & 1 deletion test/common/stream_info/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ envoy_cc_test_library(
"//include/envoy/stream_info:stream_info_interface",
"//source/common/common:assert_lib",
"//source/common/stream_info:filter_state_lib",
"//test/test_common:test_time_lib",
"//test/test_common:simulated_time_system_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
Expand Down
4 changes: 2 additions & 2 deletions test/common/stream_info/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "common/common/assert.h"
#include "common/stream_info/filter_state_impl.h"

#include "test/test_common/test_time.h"
#include "test/test_common/simulated_time_system.h"

namespace Envoy {

Expand Down Expand Up @@ -242,7 +242,7 @@ class TestStreamInfo : public StreamInfo::StreamInfo {
std::string requested_server_name_;
std::string upstream_transport_failure_reason_;
const Http::HeaderMap* request_headers_{};
DangerousDeprecatedTestTime test_time_;
Envoy::Event::SimulatedTimeSystem test_time_;
};

} // namespace Envoy
68 changes: 40 additions & 28 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,11 @@ class TcpProxyTest : public testing::Test {
TcpProxyTest() {
ON_CALL(*factory_context_.access_log_manager_.file_, write(_))
.WillByDefault(SaveArg<0>(&access_log_data_));
ON_CALL(filter_callbacks_.connection_.stream_info_, onUpstreamHostSelected(_))
.WillByDefault(Invoke(
[this](Upstream::HostDescriptionConstSharedPtr host) { upstream_host_ = host; }));
ON_CALL(filter_callbacks_.connection_.stream_info_, upstreamHost())
.WillByDefault(ReturnPointee(&upstream_host_));
}

~TcpProxyTest() override {
Expand Down Expand Up @@ -905,7 +910,7 @@ class TcpProxyTest : public testing::Test {
}

{
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
EXPECT_CALL(filter_callbacks_.connection_, enableHalfClose(true));
EXPECT_CALL(filter_callbacks_.connection_, readDisable(true));
filter_->initializeReadFilterCallbacks(filter_callbacks_);
Expand Down Expand Up @@ -953,8 +958,8 @@ class TcpProxyTest : public testing::Test {

NiceMock<Server::Configuration::MockFactoryContext> factory_context_;
ConfigSharedPtr config_;
std::unique_ptr<Filter> filter_;
NiceMock<Network::MockReadFilterCallbacks> filter_callbacks_;
std::unique_ptr<Filter> filter_;
std::vector<std::shared_ptr<NiceMock<Upstream::MockHost>>> upstream_hosts_{};
std::vector<std::unique_ptr<NiceMock<Network::MockClientConnection>>> upstream_connections_{};
std::vector<std::unique_ptr<NiceMock<Tcp::ConnectionPool::MockConnectionData>>>
Expand All @@ -968,6 +973,7 @@ class TcpProxyTest : public testing::Test {
Network::Address::InstanceConstSharedPtr upstream_remote_address_;
std::list<std::function<Tcp::ConnectionPool::Cancellable*(Tcp::ConnectionPool::Cancellable*)>>
new_connection_functions_;
Upstream::HostDescriptionConstSharedPtr upstream_host_{};
};

TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(DefaultRoutes)) {
Expand Down Expand Up @@ -1253,7 +1259,7 @@ TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(RouteWithMetadataMatch)) {
{Envoy::Config::MetadataFilters::get().ENVOY_LB, metadata_struct});

configure(config);
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);
EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection());

Expand Down Expand Up @@ -1301,7 +1307,7 @@ TEST_F(TcpProxyTest, WeightedClusterWithMetadataMatch) {
v2.set_string_value("v2");
HashedValue hv0(v0), hv1(v1), hv2(v2);

filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);

// Expect filter to try to open a connection to cluster1.
Expand Down Expand Up @@ -1355,7 +1361,7 @@ TEST_F(TcpProxyTest, WeightedClusterWithMetadataMatch) {

TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(DisconnectBeforeData)) {
configure(defaultConfig());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);

filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
Expand Down Expand Up @@ -1394,7 +1400,7 @@ TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(UpstreamConnectionLimit)) {
0, 0, 0, 0, 0);

// setup sets up expectation for tcpConnForCluster but this test is expected to NOT call that
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
// The downstream connection closes if the proxy can't make an upstream connection.
EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush));
filter_->initializeReadFilterCallbacks(filter_callbacks_);
Expand Down Expand Up @@ -1746,6 +1752,23 @@ TEST_F(TcpProxyTest, ShareFilterState) {
.value());
}

// Tests that filter callback can access downstream and upstream address and ssl properties.
TEST_F(TcpProxyTest, AccessDownstreamAndUpstreamProperties) {
setup(1);

raiseEventUpstreamConnected(0);
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamLocalAddress(),
filter_callbacks_.connection().localAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamRemoteAddress(),
filter_callbacks_.connection().remoteAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamSslConnection(),
filter_callbacks_.connection().ssl());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().upstreamLocalAddress(),
upstream_connections_.at(0)->localAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().upstreamSslConnection(),
upstream_connections_.at(0)->streamInfo().downstreamSslConnection());
}

class TcpProxyRoutingTest : public testing::Test {
public:
TcpProxyRoutingTest() = default;
Expand All @@ -1766,7 +1789,7 @@ class TcpProxyRoutingTest : public testing::Test {
void initializeFilter() {
EXPECT_CALL(filter_callbacks_, connection()).WillRepeatedly(ReturnRef(connection_));

filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);
}

Expand Down Expand Up @@ -1826,13 +1849,10 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(UseClusterFromPerConnectionC
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData("envoy.tcp_proxy.cluster",
std::make_unique<PerConnectionCluster>("filter_state_cluster"),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);
ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));
connection_.streamInfo().filterState()->setData(
"envoy.tcp_proxy.cluster", std::make_unique<PerConnectionCluster>("filter_state_cluster"),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

// Expect filter to try to open a connection to specified cluster.
EXPECT_CALL(factory_context_.cluster_manager_,
Expand All @@ -1847,14 +1867,10 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(UpstreamServerName)) {
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData("envoy.network.upstream_server_name",
std::make_unique<UpstreamServerName>("www.example.com"),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));
connection_.streamInfo().filterState()->setData(
"envoy.network.upstream_server_name", std::make_unique<UpstreamServerName>("www.example.com"),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

// Expect filter to try to open a connection to a cluster with the transport socket options with
// override-server-name
Expand Down Expand Up @@ -1882,16 +1898,12 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(ApplicationProtocols)) {
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData(
connection_.streamInfo().filterState()->setData(
Network::ApplicationProtocols::key(),
std::make_unique<Network::ApplicationProtocols>(std::vector<std::string>{"foo", "bar"}),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));

// Expect filter to try to open a connection to a cluster with the transport socket options with
// override-application-protocol
EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster(_, _, _))
Expand Down Expand Up @@ -1933,7 +1945,7 @@ class TcpProxyHashingTest : public testing::Test {
void initializeFilter() {
EXPECT_CALL(filter_callbacks_, connection()).WillRepeatedly(ReturnRef(connection_));

filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);
}

Expand Down
7 changes: 4 additions & 3 deletions test/common/upstream/health_checker_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1322,7 +1322,6 @@ TEST_F(HttpHealthCheckerImplTest, SuccessServiceCheckWithAdditionalHeaders) {
key: value
)EOF");

std::string current_start_time;
cluster_->prioritySet().getMockHostSet(0)->hosts_ = {
makeTestHost(cluster_->info_, "tcp://127.0.0.1:80", metadata)};
cluster_->info_->stats().upstream_cx_total_.inc();
Expand All @@ -1348,8 +1347,10 @@ TEST_F(HttpHealthCheckerImplTest, SuccessServiceCheckWithAdditionalHeaders) {
EXPECT_EQ(headers.get(downstream_local_address_without_port)->value().getStringView(),
value_downstream_local_address_without_port);

EXPECT_NE(headers.get(start_time)->value().getStringView(), current_start_time);
current_start_time = std::string(headers.get(start_time)->value().getStringView());
Envoy::DateFormatter date_formatter("%s.%9f");
std::string current_start_time =
date_formatter.fromTime(dispatcher_.timeSource().systemTime());
EXPECT_EQ(headers.get(start_time)->value().getStringView(), current_start_time);
}));
health_checker_->start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class HttpGrpcAccessLogTest : public testing::Test {
void expectLogRequestMethod(const std::string& request_method) {
NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.host_ = nullptr;
stream_info.start_time_ = SystemTime(1h);

Http::TestHeaderMapImpl request_headers{
{":method", request_method},
Expand All @@ -104,7 +105,8 @@ class HttpGrpcAccessLogTest : public testing::Test {
socket_address:
address: "127.0.0.2"
port_value: 0
start_time: {{}}
start_time:
seconds: 3600
request:
request_method: {}
request_headers_bytes: {}
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/http/lua/lua_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,7 @@ TEST_F(LuaHttpFilterTest, SetGetDynamicMetadata) {
setup(SCRIPT);

Http::TestHeaderMapImpl request_headers{{":path", "/"}};
DangerousDeprecatedTestTime test_time;
Event::SimulatedTimeSystem test_time;
StreamInfo::StreamInfoImpl stream_info(Http::Protocol::Http2, test_time.timeSystem());
EXPECT_EQ(0, stream_info.dynamicMetadata().filter_metadata_size());
EXPECT_CALL(decoder_callbacks_, streamInfo()).WillOnce(ReturnRef(stream_info));
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/http/lua/wrappers_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class LuaStreamInfoWrapperTest
return metadata;
}

DangerousDeprecatedTestTime test_time_;
Event::SimulatedTimeSystem test_time_;
};

// Return the current request protocol.
Expand Down
13 changes: 11 additions & 2 deletions test/extensions/filters/network/tcp_proxy/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ TEST_P(RouteIpListConfigTest, DEPRECATED_FEATURE_TEST(TcpProxy)) {
ConfigFactory factory;
Network::FilterFactoryCb cb = factory.createFilterFactoryFromProto(proto_config, context);
Network::MockConnection connection;
EXPECT_CALL(connection, addReadFilter(_));
NiceMock<Network::MockReadFilterCallbacks> readFilterCallback;
EXPECT_CALL(connection, addReadFilter(_))
.WillRepeatedly(Invoke([&readFilterCallback](Network::ReadFilterSharedPtr filter) {
filter->initializeReadFilterCallbacks(readFilterCallback);
}));
cb(connection);
}

Expand All @@ -119,9 +123,14 @@ TEST(ConfigTest, ConfigTest) {
config.set_cluster("cluster");

EXPECT_TRUE(factory.isTerminalFilter());

Network::FilterFactoryCb cb = factory.createFilterFactoryFromProto(config, context);
Network::MockConnection connection;
EXPECT_CALL(connection, addReadFilter(_));
NiceMock<Network::MockReadFilterCallbacks> readFilterCallback;
EXPECT_CALL(connection, addReadFilter(_))
.WillRepeatedly(Invoke([&readFilterCallback](Network::ReadFilterSharedPtr filter) {
filter->initializeReadFilterCallbacks(readFilterCallback);
}));
cb(connection);
}

Expand Down
2 changes: 1 addition & 1 deletion test/extensions/tracers/zipkin/zipkin_tracer_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class ZipkinDriverTest : public testing::Test {
NiceMock<Runtime::MockRandomGenerator> random_;

NiceMock<Tracing::MockConfig> config_;
DangerousDeprecatedTestTime test_time_;
Event::SimulatedTimeSystem test_time_;
TimeSource& time_source_;
};

Expand Down
Loading

0 comments on commit fade668

Please sign in to comment.