Skip to content

Commit

Permalink
Add Metadata to codec (envoyproxy#4796)
Browse files Browse the repository at this point in the history
Description: Add MetadataEncoder and MetadataDecoder to codec. I also corrected some of the design decisions in MetadataEnocder and MetadataDecoder so that they are more suitable for codec. Those corrections includes: 1. let StreamImpl owns the decoded metadata, not the decoder. 2. Remove stream_id field in decoder and encoder since stream id is stored in StreamImpl. 3. Remove set/getMaxMetadataSize(), because nghttp2 uses default max frame size.

This is the second step towards supporting metadata in envoy. The next step would be add filters to add/delete/modify metadata.

Risk Level: Low. Not used.
Testing: Unit test.
Docs Changes:
Release Notes:
envoyproxy#2394

Signed-off-by: Yang Song <[email protected]>
Signed-off-by: Fred Douglas <[email protected]>
  • Loading branch information
soya3129 authored and fredlas committed Mar 5, 2019
1 parent 55814fe commit b46128a
Show file tree
Hide file tree
Showing 25 changed files with 581 additions and 183 deletions.
6 changes: 6 additions & 0 deletions include/envoy/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ envoy_cc_library(
hdrs = ["codec.h"],
deps = [
":header_map_interface",
":metadata_interface",
":protocol_interface",
"//include/envoy/buffer:buffer_interface",
],
Expand Down Expand Up @@ -83,3 +84,8 @@ envoy_cc_library(
name = "query_params_interface",
hdrs = ["query_params.h"],
)

envoy_cc_library(
name = "metadata_interface",
hdrs = ["metadata_interface.h"],
)
16 changes: 16 additions & 0 deletions include/envoy/http/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "envoy/buffer/buffer.h"
#include "envoy/common/pure.h"
#include "envoy/http/header_map.h"
#include "envoy/http/metadata_interface.h"
#include "envoy/http/protocol.h"

namespace Envoy {
Expand Down Expand Up @@ -52,6 +53,12 @@ class StreamEncoder {
* @return Stream& the backing stream.
*/
virtual Stream& getStream() PURE;

/**
* Encode METADATA.
* @param metadata_map is the METADATA to encode.
*/
virtual void encodeMetadata(const MetadataMap& metadata_map) PURE;
};

/**
Expand Down Expand Up @@ -86,6 +93,12 @@ class StreamDecoder {
* @param trailers supplies the decoded trailers.
*/
virtual void decodeTrailers(HeaderMapPtr&& trailers) PURE;

/**
* Called with decoded METADATA.
* @param decoded METADATA.
*/
virtual void decodeMetadata(MetadataMapPtr&& metadata_map) PURE;
};

/**
Expand Down Expand Up @@ -210,6 +223,7 @@ struct Http2Settings {
uint32_t initial_stream_window_size_{DEFAULT_INITIAL_STREAM_WINDOW_SIZE};
uint32_t initial_connection_window_size_{DEFAULT_INITIAL_CONNECTION_WINDOW_SIZE};
bool allow_connect_{DEFAULT_ALLOW_CONNECT};
bool allow_metadata_{DEFAULT_ALLOW_METADATA};

// disable HPACK compression
static const uint32_t MIN_HPACK_TABLE_SIZE = 0;
Expand Down Expand Up @@ -245,6 +259,8 @@ struct Http2Settings {
static const uint32_t MAX_INITIAL_CONNECTION_WINDOW_SIZE = (1U << 31) - 1;
// By default both nghttp2 and Envoy do not allow CONNECT over H2.
static const bool DEFAULT_ALLOW_CONNECT = false;
// By default Envoy does not allow METADATA support.
static const bool DEFAULT_ALLOW_METADATA = false;
};

/**
Expand Down
27 changes: 27 additions & 0 deletions include/envoy/http/metadata_interface.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <functional>
#include <memory>
#include <unordered_map>

namespace Envoy {
namespace Http {

/**
* Please refer to #2394 for more info about Envoy METADATA.
* Envoy metadata docs can be found at source/docs/h2_metadata.md.
*/
constexpr uint8_t METADATA_FRAME_TYPE = 0x4d;
constexpr uint8_t END_METADATA_FLAG = 0x4;

// NGHTTP2_MAX_PAYLOADLEN in nghttp2.
// TODO(soya3129): Respect max_frame_size after nghttp2 #1250 is resolved.
constexpr uint64_t METADATA_MAX_PAYLOAD_SIZE = 16384;

using MetadataMap = std::unordered_map<std::string, std::string>;
using MetadataMapPtr = std::unique_ptr<MetadataMap>;

using MetadataCallback = std::function<void(std::unique_ptr<MetadataMap>)>;

} // namespace Http
} // namespace Envoy
8 changes: 8 additions & 0 deletions source/common/http/codec_wrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class StreamDecoderWrapper : public StreamDecoder {
onDecodeComplete();
}

