diff --git a/ci/do_ci.sh b/ci/do_ci.sh index 33e870706955..421b2496bf91 100755 --- a/ci/do_ci.sh +++ b/ci/do_ci.sh @@ -41,6 +41,7 @@ $EXTRA_CMAKE_FLAGS -DENVOY_DEBUG:BOOL=OFF \ -DENVOY_TCLAP_INCLUDE_DIR:FILEPATH=/thirdparty/tclap-1.2.1/include \ -DENVOY_JANSSON_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \ -DENVOY_OPENSSL_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \ +-DENVOY_LIGHTSTEP_TRACER_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \ -DENVOY_PROTOBUF_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \ -DENVOY_PROTOBUF_PROTOC:FILEPATH=/thirdparty_build/bin/protoc \ -DENVOY_GCOVR:FILEPATH=/thirdparty/gcovr-3.3/scripts/gcovr \ diff --git a/configs/envoy_double_proxy.template.json b/configs/envoy_double_proxy.template.json index c7e3e3d6184f..b4453ec65510 100644 --- a/configs/envoy_double_proxy.template.json +++ b/configs/envoy_double_proxy.template.json @@ -137,6 +137,7 @@ }, { "name": "lightstep_saas", + "features": "http2", "ssl_context": { "ca_cert_file": "/etc/ssl/certs/ca-certificates.crt", "verify_subject_alt_name": "collector.lightstep.com" diff --git a/configs/envoy_front_proxy.template.json b/configs/envoy_front_proxy.template.json index 4238f362b570..829a7a38bc5b 100644 --- a/configs/envoy_front_proxy.template.json +++ b/configs/envoy_front_proxy.template.json @@ -126,6 +126,7 @@ }, { "name": "lightstep_saas", + "features": "http2", "ssl_context": { "ca_cert_file": "/etc/ssl/certs/ca-certificates.crt", "verify_subject_alt_name": "collector.lightstep.com" diff --git a/configs/envoy_service_to_service.template.json b/configs/envoy_service_to_service.template.json index 8de210c048d6..818c661a4394 100644 --- a/configs/envoy_service_to_service.template.json +++ b/configs/envoy_service_to_service.template.json @@ -370,6 +370,7 @@ }, { "name": "lightstep_saas", + "features": "http2", "ssl_context": { "ca_cert_file": "/etc/ssl/certs/ca-certificates.crt", "verify_subject_alt_name": "collector.lightstep.com" diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index 970075e17366..49b3ea755db8 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -113,6 +113,7 @@ endif() include_directories(${ENVOY_LIBEVENT_INCLUDE_DIR}) include_directories(${ENVOY_NGHTTP2_INCLUDE_DIR}) include_directories(SYSTEM ${ENVOY_OPENSSL_INCLUDE_DIR}) +include_directories(SYSTEM ${ENVOY_LIGHTSTEP_TRACER_INCLUDE_DIR}) set_target_properties(envoy-common PROPERTIES COTIRE_CXX_PREFIX_HEADER_INIT "../precompiled/precompiled.h") diff --git a/source/common/common/utility.cc b/source/common/common/utility.cc index 580b9faa81d3..d21180e6c970 100644 --- a/source/common/common/utility.cc +++ b/source/common/common/utility.cc @@ -102,3 +102,8 @@ bool StringUtil::startsWith(const std::string& source, const std::string& start, return strncasecmp(source.c_str(), start.c_str(), start.size()) == 0; } } + +const std::string& StringUtil::valueOrDefault(const std::string& input, + const std::string& default_value) { + return input.empty() ? default_value : input; +} diff --git a/source/common/common/utility.h b/source/common/common/utility.h index 0b88f3d2f43a..280e1b9ec0ef 100644 --- a/source/common/common/utility.h +++ b/source/common/common/utility.h @@ -92,4 +92,10 @@ class StringUtil { */ static bool startsWith(const std::string& source, const std::string& start, bool case_sensitive = true); + + /** + * @return original @param input string if it's not empty or @param default_value otherwise. + */ + static const std::string& valueOrDefault(const std::string& input, + const std::string& default_value); }; diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index 1c9c520d047c..72ea2811a357 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -1,5 +1,12 @@ #include "common.h" +#include "common/buffer/buffer_impl.h" +#include "common/common/enum_to_int.h" +#include "common/common/utility.h" +#include "common/http/headers.h" +#include "common/http/message_impl.h" +#include "common/http/utility.h" + namespace Grpc { const std::string Common::GRPC_CONTENT_TYPE{"application/grpc"}; @@ -15,4 +22,71 @@ void Common::chargeStat(Stats::Store& store, const std::string& cluster, .inc(); } +Buffer::InstancePtr Common::serializeBody(const google::protobuf::Message& message) { + // http://www.grpc.io/docs/guides/wire.html + Buffer::InstancePtr body(new Buffer::OwnedImpl()); + uint8_t compressed = 0; + body->add(&compressed, sizeof(compressed)); + uint32_t size = htonl(message.ByteSize()); + body->add(&size, sizeof(size)); + body->add(message.SerializeAsString()); + + return body; +} + +Http::MessagePtr Common::prepareHeaders(const std::string& upstream_cluster, + const std::string& service_full_name, + const std::string& method_name) { + Http::MessagePtr message(new Http::RequestMessageImpl()); + message->headers().addViaMoveValue(Http::Headers::get().Scheme, "http"); + message->headers().addViaMoveValue(Http::Headers::get().Method, "POST"); + message->headers().addViaMoveValue(Http::Headers::get().Path, + fmt::format("/{}/{}", service_full_name, method_name)); + message->headers().addViaCopy(Http::Headers::get().Host, upstream_cluster); + message->headers().addViaCopy(Http::Headers::get().ContentType, Common::GRPC_CONTENT_TYPE); + + return message; +} + +void Common::checkForHeaderOnlyError(Http::Message& http_response) { + // First check for grpc-status in headers. If it is here, we have an error. + const std::string& grpc_status_header = http_response.headers().get(Common::GRPC_STATUS_HEADER); + if (grpc_status_header.empty()) { + return; + } + + uint64_t grpc_status_code; + if (!StringUtil::atoul(grpc_status_header.c_str(), grpc_status_code)) { + throw Exception(Optional(), "bad grpc-status header"); + } + + const std::string& grpc_status_message = http_response.headers().get(Common::GRPC_MESSAGE_HEADER); + throw Exception(grpc_status_code, grpc_status_message); +} + +void Common::validateResponse(Http::Message& http_response) { + if (Http::Utility::getResponseStatus(http_response.headers()) != enumToInt(Http::Code::OK)) { + throw Exception(Optional(), "non-200 response code"); + } + + checkForHeaderOnlyError(http_response); + + // Check for existence of trailers. + if (!http_response.trailers()) { + throw Exception(Optional(), "no response trailers"); + } + + const std::string& grpc_status_header = http_response.trailers()->get(Common::GRPC_STATUS_HEADER); + const std::string& grpc_status_message = + http_response.trailers()->get(Common::GRPC_MESSAGE_HEADER); + uint64_t grpc_status_code; + if (!StringUtil::atoul(grpc_status_header.c_str(), grpc_status_code)) { + throw Exception(Optional(), "bad grpc-status trailer"); + } + + if (grpc_status_code != 0) { + throw Exception(grpc_status_code, grpc_status_message); + } +} + } // Grpc diff --git a/source/common/grpc/common.h b/source/common/grpc/common.h index 7c894b8a507d..fec4d29ba085 100644 --- a/source/common/grpc/common.h +++ b/source/common/grpc/common.h @@ -1,10 +1,23 @@ #pragma once +#include "envoy/common/exception.h" +#include "envoy/common/optional.h" #include "envoy/http/header_map.h" +#include "envoy/http/message.h" #include "envoy/stats/stats.h" +#include "google/protobuf/message.h" + namespace Grpc { +class Exception : public EnvoyException { +public: + Exception(const Optional& grpc_status, const std::string& message) + : EnvoyException(message), grpc_status_(grpc_status) {} + + const Optional grpc_status_; +}; + class Common { public: /** @@ -18,10 +31,29 @@ class Common { static void chargeStat(Stats::Store& store, const std::string& cluster, const std::string& grpc_service, const std::string& grpc_method, bool success); + /** + * Serialize protobuf message. + */ + static Buffer::InstancePtr serializeBody(const google::protobuf::Message& message); + + /** + * Prepare headers for protobuf service. + */ + static Http::MessagePtr prepareHeaders(const std::string& upstream_cluster, + const std::string& service_full_name, + const std::string& method_name); + + /** + * Basic validation of gRPC response, @throws Grpc::Exception in case of non successful response. + */ + static void validateResponse(Http::Message& http_response); static const std::string GRPC_CONTENT_TYPE; static const Http::LowerCaseString GRPC_MESSAGE_HEADER; static const Http::LowerCaseString GRPC_STATUS_HEADER; + +private: + static void checkForHeaderOnlyError(Http::Message& http_response); }; } // Grpc diff --git a/source/common/grpc/rpc_channel_impl.cc b/source/common/grpc/rpc_channel_impl.cc index adb918d6a166..f7a31882acd6 100644 --- a/source/common/grpc/rpc_channel_impl.cc +++ b/source/common/grpc/rpc_channel_impl.cc @@ -30,15 +30,9 @@ void RpcChannelImpl::CallMethod(const proto::MethodDescriptor* method, proto::Rp // here for clarity. ASSERT(cm_.get(cluster_)->features() & Upstream::Cluster::Features::HTTP2); - Http::MessagePtr message(new Http::RequestMessageImpl()); - message->headers().addViaMoveValue(Http::Headers::get().Scheme, "http"); - message->headers().addViaMoveValue(Http::Headers::get().Method, "POST"); - message->headers().addViaMoveValue( - Http::Headers::get().Path, - fmt::format("/{}/{}", method->service()->full_name(), method->name())); - message->headers().addViaCopy(Http::Headers::get().Host, cluster_); - message->headers().addViaCopy(Http::Headers::get().ContentType, Common::GRPC_CONTENT_TYPE); - message->body(serializeBody(*grpc_request)); + Http::MessagePtr message = + Common::prepareHeaders(cluster_, method->service()->full_name(), method->name()); + message->body(Common::serializeBody(*grpc_request)); callbacks_.onPreRequestCustomizeHeaders(message->headers()); http_request_ = cm_.httpAsyncClientForCluster(cluster_).send(std::move(message), *this, timeout_); @@ -49,61 +43,21 @@ void RpcChannelImpl::incStat(bool success) { grpc_method_->name(), success); } -void RpcChannelImpl::checkForHeaderOnlyError(Http::Message& http_response) { - // First check for grpc-status in headers. If it is here, we have an error. - const std::string& grpc_status_header = http_response.headers().get(Common::GRPC_STATUS_HEADER); - if (grpc_status_header.empty()) { - return; - } - - uint64_t grpc_status_code; - if (!StringUtil::atoul(grpc_status_header.c_str(), grpc_status_code)) { - throw Exception(Optional(), "bad grpc-status header"); - } - - const std::string& grpc_status_message = http_response.headers().get(Common::GRPC_MESSAGE_HEADER); - throw Exception(grpc_status_code, grpc_status_message); -} - -void RpcChannelImpl::onSuccessWorker(Http::Message& http_response) { - if (Http::Utility::getResponseStatus(http_response.headers()) != enumToInt(Http::Code::OK)) { - throw Exception(Optional(), "non-200 response code"); - } - - checkForHeaderOnlyError(http_response); - - // Check for existance of trailers. - if (!http_response.trailers()) { - throw Exception(Optional(), "no response trailers"); - } - - const std::string& grpc_status_header = http_response.trailers()->get(Common::GRPC_STATUS_HEADER); - const std::string& grpc_status_message = - http_response.trailers()->get(Common::GRPC_MESSAGE_HEADER); - uint64_t grpc_status_code; - if (!StringUtil::atoul(grpc_status_header.c_str(), grpc_status_code)) { - throw Exception(Optional(), "bad grpc-status trailer"); - } +void RpcChannelImpl::onSuccess(Http::MessagePtr&& http_response) { + try { + Common::validateResponse(*http_response); - if (grpc_status_code != 0) { - throw Exception(grpc_status_code, grpc_status_message); - } + // A gRPC response contains a 5 byte header. Currently we only support unary responses so we + // ignore the header. @see serializeBody(). + if (!http_response->body() || !(http_response->body()->length() > 5)) { + throw Exception(Optional(), "bad serialized body"); + } - // A GRPC response contains a 5 byte header. Currently we only support unary responses so we - // ignore the header. @see serializeBody(). - if (!http_response.body() || !(http_response.body()->length() > 5)) { - throw Exception(Optional(), "bad serialized body"); - } + http_response->body()->drain(5); + if (!grpc_response_->ParseFromString(http_response->bodyAsString())) { + throw Exception(Optional(), "bad serialized body"); + } - http_response.body()->drain(5); - if (!grpc_response_->ParseFromString(http_response.bodyAsString())) { - throw Exception(Optional(), "bad serialized body"); - } -} - -void RpcChannelImpl::onSuccess(Http::MessagePtr&& http_response) { - try { - onSuccessWorker(*http_response); callbacks_.onSuccess(); incStat(true); onComplete(); @@ -133,15 +87,4 @@ void RpcChannelImpl::onComplete() { grpc_response_ = nullptr; } -Buffer::InstancePtr RpcChannelImpl::serializeBody(const proto::Message& message) { - // http://www.grpc.io/docs/guides/wire.html - Buffer::InstancePtr body(new Buffer::OwnedImpl()); - uint8_t compressed = 0; - body->add(&compressed, sizeof(compressed)); - uint32_t size = htonl(message.ByteSize()); - body->add(&size, sizeof(size)); - body->add(message.SerializeAsString()); - return body; -} - } // Grpc diff --git a/source/common/grpc/rpc_channel_impl.h b/source/common/grpc/rpc_channel_impl.h index 72b74e0660a6..93d5da0d1aaf 100644 --- a/source/common/grpc/rpc_channel_impl.h +++ b/source/common/grpc/rpc_channel_impl.h @@ -42,15 +42,6 @@ class RpcChannelImpl : public RpcChannel, public Http::AsyncClient::Callbacks { proto::Closure* done_callback) override; private: - class Exception : public EnvoyException { - public: - Exception(const Optional& grpc_status, const std::string& message) - : EnvoyException(message), grpc_status_(grpc_status) {} - - const Optional grpc_status_; - }; - - void checkForHeaderOnlyError(Http::Message& http_response); void incStat(bool success); void onComplete(); void onFailureWorker(const Optional& grpc_status, const std::string& message); diff --git a/source/common/tracing/http_tracer_impl.cc b/source/common/tracing/http_tracer_impl.cc index 7529b31cfa0d..c95cd670bfc4 100644 --- a/source/common/tracing/http_tracer_impl.cc +++ b/source/common/tracing/http_tracer_impl.cc @@ -1,18 +1,19 @@ #include "http_tracer_impl.h" #include "common/common/macros.h" -#include "common/http/codes.h" +#include "common/common/utility.h" +#include "common/grpc/common.h" #include "common/http/headers.h" #include "common/http/header_map_impl.h" #include "common/http/message_impl.h" #include "common/http/utility.h" #include "common/runtime/uuid_util.h" -#include "envoy/thread_local/thread_local.h" -#include "envoy/upstream/cluster_manager.h" - namespace Tracing { +const std::string LightStepSink::LIGHTSTEP_SERVICE = "lightstep.collector.CollectorService"; +const std::string LightStepSink::LIGHTSTEP_METHOD = "Report"; + void HttpTracerUtility::mutateHeaders(Http::HeaderMap& request_headers, Runtime::Loader& runtime) { std::string x_request_id = request_headers.get(Http::Headers::get().RequestId); @@ -113,46 +114,69 @@ void HttpTracerImpl::populateStats(const Decision& decision) { } } -Http::MessagePtr LightStepUtility::buildHeaders(const std::string& access_token) { - Http::MessagePtr msg{new Http::RequestMessageImpl()}; +LightStepRecorder::LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink) + : builder_(tracer), sink_(sink) {} + +void LightStepRecorder::RecordSpan(lightstep::collector::Span&& span) { + builder_.addSpan(std::move(span)); + + uint64_t min_flush_spans = + sink_.runtime().snapshot().getInteger("tracing.lightstep.min_flush_spans", 5U); + if (builder_.pendingSpans() == min_flush_spans) { + lightstep::collector::ReportRequest request; + std::swap(request, builder_.pending()); + + Http::MessagePtr message = + Grpc::Common::prepareHeaders(sink_.collectorCluster(), LightStepSink::LIGHTSTEP_SERVICE, + LightStepSink::LIGHTSTEP_METHOD); - msg->headers().addViaCopy(Http::Headers::get().Scheme, "http"); - msg->headers().addViaCopy(Http::Headers::get().Method, "POST"); - msg->headers().addViaCopy(Http::Headers::get().Path, "/api/v0/reports"); - msg->headers().addViaCopy(Http::Headers::get().ContentType, "application/json"); - msg->headers().addViaCopy(Http::Headers::get().Host, "collector.lightstep.com"); - msg->headers().addViaCopy("LightStep-Access-Token", access_token); + message->body(Grpc::Common::serializeBody(std::move(request))); - return msg; + sink_.clusterManager() + .httpAsyncClientForCluster(sink_.collectorCluster()) + .send(std::move(message), *this, std::chrono::milliseconds(5000)); + } } -std::string LightStepUtility::buildJoiningIds(const Http::HeaderMap& request_headers) { - std::string join_ids; +bool LightStepRecorder::FlushWithTimeout(lightstep::Duration) { + // Note: We don't expect this to be called, since the Tracer + // reference is private to its LightStepSink. + return true; +} - // Always populate x-request-id as joining id. - static const std::string x_request_id_format = R"EOF( - {{ - "TraceKey": "x-request-id", - "Value": "{}" - }})EOF"; - join_ids += fmt::format(x_request_id_format, request_headers.get(Http::Headers::get().RequestId)); +std::unique_ptr +LightStepRecorder::NewInstance(LightStepSink& sink, const lightstep::TracerImpl& tracer) { + return std::unique_ptr(new LightStepRecorder(tracer, sink)); +} - // Optionally populate x-client-trace-id if present. - if (request_headers.has(Http::Headers::get().ClientTraceId)) { - static const std::string x_client_trace_id_format = R"EOF( - ,{{ - "TraceKey": "x-client-trace-id", - "Value": "{}" - }})EOF"; - join_ids += fmt::format(x_client_trace_id_format, - request_headers.get(Http::Headers::get().ClientTraceId)); +LightStepSink::LightStepSink(const Json::Object& config, Upstream::ClusterManager& cluster_manager, + Stats::Store& stats, const std::string& service_node, + ThreadLocal::Instance& tls, Runtime::Loader& runtime, + std::unique_ptr options) + : collector_cluster_(config.getString("collector_cluster")), cm_(cluster_manager), + stats_store_(stats), service_node_(service_node), tls_(tls), runtime_(runtime), + options_(std::move(options)), tls_slot_(tls.allocateSlot()) { + if (!cm_.get(collector_cluster_)) { + throw EnvoyException(fmt::format("{} collector cluster is not defined on cluster manager level", + collector_cluster_)); + } + + if (!(cm_.get(collector_cluster_)->features() & Upstream::Cluster::Features::HTTP2)) { + throw EnvoyException( + fmt::format("{} collector cluster must support http2 for gRPC calls", collector_cluster_)); } - return join_ids; + tls_.set(tls_slot_, [this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectPtr { + lightstep::Tracer tracer(lightstep::NewUserDefinedTransportLightStepTracer( + *options_, + std::bind(&LightStepRecorder::NewInstance, std::ref(*this), std::placeholders::_1))); + + return ThreadLocal::ThreadLocalObjectPtr{new TlsLightStepTracer(std::move(tracer), *this)}; + }); } -std::string LightStepUtility::buildRequestLine(const Http::HeaderMap& request_headers, - const Http::AccessLog::RequestInfo& info) { +std::string LightStepSink::buildRequestLine(const Http::HeaderMap& request_headers, + const Http::AccessLog::RequestInfo& info) { std::string method = request_headers.get(Http::Headers::get().Method); std::string path = request_headers.has(Http::Headers::get().EnvoyOriginalPath) ? request_headers.get(Http::Headers::get().EnvoyOriginalPath) @@ -166,136 +190,54 @@ std::string LightStepUtility::buildRequestLine(const Http::HeaderMap& request_he return fmt::format("{} {} {}", method, path, info.protocol()); } -std::string LightStepUtility::buildSpanAttributes(const Http::HeaderMap& request_headers, - const Http::AccessLog::RequestInfo& request_info, - const std::string& service_node) { - const std::string request_line = buildRequestLine(request_headers, request_info); - std::string downstream_cluster = - request_headers.get(Http::Headers::get().EnvoyDownstreamServiceCluster); - if (downstream_cluster.empty()) { - downstream_cluster = "-"; - } - - const std::string response_code = request_info.responseCode().valid() - ? std::to_string(request_info.responseCode().value()) - : "0"; - std::string user_agent = request_headers.get(Http::Headers::get().UserAgent); - if (user_agent.empty()) { - user_agent = "-"; - } - - static const std::string attributes_format = R"EOF( - {{ - "Key": "request line", - "Value": "{}" - }}, - {{ - "Key": "response code", - "Value": "{}" - }}, - {{ - "Key": "downstream cluster", - "Value": "{}" - }}, - {{ - "Key": "user agent", - "Value": "{}" - }}, - {{ - "Key": "node id", - "Value": "{}" - }})EOF"; - - return fmt::format(attributes_format, request_line, response_code, downstream_cluster, user_agent, - service_node); +std::string LightStepSink::buildResponseCode(const Http::AccessLog::RequestInfo& info) { + return info.responseCode().valid() ? std::to_string(info.responseCode().value()) : "0"; } -std::string LightStepUtility::buildJsonBody(const Http::HeaderMap& request_headers, - const Http::HeaderMap&, - const Http::AccessLog::RequestInfo& request_info, - Runtime::RandomGenerator& random, - const std::string& local_service_cluster, - const std::string& service_node) { - static const std::string json_format = R"EOF( -{{ - "runtime": {{ - "guid": "{}", - "group_name": "{}", - "start_micros": {} - }}, - "span_records": [ - {{ - "span_guid": "{}", - "span_name": "{}", - "oldest_micros": {}, - "youngest_micros": {}, - "join_ids": [{}], - "attributes": [{}] - }} - ] -}} - )EOF"; - - const std::string tracing_guid = random.uuid(); - static const std::string group_name = "Envoy-Tracing"; - uint64_t start_time = std::chrono::duration_cast( - request_info.startTime().time_since_epoch()).count(); - const std::string start_micros = std::to_string(start_time); - const std::string span_guid = random.uuid(); - const std::string& span_name = local_service_cluster; - const std::string oldest_micros = start_micros; - uint64_t end_time = - start_time + - std::chrono::duration_cast(request_info.duration()).count(); - const std::string youngest_micros = std::to_string(end_time); - const std::string joining_ids = buildJoiningIds(request_headers); - const std::string annotations = buildSpanAttributes(request_headers, request_info, service_node); - - return fmt::format(json_format, tracing_guid, group_name, start_micros, span_guid, span_name, - oldest_micros, youngest_micros, joining_ids, annotations); -} +void LightStepSink::flushTrace(const Http::HeaderMap& request_headers, const Http::HeaderMap&, + const Http::AccessLog::RequestInfo& request_info) { + lightstep::Span span = tls_.getTyped(tls_slot_).tracer_.StartSpan( + "full request", + { + lightstep::StartTimestamp(request_info.startTime()), + lightstep::SetTag("join:x-request-id", request_headers.get(Http::Headers::get().RequestId)), + lightstep::SetTag("request line", buildRequestLine(request_headers, request_info)), + lightstep::SetTag("response code", buildResponseCode(request_info)), + lightstep::SetTag( + "downstream cluster", + StringUtil::valueOrDefault( + request_headers.get(Http::Headers::get().EnvoyDownstreamServiceCluster), "-")), + lightstep::SetTag( + "user agent", + StringUtil::valueOrDefault(request_headers.get(Http::Headers::get().UserAgent), "-")), + lightstep::SetTag("node id", service_node_), + }); -LightStepSink::LightStepSink(const Json::Object& config, Upstream::ClusterManager& cluster_manager, - const std::string& stat_prefix, Stats::Store& stats, - Runtime::RandomGenerator& random, - const std::string& local_service_cluster, - const std::string& service_node, const std::string& access_token) - : collector_cluster_(config.getString("collector_cluster")), cm_(cluster_manager), - stats_{LIGHTSTEP_STATS(POOL_COUNTER_PREFIX(stats, stat_prefix + "tracing.lightstep."))}, - random_(random), local_service_cluster_(local_service_cluster), service_node_(service_node), - access_token_(access_token) { - if (!cm_.get(collector_cluster_)) { - throw EnvoyException(fmt::format("{} collector cluster is not defined on cluster manager level", - collector_cluster_)); + if (request_headers.has(Http::Headers::get().ClientTraceId)) { + span.SetTag("join:x-client-trace-id", request_headers.get(Http::Headers::get().ClientTraceId)); } -} - -void LightStepSink::flushTrace(const Http::HeaderMap& request_headers, - const Http::HeaderMap& response_headers, - const Http::AccessLog::RequestInfo& request_info) { - Http::MessagePtr msg = LightStepUtility::buildHeaders(access_token_); - Buffer::InstancePtr buffer(new Buffer::OwnedImpl( - LightStepUtility::buildJsonBody(request_headers, response_headers, request_info, random_, - local_service_cluster_, service_node_))); - msg->body(std::move(buffer)); - executeRequest(std::move(msg)); + span.Finish(); } -void LightStepSink::executeRequest(Http::MessagePtr&& msg) { - cm_.httpAsyncClientForCluster(collector_cluster_) - .send(std::move(msg), *this, std::chrono::milliseconds(5000)); +void LightStepRecorder::onFailure(Http::AsyncClient::FailureReason) { + Grpc::Common::chargeStat(sink_.statsStore(), sink_.collectorCluster(), + LightStepSink::LIGHTSTEP_SERVICE, LightStepSink::LIGHTSTEP_METHOD, + false); } -void LightStepSink::onFailure(Http::AsyncClient::FailureReason) { stats_.collector_failed_.inc(); } - -void LightStepSink::onSuccess(Http::MessagePtr&& response) { - uint64_t response_code = Http::Utility::getResponseStatus(response->headers()); - if (Http::CodeUtility::is2xx(response_code)) { - stats_.collector_success_.inc(); - } else { - stats_.collector_failed_.inc(); +void LightStepRecorder::onSuccess(Http::MessagePtr&& msg) { + try { + Grpc::Common::validateResponse(*msg); + + Grpc::Common::chargeStat(sink_.statsStore(), sink_.collectorCluster(), + LightStepSink::LIGHTSTEP_SERVICE, LightStepSink::LIGHTSTEP_METHOD, + true); + } catch (const Grpc::Exception& ex) { + Grpc::Common::chargeStat(sink_.statsStore(), sink_.collectorCluster(), + LightStepSink::LIGHTSTEP_SERVICE, LightStepSink::LIGHTSTEP_METHOD, + false); } } -} // Tracing \ No newline at end of file +} // Tracing diff --git a/source/common/tracing/http_tracer_impl.h b/source/common/tracing/http_tracer_impl.h index b7df2db3dd85..3de1402e0117 100644 --- a/source/common/tracing/http_tracer_impl.h +++ b/source/common/tracing/http_tracer_impl.h @@ -1,21 +1,16 @@ #pragma once #include "envoy/runtime/runtime.h" +#include "envoy/thread_local/thread_local.h" #include "envoy/tracing/http_tracer.h" #include "envoy/upstream/cluster_manager.h" #include "common/http/header_map_impl.h" #include "common/json/json_loader.h" -namespace Tracing { - -#define LIGHTSTEP_STATS(COUNTER) \ - COUNTER(collector_failed) \ - COUNTER(collector_success) +#include "lightstep/tracer.h" -struct LightStepStats { - LIGHTSTEP_STATS(GENERATE_COUNTER_STRUCT) -}; +namespace Tracing { #define HTTP_TRACER_STATS(COUNTER) \ COUNTER(global_switch_off) \ @@ -88,78 +83,73 @@ class HttpTracerImpl : public HttpTracer { std::vector sinks_; }; -class LightStepUtility { -public: - /** - * Sample json body format. - * - * {"runtime": { "guid": "039e9da7-2a07-4a54-9440-eaee8f3887ea", "group_name": "Envoy-Test", - * "start_micros": 1466704630010000 }, "span_records": [ { "span_guid": - * "745bfab3-4ba4-4a36-9133-ecf76108feb5", "span_name": "front-envoy", - * "oldest_micros":1466704630010000, "youngest_micros": 1466704630500000, "join_ids": [ { - * "TraceKey": "x-request-id", "Value": "5463a0cc-e469-454c-a8ac-950dd1a87c66" }, { - * "TraceKey": "x-client-request-id", "Value": "fcd405fd-657e-40e7-adf3-27c306df60a3" }]}]} - **/ - static std::string - buildJsonBody(const Http::HeaderMap& request_headers, const Http::HeaderMap& response_headers, - const Http::AccessLog::RequestInfo& request_info, Runtime::RandomGenerator& random, - const std::string& local_service_cluster, const std::string& service_node); - - /** - * Create LightStep specific headers. - * - * Note: We temporary keep access token to LightStep here hardcoded. - * This needs to be retrieved from Confidant, but we only can do so when we move LightStep - * collectors to our internal service. - * - * @param access token for light step access. - */ - static Http::MessagePtr buildHeaders(const std::string& access_token); - -private: - /** - * Build request line: Method Request-URI Protocol. - * Note: Request-URI will be truncated if it's longer than 256 chars. - */ - static std::string buildRequestLine(const Http::HeaderMap& request_headers, - const Http::AccessLog::RequestInfo& request_info); - static std::string buildJoiningIds(const Http::HeaderMap& request_headers); - static std::string buildSpanAttributes(const Http::HeaderMap& request_headers, - const Http::AccessLog::RequestInfo& request_info, - const std::string& service_node); -}; - /** * LightStep (http://lightstep.com/) provides tracing capabilities, aggregation, visualization of * application trace data. * * LightStepSink is for flushing data to LightStep collectors. */ -class LightStepSink : public HttpSink, public Http::AsyncClient::Callbacks { +class LightStepSink : public HttpSink { public: LightStepSink(const Json::Object& config, Upstream::ClusterManager& cluster_manager, - const std::string& stat_prefix, Stats::Store& stats, - Runtime::RandomGenerator& random, const std::string& local_service_cluster, - const std::string& service_node, const std::string& access_token); + Stats::Store& stats, const std::string& service_node, ThreadLocal::Instance& tls, + Runtime::Loader& runtime, std::unique_ptr options); // Tracer::HttpSink void flushTrace(const Http::HeaderMap& request_headers, const Http::HeaderMap& response_headers, const Http::AccessLog::RequestInfo& request_info) override; - // Http::AsyncClient::Callbacks - void onSuccess(Http::MessagePtr&&) override; - void onFailure(Http::AsyncClient::FailureReason reason) override; + Upstream::ClusterManager& clusterManager() { return cm_; } + const std::string& collectorCluster() { return collector_cluster_; } + Runtime::Loader& runtime() { return runtime_; } + Stats::Store& statsStore() { return stats_store_; } + + static const std::string LIGHTSTEP_SERVICE; + static const std::string LIGHTSTEP_METHOD; private: - void executeRequest(Http::MessagePtr&& msg); + struct TlsLightStepTracer : ThreadLocal::ThreadLocalObject { + TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink) + : tracer_(tracer), sink_(sink) {} + + void shutdown() override {} + + lightstep::Tracer tracer_; + LightStepSink& sink_; + }; + + std::string buildRequestLine(const Http::HeaderMap& request_headers, + const Http::AccessLog::RequestInfo& info); + std::string buildResponseCode(const Http::AccessLog::RequestInfo& info); const std::string collector_cluster_; Upstream::ClusterManager& cm_; - LightStepStats stats_; - Runtime::RandomGenerator& random_; - const std::string local_service_cluster_; + Stats::Store& stats_store_; const std::string service_node_; - const std::string access_token_; + ThreadLocal::Instance& tls_; + Runtime::Loader& runtime_; + std::unique_ptr options_; + uint32_t tls_slot_; +}; + +class LightStepRecorder : public lightstep::Recorder, Http::AsyncClient::Callbacks { +public: + LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink); + + // lightstep::Recorder + void RecordSpan(lightstep::collector::Span&& span) override; + bool FlushWithTimeout(lightstep::Duration) override; + + // Http::AsyncClient::Callbacks + void onSuccess(Http::MessagePtr&&) override; + void onFailure(Http::AsyncClient::FailureReason) override; + + static std::unique_ptr NewInstance(LightStepSink& sink, + const lightstep::TracerImpl& tracer); + +private: + lightstep::ReportBuilder builder_; + LightStepSink& sink_; }; -} // Tracing \ No newline at end of file +} // Tracing diff --git a/source/exe/CMakeLists.txt b/source/exe/CMakeLists.txt index dcc9ea5997b5..af350d9cdcde 100644 --- a/source/exe/CMakeLists.txt +++ b/source/exe/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries(envoy jansson) target_link_libraries(envoy ssl) target_link_libraries(envoy crypto) target_link_libraries(envoy nghttp2) +target_link_libraries(envoy lightstep_core_cxx11) target_link_libraries(envoy protobuf) target_link_libraries(envoy pthread) target_link_libraries(envoy anl) diff --git a/source/server/CMakeLists.txt b/source/server/CMakeLists.txt index 26c6eb72ac1d..96b7716afe64 100644 --- a/source/server/CMakeLists.txt +++ b/source/server/CMakeLists.txt @@ -23,6 +23,7 @@ include_directories(SYSTEM ${ENVOY_TCLAP_INCLUDE_DIR}) include_directories(SYSTEM ${ENVOY_HTTP_PARSER_INCLUDE_DIR}) include_directories(${ENVOY_NGHTTP2_INCLUDE_DIR}) include_directories(SYSTEM ${ENVOY_OPENSSL_INCLUDE_DIR}) +include_directories(SYSTEM ${ENVOY_LIGHTSTEP_TRACER_INCLUDE_DIR}) set_target_properties(envoy-server PROPERTIES COTIRE_CXX_PREFIX_HEADER_INIT "../precompiled/precompiled.h") diff --git a/source/server/configuration_impl.cc b/source/server/configuration_impl.cc index c6ac349919d4..860fca16d6f3 100644 --- a/source/server/configuration_impl.cc +++ b/source/server/configuration_impl.cc @@ -1,6 +1,7 @@ #include "configuration_impl.h" #include "envoy/network/connection.h" +#include "envoy/runtime/runtime.h" #include "envoy/server/instance.h" #include "envoy/ssl/context_manager.h" @@ -77,14 +78,20 @@ void MainImpl::initializeTracers(const Json::Object& tracing_configuration_) { log().notice(fmt::format(" loading {}", type)); if (type == "lightstep") { - std::string access_token = - server_.api().fileReadToEnd(sink.getString("access_token_file")); - StringUtil::rtrim(access_token); + ::Runtime::RandomGenerator& rand = server_.random(); + std::unique_ptr opts(new lightstep::TracerOptions()); + opts->access_token = server_.api().fileReadToEnd(sink.getString("access_token_file")); + StringUtil::rtrim(opts->access_token); + + opts->tracer_attributes["lightstep.guid"] = rand.uuid(); + opts->tracer_attributes["lightstep.component_name"] = + server_.options().serviceClusterName(); + opts->guid_generator = [&rand]() { return rand.random(); }; http_tracer_->addSink(Tracing::HttpSinkPtr{new Tracing::LightStepSink( - sink.getObject("config"), *cluster_manager_, "", server_.stats(), server_.random(), - server_.options().serviceClusterName(), server_.options().serviceNodeName(), - access_token)}); + sink.getObject("config"), *cluster_manager_, server_.stats(), + server_.options().serviceNodeName(), server_.threadLocal(), server_.runtime(), + std::move(opts))}); } else { throw EnvoyException(fmt::format("Unsupported sink type: '{}'", type)); } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7c0b3bd741e2..7e45ddba250b 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -19,6 +19,7 @@ include_directories(${PROJECT_SOURCE_DIR}) include_directories(${PROJECT_BINARY_DIR}) include_directories(SYSTEM ${ENVOY_OPENSSL_INCLUDE_DIR}) include_directories(${ENVOY_NGHTTP2_INCLUDE_DIR}) +include_directories(SYSTEM ${ENVOY_LIGHTSTEP_TRACER_INCLUDE_DIR}) add_executable(envoy-test $ @@ -32,6 +33,7 @@ add_executable(envoy-test common/event/file_event_impl_test.cc common/filesystem/filesystem_impl_test.cc common/filesystem/watcher_impl_test.cc + common/grpc/common_test.cc common/grpc/http1_bridge_filter_test.cc common/grpc/rpc_channel_impl_test.cc common/http/access_log/access_log_formatter_test.cc @@ -136,6 +138,7 @@ target_link_libraries(envoy-test jansson) target_link_libraries(envoy-test ssl) target_link_libraries(envoy-test crypto) target_link_libraries(envoy-test nghttp2) +target_link_libraries(envoy-test lightstep_core_cxx11) target_link_libraries(envoy-test protobuf) target_link_libraries(envoy-test gmock) target_link_libraries(envoy-test pthread) diff --git a/test/common/grpc/common_test.cc b/test/common/grpc/common_test.cc new file mode 100644 index 000000000000..dc95bf59fb53 --- /dev/null +++ b/test/common/grpc/common_test.cc @@ -0,0 +1,34 @@ +#include "common/grpc/common.h" +#include "common/http/headers.h" +#include "common/stats/stats_impl.h" + +#include "test/generated/helloworld.pb.h" +#include "test/mocks/stats/mocks.h" + +namespace Grpc { + +TEST(CommonTest, chargeStats) { + Stats::IsolatedStoreImpl stats; + + Common::chargeStat(stats, "cluster", "service", "method", true); + EXPECT_EQ(1U, stats.counter("cluster.cluster.grpc.service.method.success").value()); + EXPECT_EQ(0U, stats.counter("cluster.cluster.grpc.service.method.failure").value()); + EXPECT_EQ(1U, stats.counter("cluster.cluster.grpc.service.method.total").value()); + + Common::chargeStat(stats, "cluster", "service", "method", false); + EXPECT_EQ(1U, stats.counter("cluster.cluster.grpc.service.method.success").value()); + EXPECT_EQ(1U, stats.counter("cluster.cluster.grpc.service.method.failure").value()); + EXPECT_EQ(2U, stats.counter("cluster.cluster.grpc.service.method.total").value()); +} + +TEST(CommonTest, prepareHeaders) { + Http::MessagePtr message = Common::prepareHeaders("cluster", "service_name", "method_name"); + + EXPECT_EQ("http", message->headers().get(Http::Headers::get().Scheme)); + EXPECT_EQ("POST", message->headers().get(Http::Headers::get().Method)); + EXPECT_EQ("/service_name/method_name", message->headers().get(Http::Headers::get().Path)); + EXPECT_EQ("cluster", message->headers().get(Http::Headers::get().Host)); + EXPECT_EQ("application/grpc", message->headers().get(Http::Headers::get().ContentType)); +} + +} // Grpc diff --git a/test/common/grpc/rpc_channel_impl_test.cc b/test/common/grpc/rpc_channel_impl_test.cc index ec2b26ca2ebb..7fbaef073c28 100644 --- a/test/common/grpc/rpc_channel_impl_test.cc +++ b/test/common/grpc/rpc_channel_impl_test.cc @@ -1,3 +1,4 @@ +#include "common/grpc/common.h" #include "common/grpc/rpc_channel_impl.h" #include "test/generated/helloworld.pb.h" @@ -61,7 +62,7 @@ TEST_F(GrpcRequestImplTest, NoError) { Http::HeaderMapPtr{new Http::HeaderMapImpl{{":status", "200"}}})); helloworld::HelloReply inner_response; inner_response.set_message("hello a name"); - response_http_message->body(RpcChannelImpl::serializeBody(inner_response)); + response_http_message->body(Common::serializeBody(inner_response)); response_http_message->trailers( Http::HeaderMapPtr{new Http::HeaderMapImpl{{"grpc-status", "0"}}}); @@ -278,7 +279,7 @@ TEST_F(GrpcRequestImplTest, RequestTimeoutSet) { helloworld::HelloReply inner_response; inner_response.set_message("hello a name"); - response_http_message->body(RpcChannelImpl::serializeBody(inner_response)); + response_http_message->body(Common::serializeBody(inner_response)); response_http_message->trailers( Http::HeaderMapPtr{new Http::HeaderMapImpl{{"grpc-status", "0"}}}); diff --git a/test/common/tracing/http_tracer_impl_test.cc b/test/common/tracing/http_tracer_impl_test.cc index 074c264c8f0a..6652684c55a3 100644 --- a/test/common/tracing/http_tracer_impl_test.cc +++ b/test/common/tracing/http_tracer_impl_test.cc @@ -272,16 +272,19 @@ TEST(HttpNullTracerTest, NoFailures) { class LightStepSinkTest : public Test { public: - LightStepSinkTest() - : stats_{LIGHTSTEP_STATS(POOL_COUNTER_PREFIX(fake_stats_, "prefix.tracing.lightstep."))} {} - void setup(Json::Object& config) { - sink_.reset(new LightStepSink(config, cm_, "prefix.", fake_stats_, random_, "service_cluster", - "service_node", "token")); + std::unique_ptr opts(new lightstep::TracerOptions()); + opts->access_token = "sample_token"; + opts->tracer_attributes["lightstep.guid"] = "random_guid"; + opts->tracer_attributes["lightstep.component_name"] = "component"; + + sink_.reset( + new LightStepSink(config, cm_, stats_, "service_node", tls_, runtime_, std::move(opts))); } void setupValidSink() { - EXPECT_CALL(cm_, get("lightstep_saas")); + EXPECT_CALL(cm_, get("lightstep_saas")).WillRepeatedly(Return(&cluster_)); + ON_CALL(cluster_, features()).WillByDefault(Return(Upstream::Cluster::Features::HTTP2)); std::string valid_config = R"EOF( {"collector_cluster": "lightstep_saas"} @@ -293,11 +296,13 @@ class LightStepSinkTest : public Test { const Http::HeaderMapImpl empty_header_{}; - Stats::IsolatedStoreImpl fake_stats_; - LightStepStats stats_; + Stats::IsolatedStoreImpl stats_; NiceMock cm_; + NiceMock cluster_; NiceMock random_; + NiceMock runtime_; std::unique_ptr sink_; + NiceMock tls_; }; TEST_F(LightStepSinkTest, InitializeSink) { @@ -318,7 +323,7 @@ TEST_F(LightStepSinkTest, InitializeSink) { } { - // Valid config but not valid cluster + // Valid config but not valid cluster. EXPECT_CALL(cm_, get("lightstep_saas")).WillOnce(Return(nullptr)); std::string valid_config = R"EOF( @@ -330,7 +335,21 @@ TEST_F(LightStepSinkTest, InitializeSink) { } { - EXPECT_CALL(cm_, get("lightstep_saas")); + // Valid config, but upstream cluster does not support http2. + EXPECT_CALL(cm_, get("lightstep_saas")).WillRepeatedly(Return(&cluster_)); + ON_CALL(cluster_, features()).WillByDefault(Return(0)); + + std::string valid_config = R"EOF( + {"collector_cluster": "lightstep_saas"} + )EOF"; + Json::StringLoader loader(valid_config); + + EXPECT_THROW(setup(loader), EnvoyException); + } + + { + EXPECT_CALL(cm_, get("lightstep_saas")).WillRepeatedly(Return(&cluster_)); + ON_CALL(cluster_, features()).WillByDefault(Return(Upstream::Cluster::Features::HTTP2)); std::string valid_config = R"EOF( {"collector_cluster": "lightstep_saas"} @@ -341,249 +360,132 @@ TEST_F(LightStepSinkTest, InitializeSink) { } } -TEST_F(LightStepSinkTest, CallbacksCalled) { +TEST_F(LightStepSinkTest, FlushSeveralSpans) { setupValidSink(); NiceMock request_info; - EXPECT_CALL(cm_, httpAsyncClientForCluster("lightstep_saas")) .WillOnce(ReturnRef(cm_.async_client_)); - Http::MockAsyncClientRequest request_1(&cm_.async_client_); - Http::AsyncClient::Callbacks* callback_1; + Http::MockAsyncClientRequest request(&cm_.async_client_); + Http::AsyncClient::Callbacks* callback; const Optional timeout(std::chrono::seconds(5)); EXPECT_CALL(cm_.async_client_, send_(_, _, timeout)) .WillOnce( - Invoke([&](Http::MessagePtr&, Http::AsyncClient::Callbacks& callbacks, + Invoke([&](Http::MessagePtr& message, Http::AsyncClient::Callbacks& callbacks, const Optional&) -> Http::AsyncClient::Request* { - callback_1 = &callbacks; - return &request_1; + callback = &callbacks; + + EXPECT_EQ("/lightstep.collector.CollectorService/Report", + message->headers().get(Http::Headers::get().Path)); + EXPECT_EQ("lightstep_saas", message->headers().get(Http::Headers::get().Host)); + EXPECT_EQ("application/grpc", message->headers().get(Http::Headers::get().ContentType)); + + return &request; })); - EXPECT_CALL(random_, uuid()).WillOnce(Return("1")).WillOnce(Return("2")); - SystemTime start_time_1; - EXPECT_CALL(request_info, startTime()).WillOnce(Return(start_time_1)); - std::chrono::seconds duration_1(1); - EXPECT_CALL(request_info, duration()).WillOnce(Return(duration_1)); + + SystemTime start_time; + EXPECT_CALL(request_info, startTime()).Times(2).WillRepeatedly(Return(start_time)); Optional code_1(200); - EXPECT_CALL(request_info, responseCode()).WillRepeatedly(ReturnRef(code_1)); + Optional code_2(503); + EXPECT_CALL(request_info, responseCode()) + .WillOnce(ReturnRef(code_1)) + .WillOnce(ReturnRef(code_1)) + .WillOnce(ReturnRef(code_2)) + .WillOnce(ReturnRef(code_2)); + const std::string protocol = "http/1"; - EXPECT_CALL(request_info, protocol()).WillRepeatedly(ReturnRef(protocol)); + EXPECT_CALL(request_info, protocol()).Times(2).WillRepeatedly(ReturnRef(protocol)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5)) + .Times(2) + .WillRepeatedly(Return(2)); sink_->flushTrace(empty_header_, empty_header_, request_info); - - Http::MockAsyncClientRequest request_2(&cm_.async_client_); - EXPECT_CALL(cm_, httpAsyncClientForCluster("lightstep_saas")) - .WillOnce(ReturnRef(cm_.async_client_)); - Http::AsyncClient::Callbacks* callback_2; - - EXPECT_CALL(cm_.async_client_, send_(_, _, _)) - .WillOnce(Invoke([&](Http::MessagePtr&, Http::AsyncClient::Callbacks& callbacks, - Optional) -> Http::AsyncClient::Request* { - callback_2 = &callbacks; - return &request_2; - })); - EXPECT_CALL(random_, uuid()).WillOnce(Return("3")).WillOnce(Return("4")); - SystemTime start_time_2; - EXPECT_CALL(request_info, startTime()).WillOnce(Return(start_time_2)); - std::chrono::seconds duration_2(2); - EXPECT_CALL(request_info, duration()).WillOnce(Return(duration_2)); - Optional code_2(200); - EXPECT_CALL(request_info, responseCode()).WillRepeatedly(ReturnRef(code_2)); - sink_->flushTrace(empty_header_, empty_header_, request_info); - callback_2->onFailure(Http::AsyncClient::FailureReason::Reset); - EXPECT_EQ(1UL, stats_.collector_failed_.value()); - EXPECT_EQ(0UL, stats_.collector_success_.value()); + Http::MessagePtr msg(new Http::ResponseMessageImpl( + Http::HeaderMapPtr{new Http::HeaderMapImpl{{":status", "200"}}})); - callback_1->onSuccess(Http::MessagePtr{new Http::ResponseMessageImpl( - Http::HeaderMapPtr{new Http::HeaderMapImpl{{":status", "200"}}})}); - EXPECT_EQ(1UL, stats_.collector_failed_.value()); - EXPECT_EQ(1UL, stats_.collector_success_.value()); -} + msg->trailers(std::move(Http::HeaderMapPtr{new Http::HeaderMapImpl{{"grpc-status", "0"}}})); -TEST_F(LightStepSinkTest, ClientNotAvailable) { - setupValidSink(); + callback->onSuccess(std::move(msg)); - NiceMock request_info; + EXPECT_EQ( + 1U, + stats_.counter( + "cluster.lightstep_saas.grpc.lightstep.collector.CollectorService.Report.success") + .value()); - EXPECT_CALL(cm_, httpAsyncClientForCluster("lightstep_saas")) - .WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, send_(_, _, _)) - .WillOnce( - Invoke([&](Http::MessagePtr&, Http::AsyncClient::Callbacks& callbacks, - const Optional&) -> Http::AsyncClient::Request* { - callbacks.onSuccess(Http::MessagePtr{new Http::ResponseMessageImpl( - Http::HeaderMapPtr{new Http::HeaderMapImpl{{":status", "503"}}})}); - return nullptr; - })); - SystemTime start_time_1; - EXPECT_CALL(request_info, startTime()).WillOnce(Return(start_time_1)); - std::chrono::seconds duration_1(1); - EXPECT_CALL(request_info, duration()).WillOnce(Return(duration_1)); - const std::string protocol = "http/1"; - EXPECT_CALL(request_info, protocol()).WillRepeatedly(ReturnRef(protocol)); - Optional code_1(200); - EXPECT_CALL(request_info, responseCode()).WillRepeatedly(ReturnRef(code_1)); - sink_->flushTrace(empty_header_, empty_header_, request_info); + callback->onFailure(Http::AsyncClient::FailureReason::Reset); + + EXPECT_EQ( + 1U, + stats_.counter( + "cluster.lightstep_saas.grpc.lightstep.collector.CollectorService.Report.failure") + .value()); - EXPECT_EQ(1UL, stats_.collector_failed_.value()); - EXPECT_EQ(0UL, stats_.collector_success_.value()); + EXPECT_EQ( + 2U, + stats_.counter( + "cluster.lightstep_saas.grpc.lightstep.collector.CollectorService.Report.total") + .value()); } -TEST_F(LightStepSinkTest, ShutdownWhenActiveRequests) { +TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) { setupValidSink(); + NiceMock request_info; EXPECT_CALL(cm_, httpAsyncClientForCluster("lightstep_saas")) .WillOnce(ReturnRef(cm_.async_client_)); Http::MockAsyncClientRequest request(&cm_.async_client_); + Http::AsyncClient::Callbacks* callback; + const Optional timeout(std::chrono::seconds(5)); - NiceMock request_info; - const std::string protocol = "http/1"; - EXPECT_CALL(request_info, protocol()).WillOnce(ReturnRef(protocol)); - EXPECT_CALL(random_, uuid()).WillOnce(Return("1")).WillOnce(Return("2")); - SystemTime start_time(std::chrono::duration(1)); - EXPECT_CALL(request_info, startTime()).WillOnce(Return(start_time)); - std::chrono::seconds duration(1); - EXPECT_CALL(request_info, duration()).WillOnce(Return(duration)); - Optional code(200); - EXPECT_CALL(request_info, responseCode()).WillRepeatedly(ReturnRef(code)); - Http::HeaderMapImpl request_header{{"x-request-id", "id"}, - {":method", "GET"}, - {":path", "sample_path"}, - {"x-envoy-downstream-service-cluster", "downstream"}, - {"x-client-trace-id", "client-trace-id"}, - {"user-agent", "agent"}}; - - std::string expected_json = R"EOF( -{ - "runtime": { - "guid": "1", - "group_name": "Envoy-Tracing", - "start_micros": 1000000 - }, - "span_records": [ - { - "span_guid": "2", - "span_name": "service_cluster", - "oldest_micros": 1000000, - "youngest_micros": 2000000, - "join_ids": [ - { - "TraceKey": "x-request-id", - "Value": "id" - } - ,{ - "TraceKey": "x-client-trace-id", - "Value": "client-trace-id" - }], - "attributes": [ - { - "Key": "request line", - "Value": "GET sample_path http/1" - }, - { - "Key": "response code", - "Value": "200" - }, - { - "Key": "downstream cluster", - "Value": "downstream" - }, - { - "Key": "user agent", - "Value": "agent" - }, - { - "Key": "node id", - "Value": "service_node" - }] - } - ] -} - )EOF"; + EXPECT_CALL(cm_.async_client_, send_(_, _, timeout)) + .WillOnce( + Invoke([&](Http::MessagePtr& message, Http::AsyncClient::Callbacks& callbacks, + const Optional&) -> Http::AsyncClient::Request* { + callback = &callbacks; - Http::AsyncClient::Callbacks* callback; - EXPECT_CALL(cm_.async_client_, send_(_, _, _)) - .WillOnce(Invoke([&](Http::MessagePtr& msg, Http::AsyncClient::Callbacks& callbacks, - Optional) -> Http::AsyncClient::Request* { - callback = &callbacks; - EXPECT_EQ(expected_json, msg->bodyAsString()); - EXPECT_EQ("token", msg->headers().get("LightStep-Access-Token")); - return &request; - })); - - sink_->flushTrace(request_header, empty_header_, request_info); -} + EXPECT_EQ("/lightstep.collector.CollectorService/Report", + message->headers().get(Http::Headers::get().Path)); + EXPECT_EQ("lightstep_saas", message->headers().get(Http::Headers::get().Host)); + EXPECT_EQ("application/grpc", message->headers().get(Http::Headers::get().ContentType)); -TEST(LightStepUtilityTest, HeadersNotSet) { - std::string expected_json = R"EOF( -{ - "runtime": { - "guid": "1", - "group_name": "Envoy-Tracing", - "start_micros": 1000000 - }, - "span_records": [ - { - "span_guid": "2", - "span_name": "cluster", - "oldest_micros": 1000000, - "youngest_micros": 2000000, - "join_ids": [ - { - "TraceKey": "x-request-id", - "Value": "id" - }], - "attributes": [ - { - "Key": "request line", - "Value": "POST /locations http/1" - }, - { - "Key": "response code", - "Value": "300" - }, - { - "Key": "downstream cluster", - "Value": "-" - }, - { - "Key": "user agent", - "Value": "-" - }, - { - "Key": "node id", - "Value": "i485" - }] - } - ] -} - )EOF"; + return &request; + })); + + SystemTime start_time; + EXPECT_CALL(request_info, startTime()).WillOnce(Return(start_time)); + Optional code(200); + EXPECT_CALL(request_info, responseCode()).WillOnce(ReturnRef(code)).WillOnce(ReturnRef(code)); - NiceMock request_info; const std::string protocol = "http/1"; EXPECT_CALL(request_info, protocol()).WillOnce(ReturnRef(protocol)); - SystemTime start_time(std::chrono::duration(1)); - EXPECT_CALL(request_info, startTime()).WillOnce(Return(start_time)); - std::chrono::seconds duration(1); - EXPECT_CALL(request_info, duration()).WillOnce(Return(duration)); - Optional code(300); - EXPECT_CALL(request_info, responseCode()).WillRepeatedly(ReturnRef(code)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5)) + .WillOnce(Return(1)); + + sink_->flushTrace(empty_header_, empty_header_, request_info); - Runtime::MockRandomGenerator random; - EXPECT_CALL(random, uuid()).WillOnce(Return("1")).WillOnce(Return("2")); + Http::MessagePtr msg(new Http::ResponseMessageImpl( + Http::HeaderMapPtr{new Http::HeaderMapImpl{{":status", "200"}}})); - Http::HeaderMapImpl request_header{ - {"x-request-id", "id"}, {":method", "POST"}, {":path", "/locations"}}; - Http::HeaderMapImpl empty_header; + // No trailers, gRPC is considered failed. + callback->onSuccess(std::move(msg)); - const std::string actual_json = LightStepUtility::buildJsonBody( - request_header, empty_header, request_info, random, "cluster", "i485"); + EXPECT_EQ( + 1U, + stats_.counter( + "cluster.lightstep_saas.grpc.lightstep.collector.CollectorService.Report.failure") + .value()); - EXPECT_EQ(actual_json, expected_json); + EXPECT_EQ( + 1U, + stats_.counter( + "cluster.lightstep_saas.grpc.lightstep.collector.CollectorService.Report.total") + .value()); } } // Tracing diff --git a/test/config/integration/server.json b/test/config/integration/server.json index 89a80cb5e731..fa65c1a60ef1 100644 --- a/test/config/integration/server.json +++ b/test/config/integration/server.json @@ -240,6 +240,7 @@ }, { "name": "lightstep_saas", + "features": "http2", "connect_timeout_ms": 250, "type": "strict_dns", "lb_type": "round_robin", diff --git a/test/config/integration/server_http2.json b/test/config/integration/server_http2.json index d264a960aa67..992fd6e456ab 100644 --- a/test/config/integration/server_http2.json +++ b/test/config/integration/server_http2.json @@ -210,6 +210,7 @@ }, { "name": "lightstep_saas", + "features": "http2", "connect_timeout_ms": 250, "type": "strict_dns", "lb_type": "round_robin", diff --git a/thirdparty.cmake b/thirdparty.cmake index a640b2750c88..a97371830bf8 100644 --- a/thirdparty.cmake +++ b/thirdparty.cmake @@ -43,6 +43,10 @@ set(ENVOY_OPENSSL_INCLUDE_DIR "" CACHE FILEPATH "location of openssl includes") set(ENVOY_PROTOBUF_INCLUDE_DIR "" CACHE FILEPATH "location of protobuf includes") set(ENVOY_PROTOBUF_PROTOC "" CACHE FILEPATH "location of protoc") +# http://lightstep.com/ +# Last tested with lightstep-tracer-cpp-0.14 +set(ENVOY_LIGHTSTEP_TRACER_INCLUDE_DIR "" CACHE FILEPATH "location of lighstep tracer includes") + # Extra linker flags required to properly link envoy with all of the above libraries. set(ENVOY_EXE_EXTRA_LINKER_FLAGS "" CACHE STRING "envoy extra linker flags")