Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lightstep gPRC generation for tracing #98

Merged
merged 34 commits into from
Oct 3, 2016
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a7dcc0d
Replace the json-generated span with a proto
jmacd Sep 14, 2016
47669b9
Supply the Runtime random generator. Generate lightste::collector::Re…
jmacd Sep 14, 2016
4b57e24
Merge branch 'master' into lightstep_tracer
RomanDzhabarov Sep 16, 2016
6e28c33
Fix format for existing code.
RomanDzhabarov Sep 16, 2016
e2b92ca
Make changes.
RomanDzhabarov Sep 16, 2016
1319bf6
Make existing code to compile with lightstep.
RomanDzhabarov Sep 21, 2016
a90fcf1
Move recorder to header, move orDash to StringUtil.
RomanDzhabarov Sep 21, 2016
b02fb42
Move things around.
RomanDzhabarov Sep 21, 2016
786e16a
Temp changes.
RomanDzhabarov Sep 22, 2016
46444b6
Add grpc/utility
RomanDzhabarov Sep 23, 2016
a3fc5ea
Move to utility class.
RomanDzhabarov Sep 23, 2016
f1a2bcc
Merge branch 'master' into lightstep_tracer
RomanDzhabarov Sep 23, 2016
95a5924
async rpc client.
RomanDzhabarov Sep 23, 2016
445c289
tmp
RomanDzhabarov Sep 23, 2016
05e80a2
Make envoy built.
RomanDzhabarov Sep 23, 2016
de1a15d
Few refinements.
RomanDzhabarov Sep 23, 2016
b6a88fe
Callback for async client call to lightstep.
RomanDzhabarov Sep 24, 2016
a467a8f
User service full name and method name to call lightstep.
RomanDzhabarov Sep 26, 2016
57115eb
RpcChannel to allow supplying of service and method names.
RomanDzhabarov Sep 26, 2016
19ed4a0
Addressing comments.
RomanDzhabarov Sep 28, 2016
640d49a
Some more refactoring.
RomanDzhabarov Sep 28, 2016
aa005c0
Try to build.
RomanDzhabarov Sep 28, 2016
44a1fec
Attempt to fix build.
RomanDzhabarov Sep 28, 2016
9320a07
Merge branch 'master' into lightstep
RomanDzhabarov Sep 28, 2016
d6f8353
Config changes, support of http2.
RomanDzhabarov Sep 30, 2016
6327e6f
Tests, runtime control for #spans.
RomanDzhabarov Sep 30, 2016
91ef68b
Refinement.
RomanDzhabarov Sep 30, 2016
371b06c
remove unused include.
RomanDzhabarov Sep 30, 2016
914c48b
Merge branch 'master' into lightstep
RomanDzhabarov Sep 30, 2016
a2b45d1
Pass ref to sink.
RomanDzhabarov Sep 30, 2016
189e1d5
tmp changes.
RomanDzhabarov Oct 2, 2016
0c9d9be
some modifications.
RomanDzhabarov Oct 3, 2016
912b6b8
Make 5 default.
RomanDzhabarov Oct 3, 2016
f3a80f8
Fix comments.
RomanDzhabarov Oct 3, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/do_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ $EXTRA_CMAKE_FLAGS -DENVOY_DEBUG:BOOL=OFF \
-DENVOY_TCLAP_INCLUDE_DIR:FILEPATH=/thirdparty/tclap-1.2.1/include \
-DENVOY_JANSSON_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \
-DENVOY_OPENSSL_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \
-DENVOY_LIGHTSTEP_TRACER_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \
-DENVOY_PROTOBUF_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \
-DENVOY_PROTOBUF_PROTOC:FILEPATH=/thirdparty_build/bin/protoc \
-DENVOY_GCOVR:FILEPATH=/thirdparty/gcovr-3.3/scripts/gcovr \
Expand Down
16 changes: 16 additions & 0 deletions include/envoy/grpc/rpc_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "envoy/common/optional.h"
#include "envoy/common/pure.h"
#include "envoy/http/async_client.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