void decodeMetadata(MetadataMapPtr&& metadata_map) override {
inner_.decodeMetadata(std::move(metadata_map));
}

protected:
StreamDecoderWrapper(StreamDecoder& inner) : inner_(inner) {}

Expand Down Expand Up @@ -87,6 +91,10 @@ class StreamEncoderWrapper : public StreamEncoder {
onEncodeComplete();
}

void encodeMetadata(const MetadataMap& metadata_map) override {
inner_.encodeMetadata(metadata_map);
}

Stream& getStream() override { return inner_.getStream(); }

protected:
Expand Down
1 change: 1 addition & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void decodeHeaders(HeaderMapPtr&& headers, bool end_stream) override;
void decodeData(Buffer::Instance& data, bool end_stream) override;
void decodeTrailers(HeaderMapPtr&& trailers) override;
void decodeMetadata(MetadataMapPtr&&) override { NOT_REACHED_GCOVR_EXCL_LINE; }

// Http::FilterChainFactoryCallbacks
void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter) override {
Expand Down
1 change: 1 addition & 0 deletions source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class StreamEncoderImpl : public StreamEncoder,
void encodeHeaders(const HeaderMap& headers, bool end_stream) override;
void encodeData(Buffer::Instance& data, bool end_stream) override;
void encodeTrailers(const HeaderMap& trailers) override;
void encodeMetadata(const MetadataMap&) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Stream& getStream() override { return *this; }

// Http::Stream
Expand Down
11 changes: 4 additions & 7 deletions source/common/http/http2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ envoy_cc_library(
"abseil_optional",
],
deps = [
":metadata_decoder_lib",
":metadata_encoder_lib",
"//include/envoy/event:deferred_deletable",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/http:codec_interface",
Expand Down Expand Up @@ -67,7 +69,7 @@ envoy_cc_library(
"nghttp2",
],
deps = [
":metadata_interface_lib",
"//include/envoy/http:codec_interface",
"//source/common/buffer:buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
Expand All @@ -83,15 +85,10 @@ envoy_cc_library(
"nghttp2",
],
deps = [
":metadata_interface_lib",
"//include/envoy/http:codec_interface",
"//source/common/buffer:buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/common:stack_array",
],
)

envoy_cc_library(
name = "metadata_interface_lib",
hdrs = ["metadata_interface.h"],
)
122 changes: 122 additions & 0 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,25 @@ void ConnectionImpl::StreamImpl::encodeTrailers(const HeaderMap& trailers) {
parent_.sendPendingFrames();
}
}

void ConnectionImpl::StreamImpl::encodeMetadata(const MetadataMap& metadata_map) {
ASSERT(parent_.allow_metadata_);

getMetadataEncoder().createPayload(metadata_map);

// Estimates the number of frames to generate, and breaks the while loop when the size is reached
// in case submitting succeeds and packing fails, and we don't get error from packing.
const size_t frame_count = metadata_encoder_->payload().length() / METADATA_MAX_PAYLOAD_SIZE + 1;
size_t count = 0;
// Keep submitting extension frames if there is payload left in the encoder.
while (metadata_encoder_->hasNextFrame() && count++ <= frame_count) {
submitMetadata();
parent_.sendPendingFrames();
}

ASSERT(!metadata_encoder_->hasNextFrame());
}

void ConnectionImpl::StreamImpl::readDisable(bool disable) {
ENVOY_CONN_LOG(debug, "Stream {} {}, unconsumed_bytes {} read_disable_count {}",
parent_.connection_, stream_id_, (disable ? "disabled" : "enabled"),
Expand Down Expand Up @@ -194,6 +213,15 @@ void ConnectionImpl::StreamImpl::submitTrailers(const HeaderMap& trailers) {
ASSERT(rc == 0);
}

void ConnectionImpl::StreamImpl::submitMetadata() {
ASSERT(stream_id_ > 0);
uint64_t payload_size = metadata_encoder_->payload().length();
const uint8_t flag = payload_size > METADATA_MAX_PAYLOAD_SIZE ? 0 : END_METADATA_FLAG;
int result =
nghttp2_submit_extension(parent_.session_, METADATA_FRAME_TYPE, flag, stream_id_, nullptr);
ASSERT(result == 0);
}

ssize_t ConnectionImpl::StreamImpl::onDataSourceRead(uint64_t length, uint32_t* data_flags) {
if (pending_send_data_.length() == 0 && !local_end_stream_) {
ASSERT(!data_deferred_);
Expand Down Expand Up @@ -286,6 +314,27 @@ void ConnectionImpl::StreamImpl::resetStreamWorker(StreamResetReason reason) {
ASSERT(rc == 0);
}

MetadataEncoder& ConnectionImpl::StreamImpl::getMetadataEncoder() {
if (metadata_encoder_ == nullptr) {
metadata_encoder_ = std::make_unique<MetadataEncoder>();
}
return *metadata_encoder_;
}

MetadataDecoder& ConnectionImpl::StreamImpl::getMetadataDecoder() {
if (metadata_decoder_ == nullptr) {
auto cb = [this](std::unique_ptr<MetadataMap> metadata_map) {
this->onMetadataDecoded(std::move(metadata_map));
};
metadata_decoder_ = std::make_unique<MetadataDecoder>(cb);
}
return *metadata_decoder_;
}

void ConnectionImpl::StreamImpl::onMetadataDecoded(std::unique_ptr<MetadataMap> metadata_map) {
decoder_->decodeMetadata(std::move(metadata_map));
}

ConnectionImpl::~ConnectionImpl() { nghttp2_session_del(session_); }

void ConnectionImpl::dispatch(Buffer::Instance& data) {
Expand Down Expand Up @@ -544,6 +593,50 @@ int ConnectionImpl::onStreamClose(int32_t stream_id, uint32_t error_code) {
return 0;
}

int ConnectionImpl::onMetadataReceived(int32_t stream_id, const uint8_t* data, size_t len) {
ENVOY_CONN_LOG(trace, "recv {} bytes METADATA", connection_, len);

StreamImpl* stream = getStream(stream_id);
if (!stream) {
return 0;
}

bool success = stream->getMetadataDecoder().receiveMetadata(data, len);
return success ? 0 : NGHTTP2_ERR_CALLBACK_FAILURE;
}

int ConnectionImpl::onMetadataFrameComplete(int32_t stream_id, bool end_metadata) {
ENVOY_CONN_LOG(trace, "recv METADATA frame on stream {}, end_metadata: {}", connection_,
stream_id, end_metadata);

StreamImpl* stream = getStream(stream_id);
ASSERT(stream != nullptr);

bool result = stream->getMetadataDecoder().onMetadataFrameComplete(end_metadata);
return result ? 0 : NGHTTP2_ERR_CALLBACK_FAILURE;
}

ssize_t ConnectionImpl::packMetadata(int32_t stream_id, uint8_t* buf, size_t len) {
ENVOY_CONN_LOG(trace, "pack METADATA frame on stream {}", connection_, stream_id);

StreamImpl* stream = getStream(stream_id);
ASSERT(stream != nullptr);

MetadataEncoder& encoder = stream->getMetadataEncoder();
const uint64_t size_to_copy = std::min(METADATA_MAX_PAYLOAD_SIZE, encoder.payload().length());
// nghttp2 guarantees len is at least 16KiB. If the check fails, please verify
// NGHTTP2_MAX_PAYLOADLEN is consistent with METADATA_MAX_PAYLOAD_SIZE.
ASSERT(len >= size_to_copy);

Buffer::OwnedImpl& p = encoder.payload();
p.copyOut(0, size_to_copy, buf);

// Releases the payload that has been copied to nghttp2.
encoder.releasePayload(size_to_copy);

return static_cast<ssize_t>(size_to_copy);
}

int ConnectionImpl::saveHeader(const nghttp2_frame* frame, HeaderString&& name,
HeaderString&& value) {
StreamImpl* stream = getStream(frame->hd.stream_id);
Expand Down Expand Up @@ -736,6 +829,29 @@ ConnectionImpl::Http2Callbacks::Http2Callbacks() {
return static_cast<ConnectionImpl*>(user_data)->onInvalidFrame(frame->hd.stream_id,
error_code);
});

nghttp2_session_callbacks_set_on_extension_chunk_recv_callback(
callbacks_,
[](nghttp2_session*, const nghttp2_frame_hd* hd, const uint8_t* data, size_t len,
void* user_data) -> int {
ASSERT(hd->length >= len);
return static_cast<ConnectionImpl*>(user_data)->onMetadataReceived(hd->stream_id, data,
len);
});

nghttp2_session_callbacks_set_unpack_extension_callback(
callbacks_, [](nghttp2_session*, void**, const nghttp2_frame_hd* hd, void* user_data) -> int {
return static_cast<ConnectionImpl*>(user_data)->onMetadataFrameComplete(
hd->stream_id, hd->flags == END_METADATA_FLAG);
});

nghttp2_session_callbacks_set_pack_extension_callback(
callbacks_,
[](nghttp2_session*, uint8_t* buf, size_t len, const nghttp2_frame* frame,
void* user_data) -> ssize_t {
ASSERT(frame->hd.length <= len);
return static_cast<ConnectionImpl*>(user_data)->packMetadata(frame->hd.stream_id, buf, len);
});
}

ConnectionImpl::Http2Callbacks::~Http2Callbacks() { nghttp2_session_callbacks_del(callbacks_); }
Expand All @@ -752,6 +868,10 @@ ConnectionImpl::Http2Options::Http2Options(const Http2Settings& http2_settings)
if (http2_settings.hpack_table_size_ != NGHTTP2_DEFAULT_HEADER_TABLE_SIZE) {
nghttp2_option_set_max_deflate_dynamic_table_size(options_, http2_settings.hpack_table_size_);
}

if (http2_settings.allow_metadata_) {
nghttp2_option_set_user_recv_extension_type(options_, METADATA_FRAME_TYPE);
}
}

ConnectionImpl::Http2Options::~Http2Options() { nghttp2_option_del(options_); }
Expand All @@ -774,6 +894,7 @@ ClientConnectionImpl::ClientConnectionImpl(Network::Connection& connection,
nghttp2_session_client_new2(&session_, http2_callbacks_.callbacks(), base(),
client_http2_options.options());
sendSettings(http2_settings, true);
allow_metadata_ = http2_settings.allow_metadata_;
}

Http::StreamEncoder& ClientConnectionImpl::newStream(StreamDecoder& decoder) {
Expand Down Expand Up @@ -820,6 +941,7 @@ ServerConnectionImpl::ServerConnectionImpl(Network::Connection& connection,
nghttp2_session_server_new2(&session_, http2_callbacks_.callbacks(), base(),
http2_options.options());
sendSettings(http2_settings, false);
allow_metadata_ = http2_settings.allow_metadata_;
}

int ServerConnectionImpl::onBeginHeaders(const nghttp2_frame* frame) {
Expand Down
Loading

0 comments on commit b46128a

Please sign in to comment.