diff --git a/examples/grpc_greeter/greeter_server.cc b/examples/grpc_greeter/greeter_server.cc index 203e6b8..b0693a3 100644 --- a/examples/grpc_greeter/greeter_server.cc +++ b/examples/grpc_greeter/greeter_server.cc @@ -77,7 +77,7 @@ struct GreeterServiceImpl final : public Greeter::Service }; DEFINE_string(grpc_addr, "localhost:50051", "GRPC server address"); -DEFINE_string(collector_uri, "", "Collector URI for tracing"); +DEFINE_string(collector_uri, "http://localhost:9411/zipkin/api/v1", "Collector URI for tracing"); void RunServer(std::shared_ptr collector) { diff --git a/examples/simple_proxy/main.c b/examples/simple_proxy/main.c index f637fc0..c50c046 100644 --- a/examples/simple_proxy/main.c +++ b/examples/simple_proxy/main.c @@ -192,6 +192,7 @@ void forward_http_request(struct mg_connection *nc, struct http_message *hm, zip else if (0 == mg_vcmp(hn, ZIPKIN_X_SPAN_ID)) { zipkin_span_set_parent_id(span, strtoull(hv->p, NULL, 16)); + zipkin_span_set_shared(span, 1); } else { @@ -273,6 +274,7 @@ void reply_json_response(struct mg_connection *nc, struct http_message *hm, zipk else if (0 == mg_vcmp(hn, ZIPKIN_X_SPAN_ID)) { zipkin_span_set_parent_id(span, strtoull(hv->p, NULL, 16)); + zipkin_span_set_shared(span, 1); } else if (0 == mg_vcmp(hn, ZIPKIN_X_SAMPLED)) { @@ -573,4 +575,4 @@ int main(int argc, char **argv) } return 0; -} \ No newline at end of file +} diff --git a/include/zipkin.h b/include/zipkin.h index 8a9fe58..3a9dee3 100644 --- a/include/zipkin.h +++ b/include/zipkin.h @@ -124,6 +124,9 @@ zipkin_span_t zipkin_span_parse_trace_id(zipkin_span_t span, const char *str, si int zipkin_span_debug(zipkin_span_t span); zipkin_span_t zipkin_span_set_debug(zipkin_span_t span, int debug); +int zipkin_span_shared(zipkin_span_t span); +zipkin_span_t zipkin_span_set_shared(zipkin_span_t span, int shared); + int zipkin_span_sampled(zipkin_span_t span); zipkin_span_t zipkin_span_set_sampled(zipkin_span_t span, int sampled); @@ -297,4 +300,4 @@ struct curl_slist *zipkin_propagation_inject_curl_headers(struct curl_slist *hea #ifdef __cplusplus } -#endif \ No newline at end of file +#endif diff --git a/src/CApi.cpp b/src/CApi.cpp index 4c0a3da..484dd6e 100644 --- a/src/CApi.cpp +++ b/src/CApi.cpp @@ -154,6 +154,17 @@ zipkin_span_t zipkin_span_set_debug(zipkin_span_t span, int debug) return span; } +int zipkin_span_shared(zipkin_span_t span) +{ + return span ? static_cast(span)->shared() : false; +} +zipkin_span_t zipkin_span_set_shared(zipkin_span_t span, int shared) +{ + if (span) + static_cast(span)->with_shared(shared); + + return span; +} int zipkin_span_sampled(zipkin_span_t span) { return span ? static_cast(span)->sampled() : false; @@ -568,4 +579,4 @@ struct curl_slist *zipkin_propagation_inject_curl_headers(struct curl_slist *hea #ifdef __cplusplus } -#endif \ No newline at end of file +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cdd8e16..0767c66 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -14,7 +14,24 @@ add_custom_command ( OUTPUT ${zipkin_core_INCS} ${zipkin_core_SRCS} COMMAND ${THRIFT_COMPILER} --gen cpp -o ${PROJECT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/zipkinCore.thrift DEPENDS zipkinCore.thrift - COMMENT "Generating zipkin Thrift binding files" + COMMENT "Generating zipkin core Thrift binding files" +) + +set (zipkin_deps_INCS + ${GENDIR}/zipkinDependencies_constants.h + ${GENDIR}/zipkinDependencies_types.h + ) + +set (zipkin_deps_SRCS + ${GENDIR}/zipkinDependencies_constants.cpp + ${GENDIR}/zipkinDependencies_types.cpp + ) + +add_custom_command ( + OUTPUT ${zipkin_deps_INCS} ${zipkin_deps_SRCS} + COMMAND ${THRIFT_COMPILER} --gen cpp -o ${PROJECT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/zipkinDependencies.thrift + DEPENDS zipkinDependencies.thrift + COMMENT "Generating zipkin dependencies Thrift binding files" ) set (scribe_INCS @@ -76,7 +93,7 @@ endif() message(STATUS "Library type: ${LIB_TYPE}") -add_library (zipkin ${LIB_TYPE} ${zipkin_SRCS} ${zipkin_core_SRCS} ${scribe_SRCS}) +add_library (zipkin ${LIB_TYPE} ${zipkin_SRCS} ${zipkin_core_SRCS} ${zipkin_deps_SRCS} ${scribe_SRCS}) target_link_libraries (zipkin ${zipkin_DEPENDENCIES}) set (PREFIX ${CMAKE_INSTALL_PREFIX}) @@ -103,7 +120,7 @@ install ( ) install ( - FILES ${zipkin_INCS} ${zipkin_core_INCS} ${scribe_INCS} + FILES ${zipkin_INCS} ${zipkin_core_INCS} ${zipkin_deps_INCS} ${scribe_INCS} DESTINATION ${CMAKE_INSTALL_PREFIX}/include/zipkin ) diff --git a/src/Collector.cpp b/src/Collector.cpp index b887764..2a1e7cb 100644 --- a/src/Collector.cpp +++ b/src/Collector.cpp @@ -21,6 +21,8 @@ namespace zipkin std::shared_ptr MessageCodec::binary(new BinaryCodec()); std::shared_ptr MessageCodec::json(new JsonCodec()); std::shared_ptr MessageCodec::pretty_json(new PrettyJsonCodec()); +std::shared_ptr MessageCodec::json_v2(new JsonCodec(2)); +std::shared_ptr MessageCodec::pretty_json_v2(new PrettyJsonCodec(2)); CompressionCodec parse_compression_codec(const std::string &codec) { @@ -57,7 +59,10 @@ std::shared_ptr MessageCodec::parse(const std::string &codec) return json; if (codec == "pretty_json") return pretty_json; - + if (codec == "json_v2") + return json_v2; + if (codec == "pretty_json_v2") + return pretty_json_v2; return nullptr; } @@ -85,7 +90,13 @@ size_t JsonCodec::encode(boost::shared_ptrserialize_json(writer); + if (m_format_version > 1) { + for (auto &span2: __impl::Span2::from_span(span)) { + span2.serialize_json(writer); + } + } else { + span->serialize_json(writer); + } } writer.EndArray(); @@ -105,7 +116,13 @@ size_t PrettyJsonCodec::encode(boost::shared_ptrserialize_json(writer); + if (m_format_version > 1) { + for (auto &span2: __impl::Span2::from_span(span)) { + span2.serialize_json(writer); + } + } else { + span->serialize_json(writer); + } } writer.EndArray(); diff --git a/src/Collector.h b/src/Collector.h index 83938c3..0a0bce9 100644 --- a/src/Collector.h +++ b/src/Collector.h @@ -4,6 +4,7 @@ #include #include #include + #include #include @@ -54,6 +55,8 @@ class MessageCodec static std::shared_ptr binary; static std::shared_ptr json; static std::shared_ptr pretty_json; + static std::shared_ptr json_v2; + static std::shared_ptr pretty_json_v2; }; /** @@ -74,8 +77,13 @@ class BinaryCodec : public MessageCodec */ class JsonCodec : public MessageCodec { + int m_format_version; public: - virtual const std::string name(void) const override { return "json"; } + JsonCodec(int format_version = 1) : m_format_version(format_version) {} + + virtual const std::string name(void) const override { + return m_format_version > 1 ? "json_v2" : "json"; + } virtual const std::string mime_type(void) const override { return "application/json"; } @@ -87,8 +95,13 @@ class JsonCodec : public MessageCodec */ class PrettyJsonCodec : public MessageCodec { + int m_format_version; public: - virtual const std::string name(void) const override { return "pretty_json"; } + PrettyJsonCodec(int format_version = 1) : m_format_version(format_version) {} + + virtual const std::string name(void) const override { + return m_format_version > 1 ? "pretty_json_v2" : "pretty_json"; + } virtual const std::string mime_type(void) const override { return "application/json"; } diff --git a/src/Propagation.cpp b/src/Propagation.cpp index 4a00b78..89eef8a 100644 --- a/src/Propagation.cpp +++ b/src/Propagation.cpp @@ -112,7 +112,8 @@ void Propagation::extract(grpc::ServerContext &context, Span &span) } else if (item.first == ZIPKIN_X_SPAN_ID_LOWERCASE) { - span.with_parent_id(folly::to(value)); + span.with_parent_id(folly::to(value)) + .with_shared(true); } else if (item.first == ZIPKIN_X_SAMPLED_LOWERCASE) { @@ -127,4 +128,4 @@ void Propagation::extract(grpc::ServerContext &context, Span &span) #endif // WITH_GRPC -} // namespace zipkin \ No newline at end of file +} // namespace zipkin diff --git a/src/Span.cpp b/src/Span.cpp index 771e6c5..3da5941 100644 --- a/src/Span.cpp +++ b/src/Span.cpp @@ -24,6 +24,8 @@ DEFINE_TRACE_KEY(CLIENT_SEND) DEFINE_TRACE_KEY(CLIENT_RECV) DEFINE_TRACE_KEY(SERVER_SEND) DEFINE_TRACE_KEY(SERVER_RECV) +DEFINE_TRACE_KEY(MESSAGE_SEND) +DEFINE_TRACE_KEY(MESSAGE_RECV) DEFINE_TRACE_KEY(WIRE_SEND) DEFINE_TRACE_KEY(WIRE_RECV) DEFINE_TRACE_KEY(CLIENT_SEND_FRAGMENT) @@ -33,6 +35,7 @@ DEFINE_TRACE_KEY(SERVER_RECV_FRAGMENT) DEFINE_TRACE_KEY(LOCAL_COMPONENT) DEFINE_TRACE_KEY(CLIENT_ADDR) DEFINE_TRACE_KEY(SERVER_ADDR) +DEFINE_TRACE_KEY(MESSAGE_ADDR) DEFINE_TRACE_KEY(ERROR) DEFINE_TRACE_KEY(HTTP_HOST) DEFINE_TRACE_KEY(HTTP_METHOD) @@ -89,9 +92,9 @@ Endpoint &Endpoint::with_addr(const struct sockaddr *addr) { auto v4 = reinterpret_cast(addr); - m_host.__set_ipv4(ntohl(v4->sin_addr.s_addr)); - m_host.__set_port(ntohs(v4->sin_port)); - m_host.__isset.ipv6 = false; + set_ipv4(ntohl(v4->sin_addr.s_addr)); + set_port(ntohs(v4->sin_port)); + break; } @@ -99,8 +102,9 @@ Endpoint &Endpoint::with_addr(const struct sockaddr *addr) { auto v6 = reinterpret_cast(addr); - m_host.__set_ipv6(std::string(reinterpret_cast(v6->sin6_addr.s6_addr), sizeof(v6->sin6_addr))); - m_host.__set_port(ntohs(v6->sin6_port)); + set_ipv6(std::string(reinterpret_cast(v6->sin6_addr.s6_addr), sizeof(v6->sin6_addr))); + set_port(ntohs(v6->sin6_port)); + break; } } @@ -116,17 +120,16 @@ Endpoint &Endpoint::with_addr(const std::string &addr, port_t port) { auto bytes = ip.to_v6().to_bytes(); - m_host.__set_ipv6(std::string(reinterpret_cast(bytes.data()), bytes.size())); + set_ipv6(std::string(reinterpret_cast(bytes.data()), bytes.size())); } else { auto bytes = ip.to_v4().to_bytes(); - m_host.__set_ipv4(ntohl(*reinterpret_cast(bytes.data()))); - m_host.__isset.ipv6 = false; + set_ipv4(ntohl(*reinterpret_cast(bytes.data()))); } - m_host.__set_port(port); + set_port(port); return *this; } @@ -152,7 +155,7 @@ const char *to_string(AnnotationType type) } } -void Span::reset(const std::string &name, span_id_t parent_id, userdata_t userdata, bool sampled) +void Span::reset(const std::string &name, span_id_t parent_id, userdata_t userdata, bool sampled, bool shared) { m_span.debug = false; m_span.duration = 0; @@ -178,6 +181,9 @@ void Span::reset(const std::string &name, span_id_t parent_id, userdata_t userda m_userdata = userdata; m_sampled = sampled; + m_shared = shared; + m_local_endpoint.reset(); + m_remote_endpoint.reset(); } void Span::submit(void) @@ -208,12 +214,18 @@ timestamp_t Span::now() return std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()); } -Annotation Span::annotate(const std::string &value, const Endpoint *endpoint) +Annotation Span::annotate(const std::string &value, const Endpoint *endpoint, timestamp_t timestamp) { ::Annotation annotation; - annotation.__set_timestamp(now().count()); + if ((annotation.__isset.timestamp = timestamp != timestamp_t::zero())) { + annotation.__set_timestamp(timestamp.count()); + } else { + annotation.__set_timestamp(now().count()); + } + annotation.__set_value(value); + annotation.__isset.value = true; if (endpoint) { @@ -230,8 +242,13 @@ BinaryAnnotation Span::annotate(const std::string &key, const uint8_t *value, si ::BinaryAnnotation annotation; annotation.__set_key(key); + annotation.__isset.key = true; + annotation.__set_value(std::string(reinterpret_cast(value), size)); + annotation.__isset.value = true; + annotation.__set_annotation_type(AnnotationType::BYTES); + annotation.__isset.annotation_type = true; if (endpoint) { @@ -340,4 +357,271 @@ void CachedSpan::release(void) } } -} // namespace zipkin \ No newline at end of file +namespace __impl +{ + +inline bool is_set(const ::Endpoint &host) { + return host.__isset.service_name && (host.__isset.ipv4 || host.__isset.ipv6); +} + +inline bool close_enough(const ::Endpoint *lhs, const ::Endpoint *rhs) { + return lhs->__isset.service_name && rhs->__isset.service_name && + lhs->service_name == rhs->service_name; +} + +const std::vector Span2::from_span(const Span *span) { + std::vector spans = {{ Span2(span) }}; + + auto new_span = [span, &spans](const ::Endpoint *host, Kind kind=UNKNOWN) -> Span2& { + Span2 span2(span); + + span2.kind = kind; + span2.local_endpoint = host; + + spans.push_back(std::move(span2)); + + return spans.back(); + }; + + auto for_endpoint = [span, &spans, new_span](const ::Endpoint &host) -> Span2& { + if (!is_set(host)) { + return spans.front(); // allocate missing endpoint data to first span + } + + for (Span2& next : spans) { + if (!next.local_endpoint) { + next.local_endpoint = &host; + return next; + } + + if (close_enough(next.local_endpoint, &host)) { + return next; + } + } + + return new_span(&host); + }; + + auto maybe_timestamp_and_duration = [span, for_endpoint](const ::Annotation *begin, const ::Annotation *end) { + Span2& span2 = for_endpoint(begin->host); + + if (span->message().__isset.timestamp && span->message().__isset.duration) { + span2.timestamp = span->message().timestamp; + span2.duration = span->message().duration; + } else { + span2.timestamp = begin->timestamp; + span2.duration = end ? (end->timestamp - begin->timestamp) : 0; + } + }; + + const ::Annotation *cs = nullptr, *sr = nullptr, *ss = nullptr, *cr = nullptr, *ms = nullptr, *mr = nullptr, *ws = nullptr, *wr = nullptr; + + // add annotations unless they are "core" + for (const ::Annotation& annotation : span->message().annotations) + { + Span2& span2 = for_endpoint(annotation.host); + + if (annotation.value.size() == 2 && is_set(annotation.host)) { + uint16_t key = *reinterpret_cast(annotation.value.c_str()); + + // core annotations require an endpoint. Don't give special treatment when that's missing + switch (key) { + case 0x7363: // CLIENT_SEND + span2.kind = CLIENT; + cs = &annotation; + break; + + case 0x7273: // SERVER_RECV + span2.kind = SERVER; + sr = &annotation; + break; + + case 0x7373: // SERVER_SEND + span2.kind = SERVER; + ss = &annotation; + break; + + case 0x7263: // CLIENT_RECV + span2.kind = CLIENT; + cr = &annotation; + break; + + case 0x736d: // MESSAGE_SEND + span2.kind = PRODUCER; + ms = &annotation; + break; + + case 0x726d: // MESSAGE_RECV + span2.kind = CONSUMER; + mr = &annotation; + break; + + case 0x7377: // WIRE_SEND + ws = &annotation; + break; + + case 0x7277: // WIRE_RECV + wr = &annotation; + break; + + default: + span2.annotations.push_back(&annotation); + break; + } + } else { + span2.annotations.push_back(&annotation); + } + } + + if (cs && sr) { + // in a shared span, the client side owns span duration by annotations or explicit timestamp + maybe_timestamp_and_duration(cs, cr); + + // special-case loopback: We need to make sure on loopback there are two span2s + Span2& client = for_endpoint(cs->host); + bool is_client = close_enough(&cs->host, &sr->host); + Span2& server = is_client ? + new_span(&sr->host, SERVER) : // fork a new span for the server side + for_endpoint(sr->host); + + if (is_client) { + client.kind = CLIENT; + } + + // the server side is smaller than that, we have to read annotations to find out + server.shared = true; + server.timestamp = sr->timestamp; + + if (ss) server.duration = ss->timestamp - sr->timestamp; + if (!cr && !span->message().duration) client.duration = 0; // one-way has no duration + } else if (cs && cr) { + maybe_timestamp_and_duration(cs, cr); + } else if (sr && ss) { + maybe_timestamp_and_duration(sr, ss); + } else { + // otherwise, the span is incomplete. revert special-casing + for (Span2& next : spans) { + switch (next.kind) { + case CLIENT: + if (cs) next.timestamp = cs->timestamp; + break; + case SERVER: + if (sr) next.timestamp = sr->timestamp; + break; + case PRODUCER: + case CONSUMER: + case UNKNOWN: + break; + } + } + + if (span->message().timestamp) { + Span2& span2 = spans.front(); + + span2.timestamp = span->message().timestamp; + span2.duration = span->message().duration; + } + } + + // Span v1 format did not have a shared flag. By convention, span.timestamp being absent + // implied shared. When we only see the server-side, carry this signal over. + if (!cs && (sr && !span->message().timestamp)) { + for_endpoint(sr->host).shared = true; + } + + // ms and mr are not supposed to be in the same span, but in case they are.. + if (ms && mr) { + // special-case loopback: We need to make sure on loopback there are two span2s + Span2& producer = for_endpoint(ms->host); + bool is_producer = close_enough(&ms->host, &mr->host); + Span2& consumer = is_producer ? + new_span(&mr->host, CONSUMER) : // fork a new span for the consumer side + for_endpoint(mr->host); + + if (is_producer) { + producer.kind = PRODUCER; + } + + consumer.shared = true; + + if (wr) { + consumer.timestamp = wr->timestamp; + consumer.duration = mr->timestamp - wr->timestamp; + } else { + consumer.timestamp = mr->timestamp; + } + + producer.timestamp = ms->timestamp; + producer.duration = ws ? (ws->timestamp - ms->timestamp) : 0; + } else if (ms) { + maybe_timestamp_and_duration(ms, ws); + } else if (mr) { + if (wr) { + maybe_timestamp_and_duration(wr, mr); + } else { + maybe_timestamp_and_duration(mr, nullptr); + } + } else { + if (ws) for_endpoint(ws->host).annotations.push_back(ws); + if (wr) for_endpoint(wr->host).annotations.push_back(wr); + } + + const ::Endpoint *ca = nullptr, *sa = nullptr, *ma = nullptr; + + // convert binary annotations to tags and addresses + for (const ::BinaryAnnotation& annotation : span->message().binary_annotations) + { + if (annotation.key.size() == 2 && + annotation.annotation_type == AnnotationType::BOOL && + is_set(annotation.host)) + { + switch (* reinterpret_cast(annotation.key.c_str())) { + case 0x6163: // CLIENT_ADDR + ca = &annotation.host; + break; + + case 0x6173: // SERVER_ADDR + sa = &annotation.host; + break; + + case 0x616d: // MESSAGE_ADDR + ma = &annotation.host; + break; + + default: + for_endpoint(annotation.host).binary_annotations.push_back(&annotation); + break; + } + + continue; + } + + for_endpoint(annotation.host).binary_annotations.push_back(&annotation); + } + + if (cs && sa && !close_enough(sa, &cs->host)) { + for_endpoint(cs->host).remote_endpoint = sa; + } + + if (sr && ca && !close_enough(ca, &sr->host)) { + for_endpoint(sr->host).remote_endpoint = ca; + } + + if (ms && ma && !close_enough(ma, &ms->host)) { + for_endpoint(ms->host).remote_endpoint = ma; + } + + if (mr && ma && !close_enough(ma, &mr->host)) { + for_endpoint(mr->host).remote_endpoint = ma; + } + + if ((!cs && !sr) && (ca && sa)) { + for_endpoint(*ca).remote_endpoint = sa; + } + + return std::move(spans); +} + +} // namespace __impl + +} // namespace zipkin diff --git a/src/Span.h b/src/Span.h index fe6d076..56ba228 100644 --- a/src/Span.h +++ b/src/Span.h @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include #include @@ -35,6 +37,12 @@ typedef void *userdata_t; namespace zipkin { +struct Tracer; +class CachedTracer; +class Span; + +namespace __impl { class Span2; } + /** * \brief Indicates the network context of a service recording an annotation with two exceptions. */ @@ -42,6 +50,29 @@ class Endpoint { ::Endpoint m_host; + inline void set_service_name(const std::string& service_name) { + m_host.__set_service_name(service_name); + m_host.__isset.service_name = true; + } + + inline void set_ipv4(uint32_t ipv4) { + m_host.__set_ipv4(ipv4); + m_host.__isset.ipv4 = true; + m_host.__isset.ipv6 = false; + } + + inline void set_ipv6(const std::string& ipv6) { + m_host.__set_ipv6(ipv6); + m_host.__isset.ipv4 = false; + m_host.__isset.ipv6 = true; + } + + inline void set_port(uint16_t port) { + if ((m_host.__isset.port = port != 0)) { + m_host.__set_port(port); + } + } + public: Endpoint() { @@ -97,7 +128,11 @@ class Endpoint */ inline const std::string &service_name(void) const { return m_host.service_name; } - inline Endpoint &with_service_name(const std::string &service_name); + inline Endpoint &with_service_name(const std::string &service_name) { + set_service_name(service_name); + + return *this; + } std::unique_ptr sockaddr(void) const; @@ -110,12 +145,26 @@ class Endpoint /** * \brief with IPv4 address */ - inline Endpoint &with_addr(const struct sockaddr_in *addr); + inline Endpoint &with_addr(const struct sockaddr_in *addr) { + assert(addr); + + set_ipv4(ntohl(addr->sin_addr.s_addr)); + set_port(ntohs(addr->sin_port)); + + return *this; + } /** * \brief with IPv6 address */ - inline Endpoint &with_addr(const struct sockaddr_in6 *addr); + inline Endpoint &with_addr(const struct sockaddr_in6 *addr) { + assert(addr); + + set_ipv6(std::string(reinterpret_cast(addr->sin6_addr.s6_addr), sizeof(addr->sin6_addr))); + set_port(ntohs(addr->sin6_port)); + + return *this; + } /** * \brief with IP address @@ -125,21 +174,39 @@ class Endpoint /** * \brief with IPv4 address */ - inline Endpoint &with_ipv4(const std::string &ip); + inline Endpoint &with_ipv4(const std::string &ip) { + set_ipv4(ntohl(inet_addr(ip.c_str()))); + + return *this; + } /** * \brief with IPv6 address */ - inline Endpoint &with_ipv6(const std::string &ip); + inline Endpoint &with_ipv6(const std::string &ip) { + struct in6_addr addr; - inline Endpoint &with_port(port_t port); + if (inet_pton(AF_INET6, ip.c_str(), addr.s6_addr) > 0) { + set_ipv6(std::string(reinterpret_cast(addr.s6_addr), sizeof(addr))); + } - inline const ::Endpoint &host(void) const { return m_host; } -}; + return *this; + } -struct Tracer; -class CachedTracer; -class Span; + inline Endpoint &with_port(port_t port) { + set_port(port); + + return *this; + } + + const ::Endpoint& operator*(void) const { return m_host; } + + ::Endpoint& operator*(void) { return m_host; } + + const ::Endpoint& host(void) const { return m_host; } + + ::Endpoint& host(void) { return m_host; } +}; /** * \brief Associates an event that explains latency with a timestamp. @@ -154,7 +221,27 @@ class Annotation public: Annotation(Span &span, ::Annotation &annotation) : m_span(span), m_annotation(annotation) {} - Span &span(void) { return m_span; } + Span &span(void) const { return m_span; } + + /** + * \brief Raw thrift Annotation + */ + const ::Annotation& operator*() const { return m_annotation; } + + /** + * \brief Raw thrift Annotation + */ + ::Annotation& operator*() { return m_annotation; } + + /** + * \brief Raw thrift Annotation + */ + const ::Annotation& annotation() const { return m_annotation; } + + /** + * \brief Raw thrift Annotation + */ + ::Annotation& annotation() { return m_annotation; } /** * \brief Microseconds from epoch. @@ -189,7 +276,8 @@ class Annotation /** \sa Annotation#endpoint */ Annotation &with_endpoint(const Endpoint &endpoint) { - m_annotation.host = endpoint.host(); + m_annotation.host = *endpoint; + return *this; } }; @@ -262,6 +350,19 @@ struct TraceKeys */ DECLARE_TRACE_KEY(SERVER_RECV) /** + * \brief Message send ("ms") is a request to send a message to a destination, usually a broker. + * + * This may be the only annotation in a messaging span. If WIRE_SEND exists in the same span, + * it follows this moment and clarifies delays sending the message, such as batching. + */ + DECLARE_TRACE_KEY(MESSAGE_SEND) + /** + * \brief A consumer received ("mr") a message from a broker. This may be the only + * annotation in a messaging span. If WIRE_RECV exists in the same span, it + * precedes this moment and clarifies any local queuing delay. + */ + DECLARE_TRACE_KEY(MESSAGE_RECV) + /** * \brief Optionally logs an attempt to send a message on the wire. * * Multiple wire send events could indicate network retries. @@ -338,6 +439,10 @@ struct TraceKeys */ DECLARE_TRACE_KEY(SERVER_ADDR) /** + * \brief Indicates the remote address of a messaging span, usually the broker. + */ + DECLARE_TRACE_KEY(MESSAGE_ADDR) + /** * \brief When an {@link Annotation#value}, this indicates when an error occurred. When a {@link * BinaryAnnotation#key}, the value is a human readable message associated with an error. * @@ -405,7 +510,27 @@ class BinaryAnnotation public: BinaryAnnotation(Span &span, ::BinaryAnnotation &annotation) : m_span(span), m_annotation(annotation) {} - Span &span(void) { return m_span; } + Span &span(void) const { return m_span; } + + /** + * \brief Raw thrift BinaryAnnotation + */ + const ::BinaryAnnotation& operator*() const { return m_annotation; } + + /** + * \brief Raw thrift BinaryAnnotation + */ + ::BinaryAnnotation& operator*() { return m_annotation; } + + /** + * \brief Raw thrift BinaryAnnotation + */ + const ::BinaryAnnotation& annotation() const { return m_annotation; } + + /** + * \brief Raw thrift BinaryAnnotation + */ + ::BinaryAnnotation& annotation() { return m_annotation; } /** * \brief The thrift type of value, most often AnnotationType#STRING. @@ -518,7 +643,8 @@ class BinaryAnnotation */ BinaryAnnotation &with_endpoint(const Endpoint &endpoint) { - m_annotation.host = endpoint.host(); + m_annotation.host = *endpoint; + return *this; } }; @@ -543,23 +669,26 @@ class Span ::Span m_span; userdata_t m_userdata; bool m_sampled; + bool m_shared; + std::shared_ptr m_local_endpoint, m_remote_endpoint; static const ::Endpoint host(const Endpoint *endpoint); + friend class __impl::Span2; public: /** * \brief Construct a span */ - Span(Tracer *tracer, const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true) + Span(Tracer *tracer, const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true, bool shared = false) : m_tracer(tracer) { - reset(name, parent_id, userdata, sampled); + reset(name, parent_id, userdata, sampled, shared); } /** * \brief Reset a span */ - void reset(const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true); + void reset(const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true, bool shared = false); /** * \brief Submit a Span to Tracer @@ -583,9 +712,22 @@ class Span /** * \brief Raw thrift message */ - inline const ::Span &message(void) const { return m_span; } + inline const ::Span& message(void) const { return m_span; } + + /** + * \brief Raw thrift message + */ + inline ::Span& message(void) { return m_span; } - inline ::Span &message(void) { return m_span; } + /** + * \brief Raw thrift message + */ + inline const ::Span& operator*(void) const { return m_span; } + + /** + * \brief Raw thrift message + */ + inline ::Span& operator*(void) { return m_span; } /** * \brief Unique 8-byte identifier for a trace, set on all spans within it. @@ -730,6 +872,41 @@ class Span return *this; } + /** + * \brief Report the span is sharing between the client and the server. + */ + inline bool shared(void) const { return m_shared; } + + /** \sa Span#shared */ + inline Span &with_shared(bool shared = true) + { + m_shared = shared; + return *this; + } + + /** + * \brief Local endpoint + */ + inline std::shared_ptr local_endpoint(void) const { return m_local_endpoint; } + + /** \sa Span#local_endpoint */ + inline Span &with_local_endpoint(std::shared_ptr local_endpoint) + { + m_local_endpoint = local_endpoint; + return *this; + } + /** + * \brief Remote endpoint + */ + inline std::shared_ptr remote_endpoint(void) const { return m_remote_endpoint; } + + /** \sa Span#remote_endpoint */ + inline Span &with_remote_endpoint(std::shared_ptr remote_endpoint) + { + remote_endpoint = remote_endpoint; + return *this; + } + virtual inline Span *span(const std::string &name, userdata_t userdata = nullptr) const { Span *span = new Span(m_tracer, name, id(), userdata); @@ -753,57 +930,67 @@ class Span /** * \brief Associates events that explain latency with a timestamp. */ - Annotation annotate(const std::string &value, const Endpoint *endpoint = nullptr); + Annotation annotate(const std::string &value, const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()); /// \brief Annotate TraceKeys#CLIENT_SEND event - inline Annotation client_send(const Endpoint *endpoint = nullptr) + inline Annotation client_send(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::CLIENT_SEND, endpoint); + return annotate(TraceKeys::CLIENT_SEND, endpoint, timestamp); } /// \brief Annotate TraceKeys#CLIENT_RECV event - inline Annotation client_recv(const Endpoint *endpoint = nullptr) + inline Annotation client_recv(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::CLIENT_RECV, endpoint); + return annotate(TraceKeys::CLIENT_RECV, endpoint, timestamp); } /// \brief Annotate TraceKeys#SERVER_SEND event - inline Annotation server_send(const Endpoint *endpoint = nullptr) + inline Annotation server_send(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::SERVER_SEND, endpoint); + return annotate(TraceKeys::SERVER_SEND, endpoint, timestamp); } /// \brief Annotate TraceKeys#CLIENT_RECV event - inline Annotation server_recv(const Endpoint *endpoint = nullptr) + inline Annotation server_recv(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::SERVER_RECV, endpoint); + return annotate(TraceKeys::SERVER_RECV, endpoint, timestamp); + } + /// \brief Annotate TraceKeys#MESSAGE_SEND event + inline Annotation message_send(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) + { + return annotate(TraceKeys::MESSAGE_SEND, endpoint, timestamp); + } + /// \brief Annotate TraceKeys#MESSAGE_RECV event + inline Annotation message_recv(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) + { + return annotate(TraceKeys::MESSAGE_RECV, endpoint, timestamp); } /// \brief Annotate TraceKeys#WIRE_SEND event - inline Annotation wire_send(const Endpoint *endpoint = nullptr) + inline Annotation wire_send(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::WIRE_SEND, endpoint); + return annotate(TraceKeys::WIRE_SEND, endpoint, timestamp); } /// \brief Annotate TraceKeys#CLIENT_RECV event - inline Annotation wire_recv(const Endpoint *endpoint = nullptr) + inline Annotation wire_recv(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::WIRE_RECV, endpoint); + return annotate(TraceKeys::WIRE_RECV, endpoint, timestamp); } /// \brief Annotate TraceKeys#CLIENT_SEND_FRAGMENT event - inline Annotation client_send_fragment(const Endpoint *endpoint = nullptr) + inline Annotation client_send_fragment(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::CLIENT_SEND_FRAGMENT, endpoint); + return annotate(TraceKeys::CLIENT_SEND_FRAGMENT, endpoint, timestamp); } /// \brief Annotate TraceKeys#CLIENT_RECV_FRAGMENT event - inline Annotation client_recv_fragment(const Endpoint *endpoint = nullptr) + inline Annotation client_recv_fragment(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::CLIENT_RECV_FRAGMENT, endpoint); + return annotate(TraceKeys::CLIENT_RECV_FRAGMENT, endpoint, timestamp); } /// \brief Annotate TraceKeys#SERVER_SEND_FRAGMENT event - inline Annotation server_send_fragment(const Endpoint *endpoint = nullptr) + inline Annotation server_send_fragment(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::SERVER_SEND_FRAGMENT, endpoint); + return annotate(TraceKeys::SERVER_SEND_FRAGMENT, endpoint, timestamp); } /// \brief Annotate TraceKeys#SERVER_RECV_FRAGMENT event - inline Annotation server_recv_fragment(const Endpoint *endpoint = nullptr) + inline Annotation server_recv_fragment(const Endpoint *endpoint = nullptr, timestamp_t timestamp = timestamp_t::zero()) { - return annotate(TraceKeys::SERVER_RECV_FRAGMENT, endpoint); + return annotate(TraceKeys::SERVER_RECV_FRAGMENT, endpoint, timestamp); } /** @@ -888,14 +1075,19 @@ class Span return annotate(TraceKeys::LOCAL_COMPONENT, value, endpoint); } /// \brief Annotate TraceKeys#CLIENT_ADDR event - inline BinaryAnnotation client_addr(const std::string &value, const Endpoint *endpoint = nullptr) + inline BinaryAnnotation client_addr(const Endpoint *endpoint) { - return annotate(TraceKeys::CLIENT_ADDR, value, endpoint); + return annotate(TraceKeys::CLIENT_ADDR, true, endpoint); } /// \brief Annotate TraceKeys#SERVER_ADDR event - inline BinaryAnnotation server_addr(const std::string &value, const Endpoint *endpoint = nullptr) + inline BinaryAnnotation server_addr(const Endpoint *endpoint) + { + return annotate(TraceKeys::SERVER_ADDR, true, endpoint); + } + /// \brief Annotate TraceKeys#MESSAGE_ADDR event + inline BinaryAnnotation message_addr(const Endpoint *endpoint) { - return annotate(TraceKeys::SERVER_ADDR, value, endpoint); + return annotate(TraceKeys::MESSAGE_ADDR, true, endpoint); } /// \brief Annotate TraceKeys#ERROR event inline Annotation error(const Endpoint *endpoint = nullptr) @@ -916,6 +1108,9 @@ class Span template void serialize_json(RapidJsonWriter &writer) const; + template + void serialize_json_v2(RapidJsonWriter &writer) const; + class Scope { Span &m_span; @@ -1041,58 +1236,9 @@ class CachedSpan : public Span } __attribute__((aligned)); -Endpoint &Endpoint::with_service_name(const std::string &service_name) -{ - m_host.service_name = service_name; - return *this; -} - -Endpoint &Endpoint::with_addr(const struct sockaddr_in *addr) -{ - assert(addr); - - m_host.__isset.ipv6 = 0; - m_host.__set_ipv4(ntohl(addr->sin_addr.s_addr)); - m_host.__set_port(ntohs(addr->sin_port)); - - return *this; -} - -Endpoint &Endpoint::with_addr(const struct sockaddr_in6 *addr) -{ - assert(addr); - - m_host.__set_ipv6(std::string(reinterpret_cast(addr->sin6_addr.s6_addr), sizeof(addr->sin6_addr))); - m_host.__set_port(ntohs(addr->sin6_port)); - - return *this; -} - -Endpoint &Endpoint::with_ipv4(const std::string &ip) -{ - m_host.__set_ipv4(ntohl(inet_addr(ip.c_str()))); - - return *this; -} - -Endpoint &Endpoint::with_ipv6(const std::string &ip) -{ - struct in6_addr addr; - - if (inet_pton(AF_INET6, ip.c_str(), addr.s6_addr) > 0) - m_host.__set_ipv6(std::string(reinterpret_cast(addr.s6_addr), sizeof(addr))); - - return *this; -} - -inline Endpoint &Endpoint::with_port(port_t port) -{ - m_host.__set_port(port); - return *this; -} - namespace __impl { + inline uint16_t native_to_big(uint16_t value) { return htons(value); } inline uint32_t native_to_big(uint32_t value) { return htonl(value); } @@ -1232,8 +1378,17 @@ void Span::serialize_json(RapidJsonWriter &writer) const writer.Key("serviceName"); writer.String(host.service_name); - writer.Key("ipv4"); - writer.String(inet_ntoa({static_cast(htonl(host.ipv4))})); + if (host.__isset.ipv6) { + char buf[INET6_ADDRSTRLEN+1] = {0}; + + inet_ntop(AF_INET6, host.ipv6.c_str(), buf, INET6_ADDRSTRLEN); + + writer.Key("ipv6"); + writer.String(buf); + } else { + writer.Key("ipv4"); + writer.String(inet_ntoa({static_cast(htonl(host.ipv4))})); + } writer.Key("port"); writer.Int(host.port); @@ -1375,4 +1530,222 @@ void Span::serialize_json(RapidJsonWriter &writer) const writer.EndObject(); } -} // namespace zipkin \ No newline at end of file +namespace __impl +{ + +struct Span2 { + enum Kind { + UNKNOWN, + CLIENT, + SERVER, + PRODUCER, + CONSUMER, + }; + + const Span *span = nullptr; + Kind kind = UNKNOWN; + int64_t timestamp = 0, duration = 0; + bool shared = false; + const ::Endpoint *local_endpoint = nullptr; + const ::Endpoint *remote_endpoint = nullptr; + std::vector annotations; + std::vector binary_annotations; + + Span2(const Span *source = nullptr) : span(source) {} + + // Converts the input, parsing RPC annotations into Span2. + static const std::vector from_span(const Span *span); + + const Span& operator*() const { return *span; } + + template + void serialize_json(RapidJsonWriter &writer) const; +}; + +template +void Span2::serialize_json(RapidJsonWriter &writer) const +{ + auto serialize_endpoint = [&writer](const ::Endpoint *host) { + writer.StartObject(); + + writer.Key("serviceName"); + writer.String(host->service_name); + + if (host->__isset.ipv6) { + char buf[INET6_ADDRSTRLEN+1] = {0}; + + inet_ntop(AF_INET6, host->ipv6.c_str(), buf, INET6_ADDRSTRLEN); + + writer.Key("ipv6"); + writer.String(buf); + } else { + writer.Key("ipv4"); + writer.String(inet_ntoa({static_cast(htonl(host->ipv4))})); + } + + writer.Key("port"); + writer.Int(host->port); + + writer.EndObject(); + }; + + auto serialize_value = [&writer](const std::string &data, AnnotationType type) { + std::ostringstream oss; + + switch (type) + { + case AnnotationType::BOOL: + oss << std::boolalpha << *reinterpret_cast(data.c_str()); + break; + + case AnnotationType::I16: + oss << __impl::big_to_native(*reinterpret_cast(data.c_str())); + break; + + case AnnotationType::I32: + oss << __impl::big_to_native(*reinterpret_cast(data.c_str())); + break; + + case AnnotationType::I64: + oss << __impl::big_to_native(*reinterpret_cast(data.c_str())); + break; + + case AnnotationType::DOUBLE: + oss << *reinterpret_cast(data.c_str()); + break; + + case AnnotationType::BYTES: + oss << base64::encode(data); + break; + + case AnnotationType::STRING: + oss << data; + break; + } + + writer.String(oss.str()); + }; + + char str[64]; + + writer.StartObject(); + + writer.Key("traceId"); + if (span->trace_id_high()) + { + writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT SPAN_ID_FMT, span->trace_id_high(), span->trace_id())); + } + else + { + writer.String(str, snprintf(str, sizeof(str), "0000000000000000" SPAN_ID_FMT, span->trace_id())); + } + + writer.Key("name"); + writer.String(span->name()); + + writer.Key("id"); + writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, span->id())); + + if (span->message().__isset.parent_id) + { + writer.Key("parentId"); + writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, span->parent_id())); + } + + switch (kind) { + case CLIENT: + writer.Key("kind"); + writer.String("CLIENT"); + break; + + case SERVER: + writer.Key("kind"); + writer.String("SERVER"); + break; + + case PRODUCER: + writer.Key("kind"); + writer.String("PRODUCER"); + break; + + case CONSUMER: + writer.Key("kind"); + writer.String("CONSUMER"); + break; + + case UNKNOWN: // ignore it + break; + } + + if (span->message().__isset.timestamp) { + writer.Key("timestamp"); + writer.Int64(span->message().timestamp); + } else if (timestamp) { + writer.Key("timestamp"); + writer.Int64(timestamp); + } + + if (span->message().__isset.duration) { + writer.Key("duration"); + writer.Int64(span->message().duration); + } else if (duration) { + writer.Key("duration"); + writer.Int64(duration); + } + + writer.Key("annotations"); + writer.StartArray(); + + for (auto annotation : annotations) + { + writer.StartObject(); + + writer.Key("timestamp"); + writer.Int64(annotation->timestamp); + + writer.Key("value"); + writer.String(annotation->value); + + writer.EndObject(); + } + + writer.EndArray(annotations.size()); + + writer.Key("tags"); + writer.StartObject(); + + for (auto &annotation : binary_annotations) + { + writer.Key(annotation->key.c_str()); + serialize_value(annotation->value, annotation->annotation_type); + } + + writer.EndObject(); + + if (local_endpoint) { + writer.Key("localEndpoint"); + serialize_endpoint(local_endpoint); + } + + if (remote_endpoint) { + writer.Key("remoteEndpoint"); + serialize_endpoint(remote_endpoint); + } + + if (span->debug()) + { + writer.Key("debug"); + writer.Bool(true); + } + + if (shared) { + writer.Key("shared"); + writer.Bool(true); + } + + writer.EndObject(); +} + +} // namespace __impl + +} // namespace zipkin diff --git a/src/zipkinCore.thrift b/src/zipkinCore.thrift index d872b07..dbd7f4a 100644 --- a/src/zipkinCore.thrift +++ b/src/zipkinCore.thrift @@ -74,6 +74,34 @@ const string SERVER_SEND = "ss" * should also log the CLIENT_ADDR. */ const string SERVER_RECV = "sr" +/** + * Message send ("ms") is a request to send a message to a destination, usually + * a broker. This may be the only annotation in a messaging span. If WIRE_SEND + * exists in the same span, it follows this moment and clarifies delays sending + * the message, such as batching. + * + * Unlike RPC annotations like CLIENT_SEND, messaging spans never share a span + * ID. For example, "ms" should always be the parent of "mr". + * + * Annotation.host is not the destination, it is the host which logged the send + * event: the producer. When annotating MESSAGE_SEND, instrumentation should + * also tag the MESSAGE_ADDR. + */ +const string MESSAGE_SEND = "ms" +/** + * A consumer received ("mr") a message from a broker. This may be the only + * annotation in a messaging span. If WIRE_RECV exists in the same span, it + * precedes this moment and clarifies any local queuing delay. + * + * Unlike RPC annotations like SERVER_RECV, messaging spans never share a span + * ID. For example, "mr" should always be a child of "ms" unless it is a root + * span. + * + * Annotation.host is not the broker, it is the host which logged the receive + * event: the consumer. When annotating MESSAGE_RECV, instrumentation should + * also tag the MESSAGE_ADDR. + */ +const string MESSAGE_RECV = "mr" /** * Optionally logs an attempt to send a message on the wire. Multiple wire send * events could indicate network retries. A lag between client or server send @@ -134,7 +162,7 @@ const string HTTP_METHOD = "http.method" * "/resource/{resource_id}". In systems where only equals queries are available, searching for * http/path=/resource won't match if the actual request was /resource/abcd-ff. * - * Historical note: This was commonly expressed as "http.uri" in zipkin, eventhough it was most + * Historical note: This was commonly expressed as "http.uri" in zipkin, even though it was most * often just a path. */ const string HTTP_PATH = "http.path" @@ -230,6 +258,10 @@ const string CLIENT_ADDR = "ca" * different server ip or port. */ const string SERVER_ADDR = "sa" +/** + * Indicates the remote address of a messaging span, usually the broker. + */ +const string MESSAGE_ADDR = "ma" /** * Indicates the network context of a service recording an annotation with two @@ -420,7 +452,7 @@ struct Span { * precise value possible. For example, gettimeofday or syncing nanoTime * against a tick of currentTimeMillis. * - * For compatibilty with instrumentation that precede this field, collectors + * For compatibility with instrumentation that precede this field, collectors * or span stores can derive this via Annotation.timestamp. * For example, SERVER_RECV.timestamp or CLIENT_SEND.timestamp. * diff --git a/src/zipkinDependencies.thrift b/src/zipkinDependencies.thrift new file mode 100644 index 0000000..30ce57d --- /dev/null +++ b/src/zipkinDependencies.thrift @@ -0,0 +1,38 @@ +# Copyright 2013 Twitter Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +namespace java com.twitter.zipkin.thriftjava +#@namespace scala com.twitter.zipkin.thriftscala +namespace rb Zipkin + +struct DependencyLink { + /** parent service name (caller) */ + 1: string parent + /** child service name (callee) */ + 2: string child + # 3: Moments OBSOLETE_duration_moments + /** total traced calls made from parent to child */ + 4: i64 callCount + /** how many calls are known to be errors */ + 5: i64 errorCount + # histogram? +} + +/* An aggregate representation of services paired with every service they call. */ +struct Dependencies { + /** milliseconds from epoch */ + 1: i64 start_ts + /** milliseconds from epoch */ + 2: i64 end_ts + 3: list links +} diff --git a/test/TestMain.cpp b/test/TestMain.cpp index 11035e5..dcdd92b 100644 --- a/test/TestMain.cpp +++ b/test/TestMain.cpp @@ -5,9 +5,10 @@ int main(int argc, char **argv) { + ::google::InitGoogleLogging(argv[0]); ::google::ParseCommandLineFlags(&argc, &argv, false); ::testing::GTEST_FLAG(throw_on_failure) = true; ::testing::InitGoogleMock(&argc, argv); return RUN_ALL_TESTS(); -} \ No newline at end of file +} diff --git a/test/TestSpan.cpp b/test/TestSpan.cpp index 2c44df7..9d3a04e 100644 --- a/test/TestSpan.cpp +++ b/test/TestSpan.cpp @@ -204,8 +204,8 @@ static const char *json_template = R"###({ "binaryAnnotations": [ { "endpoint": { - "serviceName": "host", - "ipv4": "127.0.0.1", + "serviceName": "ipv6_host", + "ipv6": "::1", "port": 80 }, "key": "bool", @@ -255,9 +255,10 @@ TEST(span, serialize_json) addr.sin_addr.s_addr = inet_addr("127.0.0.1"); addr.sin_port = htons(80); zipkin::Endpoint host("host", &addr); + zipkin::Endpoint ipv6_host("ipv6_host", "::1", 80); span.client_send(&host); - span.annotate("bool", true, &host); + span.annotate("bool", true, &ipv6_host); span.annotate("i16", (int16_t)123); span.annotate("i32", (int32_t)123); span.annotate("i64", (int64_t)123); @@ -274,8 +275,70 @@ TEST(span, serialize_json) span.serialize_json(writer); char str[2048] = {0}; - int str_len = snprintf(str, sizeof(str), json_template, span.trace_id_high(), span.trace_id(), span.id(), span.parent_id(), - span.message().annotations[0].timestamp, span.message().timestamp); + int str_len = snprintf(str, sizeof(str), json_template, span.trace_id_high(), span.trace_id(), span.id(), span.parent_id(), span.message().annotations[0].timestamp, span.message().timestamp); + + ASSERT_EQ(std::string(buffer.GetString(), buffer.GetSize()), std::string(str, str_len)); +} + +static const char *json_v2_template = R"###([ + { + "traceId": "%016llx%016llx", + "name": "test", + "id": "%016llx", + "parentId": "%016llx", + "kind": "CLIENT", + "timestamp": %lld, + "annotations": [], + "tags": { + "bool": "true", + "i16": "123", + "i32": "123", + "i64": "123", + "double": "12.3", + "string": "测试", + "bytes": "AQID" + }, + "localEndpoint": { + "serviceName": "host", + "ipv4": "127.0.0.1", + "port": 80 + } + } +])###"; + +TEST(span, serialize_json_v2) +{ + MockTracer tracer; + + zipkin::Span span(&tracer, "test", zipkin::Span::next_id()); + zipkin::Endpoint host("host", "127.0.0.1", 80); + + span.client_send(&host); + span.server_addr(&host); + span.annotate("bool", true, &host); + span.annotate("i16", (int16_t)123); + span.annotate("i32", (int32_t)123); + span.annotate("i64", (int64_t)123); + span.annotate("double", 12.3); + span.annotate("string", std::wstring(L"测试")); + + uint8_t bytes[] = {1, 2, 3}; + + span.annotate("bytes", bytes); + + rapidjson::StringBuffer buffer; + rapidjson::PrettyWriter writer(buffer); + + writer.StartArray(); + + for (auto &span2: zipkin::__impl::Span2::from_span(&span)) { + span2.serialize_json(writer); + } + + writer.EndArray(); + + char str[2048] = {0}; + int str_len = snprintf(str, sizeof(str), json_v2_template, span.trace_id_high(), span.trace_id(), span.id(), span.parent_id(), span.message().timestamp, span.message().annotations[0].timestamp); ASSERT_EQ(std::string(buffer.GetString(), buffer.GetSize()), std::string(str, str_len)); } @@ -327,4 +390,123 @@ TEST(span, annotate_stream) ASSERT_EQ(span.message().annotations.size(), 2); ASSERT_EQ(span.message().binary_annotations.size(), 8); -} \ No newline at end of file +} + +class Span2ConverterTest : public ::testing::Test { +protected: + zipkin::Endpoint frontend, backend, kafka; + + Span2ConverterTest() + : frontend("frontend", "127.0.0.1"), + backend("backend", "192.168.99.101", 9000), + kafka("kafka") + { + } + + virtual void SetUp() {} + + virtual void TearDown() {} +}; + +using zipkin::__impl::Span2; + +TEST_F(Span2ConverterTest, client) +{ + zipkin::Span client(nullptr, "get"); + + client + .with_trace_id("7180c278b62e8f6a216a2aea45d08fc9") + .with_parent_id(0x6b221d5bc9e6496c) + .with_id(0x5b4185666d50f68b) + .with_timestamp(std::chrono::microseconds(1472470996199000)) + .with_duration(std::chrono::microseconds(207000)); + + client.client_send(&frontend, std::chrono::microseconds(1472470996199000)); + auto wire_send = client.wire_send(&frontend, std::chrono::microseconds(1472470996238000)); + auto wire_recv = client.wire_recv(&frontend, std::chrono::microseconds(1472470996403000)); + client.client_recv(&frontend, std::chrono::microseconds(1472470996406000)); + auto http_path = client.http_path("/api", &frontend); + auto client_version = client.annotate("clnt/finagle.version", "6.45.0", &frontend); + client.server_addr(&backend); + + auto spans = Span2::from_span(&client); + + EXPECT_EQ(spans.size(), 1); + + auto simpleClient = spans.front(); + + EXPECT_EQ(simpleClient.kind, Span2::CLIENT); + EXPECT_EQ(*simpleClient.local_endpoint, frontend.host()); + EXPECT_EQ(*simpleClient.remote_endpoint, backend.host()); + EXPECT_EQ(simpleClient.timestamp, 1472470996199000); + EXPECT_EQ(simpleClient.duration, 207000); + + EXPECT_EQ(simpleClient.annotations.size(), 2); + EXPECT_EQ(*simpleClient.annotations[0], *wire_send); + EXPECT_EQ(*simpleClient.annotations[1], *wire_recv); + + EXPECT_EQ(simpleClient.binary_annotations.size(), 2); + EXPECT_EQ(*simpleClient.binary_annotations[0], *http_path); + EXPECT_EQ(*simpleClient.binary_annotations[1], *client_version); +} + +TEST_F(Span2ConverterTest, client_unfinished) +{ + zipkin::Span client(nullptr, "get"); + + client + .with_trace_id("7180c278b62e8f6a216a2aea45d08fc9") + .with_parent_id(0x6b221d5bc9e6496c) + .with_id(0x5b4185666d50f68b) + .with_timestamp(std::chrono::microseconds(1472470996199000)); + + client.client_send(&frontend, std::chrono::microseconds(1472470996199000)); + auto wire_send = client.wire_send(&frontend, std::chrono::microseconds(1472470996238000)); + + auto spans = Span2::from_span(&client); + + EXPECT_EQ(spans.size(), 1); + + auto simpleClient = spans.front(); + + EXPECT_EQ(simpleClient.kind, Span2::CLIENT); + EXPECT_EQ(*simpleClient.local_endpoint, frontend.host()); + EXPECT_EQ(simpleClient.remote_endpoint, nullptr); + EXPECT_EQ(simpleClient.timestamp, 1472470996199000); + EXPECT_EQ(simpleClient.duration, 0); + + EXPECT_EQ(simpleClient.annotations.size(), 1); + EXPECT_EQ(*simpleClient.annotations[0], *wire_send); + + EXPECT_TRUE(simpleClient.binary_annotations.empty()); +} + +TEST_F(Span2ConverterTest, client_kindInferredFromAnnotation) +{ + zipkin::Span client(nullptr, "get"); + + client + .with_trace_id("7180c278b62e8f6a216a2aea45d08fc9") + .with_parent_id(0x6b221d5bc9e6496c) + .with_id(0x5b4185666d50f68b) + .with_timestamp(std::chrono::microseconds(1472470996199000)) + .with_duration(std::chrono::microseconds(1472470996238000 - 1472470996199000)); + + auto client_send = client.client_send(&frontend, std::chrono::microseconds(1472470996199000)); + client.client_recv(&frontend, std::chrono::microseconds(1472470996238000)); + + auto spans = Span2::from_span(&client); + + EXPECT_EQ(spans.size(), 1); + + auto simpleClient = spans.front(); + + EXPECT_EQ(simpleClient.kind, Span2::CLIENT); + EXPECT_EQ(*simpleClient.local_endpoint, frontend.host()); + EXPECT_EQ(simpleClient.remote_endpoint, nullptr); + EXPECT_EQ(simpleClient.timestamp, 1472470996199000); + EXPECT_EQ(simpleClient.duration, 1472470996238000 - 1472470996199000); + + EXPECT_TRUE(simpleClient.annotations.empty()); + EXPECT_TRUE(simpleClient.binary_annotations.empty()); +}