#include "envoy/http/header_map.h"

#include "google/protobuf/service.h"
Expand Down Expand Up @@ -69,4 +70,19 @@ class RpcChannelFactory {
const Optional<std::chrono::milliseconds>& timeout) PURE;
};

/**
* Interface for sending async gRPC requests.
*/
class RpcAsyncClient {
public:
virtual ~RpcAsyncClient() {}

/**
* Send gRPC request to upstream cluster using method and callbacks.
*/
virtual void send(const std::string& upstream_cluster, const std::string& service_full_name,
const std::string& method_name, proto::Message&& grpc_request,
Http::AsyncClient::Callbacks& callbacks) PURE;
};

} // Grpc
2 changes: 2 additions & 0 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ add_library(
grpc/common.cc
grpc/http1_bridge_filter.cc
grpc/rpc_channel_impl.cc
grpc/utility.cc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think just add this code to common.cc, probably don't need a new source file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, moving that to common.

http/access_log/access_log_formatter.cc
http/access_log/access_log_impl.cc
http/async_client_impl.cc
Expand Down Expand Up @@ -113,6 +114,7 @@ endif()
include_directories(${ENVOY_LIBEVENT_INCLUDE_DIR})
include_directories(${ENVOY_NGHTTP2_INCLUDE_DIR})
include_directories(SYSTEM ${ENVOY_OPENSSL_INCLUDE_DIR})
include_directories(SYSTEM ${ENVOY_LIGHTSTEP_TRACER_INCLUDE_DIR})

set_target_properties(envoy-common PROPERTIES COTIRE_CXX_PREFIX_HEADER_INIT
"../precompiled/precompiled.h")
Expand Down
5 changes: 5 additions & 0 deletions source/common/common/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,8 @@ bool StringUtil::startsWith(const std::string& source, const std::string& start,
return strncasecmp(source.c_str(), start.c_str(), start.size()) == 0;
}
}

const std::string& StringUtil::valueOrDefault(const std::string& input,
const std::string& default_value) {
return input.empty() ? default_value : input;
}
6 changes: 6 additions & 0 deletions source/common/common/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,10 @@ class StringUtil {
*/
static bool startsWith(const std::string& source, const std::string& start,
bool case_sensitive = true);

/**
* @return original @param input string if it's not empty or @param default_value otherwise.
*/
static const std::string& valueOrDefault(const std::string& input,
const std::string& default_value = "-");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: having a default param of "-" for a common utility function is a little strange, I would just have no default param.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

};
39 changes: 21 additions & 18 deletions source/common/grpc/rpc_channel_impl.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "common.h"
#include "rpc_channel_impl.h"
#include "utility.h"

