From 52c8f7872ebf14e697501756addc366f6d4079d7 Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Fri, 11 Aug 2017 13:49:54 +0800 Subject: [PATCH 01/12] support Zipkin v2 Span format, resolve #11 --- src/Collector.cpp | 19 +++- src/Collector.h | 9 ++ src/Span.cpp | 7 +- src/Span.h | 224 +++++++++++++++++++++++++++++++++++++++++++++- test/TestSpan.cpp | 62 ++++++++++++- 5 files changed, 309 insertions(+), 12 deletions(-) diff --git a/src/Collector.cpp b/src/Collector.cpp index b887764..888fa49 100644 --- a/src/Collector.cpp +++ b/src/Collector.cpp @@ -21,6 +21,8 @@ namespace zipkin std::shared_ptr MessageCodec::binary(new BinaryCodec()); std::shared_ptr MessageCodec::json(new JsonCodec()); std::shared_ptr MessageCodec::pretty_json(new PrettyJsonCodec()); +std::shared_ptr MessageCodec::json_v2(new JsonCodec(2)); +std::shared_ptr MessageCodec::pretty_json_v2(new PrettyJsonCodec(2)); CompressionCodec parse_compression_codec(const std::string &codec) { @@ -57,7 +59,10 @@ std::shared_ptr MessageCodec::parse(const std::string &codec) return json; if (codec == "pretty_json") return pretty_json; - + if (codec == "json_v2") + return json_v2; + if (codec == "pretty_json_v2") + return pretty_json_v2; return nullptr; } @@ -85,7 +90,11 @@ size_t JsonCodec::encode(boost::shared_ptrserialize_json(writer); + if (m_format_version == 2) { + span->serialize_json_v2(writer); + } else { + span->serialize_json(writer); + } } writer.EndArray(); @@ -105,7 +114,11 @@ size_t PrettyJsonCodec::encode(boost::shared_ptrserialize_json(writer); + if (m_format_version == 2) { + span->serialize_json_v2(writer); + } else { + span->serialize_json(writer); + } } writer.EndArray(); diff --git a/src/Collector.h b/src/Collector.h index 83938c3..79f76df 100644 --- a/src/Collector.h +++ b/src/Collector.h @@ -4,6 +4,7 @@ #include #include #include + #include #include @@ -54,6 +55,8 @@ class MessageCodec static std::shared_ptr binary; static std::shared_ptr json; static std::shared_ptr pretty_json; + static std::shared_ptr json_v2; + static std::shared_ptr pretty_json_v2; }; /** @@ -74,7 +77,10 @@ class BinaryCodec : public MessageCodec */ class JsonCodec : public MessageCodec { + int m_format_version; public: + JsonCodec(int format_version = 1) : m_format_version(format_version) {} + virtual const std::string name(void) const override { return "json"; } virtual const std::string mime_type(void) const override { return "application/json"; } @@ -87,7 +93,10 @@ class JsonCodec : public MessageCodec */ class PrettyJsonCodec : public MessageCodec { + int m_format_version; public: + PrettyJsonCodec(int format_version = 1) : m_format_version(format_version) {} + virtual const std::string name(void) const override { return "pretty_json"; } virtual const std::string mime_type(void) const override { return "application/json"; } diff --git a/src/Span.cpp b/src/Span.cpp index 771e6c5..b6d3ab5 100644 --- a/src/Span.cpp +++ b/src/Span.cpp @@ -152,7 +152,7 @@ const char *to_string(AnnotationType type) } } -void Span::reset(const std::string &name, span_id_t parent_id, userdata_t userdata, bool sampled) +void Span::reset(const std::string &name, span_id_t parent_id, userdata_t userdata, bool sampled, bool shared) { m_span.debug = false; m_span.duration = 0; @@ -178,6 +178,9 @@ void Span::reset(const std::string &name, span_id_t parent_id, userdata_t userda m_userdata = userdata; m_sampled = sampled; + m_shared = shared; + m_local_endpoint.reset(); + m_remote_endpoint.reset(); } void Span::submit(void) @@ -340,4 +343,4 @@ void CachedSpan::release(void) } } -} // namespace zipkin \ No newline at end of file +} // namespace zipkin diff --git a/src/Span.h b/src/Span.h index fe6d076..cbd4a94 100644 --- a/src/Span.h +++ b/src/Span.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -543,6 +544,8 @@ class Span ::Span m_span; userdata_t m_userdata; bool m_sampled; + bool m_shared; + std::shared_ptr m_local_endpoint, m_remote_endpoint; static const ::Endpoint host(const Endpoint *endpoint); @@ -550,16 +553,16 @@ class Span /** * \brief Construct a span */ - Span(Tracer *tracer, const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true) + Span(Tracer *tracer, const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true, bool shared = false) : m_tracer(tracer) { - reset(name, parent_id, userdata, sampled); + reset(name, parent_id, userdata, sampled, shared); } /** * \brief Reset a span */ - void reset(const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true); + void reset(const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true, bool shared = false); /** * \brief Submit a Span to Tracer @@ -730,6 +733,41 @@ class Span return *this; } + /** + * \brief Report the span is sharing between the client and the server. + */ + inline bool shared(void) const { return m_shared; } + + /** \sa Span#shared */ + inline Span &with_shared(bool shared = true) + { + m_shared = shared; + return *this; + } + + /** + * \brief Local endpoint + */ + inline std::shared_ptr local_endpoint(void) const { return m_local_endpoint; } + + /** \sa Span#local_endpoint */ + inline Span &with_local_endpoint(std::shared_ptr local_endpoint) + { + m_local_endpoint = local_endpoint; + return *this; + } + /** + * \brief Remote endpoint + */ + inline std::shared_ptr remote_endpoint(void) const { return m_remote_endpoint; } + + /** \sa Span#remote_endpoint */ + inline Span &with_remote_endpoint(std::shared_ptr remote_endpoint) + { + remote_endpoint = remote_endpoint; + return *this; + } + virtual inline Span *span(const std::string &name, userdata_t userdata = nullptr) const { Span *span = new Span(m_tracer, name, id(), userdata); @@ -916,6 +954,9 @@ class Span template void serialize_json(RapidJsonWriter &writer) const; + template + void serialize_json_v2(RapidJsonWriter &writer) const; + class Scope { Span &m_span; @@ -1375,4 +1416,179 @@ void Span::serialize_json(RapidJsonWriter &writer) const writer.EndObject(); } -} // namespace zipkin \ No newline at end of file +template +void Span::serialize_json_v2(RapidJsonWriter &writer) const +{ + auto serialize_endpoint = [&writer](const Endpoint *host) { + writer.StartObject(); + + writer.Key("serviceName"); + writer.String(host->service_name()); + + writer.Key("ipv4"); + writer.String(host->addr().to_v4().to_string()); + + writer.Key("port"); + writer.Int(host->port()); + + writer.EndObject(); + }; + + auto serialize_value = [&writer](const std::string &data, AnnotationType type) { + std::ostringstream oss; + + switch (type) + { + case AnnotationType::BOOL: + oss << *reinterpret_cast(data.c_str()); + break; + + case AnnotationType::I16: + oss << __impl::big_to_native(*reinterpret_cast(data.c_str())); + break; + + case AnnotationType::I32: + oss << __impl::big_to_native(*reinterpret_cast(data.c_str())); + break; + + case AnnotationType::I64: + oss << __impl::big_to_native(*reinterpret_cast(data.c_str())); + break; + + case AnnotationType::DOUBLE: + oss << *reinterpret_cast(data.c_str()); + break; + + case AnnotationType::BYTES: + oss << base64::encode(data); + break; + + case AnnotationType::STRING: + oss << data; + break; + } + + writer.String(oss.str()); + }; + + char str[64]; + + writer.StartObject(); + + writer.Key("traceId"); + if (m_span.trace_id_high) + { + writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT SPAN_ID_FMT, m_span.trace_id_high, m_span.trace_id)); + } + else + { + writer.String(str, snprintf(str, sizeof(str), "0000000000000000" SPAN_ID_FMT, m_span.trace_id)); + } + + writer.Key("name"); + writer.String(m_span.name); + + writer.Key("id"); + writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, m_span.id)); + + if (m_span.__isset.parent_id) + { + writer.Key("parentId"); + writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, m_span.parent_id)); + } + + for (auto &annotation : m_span.annotations) + { + if (annotation.value == TraceKeys::CLIENT_SEND || annotation.value == TraceKeys::CLIENT_RECV) { + writer.Key("kind"); + writer.String("CLIENT"); + break; + } + if (annotation.value == TraceKeys::SERVER_SEND || annotation.value == TraceKeys::SERVER_RECV) { + writer.Key("kind"); + writer.String("SERVER"); + break; + } + } + + if (m_span.__isset.timestamp) + { + writer.Key("timestamp"); + writer.Int64(m_span.timestamp); + } + + if (m_span.__isset.duration) + { + writer.Key("duration"); + writer.Int64(m_span.duration); + } + + auto local_endpoint = m_local_endpoint; + auto remote_endpoint = m_remote_endpoint; + + for (auto &annotation : m_span.binary_annotations) + { + if (!local_endpoint && annotation.value == TraceKeys::CLIENT_ADDR && + (annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6)) { + local_endpoint.reset(new Endpoint(annotation.host)); + } + if (!remote_endpoint && annotation.value == TraceKeys::SERVER_ADDR && + (annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6)) { + remote_endpoint.reset(new Endpoint(annotation.host)); + } + } + + if (local_endpoint) { + writer.Key("localEndpoint"); + serialize_endpoint(m_local_endpoint.get()); + } + + if (remote_endpoint) { + writer.Key("remoteEndpoint"); + serialize_endpoint(m_remote_endpoint.get()); + } + + writer.Key("annotations"); + writer.StartArray(); + + for (auto &annotation : m_span.annotations) + { + writer.StartObject(); + + writer.Key("timestamp"); + writer.Int64(annotation.timestamp); + + writer.Key("value"); + writer.String(annotation.value); + + writer.EndObject(); + } + + writer.EndArray(m_span.annotations.size()); + + writer.Key("tags"); + writer.StartObject(); + + for (auto &annotation : m_span.binary_annotations) + { + writer.Key(annotation.key.c_str()); + serialize_value(annotation.value, annotation.annotation_type); + } + + writer.EndObject(); + + if (m_span.debug) + { + writer.Key("debug"); + writer.Bool(m_span.debug); + } + + if (m_shared) { + writer.Key("shared"); + writer.Bool(m_shared); + } + + writer.EndObject(); +} + +} // namespace zipkin diff --git a/test/TestSpan.cpp b/test/TestSpan.cpp index 2c44df7..03d9a8a 100644 --- a/test/TestSpan.cpp +++ b/test/TestSpan.cpp @@ -245,6 +245,31 @@ static const char *json_template = R"###({ "timestamp": %lld })###"; +static const char *json_v2_template = R"###({ + "traceId": "%016llx%016llx", + "name": "test", + "id": "%016llx", + "parentId": "%016llx", + "kind": "CLIENT", + "timestamp": %lld, + "annotations": [ + { + "timestamp": %lld, + "value": "cs" + } + ], + "tags": { + "sa": "8.8.8.8", + "bool": "1", + "i16": "123", + "i32": "123", + "i64": "123", + "double": "12.3", + "string": "测试", + "bytes": "AQID" + } +})###"; + TEST(span, serialize_json) { MockTracer tracer; @@ -274,8 +299,39 @@ TEST(span, serialize_json) span.serialize_json(writer); char str[2048] = {0}; - int str_len = snprintf(str, sizeof(str), json_template, span.trace_id_high(), span.trace_id(), span.id(), span.parent_id(), - span.message().annotations[0].timestamp, span.message().timestamp); + int str_len = snprintf(str, sizeof(str), json_template, span.trace_id_high(), span.trace_id(), span.id(), span.parent_id(), span.message().annotations[0].timestamp, span.message().timestamp); + + ASSERT_EQ(std::string(buffer.GetString(), buffer.GetSize()), std::string(str, str_len)); +} + +TEST(span, serialize_json_v2) +{ + MockTracer tracer; + + zipkin::Span span(&tracer, "test", zipkin::Span::next_id()); + zipkin::Endpoint host("host", "127.0.0.1", 80); + zipkin::Endpoint remote("remote", "8.8.8.8"); + + span.client_send(&host); + span.server_addr("8.8.8.8", &remote); + span.annotate("bool", true, &host); + span.annotate("i16", (int16_t)123); + span.annotate("i32", (int32_t)123); + span.annotate("i64", (int64_t)123); + span.annotate("double", 12.3); + span.annotate("string", std::wstring(L"测试")); + + uint8_t bytes[] = {1, 2, 3}; + + span.annotate("bytes", bytes); + + rapidjson::StringBuffer buffer; + rapidjson::PrettyWriter writer(buffer); + + span.serialize_json_v2(writer); + + char str[2048] = {0}; + int str_len = snprintf(str, sizeof(str), json_v2_template, span.trace_id_high(), span.trace_id(), span.id(), span.parent_id(), span.message().timestamp, span.message().annotations[0].timestamp); ASSERT_EQ(std::string(buffer.GetString(), buffer.GetSize()), std::string(str, str_len)); } @@ -327,4 +383,4 @@ TEST(span, annotate_stream) ASSERT_EQ(span.message().annotations.size(), 2); ASSERT_EQ(span.message().binary_annotations.size(), 8); -} \ No newline at end of file +} From 1be7a3377b9b10ffe2485dbc6b75b6b45f39f6b8 Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Fri, 11 Aug 2017 13:59:46 +0800 Subject: [PATCH 02/12] format bool tag as true/false --- src/Span.h | 2 +- test/TestSpan.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Span.h b/src/Span.h index cbd4a94..18159cf 100644 --- a/src/Span.h +++ b/src/Span.h @@ -1440,7 +1440,7 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const switch (type) { case AnnotationType::BOOL: - oss << *reinterpret_cast(data.c_str()); + oss << std::boolalpha << *reinterpret_cast(data.c_str()); break; case AnnotationType::I16: diff --git a/test/TestSpan.cpp b/test/TestSpan.cpp index 03d9a8a..6bf5ba8 100644 --- a/test/TestSpan.cpp +++ b/test/TestSpan.cpp @@ -260,7 +260,7 @@ static const char *json_v2_template = R"###({ ], "tags": { "sa": "8.8.8.8", - "bool": "1", + "bool": "true", "i16": "123", "i32": "123", "i64": "123", From 5eb34ede43873853adca1668309ca3b7a1e8646f Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Fri, 11 Aug 2017 17:13:44 +0800 Subject: [PATCH 03/12] set span shared after extract from context --- examples/grpc_greeter/greeter_server.cc | 2 +- examples/simple_proxy/main.c | 4 +++- include/zipkin.h | 5 ++++- src/CApi.cpp | 13 ++++++++++++- src/Propagation.cpp | 5 +++-- 5 files changed, 23 insertions(+), 6 deletions(-) diff --git a/examples/grpc_greeter/greeter_server.cc b/examples/grpc_greeter/greeter_server.cc index 203e6b8..b0693a3 100644 --- a/examples/grpc_greeter/greeter_server.cc +++ b/examples/grpc_greeter/greeter_server.cc @@ -77,7 +77,7 @@ struct GreeterServiceImpl final : public Greeter::Service }; DEFINE_string(grpc_addr, "localhost:50051", "GRPC server address"); -DEFINE_string(collector_uri, "", "Collector URI for tracing"); +DEFINE_string(collector_uri, "http://localhost:9411/zipkin/api/v1", "Collector URI for tracing"); void RunServer(std::shared_ptr collector) { diff --git a/examples/simple_proxy/main.c b/examples/simple_proxy/main.c index f637fc0..c50c046 100644 --- a/examples/simple_proxy/main.c +++ b/examples/simple_proxy/main.c @@ -192,6 +192,7 @@ void forward_http_request(struct mg_connection *nc, struct http_message *hm, zip else if (0 == mg_vcmp(hn, ZIPKIN_X_SPAN_ID)) { zipkin_span_set_parent_id(span, strtoull(hv->p, NULL, 16)); + zipkin_span_set_shared(span, 1); } else { @@ -273,6 +274,7 @@ void reply_json_response(struct mg_connection *nc, struct http_message *hm, zipk else if (0 == mg_vcmp(hn, ZIPKIN_X_SPAN_ID)) { zipkin_span_set_parent_id(span, strtoull(hv->p, NULL, 16)); + zipkin_span_set_shared(span, 1); } else if (0 == mg_vcmp(hn, ZIPKIN_X_SAMPLED)) { @@ -573,4 +575,4 @@ int main(int argc, char **argv) } return 0; -} \ No newline at end of file +} diff --git a/include/zipkin.h b/include/zipkin.h index 8a9fe58..3a9dee3 100644 --- a/include/zipkin.h +++ b/include/zipkin.h @@ -124,6 +124,9 @@ zipkin_span_t zipkin_span_parse_trace_id(zipkin_span_t span, const char *str, si int zipkin_span_debug(zipkin_span_t span); zipkin_span_t zipkin_span_set_debug(zipkin_span_t span, int debug); +int zipkin_span_shared(zipkin_span_t span); +zipkin_span_t zipkin_span_set_shared(zipkin_span_t span, int shared); + int zipkin_span_sampled(zipkin_span_t span); zipkin_span_t zipkin_span_set_sampled(zipkin_span_t span, int sampled); @@ -297,4 +300,4 @@ struct curl_slist *zipkin_propagation_inject_curl_headers(struct curl_slist *hea #ifdef __cplusplus } -#endif \ No newline at end of file +#endif diff --git a/src/CApi.cpp b/src/CApi.cpp index 4c0a3da..484dd6e 100644 --- a/src/CApi.cpp +++ b/src/CApi.cpp @@ -154,6 +154,17 @@ zipkin_span_t zipkin_span_set_debug(zipkin_span_t span, int debug) return span; } +int zipkin_span_shared(zipkin_span_t span) +{ + return span ? static_cast(span)->shared() : false; +} +zipkin_span_t zipkin_span_set_shared(zipkin_span_t span, int shared) +{ + if (span) + static_cast(span)->with_shared(shared); + + return span; +} int zipkin_span_sampled(zipkin_span_t span) { return span ? static_cast(span)->sampled() : false; @@ -568,4 +579,4 @@ struct curl_slist *zipkin_propagation_inject_curl_headers(struct curl_slist *hea #ifdef __cplusplus } -#endif \ No newline at end of file +#endif diff --git a/src/Propagation.cpp b/src/Propagation.cpp index 4a00b78..89eef8a 100644 --- a/src/Propagation.cpp +++ b/src/Propagation.cpp @@ -112,7 +112,8 @@ void Propagation::extract(grpc::ServerContext &context, Span &span) } else if (item.first == ZIPKIN_X_SPAN_ID_LOWERCASE) { - span.with_parent_id(folly::to(value)); + span.with_parent_id(folly::to(value)) + .with_shared(true); } else if (item.first == ZIPKIN_X_SAMPLED_LOWERCASE) { @@ -127,4 +128,4 @@ void Propagation::extract(grpc::ServerContext &context, Span &span) #endif // WITH_GRPC -} // namespace zipkin \ No newline at end of file +} // namespace zipkin From a2b14016591a73b13c033ae7dce727af1736b56e Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Fri, 11 Aug 2017 17:43:44 +0800 Subject: [PATCH 04/12] serialize endpoint's IPv6 address to JSON --- src/Span.h | 54 +++++++++++++++++++++++++++++++++++------------ test/TestSpan.cpp | 14 ++++++++---- 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/src/Span.h b/src/Span.h index 18159cf..ffde819 100644 --- a/src/Span.h +++ b/src/Span.h @@ -1273,8 +1273,17 @@ void Span::serialize_json(RapidJsonWriter &writer) const writer.Key("serviceName"); writer.String(host.service_name); - writer.Key("ipv4"); - writer.String(inet_ntoa({static_cast(htonl(host.ipv4))})); + if (host.__isset.ipv6) { + char buf[INET6_ADDRSTRLEN+1] = {0}; + + inet_ntop(AF_INET6, host.ipv6.c_str(), buf, INET6_ADDRSTRLEN); + + writer.Key("ipv6"); + writer.String(buf); + } else { + writer.Key("ipv4"); + writer.String(inet_ntoa({static_cast(htonl(host.ipv4))})); + } writer.Key("port"); writer.Int(host.port); @@ -1425,8 +1434,13 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const writer.Key("serviceName"); writer.String(host->service_name()); - writer.Key("ipv4"); - writer.String(host->addr().to_v4().to_string()); + if (host->addr().is_v6()) { + writer.Key("ipv6"); + writer.String(host->addr().to_v6().to_string()); + } else { + writer.Key("ipv4"); + writer.String(host->addr().to_v4().to_string()); + } writer.Key("port"); writer.Int(host->port()); @@ -1473,6 +1487,9 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const char str[64]; + auto local_endpoint = m_local_endpoint; + auto remote_endpoint = m_remote_endpoint; + writer.StartObject(); writer.Key("traceId"); @@ -1499,14 +1516,28 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const for (auto &annotation : m_span.annotations) { - if (annotation.value == TraceKeys::CLIENT_SEND || annotation.value == TraceKeys::CLIENT_RECV) { + if (annotation.value == TraceKeys::CLIENT_SEND || annotation.value == TraceKeys::CLIENT_RECV) + { writer.Key("kind"); writer.String("CLIENT"); + + if (annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6) + { + local_endpoint.reset(new Endpoint(annotation.host)); + } + break; } - if (annotation.value == TraceKeys::SERVER_SEND || annotation.value == TraceKeys::SERVER_RECV) { + if (annotation.value == TraceKeys::SERVER_SEND || annotation.value == TraceKeys::SERVER_RECV) + { writer.Key("kind"); writer.String("SERVER"); + + if (annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6) + { + local_endpoint.reset(new Endpoint(annotation.host)); + } + break; } } @@ -1523,16 +1554,13 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const writer.Int64(m_span.duration); } - auto local_endpoint = m_local_endpoint; - auto remote_endpoint = m_remote_endpoint; - for (auto &annotation : m_span.binary_annotations) { - if (!local_endpoint && annotation.value == TraceKeys::CLIENT_ADDR && + if (!local_endpoint && annotation.key == TraceKeys::CLIENT_ADDR && (annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6)) { local_endpoint.reset(new Endpoint(annotation.host)); } - if (!remote_endpoint && annotation.value == TraceKeys::SERVER_ADDR && + if (!remote_endpoint && annotation.key == TraceKeys::SERVER_ADDR && (annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6)) { remote_endpoint.reset(new Endpoint(annotation.host)); } @@ -1540,12 +1568,12 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const if (local_endpoint) { writer.Key("localEndpoint"); - serialize_endpoint(m_local_endpoint.get()); + serialize_endpoint(local_endpoint.get()); } if (remote_endpoint) { writer.Key("remoteEndpoint"); - serialize_endpoint(m_remote_endpoint.get()); + serialize_endpoint(remote_endpoint.get()); } writer.Key("annotations"); diff --git a/test/TestSpan.cpp b/test/TestSpan.cpp index 6bf5ba8..b491ea3 100644 --- a/test/TestSpan.cpp +++ b/test/TestSpan.cpp @@ -204,8 +204,8 @@ static const char *json_template = R"###({ "binaryAnnotations": [ { "endpoint": { - "serviceName": "host", - "ipv4": "127.0.0.1", + "serviceName": "ipv6_host", + "ipv6": "::1", "port": 80 }, "key": "bool", @@ -252,6 +252,11 @@ static const char *json_v2_template = R"###({ "parentId": "%016llx", "kind": "CLIENT", "timestamp": %lld, + "remoteEndpoint": { + "serviceName": "remote", + "ipv6": "::1", + "port": 80 + }, "annotations": [ { "timestamp": %lld, @@ -280,9 +285,10 @@ TEST(span, serialize_json) addr.sin_addr.s_addr = inet_addr("127.0.0.1"); addr.sin_port = htons(80); zipkin::Endpoint host("host", &addr); + zipkin::Endpoint ipv6_host("ipv6_host", "::1", 80); span.client_send(&host); - span.annotate("bool", true, &host); + span.annotate("bool", true, &ipv6_host); span.annotate("i16", (int16_t)123); span.annotate("i32", (int32_t)123); span.annotate("i64", (int64_t)123); @@ -310,7 +316,7 @@ TEST(span, serialize_json_v2) zipkin::Span span(&tracer, "test", zipkin::Span::next_id()); zipkin::Endpoint host("host", "127.0.0.1", 80); - zipkin::Endpoint remote("remote", "8.8.8.8"); + zipkin::Endpoint remote("remote", "::1", 80); span.client_send(&host); span.server_addr("8.8.8.8", &remote); From c6c4723e3838af5403f298a359add556d5b64753 Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Fri, 11 Aug 2017 18:12:30 +0800 Subject: [PATCH 05/12] decide local/remote endpoint base on span kind --- src/Span.h | 81 +++++++++++++++++++++++++++++++---------------- test/TestSpan.cpp | 11 +++---- 2 files changed, 58 insertions(+), 34 deletions(-) diff --git a/src/Span.h b/src/Span.h index ffde819..0e94880 100644 --- a/src/Span.h +++ b/src/Span.h @@ -1425,6 +1425,14 @@ void Span::serialize_json(RapidJsonWriter &writer) const writer.EndObject(); } +inline bool endpoint_is_set(const ::Annotation &annotation) { + return annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6; +} + +inline bool endpoint_is_set(const ::BinaryAnnotation &annotation) { + return annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6; +} + template void Span::serialize_json_v2(RapidJsonWriter &writer) const { @@ -1487,8 +1495,9 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const char str[64]; - auto local_endpoint = m_local_endpoint; - auto remote_endpoint = m_remote_endpoint; + bool client_mode = false; + std::shared_ptr local_endpoint = m_local_endpoint; + std::shared_ptr remote_endpoint = m_remote_endpoint; writer.StartObject(); @@ -1521,11 +1530,13 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const writer.Key("kind"); writer.String("CLIENT"); - if (annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6) + if (endpoint_is_set(annotation)) { local_endpoint.reset(new Endpoint(annotation.host)); } + client_mode = true; + break; } if (annotation.value == TraceKeys::SERVER_SEND || annotation.value == TraceKeys::SERVER_RECV) @@ -1533,11 +1544,13 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const writer.Key("kind"); writer.String("SERVER"); - if (annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6) + if (endpoint_is_set(annotation)) { local_endpoint.reset(new Endpoint(annotation.host)); } + client_mode = false; + break; } } @@ -1554,28 +1567,6 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const writer.Int64(m_span.duration); } - for (auto &annotation : m_span.binary_annotations) - { - if (!local_endpoint && annotation.key == TraceKeys::CLIENT_ADDR && - (annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6)) { - local_endpoint.reset(new Endpoint(annotation.host)); - } - if (!remote_endpoint && annotation.key == TraceKeys::SERVER_ADDR && - (annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6)) { - remote_endpoint.reset(new Endpoint(annotation.host)); - } - } - - if (local_endpoint) { - writer.Key("localEndpoint"); - serialize_endpoint(local_endpoint.get()); - } - - if (remote_endpoint) { - writer.Key("remoteEndpoint"); - serialize_endpoint(remote_endpoint.get()); - } - writer.Key("annotations"); writer.StartArray(); @@ -1599,12 +1590,46 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const for (auto &annotation : m_span.binary_annotations) { - writer.Key(annotation.key.c_str()); - serialize_value(annotation.value, annotation.annotation_type); + auto endpoint = [&annotation]() { + return std::shared_ptr( + endpoint_is_set(annotation) ? + new Endpoint(annotation.host) : + new Endpoint(annotation.value, annotation.value) + ); + }; + + if (annotation.key == TraceKeys::CLIENT_ADDR) { + if (client_mode) { + if (!local_endpoint) { local_endpoint = endpoint(); } + } else { + if (!remote_endpoint) { remote_endpoint = endpoint(); } + } + } else if (annotation.key == TraceKeys::SERVER_ADDR) { + if (endpoint_is_set(annotation)) { + if (client_mode) { + if (!remote_endpoint) { remote_endpoint = endpoint(); } + } else { + if (!local_endpoint) { local_endpoint = endpoint(); } + } + } + } else { + writer.Key(annotation.key.c_str()); + serialize_value(annotation.value, annotation.annotation_type); + } } writer.EndObject(); + if (local_endpoint) { + writer.Key("localEndpoint"); + serialize_endpoint(local_endpoint.get()); + } + + if (remote_endpoint) { + writer.Key("remoteEndpoint"); + serialize_endpoint(remote_endpoint.get()); + } + if (m_span.debug) { writer.Key("debug"); diff --git a/test/TestSpan.cpp b/test/TestSpan.cpp index b491ea3..da2507a 100644 --- a/test/TestSpan.cpp +++ b/test/TestSpan.cpp @@ -252,11 +252,6 @@ static const char *json_v2_template = R"###({ "parentId": "%016llx", "kind": "CLIENT", "timestamp": %lld, - "remoteEndpoint": { - "serviceName": "remote", - "ipv6": "::1", - "port": 80 - }, "annotations": [ { "timestamp": %lld, @@ -264,7 +259,6 @@ static const char *json_v2_template = R"###({ } ], "tags": { - "sa": "8.8.8.8", "bool": "true", "i16": "123", "i32": "123", @@ -272,6 +266,11 @@ static const char *json_v2_template = R"###({ "double": "12.3", "string": "测试", "bytes": "AQID" + }, + "remoteEndpoint": { + "serviceName": "remote", + "ipv6": "::1", + "port": 80 } })###"; From f1c55818f4ab4f0939c3a6df4a8d0d72139e8a33 Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Tue, 15 Aug 2017 18:43:26 +0800 Subject: [PATCH 06/12] extract Span2 converter --- src/Collector.cpp | 12 +- src/Collector.h | 8 +- src/Span.h | 327 ++++++++++++++++++++++++++++++++++------------ test/TestSpan.cpp | 4 +- 4 files changed, 264 insertions(+), 87 deletions(-) diff --git a/src/Collector.cpp b/src/Collector.cpp index 888fa49..2a1e7cb 100644 --- a/src/Collector.cpp +++ b/src/Collector.cpp @@ -90,8 +90,10 @@ size_t JsonCodec::encode(boost::shared_ptrserialize_json_v2(writer); + if (m_format_version > 1) { + for (auto &span2: __impl::Span2::from_span(span)) { + span2.serialize_json(writer); + } } else { span->serialize_json(writer); } @@ -114,8 +116,10 @@ size_t PrettyJsonCodec::encode(boost::shared_ptrserialize_json_v2(writer); + if (m_format_version > 1) { + for (auto &span2: __impl::Span2::from_span(span)) { + span2.serialize_json(writer); + } } else { span->serialize_json(writer); } diff --git a/src/Collector.h b/src/Collector.h index 79f76df..0a0bce9 100644 --- a/src/Collector.h +++ b/src/Collector.h @@ -81,7 +81,9 @@ class JsonCodec : public MessageCodec public: JsonCodec(int format_version = 1) : m_format_version(format_version) {} - virtual const std::string name(void) const override { return "json"; } + virtual const std::string name(void) const override { + return m_format_version > 1 ? "json_v2" : "json"; + } virtual const std::string mime_type(void) const override { return "application/json"; } @@ -97,7 +99,9 @@ class PrettyJsonCodec : public MessageCodec public: PrettyJsonCodec(int format_version = 1) : m_format_version(format_version) {} - virtual const std::string name(void) const override { return "pretty_json"; } + virtual const std::string name(void) const override { + return m_format_version > 1 ? "pretty_json_v2" : "pretty_json"; + } virtual const std::string mime_type(void) const override { return "application/json"; } diff --git a/src/Span.h b/src/Span.h index 0e94880..83783fd 100644 --- a/src/Span.h +++ b/src/Span.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -36,6 +37,12 @@ typedef void *userdata_t; namespace zipkin { +struct Tracer; +class CachedTracer; +class Span; + +namespace __impl { class Span2; } + /** * \brief Indicates the network context of a service recording an annotation with two exceptions. */ @@ -43,6 +50,7 @@ class Endpoint { ::Endpoint m_host; + friend class __impl::Span2; public: Endpoint() { @@ -138,10 +146,6 @@ class Endpoint inline const ::Endpoint &host(void) const { return m_host; } }; -struct Tracer; -class CachedTracer; -class Span; - /** * \brief Associates an event that explains latency with a timestamp. * @@ -549,6 +553,7 @@ class Span static const ::Endpoint host(const Endpoint *endpoint); + friend class __impl::Span2; public: /** * \brief Construct a span @@ -1134,6 +1139,7 @@ inline Endpoint &Endpoint::with_port(port_t port) namespace __impl { + inline uint16_t native_to_big(uint16_t value) { return htons(value); } inline uint32_t native_to_big(uint32_t value) { return htonl(value); } @@ -1425,6 +1431,9 @@ void Span::serialize_json(RapidJsonWriter &writer) const writer.EndObject(); } +namespace __impl +{ + inline bool endpoint_is_set(const ::Annotation &annotation) { return annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6; } @@ -1433,8 +1442,205 @@ inline bool endpoint_is_set(const ::BinaryAnnotation &annotation) { return annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6; } +inline bool endpoint_close_enough(const ::Endpoint &lhs, const ::Endpoint &rhs) { + return lhs.__isset.service_name && rhs.__isset.service_name && + lhs.service_name == rhs.service_name; +} + +inline bool endpoint_close_enough(const zipkin::Endpoint *lhs, const zipkin::Endpoint *rhs) { + return lhs->host().__isset.service_name && rhs->host().__isset.service_name && + lhs->host().service_name == rhs->host().service_name; +} + +struct Span2 { + enum Kind { + UNKNOWN, + CLIENT, + SERVER, + }; + + const Span *span; + Kind kind; + int64_t timestamp, duration; + const zipkin::Endpoint *local_endpoint; + const zipkin::Endpoint *remote_endpoint; + std::vector annotations; + std::vector binary_annotations; + + static const std::vector from_span(const Span *span); + + template + void serialize_json(RapidJsonWriter &writer) const; +}; + +inline const std::vector Span2::from_span(const Span *span) { + const ::Annotation *cs = nullptr, *sr = nullptr, *ss = nullptr, *cr = nullptr, *ws = nullptr, *wr = nullptr; + + std::vector spans; + Kind kind = UNKNOWN; + int64_t timestamp, duration; + const zipkin::Endpoint *local_endpoint = NULL; + const zipkin::Endpoint *remote_endpoint = NULL; + std::vector annotations; + std::vector binary_annotations; + + auto new_span = [span, &spans](const Endpoint *host) { + spans.push_back(Span2 { .span = span, .local_endpoint = host }); + + return spans.back(); + }; + + auto for_endpoint = [span, &spans, new_span](const Endpoint *host) { + if (!host) return spans[0]; // allocate missing endpoint data to first span + + for (auto &next : spans) { + if (!next.local_endpoint) { + next.local_endpoint = host; + return next; + } + + if (endpoint_close_enough(next.local_endpoint, host)) { + return next; + } + } + + return new_span(host); + }; + + auto maybe_timestamp_and_duration = [&](const ::Annotation *begin, const ::Annotation *end) { + if (span->m_span.__isset.timestamp && span->m_span.__isset.duration) { + timestamp = span->m_span.timestamp; + duration = span->m_span.duration; + } else { + timestamp = begin->timestamp; + duration = end ? (end->timestamp - begin->timestamp) : 0; + } + }; + + for (auto &annotation : span->m_span.annotations) + { + if (annotation.value.size() == 2 && endpoint_is_set(annotation)) { + // core annotations require an endpoint. Don't give special treatment when that's missing + switch (* reinterpret_cast(annotation.value.c_str())) { + case 0x7363: // CLIENT_SEND + kind = CLIENT; + cs = &annotation; + break; + + case 0x7273: // SERVER_RECV + kind = SERVER; + sr = &annotation; + break; + + case 0x7373: // SERVER_SEND + kind = SERVER; + ss = &annotation; + break; + + case 0x7263: // CLIENT_RECV + kind = CLIENT; + cr = &annotation; + break; + + case 0x7377: // WIRE_SEND + ws = &annotation; + break; + + case 0x7277: // WIRE_RECV + wr = &annotation; + break; + + default: + annotations.push_back(&annotation); + break; + } + } else { + annotations.push_back(&annotation); + } + } + + if (cs && sr) { + // in a shared span, the client side owns span duration by annotations or explicit timestamp + maybe_timestamp_and_duration(cs, cr); + + // special-case loopback: We need to make sure on loopback there are two span2s + auto client = for_endpoint(&cs->host); + auto server; + + if (endpoint_close_enough(cs->host, sr->host)) { + client.kind = CLIENT; + + // fork a new span for the server side + server = new_span(sr->host); + server.kind = SERVER; + } else { + server = for_endpoint(&sr->host); + } + } else if (cs && cr) { + maybe_timestamp_and_duration(cs, cr); + } else if (sr && ss) { + maybe_timestamp_and_duration(sr, ss); + } else { + // otherwise, the span is incomplete. revert special-casing + + } + + for (auto &annotation : span->m_span.binary_annotations) + { + auto endpoint = [&annotation]() { + return std::shared_ptr( + endpoint_is_set(annotation) ? + new Endpoint(annotation.host) : + new Endpoint(annotation.value, annotation.value) + ); + }; + + if (annotation.key == TraceKeys::CLIENT_ADDR) { + if (endpoint_is_set(annotation)) { + switch (kind) { + case CLIENT: + if (!local_endpoint) { local_endpoint = endpoint(); } + break; + + case SERVER: + if (!remote_endpoint) { remote_endpoint = endpoint(); } + break; + + case UNKNOWN: // ignore it + break; + } + } + } else if (annotation.key == TraceKeys::SERVER_ADDR) { + if (endpoint_is_set(annotation)) { + switch (kind) { + case CLIENT: + if (!remote_endpoint) { remote_endpoint = endpoint(); } + break; + + case SERVER: + if (!local_endpoint) { local_endpoint = endpoint(); } + break; + + case UNKNOWN: // ignore it + break; + } + } + } else { + binary_annotations.push_back(&annotation); + } + } + + spans.push_back(Span2 { + span, kind, timestamp, duration, + local_endpoint, remote_endpoint, + std::move(annotations), std::move(binary_annotations) + }); + + return std::move(spans); +} + template -void Span::serialize_json_v2(RapidJsonWriter &writer) const +void Span2::serialize_json(RapidJsonWriter &writer) const { auto serialize_endpoint = [&writer](const Endpoint *host) { writer.StartObject(); @@ -1495,127 +1701,86 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const char str[64]; - bool client_mode = false; - std::shared_ptr local_endpoint = m_local_endpoint; - std::shared_ptr remote_endpoint = m_remote_endpoint; - writer.StartObject(); writer.Key("traceId"); - if (m_span.trace_id_high) + if (span->trace_id_high()) { - writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT SPAN_ID_FMT, m_span.trace_id_high, m_span.trace_id)); + writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT SPAN_ID_FMT, span->trace_id_high(), span->trace_id())); } else { - writer.String(str, snprintf(str, sizeof(str), "0000000000000000" SPAN_ID_FMT, m_span.trace_id)); + writer.String(str, snprintf(str, sizeof(str), "0000000000000000" SPAN_ID_FMT, span->trace_id())); } writer.Key("name"); - writer.String(m_span.name); + writer.String(span->name()); writer.Key("id"); - writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, m_span.id)); + writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, span->id())); - if (m_span.__isset.parent_id) + if (span->m_span.__isset.parent_id) { writer.Key("parentId"); - writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, m_span.parent_id)); + writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, span->parent_id())); } - for (auto &annotation : m_span.annotations) - { - if (annotation.value == TraceKeys::CLIENT_SEND || annotation.value == TraceKeys::CLIENT_RECV) - { + switch (kind) { + case CLIENT: writer.Key("kind"); writer.String("CLIENT"); - - if (endpoint_is_set(annotation)) - { - local_endpoint.reset(new Endpoint(annotation.host)); - } - - client_mode = true; - break; - } - if (annotation.value == TraceKeys::SERVER_SEND || annotation.value == TraceKeys::SERVER_RECV) - { + + case SERVER: writer.Key("kind"); writer.String("SERVER"); + break; - if (endpoint_is_set(annotation)) - { - local_endpoint.reset(new Endpoint(annotation.host)); - } - - client_mode = false; - + case UNKNOWN: // ignore it break; - } } - if (m_span.__isset.timestamp) - { + if (span->m_span.__isset.timestamp) { writer.Key("timestamp"); - writer.Int64(m_span.timestamp); + writer.Int64(span->m_span.timestamp); + } else if (timestamp) { + writer.Key("timestamp"); + writer.Int64(timestamp); } - if (m_span.__isset.duration) - { + if (span->m_span.__isset.duration) { writer.Key("duration"); - writer.Int64(m_span.duration); + writer.Int64(span->m_span.duration); + } else if (duration) { + writer.Key("duration"); + writer.Int64(duration); } writer.Key("annotations"); writer.StartArray(); - for (auto &annotation : m_span.annotations) + for (auto annotation : annotations) { writer.StartObject(); writer.Key("timestamp"); - writer.Int64(annotation.timestamp); + writer.Int64(annotation->timestamp); writer.Key("value"); - writer.String(annotation.value); + writer.String(annotation->value); writer.EndObject(); } - writer.EndArray(m_span.annotations.size()); + writer.EndArray(annotations.size()); writer.Key("tags"); writer.StartObject(); - for (auto &annotation : m_span.binary_annotations) + for (auto &annotation : binary_annotations) { - auto endpoint = [&annotation]() { - return std::shared_ptr( - endpoint_is_set(annotation) ? - new Endpoint(annotation.host) : - new Endpoint(annotation.value, annotation.value) - ); - }; - - if (annotation.key == TraceKeys::CLIENT_ADDR) { - if (client_mode) { - if (!local_endpoint) { local_endpoint = endpoint(); } - } else { - if (!remote_endpoint) { remote_endpoint = endpoint(); } - } - } else if (annotation.key == TraceKeys::SERVER_ADDR) { - if (endpoint_is_set(annotation)) { - if (client_mode) { - if (!remote_endpoint) { remote_endpoint = endpoint(); } - } else { - if (!local_endpoint) { local_endpoint = endpoint(); } - } - } - } else { - writer.Key(annotation.key.c_str()); - serialize_value(annotation.value, annotation.annotation_type); - } + writer.Key(annotation->key.c_str()); + serialize_value(annotation->value, annotation->annotation_type); } writer.EndObject(); @@ -1630,18 +1795,20 @@ void Span::serialize_json_v2(RapidJsonWriter &writer) const serialize_endpoint(remote_endpoint.get()); } - if (m_span.debug) + if (span->debug()) { writer.Key("debug"); - writer.Bool(m_span.debug); + writer.Bool(span->debug()); } - if (m_shared) { + if (span->shared()) { writer.Key("shared"); - writer.Bool(m_shared); + writer.Bool(span->shared()); } writer.EndObject(); } +} // namespace __impl + } // namespace zipkin diff --git a/test/TestSpan.cpp b/test/TestSpan.cpp index da2507a..70621dc 100644 --- a/test/TestSpan.cpp +++ b/test/TestSpan.cpp @@ -333,7 +333,9 @@ TEST(span, serialize_json_v2) rapidjson::StringBuffer buffer; rapidjson::PrettyWriter writer(buffer); - span.serialize_json_v2(writer); + for (auto &span2: zipkin::__impl::Span2::from_span(&span)) { + span2.serialize_json(writer); + } char str[2048] = {0}; int str_len = snprintf(str, sizeof(str), json_v2_template, span.trace_id_high(), span.trace_id(), span.id(), span.parent_id(), span.message().timestamp, span.message().annotations[0].timestamp); From a4fd171130cff255eb12900461ba1c083a8ffdb5 Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Wed, 16 Aug 2017 12:21:30 +0800 Subject: [PATCH 07/12] migrate to use Span2 convert --- src/Span.cpp | 185 +++++++++++++++++++++++++++++++++++++++++++++ src/Span.h | 208 ++++++--------------------------------------------- 2 files changed, 206 insertions(+), 187 deletions(-) diff --git a/src/Span.cpp b/src/Span.cpp index b6d3ab5..bde1a2c 100644 --- a/src/Span.cpp +++ b/src/Span.cpp @@ -343,4 +343,189 @@ void CachedSpan::release(void) } } + +namespace __impl +{ + +const std::vector Span2::from_span(const Span *span) { + const ::Annotation *cs = nullptr, *sr = nullptr, *ss = nullptr, *cr = nullptr, *ws = nullptr, *wr = nullptr; + + std::vector spans; + + auto new_span = [span, &spans](const ::Endpoint *host, Kind kind=UNKNOWN) -> Span2& { + spans.push_back(Span2 { .span = span, .kind = kind, .local_endpoint = host }); + + return spans.back(); + }; + + auto for_endpoint = [span, &spans, new_span](const ::Endpoint *host) -> Span2& { + if (!host) return spans.front(); // allocate missing endpoint data to first span + + for (auto &next : spans) { + if (!next.local_endpoint) { + next.local_endpoint = host; + return next; + } + + if (endpoint_close_enough(next.local_endpoint, host)) { + return next; + } + } + + return new_span(host); + }; + + auto maybe_timestamp_and_duration = [span, for_endpoint](const ::Annotation *begin, const ::Annotation *end) { + auto span2 = for_endpoint(&begin->host); + + if (span->m_span.__isset.timestamp && span->m_span.__isset.duration) { + span2.timestamp = span->m_span.timestamp; + span2.duration = span->m_span.duration; + } else { + span2.timestamp = begin->timestamp; + span2.duration = end ? (end->timestamp - begin->timestamp) : 0; + } + }; + + for (auto &annotation : span->m_span.annotations) + { + auto span2 = for_endpoint(&annotation.host); + + if (annotation.value.size() == 2 && endpoint_is_set(annotation)) { + // core annotations require an endpoint. Don't give special treatment when that's missing + switch (* reinterpret_cast(annotation.value.c_str())) { + case 0x7363: // CLIENT_SEND + span2.kind = CLIENT; + cs = &annotation; + break; + + case 0x7273: // SERVER_RECV + span2.kind = SERVER; + sr = &annotation; + break; + + case 0x7373: // SERVER_SEND + span2.kind = SERVER; + ss = &annotation; + break; + + case 0x7263: // CLIENT_RECV + span2.kind = CLIENT; + cr = &annotation; + break; + + case 0x7377: // WIRE_SEND + ws = &annotation; + break; + + case 0x7277: // WIRE_RECV + wr = &annotation; + break; + + default: + span2.annotations.push_back(&annotation); + break; + } + } else { + span2.annotations.push_back(&annotation); + } + } + + if (cs && sr) { + // in a shared span, the client side owns span duration by annotations or explicit timestamp + maybe_timestamp_and_duration(cs, cr); + + // special-case loopback: We need to make sure on loopback there are two span2s + auto client = for_endpoint(&cs->host); + bool is_client = endpoint_close_enough(&cs->host, &sr->host); + auto server = is_client ? + new_span(&sr->host, SERVER) : // fork a new span for the server side + for_endpoint(&sr->host); + + if (is_client) { + client.kind = CLIENT; + } + + // the server side is smaller than that, we have to read annotations to find out + server.shared = true; + server.timestamp = sr->timestamp; + + if (ss) server.duration = ss->timestamp - sr->timestamp; + if (!cr && !span->m_span.duration) client.duration = 0; // one-way has no duration + } else if (cs && cr) { + maybe_timestamp_and_duration(cs, cr); + } else if (sr && ss) { + maybe_timestamp_and_duration(sr, ss); + } else { + // otherwise, the span is incomplete. revert special-casing + for (auto &next : spans) { + switch (next.kind) { + case CLIENT: + if (cs) next.timestamp = cs->timestamp; + break; + case SERVER: + if (sr) next.timestamp = sr->timestamp; + break; + case UNKNOWN: + break; + } + } + + if (span->m_span.timestamp) { + spans.front().timestamp = span->m_span.timestamp; + spans.front().duration = span->m_span.duration; + } + } + + // Span v1 format did not have a shared flag. By convention, span.timestamp being absent + // implied shared. When we only see the server-side, carry this signal over. + if (!cs && (sr && !span->m_span.timestamp)) { + for_endpoint(&sr->host).shared = true; + } + + if (ws) for_endpoint(&ws->host).annotations.push_back(ws); + if (wr) for_endpoint(&wr->host).annotations.push_back(wr); + + const ::Endpoint *ca = nullptr, *sa = nullptr; + + for (auto &annotation : span->m_span.binary_annotations) + { + if (annotation.annotation_type == AnnotationType::BOOL) { + switch (* reinterpret_cast(annotation.key.c_str())) { + case 0x6163: // CLIENT_ADDR + ca = &annotation.host; + break; + + case 0x6173: // SERVER_ADDR + sa = &annotation.host; + break; + + default: + for_endpoint(&annotation.host).binary_annotations.push_back(&annotation); + break; + } + + continue; + } + + for_endpoint(&annotation.host).binary_annotations.push_back(&annotation); + } + + if (cs && sa && !endpoint_close_enough(sa, &cs->host)) { + for_endpoint(&cs->host).remote_endpoint = sa; + } + + if (sr && ca && !endpoint_close_enough(ca, &sr->host)) { + for_endpoint(&sr->host).remote_endpoint = ca; + } + + if ((!cs && !sr) && (ca && sa)) { + for_endpoint(ca).remote_endpoint = sa; + } + + return std::move(spans); +} + +} // namespace __impl + } // namespace zipkin diff --git a/src/Span.h b/src/Span.h index 83783fd..962eb52 100644 --- a/src/Span.h +++ b/src/Span.h @@ -1442,14 +1442,9 @@ inline bool endpoint_is_set(const ::BinaryAnnotation &annotation) { return annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6; } -inline bool endpoint_close_enough(const ::Endpoint &lhs, const ::Endpoint &rhs) { - return lhs.__isset.service_name && rhs.__isset.service_name && - lhs.service_name == rhs.service_name; -} - -inline bool endpoint_close_enough(const zipkin::Endpoint *lhs, const zipkin::Endpoint *rhs) { - return lhs->host().__isset.service_name && rhs->host().__isset.service_name && - lhs->host().service_name == rhs->host().service_name; +inline bool endpoint_close_enough(const ::Endpoint *lhs, const ::Endpoint *rhs) { + return lhs->__isset.service_name && rhs->__isset.service_name && + lhs->service_name == rhs->service_name; } struct Span2 { @@ -1462,8 +1457,9 @@ struct Span2 { const Span *span; Kind kind; int64_t timestamp, duration; - const zipkin::Endpoint *local_endpoint; - const zipkin::Endpoint *remote_endpoint; + bool shared; + const ::Endpoint *local_endpoint; + const ::Endpoint *remote_endpoint; std::vector annotations; std::vector binary_annotations; @@ -1473,191 +1469,29 @@ struct Span2 { void serialize_json(RapidJsonWriter &writer) const; }; -inline const std::vector Span2::from_span(const Span *span) { - const ::Annotation *cs = nullptr, *sr = nullptr, *ss = nullptr, *cr = nullptr, *ws = nullptr, *wr = nullptr; - - std::vector spans; - Kind kind = UNKNOWN; - int64_t timestamp, duration; - const zipkin::Endpoint *local_endpoint = NULL; - const zipkin::Endpoint *remote_endpoint = NULL; - std::vector annotations; - std::vector binary_annotations; - - auto new_span = [span, &spans](const Endpoint *host) { - spans.push_back(Span2 { .span = span, .local_endpoint = host }); - - return spans.back(); - }; - - auto for_endpoint = [span, &spans, new_span](const Endpoint *host) { - if (!host) return spans[0]; // allocate missing endpoint data to first span - - for (auto &next : spans) { - if (!next.local_endpoint) { - next.local_endpoint = host; - return next; - } - - if (endpoint_close_enough(next.local_endpoint, host)) { - return next; - } - } - - return new_span(host); - }; - - auto maybe_timestamp_and_duration = [&](const ::Annotation *begin, const ::Annotation *end) { - if (span->m_span.__isset.timestamp && span->m_span.__isset.duration) { - timestamp = span->m_span.timestamp; - duration = span->m_span.duration; - } else { - timestamp = begin->timestamp; - duration = end ? (end->timestamp - begin->timestamp) : 0; - } - }; - - for (auto &annotation : span->m_span.annotations) - { - if (annotation.value.size() == 2 && endpoint_is_set(annotation)) { - // core annotations require an endpoint. Don't give special treatment when that's missing - switch (* reinterpret_cast(annotation.value.c_str())) { - case 0x7363: // CLIENT_SEND - kind = CLIENT; - cs = &annotation; - break; - - case 0x7273: // SERVER_RECV - kind = SERVER; - sr = &annotation; - break; - - case 0x7373: // SERVER_SEND - kind = SERVER; - ss = &annotation; - break; - - case 0x7263: // CLIENT_RECV - kind = CLIENT; - cr = &annotation; - break; - - case 0x7377: // WIRE_SEND - ws = &annotation; - break; - - case 0x7277: // WIRE_RECV - wr = &annotation; - break; - - default: - annotations.push_back(&annotation); - break; - } - } else { - annotations.push_back(&annotation); - } - } - - if (cs && sr) { - // in a shared span, the client side owns span duration by annotations or explicit timestamp - maybe_timestamp_and_duration(cs, cr); - - // special-case loopback: We need to make sure on loopback there are two span2s - auto client = for_endpoint(&cs->host); - auto server; - - if (endpoint_close_enough(cs->host, sr->host)) { - client.kind = CLIENT; - - // fork a new span for the server side - server = new_span(sr->host); - server.kind = SERVER; - } else { - server = for_endpoint(&sr->host); - } - } else if (cs && cr) { - maybe_timestamp_and_duration(cs, cr); - } else if (sr && ss) { - maybe_timestamp_and_duration(sr, ss); - } else { - // otherwise, the span is incomplete. revert special-casing - - } - - for (auto &annotation : span->m_span.binary_annotations) - { - auto endpoint = [&annotation]() { - return std::shared_ptr( - endpoint_is_set(annotation) ? - new Endpoint(annotation.host) : - new Endpoint(annotation.value, annotation.value) - ); - }; - - if (annotation.key == TraceKeys::CLIENT_ADDR) { - if (endpoint_is_set(annotation)) { - switch (kind) { - case CLIENT: - if (!local_endpoint) { local_endpoint = endpoint(); } - break; - - case SERVER: - if (!remote_endpoint) { remote_endpoint = endpoint(); } - break; - - case UNKNOWN: // ignore it - break; - } - } - } else if (annotation.key == TraceKeys::SERVER_ADDR) { - if (endpoint_is_set(annotation)) { - switch (kind) { - case CLIENT: - if (!remote_endpoint) { remote_endpoint = endpoint(); } - break; - - case SERVER: - if (!local_endpoint) { local_endpoint = endpoint(); } - break; - - case UNKNOWN: // ignore it - break; - } - } - } else { - binary_annotations.push_back(&annotation); - } - } - - spans.push_back(Span2 { - span, kind, timestamp, duration, - local_endpoint, remote_endpoint, - std::move(annotations), std::move(binary_annotations) - }); - - return std::move(spans); -} - template void Span2::serialize_json(RapidJsonWriter &writer) const { - auto serialize_endpoint = [&writer](const Endpoint *host) { + auto serialize_endpoint = [&writer](const ::Endpoint *host) { writer.StartObject(); writer.Key("serviceName"); - writer.String(host->service_name()); + writer.String(host->service_name); + + if (host->__isset.ipv6) { + char buf[INET6_ADDRSTRLEN+1] = {0}; + + inet_ntop(AF_INET6, host->ipv6.c_str(), buf, INET6_ADDRSTRLEN); - if (host->addr().is_v6()) { writer.Key("ipv6"); - writer.String(host->addr().to_v6().to_string()); + writer.String(buf); } else { writer.Key("ipv4"); - writer.String(host->addr().to_v4().to_string()); + writer.String(inet_ntoa({static_cast(htonl(host->ipv4))})); } writer.Key("port"); - writer.Int(host->port()); + writer.Int(host->port); writer.EndObject(); }; @@ -1787,23 +1621,23 @@ void Span2::serialize_json(RapidJsonWriter &writer) const if (local_endpoint) { writer.Key("localEndpoint"); - serialize_endpoint(local_endpoint.get()); + serialize_endpoint(local_endpoint); } if (remote_endpoint) { writer.Key("remoteEndpoint"); - serialize_endpoint(remote_endpoint.get()); + serialize_endpoint(remote_endpoint); } if (span->debug()) { writer.Key("debug"); - writer.Bool(span->debug()); + writer.Bool(true); } - if (span->shared()) { + if (shared) { writer.Key("shared"); - writer.Bool(span->shared()); + writer.Bool(true); } writer.EndObject(); From bf61d2633b5b58d2b1d409d03f1847d79e553a00 Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Wed, 16 Aug 2017 18:23:27 +0800 Subject: [PATCH 08/12] improve code style --- src/Span.cpp | 71 +++++++++++++++++++++++++++++++--------------------- src/Span.h | 14 +---------- 2 files changed, 44 insertions(+), 41 deletions(-) diff --git a/src/Span.cpp b/src/Span.cpp index bde1a2c..1d09a2d 100644 --- a/src/Span.cpp +++ b/src/Span.cpp @@ -347,9 +347,16 @@ void CachedSpan::release(void) namespace __impl { -const std::vector Span2::from_span(const Span *span) { - const ::Annotation *cs = nullptr, *sr = nullptr, *ss = nullptr, *cr = nullptr, *ws = nullptr, *wr = nullptr; +inline bool is_set(const ::Endpoint &host) { + return host.__isset.service_name && (host.__isset.ipv4 || host.__isset.ipv6); +} +inline bool close_enough(const ::Endpoint *lhs, const ::Endpoint *rhs) { + return lhs->__isset.service_name && rhs->__isset.service_name && + lhs->service_name == rhs->service_name; +} + +const std::vector Span2::from_span(const Span *span) { std::vector spans; auto new_span = [span, &spans](const ::Endpoint *host, Kind kind=UNKNOWN) -> Span2& { @@ -358,25 +365,29 @@ const std::vector Span2::from_span(const Span *span) { return spans.back(); }; - auto for_endpoint = [span, &spans, new_span](const ::Endpoint *host) -> Span2& { - if (!host) return spans.front(); // allocate missing endpoint data to first span - - for (auto &next : spans) { - if (!next.local_endpoint) { - next.local_endpoint = host; - return next; + auto for_endpoint = [span, &spans, new_span](const ::Endpoint &host) -> Span2& { + if (!spans.empty()) { + if (!is_set(host)) { + return spans.front(); // allocate missing endpoint data to first span } - if (endpoint_close_enough(next.local_endpoint, host)) { - return next; + for (auto &next : spans) { + if (!next.local_endpoint) { + next.local_endpoint = &host; + return next; + } + + if (close_enough(next.local_endpoint, &host)) { + return next; + } } } - return new_span(host); + return new_span(&host); }; auto maybe_timestamp_and_duration = [span, for_endpoint](const ::Annotation *begin, const ::Annotation *end) { - auto span2 = for_endpoint(&begin->host); + auto span2 = for_endpoint(begin->host); if (span->m_span.__isset.timestamp && span->m_span.__isset.duration) { span2.timestamp = span->m_span.timestamp; @@ -387,11 +398,14 @@ const std::vector Span2::from_span(const Span *span) { } }; + const ::Annotation *cs = nullptr, *sr = nullptr, *ss = nullptr, *cr = nullptr, *ws = nullptr, *wr = nullptr; + + // add annotations unless they are "core" for (auto &annotation : span->m_span.annotations) { - auto span2 = for_endpoint(&annotation.host); + auto span2 = for_endpoint(annotation.host); - if (annotation.value.size() == 2 && endpoint_is_set(annotation)) { + if (annotation.value.size() == 2 && is_set(annotation.host)) { // core annotations require an endpoint. Don't give special treatment when that's missing switch (* reinterpret_cast(annotation.value.c_str())) { case 0x7363: // CLIENT_SEND @@ -436,11 +450,11 @@ const std::vector Span2::from_span(const Span *span) { maybe_timestamp_and_duration(cs, cr); // special-case loopback: We need to make sure on loopback there are two span2s - auto client = for_endpoint(&cs->host); - bool is_client = endpoint_close_enough(&cs->host, &sr->host); + auto client = for_endpoint(cs->host); + bool is_client = close_enough(&cs->host, &sr->host); auto server = is_client ? new_span(&sr->host, SERVER) : // fork a new span for the server side - for_endpoint(&sr->host); + for_endpoint(sr->host); if (is_client) { client.kind = CLIENT; @@ -480,14 +494,15 @@ const std::vector Span2::from_span(const Span *span) { // Span v1 format did not have a shared flag. By convention, span.timestamp being absent // implied shared. When we only see the server-side, carry this signal over. if (!cs && (sr && !span->m_span.timestamp)) { - for_endpoint(&sr->host).shared = true; + for_endpoint(sr->host).shared = true; } - if (ws) for_endpoint(&ws->host).annotations.push_back(ws); - if (wr) for_endpoint(&wr->host).annotations.push_back(wr); + if (ws) for_endpoint(ws->host).annotations.push_back(ws); + if (wr) for_endpoint(wr->host).annotations.push_back(wr); const ::Endpoint *ca = nullptr, *sa = nullptr; + // convert binary annotations to tags and addresses for (auto &annotation : span->m_span.binary_annotations) { if (annotation.annotation_type == AnnotationType::BOOL) { @@ -501,26 +516,26 @@ const std::vector Span2::from_span(const Span *span) { break; default: - for_endpoint(&annotation.host).binary_annotations.push_back(&annotation); + for_endpoint(annotation.host).binary_annotations.push_back(&annotation); break; } continue; } - for_endpoint(&annotation.host).binary_annotations.push_back(&annotation); + for_endpoint(annotation.host).binary_annotations.push_back(&annotation); } - if (cs && sa && !endpoint_close_enough(sa, &cs->host)) { - for_endpoint(&cs->host).remote_endpoint = sa; + if (cs && sa && !close_enough(sa, &cs->host)) { + for_endpoint(cs->host).remote_endpoint = sa; } - if (sr && ca && !endpoint_close_enough(ca, &sr->host)) { - for_endpoint(&sr->host).remote_endpoint = ca; + if (sr && ca && !close_enough(ca, &sr->host)) { + for_endpoint(sr->host).remote_endpoint = ca; } if ((!cs && !sr) && (ca && sa)) { - for_endpoint(ca).remote_endpoint = sa; + for_endpoint(*ca).remote_endpoint = sa; } return std::move(spans); diff --git a/src/Span.h b/src/Span.h index 962eb52..227f2f4 100644 --- a/src/Span.h +++ b/src/Span.h @@ -1434,19 +1434,6 @@ void Span::serialize_json(RapidJsonWriter &writer) const namespace __impl { -inline bool endpoint_is_set(const ::Annotation &annotation) { - return annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6; -} - -inline bool endpoint_is_set(const ::BinaryAnnotation &annotation) { - return annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6; -} - -inline bool endpoint_close_enough(const ::Endpoint *lhs, const ::Endpoint *rhs) { - return lhs->__isset.service_name && rhs->__isset.service_name && - lhs->service_name == rhs->service_name; -} - struct Span2 { enum Kind { UNKNOWN, @@ -1463,6 +1450,7 @@ struct Span2 { std::vector annotations; std::vector binary_annotations; + // Converts the input, parsing RPC annotations into Span2. static const std::vector from_span(const Span *span); template From 35fac35f4a26bb216ae6ec18385bd292b45c125c Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Thu, 17 Aug 2017 11:55:16 +0800 Subject: [PATCH 09/12] fix test case --- src/Span.cpp | 71 +++++++++++++------------ src/Span.h | 128 ++++++++++++++++++++++++---------------------- test/TestMain.cpp | 3 +- test/TestSpan.cpp | 62 +++++++++++----------- 4 files changed, 138 insertions(+), 126 deletions(-) diff --git a/src/Span.cpp b/src/Span.cpp index 1d09a2d..b05a90d 100644 --- a/src/Span.cpp +++ b/src/Span.cpp @@ -89,9 +89,9 @@ Endpoint &Endpoint::with_addr(const struct sockaddr *addr) { auto v4 = reinterpret_cast(addr); - m_host.__set_ipv4(ntohl(v4->sin_addr.s_addr)); - m_host.__set_port(ntohs(v4->sin_port)); - m_host.__isset.ipv6 = false; + set_ipv4(ntohl(v4->sin_addr.s_addr)); + set_port(ntohs(v4->sin_port)); + break; } @@ -99,8 +99,9 @@ Endpoint &Endpoint::with_addr(const struct sockaddr *addr) { auto v6 = reinterpret_cast(addr); - m_host.__set_ipv6(std::string(reinterpret_cast(v6->sin6_addr.s6_addr), sizeof(v6->sin6_addr))); - m_host.__set_port(ntohs(v6->sin6_port)); + set_ipv6(std::string(reinterpret_cast(v6->sin6_addr.s6_addr), sizeof(v6->sin6_addr))); + set_port(ntohs(v6->sin6_port)); + break; } } @@ -116,17 +117,16 @@ Endpoint &Endpoint::with_addr(const std::string &addr, port_t port) { auto bytes = ip.to_v6().to_bytes(); - m_host.__set_ipv6(std::string(reinterpret_cast(bytes.data()), bytes.size())); + set_ipv6(std::string(reinterpret_cast(bytes.data()), bytes.size())); } else { auto bytes = ip.to_v4().to_bytes(); - m_host.__set_ipv4(ntohl(*reinterpret_cast(bytes.data()))); - m_host.__isset.ipv6 = false; + set_ipv4(ntohl(*reinterpret_cast(bytes.data()))); } - m_host.__set_port(port); + set_port(port); return *this; } @@ -357,7 +357,7 @@ inline bool close_enough(const ::Endpoint *lhs, const ::Endpoint *rhs) { } const std::vector Span2::from_span(const Span *span) { - std::vector spans; + std::vector spans = {{ Span2 { .span = span }}}; auto new_span = [span, &spans](const ::Endpoint *host, Kind kind=UNKNOWN) -> Span2& { spans.push_back(Span2 { .span = span, .kind = kind, .local_endpoint = host }); @@ -366,20 +366,18 @@ const std::vector Span2::from_span(const Span *span) { }; auto for_endpoint = [span, &spans, new_span](const ::Endpoint &host) -> Span2& { - if (!spans.empty()) { - if (!is_set(host)) { - return spans.front(); // allocate missing endpoint data to first span - } + if (!is_set(host)) { + return spans.front(); // allocate missing endpoint data to first span + } - for (auto &next : spans) { - if (!next.local_endpoint) { - next.local_endpoint = &host; - return next; - } + for (Span2& next : spans) { + if (!next.local_endpoint) { + next.local_endpoint = &host; + return next; + } - if (close_enough(next.local_endpoint, &host)) { - return next; - } + if (close_enough(next.local_endpoint, &host)) { + return next; } } @@ -387,7 +385,7 @@ const std::vector Span2::from_span(const Span *span) { }; auto maybe_timestamp_and_duration = [span, for_endpoint](const ::Annotation *begin, const ::Annotation *end) { - auto span2 = for_endpoint(begin->host); + Span2& span2 = for_endpoint(begin->host); if (span->m_span.__isset.timestamp && span->m_span.__isset.duration) { span2.timestamp = span->m_span.timestamp; @@ -401,13 +399,15 @@ const std::vector Span2::from_span(const Span *span) { const ::Annotation *cs = nullptr, *sr = nullptr, *ss = nullptr, *cr = nullptr, *ws = nullptr, *wr = nullptr; // add annotations unless they are "core" - for (auto &annotation : span->m_span.annotations) + for (const ::Annotation& annotation : span->m_span.annotations) { - auto span2 = for_endpoint(annotation.host); + Span2& span2 = for_endpoint(annotation.host); if (annotation.value.size() == 2 && is_set(annotation.host)) { + uint16_t key = *reinterpret_cast(annotation.value.c_str()); + // core annotations require an endpoint. Don't give special treatment when that's missing - switch (* reinterpret_cast(annotation.value.c_str())) { + switch (key) { case 0x7363: // CLIENT_SEND span2.kind = CLIENT; cs = &annotation; @@ -450,9 +450,9 @@ const std::vector Span2::from_span(const Span *span) { maybe_timestamp_and_duration(cs, cr); // special-case loopback: We need to make sure on loopback there are two span2s - auto client = for_endpoint(cs->host); + Span2& client = for_endpoint(cs->host); bool is_client = close_enough(&cs->host, &sr->host); - auto server = is_client ? + Span2& server = is_client ? new_span(&sr->host, SERVER) : // fork a new span for the server side for_endpoint(sr->host); @@ -472,7 +472,7 @@ const std::vector Span2::from_span(const Span *span) { maybe_timestamp_and_duration(sr, ss); } else { // otherwise, the span is incomplete. revert special-casing - for (auto &next : spans) { + for (Span2& next : spans) { switch (next.kind) { case CLIENT: if (cs) next.timestamp = cs->timestamp; @@ -486,8 +486,10 @@ const std::vector Span2::from_span(const Span *span) { } if (span->m_span.timestamp) { - spans.front().timestamp = span->m_span.timestamp; - spans.front().duration = span->m_span.duration; + Span2& span2 = spans.front(); + + span2.timestamp = span->m_span.timestamp; + span2.duration = span->m_span.duration; } } @@ -503,9 +505,12 @@ const std::vector Span2::from_span(const Span *span) { const ::Endpoint *ca = nullptr, *sa = nullptr; // convert binary annotations to tags and addresses - for (auto &annotation : span->m_span.binary_annotations) + for (const ::BinaryAnnotation& annotation : span->m_span.binary_annotations) { - if (annotation.annotation_type == AnnotationType::BOOL) { + if (annotation.key.size() == 2 && + annotation.annotation_type == AnnotationType::BOOL && + is_set(annotation.host)) + { switch (* reinterpret_cast(annotation.key.c_str())) { case 0x6163: // CLIENT_ADDR ca = &annotation.host; diff --git a/src/Span.h b/src/Span.h index 227f2f4..1be2f8e 100644 --- a/src/Span.h +++ b/src/Span.h @@ -50,7 +50,29 @@ class Endpoint { ::Endpoint m_host; - friend class __impl::Span2; + inline void set_service_name(const std::string& service_name) { + m_host.__set_service_name(service_name); + m_host.__isset.service_name = true; + } + + inline void set_ipv4(uint32_t ipv4) { + m_host.__set_ipv4(ipv4); + m_host.__isset.ipv4 = true; + m_host.__isset.ipv6 = false; + } + + inline void set_ipv6(const std::string& ipv6) { + m_host.__set_ipv6(ipv6); + m_host.__isset.ipv4 = false; + m_host.__isset.ipv6 = true; + } + + inline void set_port(uint16_t port) { + if ((m_host.__isset.port = port != 0)) { + m_host.__set_port(port); + } + } + public: Endpoint() { @@ -106,7 +128,11 @@ class Endpoint */ inline const std::string &service_name(void) const { return m_host.service_name; } - inline Endpoint &with_service_name(const std::string &service_name); + inline Endpoint &with_service_name(const std::string &service_name) { + set_service_name(service_name); + + return *this; + } std::unique_ptr sockaddr(void) const; @@ -119,12 +145,26 @@ class Endpoint /** * \brief with IPv4 address */ - inline Endpoint &with_addr(const struct sockaddr_in *addr); + inline Endpoint &with_addr(const struct sockaddr_in *addr) { + assert(addr); + + set_ipv4(ntohl(addr->sin_addr.s_addr)); + set_port(ntohs(addr->sin_port)); + + return *this; + } /** * \brief with IPv6 address */ - inline Endpoint &with_addr(const struct sockaddr_in6 *addr); + inline Endpoint &with_addr(const struct sockaddr_in6 *addr) { + assert(addr); + + set_ipv6(std::string(reinterpret_cast(addr->sin6_addr.s6_addr), sizeof(addr->sin6_addr))); + set_port(ntohs(addr->sin6_port)); + + return *this; + } /** * \brief with IP address @@ -134,14 +174,30 @@ class Endpoint /** * \brief with IPv4 address */ - inline Endpoint &with_ipv4(const std::string &ip); + inline Endpoint &with_ipv4(const std::string &ip) { + set_ipv4(ntohl(inet_addr(ip.c_str()))); + + return *this; + } /** * \brief with IPv6 address */ - inline Endpoint &with_ipv6(const std::string &ip); + inline Endpoint &with_ipv6(const std::string &ip) { + struct in6_addr addr; - inline Endpoint &with_port(port_t port); + if (inet_pton(AF_INET6, ip.c_str(), addr.s6_addr) > 0) { + set_ipv6(std::string(reinterpret_cast(addr.s6_addr), sizeof(addr))); + } + + return *this; + } + + inline Endpoint &with_port(port_t port) { + set_port(port); + + return *this; + } inline const ::Endpoint &host(void) const { return m_host; } }; @@ -931,14 +987,14 @@ class Span return annotate(TraceKeys::LOCAL_COMPONENT, value, endpoint); } /// \brief Annotate TraceKeys#CLIENT_ADDR event - inline BinaryAnnotation client_addr(const std::string &value, const Endpoint *endpoint = nullptr) + inline BinaryAnnotation client_addr(const Endpoint *endpoint) { - return annotate(TraceKeys::CLIENT_ADDR, value, endpoint); + return annotate(TraceKeys::CLIENT_ADDR, true, endpoint); } /// \brief Annotate TraceKeys#SERVER_ADDR event - inline BinaryAnnotation server_addr(const std::string &value, const Endpoint *endpoint = nullptr) + inline BinaryAnnotation server_addr(const Endpoint *endpoint) { - return annotate(TraceKeys::SERVER_ADDR, value, endpoint); + return annotate(TraceKeys::SERVER_ADDR, true, endpoint); } /// \brief Annotate TraceKeys#ERROR event inline Annotation error(const Endpoint *endpoint = nullptr) @@ -1087,56 +1143,6 @@ class CachedSpan : public Span } __attribute__((aligned)); -Endpoint &Endpoint::with_service_name(const std::string &service_name) -{ - m_host.service_name = service_name; - return *this; -} - -Endpoint &Endpoint::with_addr(const struct sockaddr_in *addr) -{ - assert(addr); - - m_host.__isset.ipv6 = 0; - m_host.__set_ipv4(ntohl(addr->sin_addr.s_addr)); - m_host.__set_port(ntohs(addr->sin_port)); - - return *this; -} - -Endpoint &Endpoint::with_addr(const struct sockaddr_in6 *addr) -{ - assert(addr); - - m_host.__set_ipv6(std::string(reinterpret_cast(addr->sin6_addr.s6_addr), sizeof(addr->sin6_addr))); - m_host.__set_port(ntohs(addr->sin6_port)); - - return *this; -} - -Endpoint &Endpoint::with_ipv4(const std::string &ip) -{ - m_host.__set_ipv4(ntohl(inet_addr(ip.c_str()))); - - return *this; -} - -Endpoint &Endpoint::with_ipv6(const std::string &ip) -{ - struct in6_addr addr; - - if (inet_pton(AF_INET6, ip.c_str(), addr.s6_addr) > 0) - m_host.__set_ipv6(std::string(reinterpret_cast(addr.s6_addr), sizeof(addr))); - - return *this; -} - -inline Endpoint &Endpoint::with_port(port_t port) -{ - m_host.__set_port(port); - return *this; -} - namespace __impl { diff --git a/test/TestMain.cpp b/test/TestMain.cpp index 11035e5..dcdd92b 100644 --- a/test/TestMain.cpp +++ b/test/TestMain.cpp @@ -5,9 +5,10 @@ int main(int argc, char **argv) { + ::google::InitGoogleLogging(argv[0]); ::google::ParseCommandLineFlags(&argc, &argv, false); ::testing::GTEST_FLAG(throw_on_failure) = true; ::testing::InitGoogleMock(&argc, argv); return RUN_ALL_TESTS(); -} \ No newline at end of file +} diff --git a/test/TestSpan.cpp b/test/TestSpan.cpp index 70621dc..fcce28d 100644 --- a/test/TestSpan.cpp +++ b/test/TestSpan.cpp @@ -245,35 +245,6 @@ static const char *json_template = R"###({ "timestamp": %lld })###"; -static const char *json_v2_template = R"###({ - "traceId": "%016llx%016llx", - "name": "test", - "id": "%016llx", - "parentId": "%016llx", - "kind": "CLIENT", - "timestamp": %lld, - "annotations": [ - { - "timestamp": %lld, - "value": "cs" - } - ], - "tags": { - "bool": "true", - "i16": "123", - "i32": "123", - "i64": "123", - "double": "12.3", - "string": "测试", - "bytes": "AQID" - }, - "remoteEndpoint": { - "serviceName": "remote", - "ipv6": "::1", - "port": 80 - } -})###"; - TEST(span, serialize_json) { MockTracer tracer; @@ -309,16 +280,41 @@ TEST(span, serialize_json) ASSERT_EQ(std::string(buffer.GetString(), buffer.GetSize()), std::string(str, str_len)); } +static const char *json_v2_template = R"###([ + { + "traceId": "%016llx%016llx", + "name": "test", + "id": "%016llx", + "parentId": "%016llx", + "kind": "CLIENT", + "timestamp": %lld, + "annotations": [], + "tags": { + "bool": "true", + "i16": "123", + "i32": "123", + "i64": "123", + "double": "12.3", + "string": "测试", + "bytes": "AQID" + }, + "localEndpoint": { + "serviceName": "host", + "ipv4": "127.0.0.1", + "port": 80 + } + } +])###"; + TEST(span, serialize_json_v2) { MockTracer tracer; zipkin::Span span(&tracer, "test", zipkin::Span::next_id()); zipkin::Endpoint host("host", "127.0.0.1", 80); - zipkin::Endpoint remote("remote", "::1", 80); span.client_send(&host); - span.server_addr("8.8.8.8", &remote); + span.server_addr(&host); span.annotate("bool", true, &host); span.annotate("i16", (int16_t)123); span.annotate("i32", (int32_t)123); @@ -333,10 +329,14 @@ TEST(span, serialize_json_v2) rapidjson::StringBuffer buffer; rapidjson::PrettyWriter writer(buffer); + writer.StartArray(); + for (auto &span2: zipkin::__impl::Span2::from_span(&span)) { span2.serialize_json(writer); } + writer.EndArray(); + char str[2048] = {0}; int str_len = snprintf(str, sizeof(str), json_v2_template, span.trace_id_high(), span.trace_id(), span.id(), span.parent_id(), span.message().timestamp, span.message().annotations[0].timestamp); From c99f45bc3a987537db663514f34e61aad47a2700 Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Thu, 17 Aug 2017 15:21:53 +0800 Subject: [PATCH 10/12] add message related annotations --- src/CMakeLists.txt | 23 ++++++++++++++++++--- src/Span.cpp | 4 +++- src/Span.h | 32 +++++++++++++++++++++++++++++ src/zipkinCore.thrift | 36 +++++++++++++++++++++++++++++++-- src/zipkinDependencies.thrift | 38 +++++++++++++++++++++++++++++++++++ 5 files changed, 127 insertions(+), 6 deletions(-) create mode 100644 src/zipkinDependencies.thrift diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cdd8e16..0767c66 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -14,7 +14,24 @@ add_custom_command ( OUTPUT ${zipkin_core_INCS} ${zipkin_core_SRCS} COMMAND ${THRIFT_COMPILER} --gen cpp -o ${PROJECT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/zipkinCore.thrift DEPENDS zipkinCore.thrift - COMMENT "Generating zipkin Thrift binding files" + COMMENT "Generating zipkin core Thrift binding files" +) + +set (zipkin_deps_INCS + ${GENDIR}/zipkinDependencies_constants.h + ${GENDIR}/zipkinDependencies_types.h + ) + +set (zipkin_deps_SRCS + ${GENDIR}/zipkinDependencies_constants.cpp + ${GENDIR}/zipkinDependencies_types.cpp + ) + +add_custom_command ( + OUTPUT ${zipkin_deps_INCS} ${zipkin_deps_SRCS} + COMMAND ${THRIFT_COMPILER} --gen cpp -o ${PROJECT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/zipkinDependencies.thrift + DEPENDS zipkinDependencies.thrift + COMMENT "Generating zipkin dependencies Thrift binding files" ) set (scribe_INCS @@ -76,7 +93,7 @@ endif() message(STATUS "Library type: ${LIB_TYPE}") -add_library (zipkin ${LIB_TYPE} ${zipkin_SRCS} ${zipkin_core_SRCS} ${scribe_SRCS}) +add_library (zipkin ${LIB_TYPE} ${zipkin_SRCS} ${zipkin_core_SRCS} ${zipkin_deps_SRCS} ${scribe_SRCS}) target_link_libraries (zipkin ${zipkin_DEPENDENCIES}) set (PREFIX ${CMAKE_INSTALL_PREFIX}) @@ -103,7 +120,7 @@ install ( ) install ( - FILES ${zipkin_INCS} ${zipkin_core_INCS} ${scribe_INCS} + FILES ${zipkin_INCS} ${zipkin_core_INCS} ${zipkin_deps_INCS} ${scribe_INCS} DESTINATION ${CMAKE_INSTALL_PREFIX}/include/zipkin ) diff --git a/src/Span.cpp b/src/Span.cpp index b05a90d..139a947 100644 --- a/src/Span.cpp +++ b/src/Span.cpp @@ -24,6 +24,8 @@ DEFINE_TRACE_KEY(CLIENT_SEND) DEFINE_TRACE_KEY(CLIENT_RECV) DEFINE_TRACE_KEY(SERVER_SEND) DEFINE_TRACE_KEY(SERVER_RECV) +DEFINE_TRACE_KEY(MESSAGE_SEND) +DEFINE_TRACE_KEY(MESSAGE_RECV) DEFINE_TRACE_KEY(WIRE_SEND) DEFINE_TRACE_KEY(WIRE_RECV) DEFINE_TRACE_KEY(CLIENT_SEND_FRAGMENT) @@ -33,6 +35,7 @@ DEFINE_TRACE_KEY(SERVER_RECV_FRAGMENT) DEFINE_TRACE_KEY(LOCAL_COMPONENT) DEFINE_TRACE_KEY(CLIENT_ADDR) DEFINE_TRACE_KEY(SERVER_ADDR) +DEFINE_TRACE_KEY(MESSAGE_ADDR) DEFINE_TRACE_KEY(ERROR) DEFINE_TRACE_KEY(HTTP_HOST) DEFINE_TRACE_KEY(HTTP_METHOD) @@ -343,7 +346,6 @@ void CachedSpan::release(void) } } - namespace __impl { diff --git a/src/Span.h b/src/Span.h index 1be2f8e..590b844 100644 --- a/src/Span.h +++ b/src/Span.h @@ -323,6 +323,19 @@ struct TraceKeys */ DECLARE_TRACE_KEY(SERVER_RECV) /** + * \brief Message send ("ms") is a request to send a message to a destination, usually a broker. + * + * This may be the only annotation in a messaging span. If WIRE_SEND exists in the same span, + * it follows this moment and clarifies delays sending the message, such as batching. + */ + DECLARE_TRACE_KEY(MESSAGE_SEND) + /** + * \brief A consumer received ("mr") a message from a broker. This may be the only + * annotation in a messaging span. If WIRE_RECV exists in the same span, it + * precedes this moment and clarifies any local queuing delay. + */ + DECLARE_TRACE_KEY(MESSAGE_RECV) + /** * \brief Optionally logs an attempt to send a message on the wire. * * Multiple wire send events could indicate network retries. @@ -399,6 +412,10 @@ struct TraceKeys */ DECLARE_TRACE_KEY(SERVER_ADDR) /** + * \brief Indicates the remote address of a messaging span, usually the broker. + */ + DECLARE_TRACE_KEY(MESSAGE_ADDR) + /** * \brief When an {@link Annotation#value}, this indicates when an error occurred. When a {@link * BinaryAnnotation#key}, the value is a human readable message associated with an error. * @@ -874,6 +891,16 @@ class Span { return annotate(TraceKeys::SERVER_RECV, endpoint); } + /// \brief Annotate TraceKeys#MESSAGE_SEND event + inline Annotation message_send(const Endpoint *endpoint = nullptr) + { + return annotate(TraceKeys::MESSAGE_SEND, endpoint); + } + /// \brief Annotate TraceKeys#MESSAGE_RECV event + inline Annotation message_recv(const Endpoint *endpoint = nullptr) + { + return annotate(TraceKeys::MESSAGE_RECV, endpoint); + } /// \brief Annotate TraceKeys#WIRE_SEND event inline Annotation wire_send(const Endpoint *endpoint = nullptr) { @@ -996,6 +1023,11 @@ class Span { return annotate(TraceKeys::SERVER_ADDR, true, endpoint); } + /// \brief Annotate TraceKeys#MESSAGE_ADDR event + inline BinaryAnnotation message_addr(const Endpoint *endpoint) + { + return annotate(TraceKeys::MESSAGE_ADDR, true, endpoint); + } /// \brief Annotate TraceKeys#ERROR event inline Annotation error(const Endpoint *endpoint = nullptr) { diff --git a/src/zipkinCore.thrift b/src/zipkinCore.thrift index d872b07..dbd7f4a 100644 --- a/src/zipkinCore.thrift +++ b/src/zipkinCore.thrift @@ -74,6 +74,34 @@ const string SERVER_SEND = "ss" * should also log the CLIENT_ADDR. */ const string SERVER_RECV = "sr" +/** + * Message send ("ms") is a request to send a message to a destination, usually + * a broker. This may be the only annotation in a messaging span. If WIRE_SEND + * exists in the same span, it follows this moment and clarifies delays sending + * the message, such as batching. + * + * Unlike RPC annotations like CLIENT_SEND, messaging spans never share a span + * ID. For example, "ms" should always be the parent of "mr". + * + * Annotation.host is not the destination, it is the host which logged the send + * event: the producer. When annotating MESSAGE_SEND, instrumentation should + * also tag the MESSAGE_ADDR. + */ +const string MESSAGE_SEND = "ms" +/** + * A consumer received ("mr") a message from a broker. This may be the only + * annotation in a messaging span. If WIRE_RECV exists in the same span, it + * precedes this moment and clarifies any local queuing delay. + * + * Unlike RPC annotations like SERVER_RECV, messaging spans never share a span + * ID. For example, "mr" should always be a child of "ms" unless it is a root + * span. + * + * Annotation.host is not the broker, it is the host which logged the receive + * event: the consumer. When annotating MESSAGE_RECV, instrumentation should + * also tag the MESSAGE_ADDR. + */ +const string MESSAGE_RECV = "mr" /** * Optionally logs an attempt to send a message on the wire. Multiple wire send * events could indicate network retries. A lag between client or server send @@ -134,7 +162,7 @@ const string HTTP_METHOD = "http.method" * "/resource/{resource_id}". In systems where only equals queries are available, searching for * http/path=/resource won't match if the actual request was /resource/abcd-ff. * - * Historical note: This was commonly expressed as "http.uri" in zipkin, eventhough it was most + * Historical note: This was commonly expressed as "http.uri" in zipkin, even though it was most * often just a path. */ const string HTTP_PATH = "http.path" @@ -230,6 +258,10 @@ const string CLIENT_ADDR = "ca" * different server ip or port. */ const string SERVER_ADDR = "sa" +/** + * Indicates the remote address of a messaging span, usually the broker. + */ +const string MESSAGE_ADDR = "ma" /** * Indicates the network context of a service recording an annotation with two @@ -420,7 +452,7 @@ struct Span { * precise value possible. For example, gettimeofday or syncing nanoTime * against a tick of currentTimeMillis. * - * For compatibilty with instrumentation that precede this field, collectors + * For compatibility with instrumentation that precede this field, collectors * or span stores can derive this via Annotation.timestamp. * For example, SERVER_RECV.timestamp or CLIENT_SEND.timestamp. * diff --git a/src/zipkinDependencies.thrift b/src/zipkinDependencies.thrift new file mode 100644 index 0000000..30ce57d --- /dev/null +++ b/src/zipkinDependencies.thrift @@ -0,0 +1,38 @@ +# Copyright 2013 Twitter Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +namespace java com.twitter.zipkin.thriftjava +#@namespace scala com.twitter.zipkin.thriftscala +namespace rb Zipkin + +struct DependencyLink { + /** parent service name (caller) */ + 1: string parent + /** child service name (callee) */ + 2: string child + # 3: Moments OBSOLETE_duration_moments + /** total traced calls made from parent to child */ + 4: i64 callCount + /** how many calls are known to be errors */ + 5: i64 errorCount + # histogram? +} + +/* An aggregate representation of services paired with every service they call. */ +struct Dependencies { + /** milliseconds from epoch */ + 1: i64 start_ts + /** milliseconds from epoch */ + 2: i64 end_ts + 3: list links +} From b2e27b32262e20d75ad66b876cd869ed2ca4cdf7 Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Thu, 17 Aug 2017 15:51:36 +0800 Subject: [PATCH 11/12] convert message related annotations --- src/Span.cpp | 75 +++++++++++++++++++++++++++++++++++++++++++++++----- src/Span.h | 26 +++++++++++++----- 2 files changed, 89 insertions(+), 12 deletions(-) diff --git a/src/Span.cpp b/src/Span.cpp index 139a947..a25c2ff 100644 --- a/src/Span.cpp +++ b/src/Span.cpp @@ -359,10 +359,15 @@ inline bool close_enough(const ::Endpoint *lhs, const ::Endpoint *rhs) { } const std::vector Span2::from_span(const Span *span) { - std::vector spans = {{ Span2 { .span = span }}}; + std::vector spans = {{ Span2(span) }}; auto new_span = [span, &spans](const ::Endpoint *host, Kind kind=UNKNOWN) -> Span2& { - spans.push_back(Span2 { .span = span, .kind = kind, .local_endpoint = host }); + Span2 span2(span); + + span2.kind = kind; + span2.local_endpoint = host; + + spans.push_back(std::move(span2)); return spans.back(); }; @@ -398,7 +403,7 @@ const std::vector Span2::from_span(const Span *span) { } }; - const ::Annotation *cs = nullptr, *sr = nullptr, *ss = nullptr, *cr = nullptr, *ws = nullptr, *wr = nullptr; + const ::Annotation *cs = nullptr, *sr = nullptr, *ss = nullptr, *cr = nullptr, *ms = nullptr, *mr = nullptr, *ws = nullptr, *wr = nullptr; // add annotations unless they are "core" for (const ::Annotation& annotation : span->m_span.annotations) @@ -430,6 +435,16 @@ const std::vector Span2::from_span(const Span *span) { cr = &annotation; break; + case 0x736d: // MESSAGE_SEND + span2.kind = PRODUCER; + ms = &annotation; + break; + + case 0x726d: // MESSAGE_RECV + span2.kind = CONSUMER; + mr = &annotation; + break; + case 0x7377: // WIRE_SEND ws = &annotation; break; @@ -482,6 +497,8 @@ const std::vector Span2::from_span(const Span *span) { case SERVER: if (sr) next.timestamp = sr->timestamp; break; + case PRODUCER: + case CONSUMER: case UNKNOWN: break; } @@ -501,10 +518,44 @@ const std::vector Span2::from_span(const Span *span) { for_endpoint(sr->host).shared = true; } - if (ws) for_endpoint(ws->host).annotations.push_back(ws); - if (wr) for_endpoint(wr->host).annotations.push_back(wr); + // ms and mr are not supposed to be in the same span, but in case they are.. + if (ms && mr) { + // special-case loopback: We need to make sure on loopback there are two span2s + Span2& producer = for_endpoint(ms->host); + bool is_producer = close_enough(&ms->host, &mr->host); + Span2& consumer = is_producer ? + new_span(&mr->host, CONSUMER) : // fork a new span for the consumer side + for_endpoint(mr->host); + + if (is_producer) { + producer.kind = PRODUCER; + } + + consumer.shared = true; - const ::Endpoint *ca = nullptr, *sa = nullptr; + if (wr) { + consumer.timestamp = wr->timestamp; + consumer.duration = mr->timestamp - wr->timestamp; + } else { + consumer.timestamp = mr->timestamp; + } + + producer.timestamp = ms->timestamp; + producer.duration = ws ? (ws->timestamp - ms->timestamp) : 0; + } else if (ms) { + maybe_timestamp_and_duration(ms, ws); + } else if (mr) { + if (wr) { + maybe_timestamp_and_duration(wr, mr); + } else { + maybe_timestamp_and_duration(mr, nullptr); + } + } else { + if (ws) for_endpoint(ws->host).annotations.push_back(ws); + if (wr) for_endpoint(wr->host).annotations.push_back(wr); + } + + const ::Endpoint *ca = nullptr, *sa = nullptr, *ma = nullptr; // convert binary annotations to tags and addresses for (const ::BinaryAnnotation& annotation : span->m_span.binary_annotations) @@ -522,6 +573,10 @@ const std::vector Span2::from_span(const Span *span) { sa = &annotation.host; break; + case 0x616d: // MESSAGE_ADDR + ma = &annotation.host; + break; + default: for_endpoint(annotation.host).binary_annotations.push_back(&annotation); break; @@ -541,6 +596,14 @@ const std::vector Span2::from_span(const Span *span) { for_endpoint(sr->host).remote_endpoint = ca; } + if (ms && ma && !close_enough(ma, &ms->host)) { + for_endpoint(ms->host).remote_endpoint = ma; + } + + if (mr && ma && !close_enough(ma, &mr->host)) { + for_endpoint(mr->host).remote_endpoint = ma; + } + if ((!cs && !sr) && (ca && sa)) { for_endpoint(*ca).remote_endpoint = sa; } diff --git a/src/Span.h b/src/Span.h index 590b844..101c3be 100644 --- a/src/Span.h +++ b/src/Span.h @@ -1477,17 +1477,21 @@ struct Span2 { UNKNOWN, CLIENT, SERVER, + PRODUCER, + CONSUMER, }; - const Span *span; - Kind kind; - int64_t timestamp, duration; - bool shared; - const ::Endpoint *local_endpoint; - const ::Endpoint *remote_endpoint; + const Span *span = nullptr; + Kind kind = UNKNOWN; + int64_t timestamp = 0, duration = 0; + bool shared = false; + const ::Endpoint *local_endpoint = nullptr; + const ::Endpoint *remote_endpoint = nullptr; std::vector annotations; std::vector binary_annotations; + Span2(const Span *source) : span(source) {} + // Converts the input, parsing RPC annotations into Span2. static const std::vector from_span(const Span *span); @@ -1596,6 +1600,16 @@ void Span2::serialize_json(RapidJsonWriter &writer) const writer.String("SERVER"); break; + case PRODUCER: + writer.Key("kind"); + writer.String("PRODUCER"); + break; + + case CONSUMER: + writer.Key("kind"); + writer.String("CONSUMER"); + break; + case UNKNOWN: // ignore it break; } From 737f6887ce02caa06eb3c2fa255517f8e236447c Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Thu, 17 Aug 2017 18:29:58 +0800 Subject: [PATCH 12/12] add test cases for Span2 converter --- src/Span.cpp | 35 ++++++++---- src/Span.h | 139 +++++++++++++++++++++++++++++++++------------- test/TestSpan.cpp | 119 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 243 insertions(+), 50 deletions(-) diff --git a/src/Span.cpp b/src/Span.cpp index a25c2ff..3da5941 100644 --- a/src/Span.cpp +++ b/src/Span.cpp @@ -214,12 +214,18 @@ timestamp_t Span::now() return std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()); } -Annotation Span::annotate(const std::string &value, const Endpoint *endpoint) +Annotation Span::annotate(const std::string &value, const Endpoint *endpoint, timestamp_t timestamp) { ::Annotation annotation; - annotation.__set_timestamp(now().count()); + if ((annotation.__isset.timestamp = timestamp != timestamp_t::zero())) { + annotation.__set_timestamp(timestamp.count()); + } else { + annotation.__set_timestamp(now().count()); + } + annotation.__set_value(value); + annotation.__isset.value = true; if (endpoint) { @@ -236,8 +242,13 @@ BinaryAnnotation Span::annotate(const std::string &key, const uint8_t *value, si ::BinaryAnnotation annotation; annotation.__set_key(key); + annotation.__isset.key = true; + annotation.__set_value(std::string(reinterpret_cast(value), size)); + annotation.__isset.value = true; + annotation.__set_annotation_type(AnnotationType::BYTES); + annotation.__isset.annotation_type = true; if (endpoint) { @@ -394,9 +405,9 @@ const std::vector Span2::from_span(const Span *span) { auto maybe_timestamp_and_duration = [span, for_endpoint](const ::Annotation *begin, const ::Annotation *end) { Span2& span2 = for_endpoint(begin->host); - if (span->m_span.__isset.timestamp && span->m_span.__isset.duration) { - span2.timestamp = span->m_span.timestamp; - span2.duration = span->m_span.duration; + if (span->message().__isset.timestamp && span->message().__isset.duration) { + span2.timestamp = span->message().timestamp; + span2.duration = span->message().duration; } else { span2.timestamp = begin->timestamp; span2.duration = end ? (end->timestamp - begin->timestamp) : 0; @@ -406,7 +417,7 @@ const std::vector Span2::from_span(const Span *span) { const ::Annotation *cs = nullptr, *sr = nullptr, *ss = nullptr, *cr = nullptr, *ms = nullptr, *mr = nullptr, *ws = nullptr, *wr = nullptr; // add annotations unless they are "core" - for (const ::Annotation& annotation : span->m_span.annotations) + for (const ::Annotation& annotation : span->message().annotations) { Span2& span2 = for_endpoint(annotation.host); @@ -482,7 +493,7 @@ const std::vector Span2::from_span(const Span *span) { server.timestamp = sr->timestamp; if (ss) server.duration = ss->timestamp - sr->timestamp; - if (!cr && !span->m_span.duration) client.duration = 0; // one-way has no duration + if (!cr && !span->message().duration) client.duration = 0; // one-way has no duration } else if (cs && cr) { maybe_timestamp_and_duration(cs, cr); } else if (sr && ss) { @@ -504,17 +515,17 @@ const std::vector Span2::from_span(const Span *span) { } } - if (span->m_span.timestamp) { + if (span->message().timestamp) { Span2& span2 = spans.front(); - span2.timestamp = span->m_span.timestamp; - span2.duration = span->m_span.duration; + span2.timestamp = span->message().timestamp; + span2.duration = span->message().duration; } } // Span v1 format did not have a shared flag. By convention, span.timestamp being absent // implied shared. When we only see the server-side, carry this signal over. - if (!cs && (sr && !span->m_span.timestamp)) { + if (!cs && (sr && !span->message().timestamp)) { for_endpoint(sr->host).shared = true; } @@ -558,7 +569,7 @@ const std::vector Span2::from_span(const Span *span) { const ::Endpoint *ca = nullptr, *sa = nullptr, *ma = nullptr; // convert binary annotations to tags and addresses - for (const ::BinaryAnnotation& annotation : span->m_span.binary_annotations) + for (const ::BinaryAnnotation& annotation : span->message().binary_annotations) { if (annotation.key.size() == 2 && annotation.annotation_type == AnnotationType::BOOL && diff --git a/src/Span.h b/src/Span.h index 101c3be..56ba228 100644 --- a/src/Span.h +++ b/src/Span.h @@ -199,7 +199,13 @@ class Endpoint return *this; } - inline const ::Endpoint &host(void) const { return m_host; } + const ::Endpoint& operator*(void) const { return m_host; } + + ::Endpoint& operator*(void) { return m_host; } + + const ::Endpoint& host(void) const { return m_host; } + + ::Endpoint& host(void) { return m_host; } }; /** @@ -215,7 +221,27 @@ class Annotation public: Annotation(Span &span, ::Annotation &annotation) : m_span(span), m_annotation(annotation) {} - Span &span(void) { return m_span; } + Span &span(void) const { return m_span; } + + /** + * \brief Raw thrift Annotation + */ + const ::Annotation& operator*() const { return m_annotation; } + + /** + * \brief Raw thrift Annotation + */ + ::Annotation& operator*() { return m_annotation; } + + /** + * \brief Raw thrift Annotation + */ + const ::Annotation& annotation() const { return m_annotation; } + + /** + * \brief Raw thrift Annotation + */ + ::Annotation& annotation() { return m_annotation; } /** * \brief Microseconds from epoch. @@ -250,7 +276,8 @@ class Annotation /** \sa Annotation#endpoint */ Annotation &with_endpoint(const Endpoint &endpoint) { - m_annotation.host = endpoint.host(); + m_annotation.host = *endpoint; + return *this; } }; @@ -483,7 +510,27 @@ class BinaryAnnotation public: BinaryAnnotation(Span &span, ::BinaryAnnotation &annotation) : m_span(span), m_annotation(annotation) {} - Span &span(void) { return m_span; } + Span &span(void) const { return m_span; } + + /** + * \brief Raw thrift BinaryAnnotation + */ + const ::BinaryAnnotation& operator*() const { return m_annotation; } + + /** + * \brief Raw thrift BinaryAnnotation + */ + ::BinaryAnnotation& operator*() { return m_annotation; } + + /** + * \brief Raw thrift BinaryAnnotation + */ + const ::BinaryAnnotation& annotation() const { return m_annotation; } + + /** + * \brief Raw thrift BinaryAnnotation + */ + ::BinaryAnnotation& annotation() { return m_annotation; } /** * \brief The thrift type of value, most often AnnotationType#STRING. @@ -596,7 +643,8 @@ class BinaryAnnotation */ BinaryAnnotation &with_endpoint(const Endpoint &endpoint) { - m_annotation.host = endpoint.host(); + m_annotation.host = *endpoint; + return *this; } }; @@ -664,9 +712,22 @@ class Span /** * \brief Raw thrift message */ - inline const ::Span &message(void) const { return m_span; } + inline const ::Span& message(void) const { return m_span; } + + /** + * \brief Raw thrift message + */ + inline ::Span& message(void) { return m_span; } + + /** + * \brief Raw thrift message + */ + inline const ::Span& operator*(void) const { return m_span; } - inline ::Span &message(void) { return m_span; } + /** + * \brief Raw thrift message + */ + inline ::Span& operator*(void) { return m_span; } /** * \brief Unique 8-byte identifier for a trace, set on all spans within it. @@ -869,67 +930,67 @@ class Span /** * \brief Associates events that explain latency with a timestamp. */ - Annotation annotate(const std::string &value, const Endpoint *endpoint = nullptr); + Annotation annotate(const std::string &value, const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()); /// \brief Annotate TraceKeys#CLIENT_SEND event - inline Annotation client_send(const Endpoint *endpoint = nullptr) + inline Annotation client_send(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::CLIENT_SEND, endpoint); + return annotate(TraceKeys::CLIENT_SEND, endpoint, timestamp); } /// \brief Annotate TraceKeys#CLIENT_RECV event - inline Annotation client_recv(const Endpoint *endpoint = nullptr) + inline Annotation client_recv(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::CLIENT_RECV, endpoint); + return annotate(TraceKeys::CLIENT_RECV, endpoint, timestamp); } /// \brief Annotate TraceKeys#SERVER_SEND event - inline Annotation server_send(const Endpoint *endpoint = nullptr) + inline Annotation server_send(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::SERVER_SEND, endpoint); + return annotate(TraceKeys::SERVER_SEND, endpoint, timestamp); } /// \brief Annotate TraceKeys#CLIENT_RECV event - inline Annotation server_recv(const Endpoint *endpoint = nullptr) + inline Annotation server_recv(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::SERVER_RECV, endpoint); + return annotate(TraceKeys::SERVER_RECV, endpoint, timestamp); } /// \brief Annotate TraceKeys#MESSAGE_SEND event - inline Annotation message_send(const Endpoint *endpoint = nullptr) + inline Annotation message_send(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::MESSAGE_SEND, endpoint); + return annotate(TraceKeys::MESSAGE_SEND, endpoint, timestamp); } /// \brief Annotate TraceKeys#MESSAGE_RECV event - inline Annotation message_recv(const Endpoint *endpoint = nullptr) + inline Annotation message_recv(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::MESSAGE_RECV, endpoint); + return annotate(TraceKeys::MESSAGE_RECV, endpoint, timestamp); } /// \brief Annotate TraceKeys#WIRE_SEND event - inline Annotation wire_send(const Endpoint *endpoint = nullptr) + inline Annotation wire_send(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::WIRE_SEND, endpoint); + return annotate(TraceKeys::WIRE_SEND, endpoint, timestamp); } /// \brief Annotate TraceKeys#CLIENT_RECV event - inline Annotation wire_recv(const Endpoint *endpoint = nullptr) + inline Annotation wire_recv(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::WIRE_RECV, endpoint); + return annotate(TraceKeys::WIRE_RECV, endpoint, timestamp); } /// \brief Annotate TraceKeys#CLIENT_SEND_FRAGMENT event - inline Annotation client_send_fragment(const Endpoint *endpoint = nullptr) + inline Annotation client_send_fragment(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::CLIENT_SEND_FRAGMENT, endpoint); + return annotate(TraceKeys::CLIENT_SEND_FRAGMENT, endpoint, timestamp); } /// \brief Annotate TraceKeys#CLIENT_RECV_FRAGMENT event - inline Annotation client_recv_fragment(const Endpoint *endpoint = nullptr) + inline Annotation client_recv_fragment(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::CLIENT_RECV_FRAGMENT, endpoint); + return annotate(TraceKeys::CLIENT_RECV_FRAGMENT, endpoint, timestamp); } /// \brief Annotate TraceKeys#SERVER_SEND_FRAGMENT event - inline Annotation server_send_fragment(const Endpoint *endpoint = nullptr) + inline Annotation server_send_fragment(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::SERVER_SEND_FRAGMENT, endpoint); + return annotate(TraceKeys::SERVER_SEND_FRAGMENT, endpoint, timestamp); } /// \brief Annotate TraceKeys#SERVER_RECV_FRAGMENT event - inline Annotation server_recv_fragment(const Endpoint *endpoint = nullptr) + inline Annotation server_recv_fragment(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::SERVER_RECV_FRAGMENT, endpoint); + return annotate(TraceKeys::SERVER_RECV_FRAGMENT, endpoint, timestamp); } /** @@ -1490,11 +1551,13 @@ struct Span2 { std::vector annotations; std::vector binary_annotations; - Span2(const Span *source) : span(source) {} + Span2(const Span *source = nullptr) : span(source) {} // Converts the input, parsing RPC annotations into Span2. static const std::vector from_span(const Span *span); + const Span& operator*() const { return *span; } + template void serialize_json(RapidJsonWriter &writer) const; }; @@ -1583,7 +1646,7 @@ void Span2::serialize_json(RapidJsonWriter &writer) const writer.Key("id"); writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, span->id())); - if (span->m_span.__isset.parent_id) + if (span->message().__isset.parent_id) { writer.Key("parentId"); writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, span->parent_id())); @@ -1614,17 +1677,17 @@ void Span2::serialize_json(RapidJsonWriter &writer) const break; } - if (span->m_span.__isset.timestamp) { + if (span->message().__isset.timestamp) { writer.Key("timestamp"); - writer.Int64(span->m_span.timestamp); + writer.Int64(span->message().timestamp); } else if (timestamp) { writer.Key("timestamp"); writer.Int64(timestamp); } - if (span->m_span.__isset.duration) { + if (span->message().__isset.duration) { writer.Key("duration"); - writer.Int64(span->m_span.duration); + writer.Int64(span->message().duration); } else if (duration) { writer.Key("duration"); writer.Int64(duration); diff --git a/test/TestSpan.cpp b/test/TestSpan.cpp index fcce28d..9d3a04e 100644 --- a/test/TestSpan.cpp +++ b/test/TestSpan.cpp @@ -391,3 +391,122 @@ TEST(span, annotate_stream) ASSERT_EQ(span.message().annotations.size(), 2); ASSERT_EQ(span.message().binary_annotations.size(), 8); } + +class Span2ConverterTest : public ::testing::Test { +protected: + zipkin::Endpoint frontend, backend, kafka; + + Span2ConverterTest() + : frontend("frontend", "127.0.0.1"), + backend("backend", "192.168.99.101", 9000), + kafka("kafka") + { + } + + virtual void SetUp() {} + + virtual void TearDown() {} +}; + +using zipkin::__impl::Span2; + +TEST_F(Span2ConverterTest, client) +{ + zipkin::Span client(nullptr, "get"); + + client + .with_trace_id("7180c278b62e8f6a216a2aea45d08fc9") + .with_parent_id(0x6b221d5bc9e6496c) + .with_id(0x5b4185666d50f68b) + .with_timestamp(std::chrono::microseconds(1472470996199000)) + .with_duration(std::chrono::microseconds(207000)); + + client.client_send(&frontend, std::chrono::microseconds(1472470996199000)); + auto wire_send = client.wire_send(&frontend, std::chrono::microseconds(1472470996238000)); + auto wire_recv = client.wire_recv(&frontend, std::chrono::microseconds(1472470996403000)); + client.client_recv(&frontend, std::chrono::microseconds(1472470996406000)); + auto http_path = client.http_path("/api", &frontend); + auto client_version = client.annotate("clnt/finagle.version", "6.45.0", &frontend); + client.server_addr(&backend); + + auto spans = Span2::from_span(&client); + + EXPECT_EQ(spans.size(), 1); + + auto simpleClient = spans.front(); + + EXPECT_EQ(simpleClient.kind, Span2::CLIENT); + EXPECT_EQ(*simpleClient.local_endpoint, frontend.host()); + EXPECT_EQ(*simpleClient.remote_endpoint, backend.host()); + EXPECT_EQ(simpleClient.timestamp, 1472470996199000); + EXPECT_EQ(simpleClient.duration, 207000); + + EXPECT_EQ(simpleClient.annotations.size(), 2); + EXPECT_EQ(*simpleClient.annotations[0], *wire_send); + EXPECT_EQ(*simpleClient.annotations[1], *wire_recv); + + EXPECT_EQ(simpleClient.binary_annotations.size(), 2); + EXPECT_EQ(*simpleClient.binary_annotations[0], *http_path); + EXPECT_EQ(*simpleClient.binary_annotations[1], *client_version); +} + +TEST_F(Span2ConverterTest, client_unfinished) +{ + zipkin::Span client(nullptr, "get"); + + client + .with_trace_id("7180c278b62e8f6a216a2aea45d08fc9") + .with_parent_id(0x6b221d5bc9e6496c) + .with_id(0x5b4185666d50f68b) + .with_timestamp(std::chrono::microseconds(1472470996199000)); + + client.client_send(&frontend, std::chrono::microseconds(1472470996199000)); + auto wire_send = client.wire_send(&frontend, std::chrono::microseconds(1472470996238000)); + + auto spans = Span2::from_span(&client); + + EXPECT_EQ(spans.size(), 1); + + auto simpleClient = spans.front(); + + EXPECT_EQ(simpleClient.kind, Span2::CLIENT); + EXPECT_EQ(*simpleClient.local_endpoint, frontend.host()); + EXPECT_EQ(simpleClient.remote_endpoint, nullptr); + EXPECT_EQ(simpleClient.timestamp, 1472470996199000); + EXPECT_EQ(simpleClient.duration, 0); + + EXPECT_EQ(simpleClient.annotations.size(), 1); + EXPECT_EQ(*simpleClient.annotations[0], *wire_send); + + EXPECT_TRUE(simpleClient.binary_annotations.empty()); +} + +TEST_F(Span2ConverterTest, client_kindInferredFromAnnotation) +{ + zipkin::Span client(nullptr, "get"); + + client + .with_trace_id("7180c278b62e8f6a216a2aea45d08fc9") + .with_parent_id(0x6b221d5bc9e6496c) + .with_id(0x5b4185666d50f68b) + .with_timestamp(std::chrono::microseconds(1472470996199000)) + .with_duration(std::chrono::microseconds(1472470996238000 - 1472470996199000)); + + auto client_send = client.client_send(&frontend, std::chrono::microseconds(1472470996199000)); + client.client_recv(&frontend, std::chrono::microseconds(1472470996238000)); + + auto spans = Span2::from_span(&client); + + EXPECT_EQ(spans.size(), 1); + + auto simpleClient = spans.front(); + + EXPECT_EQ(simpleClient.kind, Span2::CLIENT); + EXPECT_EQ(*simpleClient.local_endpoint, frontend.host()); + EXPECT_EQ(simpleClient.remote_endpoint, nullptr); + EXPECT_EQ(simpleClient.timestamp, 1472470996199000); + EXPECT_EQ(simpleClient.duration, 1472470996238000 - 1472470996199000); + + EXPECT_TRUE(simpleClient.annotations.empty()); + EXPECT_TRUE(simpleClient.binary_annotations.empty()); +}