Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lightstep gPRC generation for tracing #98

Merged
merged 34 commits into from
Oct 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a7dcc0d
Replace the json-generated span with a proto
jmacd Sep 14, 2016
47669b9
Supply the Runtime random generator. Generate lightste::collector::Re…
jmacd Sep 14, 2016
4b57e24
Merge branch 'master' into lightstep_tracer
RomanDzhabarov Sep 16, 2016
6e28c33
Fix format for existing code.
RomanDzhabarov Sep 16, 2016
e2b92ca
Make changes.
RomanDzhabarov Sep 16, 2016
1319bf6
Make existing code to compile with lightstep.
RomanDzhabarov Sep 21, 2016
a90fcf1
Move recorder to header, move orDash to StringUtil.
RomanDzhabarov Sep 21, 2016
b02fb42
Move things around.
RomanDzhabarov Sep 21, 2016
786e16a
Temp changes.
RomanDzhabarov Sep 22, 2016
46444b6
Add grpc/utility
RomanDzhabarov Sep 23, 2016
a3fc5ea
Move to utility class.
RomanDzhabarov Sep 23, 2016
f1a2bcc
Merge branch 'master' into lightstep_tracer
RomanDzhabarov Sep 23, 2016
95a5924
async rpc client.
RomanDzhabarov Sep 23, 2016
445c289
tmp
RomanDzhabarov Sep 23, 2016
05e80a2
Make envoy built.
RomanDzhabarov Sep 23, 2016
de1a15d
Few refinements.
RomanDzhabarov Sep 23, 2016
b6a88fe
Callback for async client call to lightstep.
RomanDzhabarov Sep 24, 2016
a467a8f
User service full name and method name to call lightstep.
RomanDzhabarov Sep 26, 2016
57115eb
RpcChannel to allow supplying of service and method names.
RomanDzhabarov Sep 26, 2016
19ed4a0
Addressing comments.
RomanDzhabarov Sep 28, 2016
640d49a
Some more refactoring.
RomanDzhabarov Sep 28, 2016
aa005c0
Try to build.
RomanDzhabarov Sep 28, 2016
44a1fec
Attempt to fix build.
RomanDzhabarov Sep 28, 2016
9320a07
Merge branch 'master' into lightstep
RomanDzhabarov Sep 28, 2016
d6f8353
Config changes, support of http2.
RomanDzhabarov Sep 30, 2016
6327e6f
Tests, runtime control for #spans.
RomanDzhabarov Sep 30, 2016
91ef68b
Refinement.
RomanDzhabarov Sep 30, 2016
371b06c
remove unused include.
RomanDzhabarov Sep 30, 2016
914c48b
Merge branch 'master' into lightstep
RomanDzhabarov Sep 30, 2016
a2b45d1
Pass ref to sink.
RomanDzhabarov Sep 30, 2016
189e1d5
tmp changes.
RomanDzhabarov Oct 2, 2016
0c9d9be
some modifications.
RomanDzhabarov Oct 3, 2016
912b6b8
Make 5 default.
RomanDzhabarov Oct 3, 2016
f3a80f8
Fix comments.
RomanDzhabarov Oct 3, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/do_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ $EXTRA_CMAKE_FLAGS -DENVOY_DEBUG:BOOL=OFF \
-DENVOY_TCLAP_INCLUDE_DIR:FILEPATH=/thirdparty/tclap-1.2.1/include \
-DENVOY_JANSSON_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \
-DENVOY_OPENSSL_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \
-DENVOY_LIGHTSTEP_TRACER_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \
-DENVOY_PROTOBUF_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \
-DENVOY_PROTOBUF_PROTOC:FILEPATH=/thirdparty_build/bin/protoc \
-DENVOY_GCOVR:FILEPATH=/thirdparty/gcovr-3.3/scripts/gcovr \
Expand Down
1 change: 1 addition & 0 deletions configs/envoy_double_proxy.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
},
{
"name": "lightstep_saas",
"features": "http2",
"ssl_context": {
"ca_cert_file": "/etc/ssl/certs/ca-certificates.crt",
"verify_subject_alt_name": "collector.lightstep.com"
Expand Down
1 change: 1 addition & 0 deletions configs/envoy_front_proxy.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
},
{
"name": "lightstep_saas",
"features": "http2",
"ssl_context": {
"ca_cert_file": "/etc/ssl/certs/ca-certificates.crt",
"verify_subject_alt_name": "collector.lightstep.com"
Expand Down
1 change: 1 addition & 0 deletions configs/envoy_service_to_service.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@
},
{
"name": "lightstep_saas",
"features": "http2",
"ssl_context": {
"ca_cert_file": "/etc/ssl/certs/ca-certificates.crt",
"verify_subject_alt_name": "collector.lightstep.com"
Expand Down
1 change: 1 addition & 0 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ endif()
include_directories(${ENVOY_LIBEVENT_INCLUDE_DIR})
include_directories(${ENVOY_NGHTTP2_INCLUDE_DIR})
include_directories(SYSTEM ${ENVOY_OPENSSL_INCLUDE_DIR})
include_directories(SYSTEM ${ENVOY_LIGHTSTEP_TRACER_INCLUDE_DIR})

set_target_properties(envoy-common PROPERTIES COTIRE_CXX_PREFIX_HEADER_INIT
"../precompiled/precompiled.h")
Expand Down
5 changes: 5 additions & 0 deletions source/common/common/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,8 @@ bool StringUtil::startsWith(const std::string& source, const std::string& start,
return strncasecmp(source.c_str(), start.c_str(), start.size()) == 0;
}
}

const std::string& StringUtil::valueOrDefault(const std::string& input,
const std::string& default_value) {
return input.empty() ? default_value : input;
}
6 changes: 6 additions & 0 deletions source/common/common/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,10 @@ class StringUtil {
*/
static bool startsWith(const std::string& source, const std::string& start,
bool case_sensitive = true);

/**
* @return original @param input string if it's not empty or @param default_value otherwise.
*/
static const std::string& valueOrDefault(const std::string& input,
const std::string& default_value);
};
74 changes: 74 additions & 0 deletions source/common/grpc/common.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#include "common.h"

#include "common/buffer/buffer_impl.h"
#include "common/common/enum_to_int.h"
#include "common/common/utility.h"
#include "common/http/headers.h"
#include "common/http/message_impl.h"
#include "common/http/utility.h"

namespace Grpc {

const std::string Common::GRPC_CONTENT_TYPE{"application/grpc"};
Expand All @@ -15,4 +22,71 @@ void Common::chargeStat(Stats::Store& store, const std::string& cluster,
.inc();
}

Buffer::InstancePtr Common::serializeBody(const google::protobuf::Message& message) {
// http://www.grpc.io/docs/guides/wire.html
Buffer::InstancePtr body(new Buffer::OwnedImpl());
uint8_t compressed = 0;
body->add(&compressed, sizeof(compressed));
uint32_t size = htonl(message.ByteSize());
body->add(&size, sizeof(size));
body->add(message.SerializeAsString());

return body;
}

Http::MessagePtr Common::prepareHeaders(const std::string& upstream_cluster,
const std::string& service_full_name,
const std::string& method_name) {
Http::MessagePtr message(new Http::RequestMessageImpl());
message->headers().addViaMoveValue(Http::Headers::get().Scheme, "http");
message->headers().addViaMoveValue(Http::Headers::get().Method, "POST");
message->headers().addViaMoveValue(Http::Headers::get().Path,
fmt::format("/{}/{}", service_full_name, method_name));
message->headers().addViaCopy(Http::Headers::get().Host, upstream_cluster);
message->headers().addViaCopy(Http::Headers::get().ContentType, Common::GRPC_CONTENT_TYPE);

return message;
}

void Common::checkForHeaderOnlyError(Http::Message& http_response) {
// First check for grpc-status in headers. If it is here, we have an error.
const std::string& grpc_status_header = http_response.headers().get(Common::GRPC_STATUS_HEADER);
if (grpc_status_header.empty()) {
return;
}

uint64_t grpc_status_code;
if (!StringUtil::atoul(grpc_status_header.c_str(), grpc_status_code)) {
throw Exception(Optional<uint64_t>(), "bad grpc-status header");
}

const std::string& grpc_status_message = http_response.headers().get(Common::GRPC_MESSAGE_HEADER);
throw Exception(grpc_status_code, grpc_status_message);
}

void Common::validateResponse(Http::Message& http_response) {
if (Http::Utility::getResponseStatus(http_response.headers()) != enumToInt(Http::Code::OK)) {
throw Exception(Optional<uint64_t>(), "non-200 response code");
}

checkForHeaderOnlyError(http_response);

// Check for existence of trailers.
if (!http_response.trailers()) {
throw Exception(Optional<uint64_t>(), "no response trailers");
}

const std::string& grpc_status_header = http_response.trailers()->get(Common::GRPC_STATUS_HEADER);
const std::string& grpc_status_message =
http_response.trailers()->get(Common::GRPC_MESSAGE_HEADER);
uint64_t grpc_status_code;
if (!StringUtil::atoul(grpc_status_header.c_str(), grpc_status_code)) {
throw Exception(Optional<uint64_t>(), "bad grpc-status trailer");
}

if (grpc_status_code != 0) {
throw Exception(grpc_status_code, grpc_status_message);
}
}

} // Grpc
32 changes: 32 additions & 0 deletions source/common/grpc/common.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
#pragma once

#include "envoy/common/exception.h"
#include "envoy/common/optional.h"
#include "envoy/http/header_map.h"
#include "envoy/http/message.h"
#include "envoy/stats/stats.h"

#include "google/protobuf/message.h"

namespace Grpc {

class Exception : public EnvoyException {
public:
Exception(const Optional<uint64_t>& grpc_status, const std::string& message)
: EnvoyException(message), grpc_status_(grpc_status) {}

const Optional<uint64_t> grpc_status_;
};

class Common {
public:
/**
Expand All @@ -18,10 +31,29 @@ class Common {
static void chargeStat(Stats::Store& store, const std::string& cluster,
const std::string& grpc_service, const std::string& grpc_method,
bool success);
/**
* Serialize protobuf message.
*/
static Buffer::InstancePtr serializeBody(const google::protobuf::Message& message);

/**
* Prepare headers for protobuf service.
*/
static Http::MessagePtr prepareHeaders(const std::string& upstream_cluster,
const std::string& service_full_name,
const std::string& method_name);

/**
* Basic validation of gRPC response, @throws Grpc::Exception in case of non successful response.
*/
static void validateResponse(Http::Message& http_response);

static const std::string GRPC_CONTENT_TYPE;
static const Http::LowerCaseString GRPC_MESSAGE_HEADER;
static const Http::LowerCaseString GRPC_STATUS_HEADER;

private:
static void checkForHeaderOnlyError(Http::Message& http_response);
};

} // Grpc
87 changes: 15 additions & 72 deletions source/common/grpc/rpc_channel_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,9 @@ void RpcChannelImpl::CallMethod(const proto::MethodDescriptor* method, proto::Rp
// here for clarity.
ASSERT(cm_.get(cluster_)->features() & Upstream::Cluster::Features::HTTP2);

Http::MessagePtr message(new Http::RequestMessageImpl());
message->headers().addViaMoveValue(Http::Headers::get().Scheme, "http");
message->headers().addViaMoveValue(Http::Headers::get().Method, "POST");
message->headers().addViaMoveValue(
Http::Headers::get().Path,
fmt::format("/{}/{}", method->service()->full_name(), method->name()));
message->headers().addViaCopy(Http::Headers::get().Host, cluster_);
message->headers().addViaCopy(Http::Headers::get().ContentType, Common::GRPC_CONTENT_TYPE);
message->body(serializeBody(*grpc_request));
Http::MessagePtr message =
Common::prepareHeaders(cluster_, method->service()->full_name(), method->name());
message->body(Common::serializeBody(*grpc_request));

callbacks_.onPreRequestCustomizeHeaders(message->headers());
http_request_ = cm_.httpAsyncClientForCluster(cluster_).send(std::move(message), *this, timeout_);
Expand All @@ -49,61 +43,21 @@ void RpcChannelImpl::incStat(bool success) {
grpc_method_->name(), success);
}

void RpcChannelImpl::checkForHeaderOnlyError(Http::Message& http_response) {
// First check for grpc-status in headers. If it is here, we have an error.
const std::string& grpc_status_header = http_response.headers().get(Common::GRPC_STATUS_HEADER);
if (grpc_status_header.empty()) {
return;
}

uint64_t grpc_status_code;
if (!StringUtil::atoul(grpc_status_header.c_str(), grpc_status_code)) {
throw Exception(Optional<uint64_t>(), "bad grpc-status header");
}

const std::string& grpc_status_message = http_response.headers().get(Common::GRPC_MESSAGE_HEADER);
throw Exception(grpc_status_code, grpc_status_message);
}

void RpcChannelImpl::onSuccessWorker(Http::Message& http_response) {
if (Http::Utility::getResponseStatus(http_response.headers()) != enumToInt(Http::Code::OK)) {
throw Exception(Optional<uint64_t>(), "non-200 response code");
}

checkForHeaderOnlyError(http_response);

// Check for existance of trailers.
if (!http_response.trailers()) {
throw Exception(Optional<uint64_t>(), "no response trailers");
}

const std::string& grpc_status_header = http_response.trailers()->get(Common::GRPC_STATUS_HEADER);
const std::string& grpc_status_message =
http_response.trailers()->get(Common::GRPC_MESSAGE_HEADER);
uint64_t grpc_status_code;
if (!StringUtil::atoul(grpc_status_header.c_str(), grpc_status_code)) {
throw Exception(Optional<uint64_t>(), "bad grpc-status trailer");
}
void RpcChannelImpl::onSuccess(Http::MessagePtr&& http_response) {
try {
Common::validateResponse(*http_response);

if (grpc_status_code != 0) {
throw Exception(grpc_status_code, grpc_status_message);
}
// A gRPC response contains a 5 byte header. Currently we only support unary responses so we
// ignore the header. @see serializeBody().
if (!http_response->body() || !(http_response->body()->length() > 5)) {
throw Exception(Optional<uint64_t>(), "bad serialized body");
}

// A GRPC response contains a 5 byte header. Currently we only support unary responses so we
// ignore the header. @see serializeBody().
if (!http_response.body() || !(http_response.body()->length() > 5)) {
throw Exception(Optional<uint64_t>(), "bad serialized body");
}
http_response->body()->drain(5);
if (!grpc_response_->ParseFromString(http_response->bodyAsString())) {
throw Exception(Optional<uint64_t>(), "bad serialized body");
}

http_response.body()->drain(5);
if (!grpc_response_->ParseFromString(http_response.bodyAsString())) {
throw Exception(Optional<uint64_t>(), "bad serialized body");
}
}

void RpcChannelImpl::onSuccess(Http::MessagePtr&& http_response) {
try {
onSuccessWorker(*http_response);
callbacks_.onSuccess();
incStat(true);
onComplete();
Expand Down Expand Up @@ -133,15 +87,4 @@ void RpcChannelImpl::onComplete() {
grpc_response_ = nullptr;
}

Buffer::InstancePtr RpcChannelImpl::serializeBody(const proto::Message& message) {
// http://www.grpc.io/docs/guides/wire.html
Buffer::InstancePtr body(new Buffer::OwnedImpl());
uint8_t compressed = 0;
body->add(&compressed, sizeof(compressed));
uint32_t size = htonl(message.ByteSize());
body->add(&size, sizeof(size));
body->add(message.SerializeAsString());
return body;
}

} // Grpc
9 changes: 0 additions & 9 deletions source/common/grpc/rpc_channel_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,6 @@ class RpcChannelImpl : public RpcChannel, public Http::AsyncClient::Callbacks {
proto::Closure* done_callback) override;

private:
class Exception : public EnvoyException {
public:
Exception(const Optional<uint64_t>& grpc_status, const std::string& message)
: EnvoyException(message), grpc_status_(grpc_status) {}

const Optional<uint64_t> grpc_status_;
};

void checkForHeaderOnlyError(Http::Message& http_response);
void incStat(bool success);
void onComplete();
void onFailureWorker(const Optional<uint64_t>& grpc_status, const std::string& message);
Expand Down
Loading