#include "common/common/enum_to_int.h"
#include "common/common/utility.h"
Expand Down Expand Up @@ -30,15 +31,9 @@ void RpcChannelImpl::CallMethod(const proto::MethodDescriptor* method, proto::Rp
// here for clarity.
ASSERT(cm_.get(cluster_)->features() & Upstream::Cluster::Features::HTTP2);

Http::MessagePtr message(new Http::RequestMessageImpl());
message->headers().addViaMoveValue(Http::Headers::get().Scheme, "http");
message->headers().addViaMoveValue(Http::Headers::get().Method, "POST");
message->headers().addViaMoveValue(
Http::Headers::get().Path,
fmt::format("/{}/{}", method->service()->full_name(), method->name()));
message->headers().addViaCopy(Http::Headers::get().Host, cluster_);
message->headers().addViaCopy(Http::Headers::get().ContentType, Common::GRPC_CONTENT_TYPE);
message->body(serializeBody(*grpc_request));
Http::MessagePtr message =
Utility::prepareHeaders(cluster_, method->service()->full_name(), method->name());
message->body(Utility::serializeBody(*grpc_request));

callbacks_.onPreRequestCustomizeHeaders(message->headers());
http_request_ = cm_.httpAsyncClientForCluster(cluster_).send(std::move(message), *this, timeout_);
Expand Down Expand Up @@ -133,15 +128,23 @@ void RpcChannelImpl::onComplete() {
grpc_response_ = nullptr;
}

Buffer::InstancePtr RpcChannelImpl::serializeBody(const proto::Message& message) {
// http://www.grpc.io/docs/guides/wire.html
Buffer::InstancePtr body(new Buffer::OwnedImpl());
uint8_t compressed = 0;
body->add(&compressed, sizeof(compressed));
uint32_t size = htonl(message.ByteSize());
body->add(&size, sizeof(size));
body->add(message.SerializeAsString());
return body;
void RpcAsyncClientImpl::send(const std::string& upstream_cluster,
const std::string& service_full_name, const std::string& method_name,
proto::Message&& grpc_request,
Http::AsyncClient::Callbacks& callbacks) {
// For proto3 messages this should always return true.
ASSERT(grpc_request->IsInitialized());
// This should be caught in configuration, and a request will fail normally anyway, but assert
// here for clarity.
ASSERT(cm_.get(cluster_)->features() & Upstream::Cluster::Features::HTTP2);

Http::MessagePtr message =
Utility::prepareHeaders(upstream_cluster, service_full_name, method_name);
message->body(Utility::serializeBody(grpc_request));

// TODO: pass timeout to send method.
cm_.httpAsyncClientForCluster(upstream_cluster)
.send(std::move(message), callbacks, std::chrono::milliseconds(5000));
}

} // Grpc
12 changes: 12 additions & 0 deletions source/common/grpc/rpc_channel_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,16 @@ class RpcChannelImpl : public RpcChannel, public Http::AsyncClient::Callbacks {
Optional<std::chrono::milliseconds> timeout_;
};

class RpcAsyncClientImpl : public RpcAsyncClient {
public:
RpcAsyncClientImpl(Upstream::ClusterManager& cm) : cm_(cm) {}

void send(const std::string& upstream_cluster, const std::string& service_full_name,
const std::string& method_name, proto::Message&& grpc_request,
Http::AsyncClient::Callbacks& callbacks) override;

private:
Upstream::ClusterManager& cm_;
};

} // Grpc
34 changes: 34 additions & 0 deletions source/common/grpc/utility.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include "common.h"
#include "utility.h"

#include "common/http/headers.h"

namespace Grpc {

Buffer::InstancePtr Utility::serializeBody(const google::protobuf::Message& message) {
// http://www.grpc.io/docs/guides/wire.html
Buffer::InstancePtr body(new Buffer::OwnedImpl());
uint8_t compressed = 0;
body->add(&compressed, sizeof(compressed));
uint32_t size = htonl(message.ByteSize());
body->add(&size, sizeof(size));
body->add(message.SerializeAsString());

return body;
}

Http::MessagePtr Utility::prepareHeaders(const std::string& upstream_cluster,
const std::string& service_full_name,
const std::string& method_name) {
Http::MessagePtr message(new Http::RequestMessageImpl());
message->headers().addViaMoveValue(Http::Headers::get().Scheme, "http");
message->headers().addViaMoveValue(Http::Headers::get().Method, "POST");
message->headers().addViaMoveValue(Http::Headers::get().Path,
fmt::format("/{}/{}", service_full_name, method_name));
message->headers().addViaCopy(Http::Headers::get().Host, upstream_cluster);
message->headers().addViaCopy(Http::Headers::get().ContentType, Common::GRPC_CONTENT_TYPE);

return message;
}

} // Grpc
17 changes: 17 additions & 0 deletions source/common/grpc/utility.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include "common/http/message_impl.h"

#include "google/protobuf/message.h"

namespace Grpc {

class Utility {
public:
static Buffer::InstancePtr serializeBody(const google::protobuf::Message& message);
static Http::MessagePtr prepareHeaders(const std::string& upstream_cluster,
const std::string& service_full_name,
const std::string& method_name);
};

} // Grpc
Loading