Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] support span serializing and data routing #1878

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions core/constants/TagConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,27 @@ namespace logtail {
const std::string DEFAULT_METRIC_TAG_CONTAINER_IP = DEFAULT_TAG_CONTAINER_IP;
const std::string DEFAULT_METRIC_TAG_IMAGE_NAME = DEFAULT_TAG_IMAGE_NAME;

////////////////////////// TRACE ////////////////////////
const std::string DEFAULT_TRACE_TAG_TRACE_ID = "traceId";
const std::string DEFAULT_TRACE_TAG_SPAN_ID = "spanId";
const std::string DEFAULT_TRACE_TAG_PARENT_ID = "parentSpanId";
const std::string DEFAULT_TRACE_TAG_SPAN_NAME = "spanName";
const std::string DEFAULT_TRACE_TAG_SERVICE_NAME = "serviceName";
const std::string DEFAULT_TRACE_TAG_HOST_NAME = "hostName";
const std::string DEFAULT_TRACE_TAG_START_TIME_NANO = "startTime";
const std::string DEFAULT_TRACE_TAG_END_TIME_NANO = "endTime";
const std::string DEFAULT_TRACE_TAG_DURATION = "duration";
const std::string DEFAULT_TRACE_TAG_ATTRIBUTES = "attributes";
const std::string DEFAULT_TRACE_TAG_RESOURCE = "resources";
const std::string DEFAULT_TRACE_TAG_LINKS = "links";
const std::string DEFAULT_TRACE_TAG_EVENTS = "events";
const std::string DEFAULT_TRACE_TAG_TIMESTAMP = "timestamp";
const std::string DEFAULT_TRACE_TAG_STATUS_CODE = "statusCode";
const std::string DEFAULT_TRACE_TAG_STATUS_MESSAGE = "statusMessage";
const std::string DEFAULT_TRACE_TAG_SPAN_KIND = "kind";
const std::string DEFAULT_TRACE_TAG_TRACE_STATE = "traceState";
// for arms
const std::string DEFAULT_TRACE_TAG_APP_ID = "pid";
const std::string DEFAULT_TRACE_TAG_IP = "ip";

} // namespace logtail
25 changes: 25 additions & 0 deletions core/constants/TagConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,29 @@ namespace logtail {
extern const std::string DEFAULT_METRIC_TAG_CONTAINER_IP;
extern const std::string DEFAULT_METRIC_TAG_IMAGE_NAME;

////////////////////////// TRACE ////////////////////////

extern const std::string DEFAULT_TRACE_TAG_TRACE_ID;
extern const std::string DEFAULT_TRACE_TAG_SPAN_ID;
extern const std::string DEFAULT_TRACE_TAG_PARENT_ID;
extern const std::string DEFAULT_TRACE_TAG_SPAN_NAME;
extern const std::string DEFAULT_TRACE_TAG_SERVICE_NAME;
extern const std::string DEFAULT_TRACE_TAG_HOST_NAME;
extern const std::string DEFAULT_TRACE_TAG_START_TIME_NANO;
extern const std::string DEFAULT_TRACE_TAG_END_TIME_NANO;
extern const std::string DEFAULT_TRACE_TAG_DURATION;
extern const std::string DEFAULT_TRACE_TAG_ATTRIBUTES;
extern const std::string DEFAULT_TRACE_TAG_RESOURCE;
extern const std::string DEFAULT_TRACE_TAG_LINKS;
extern const std::string DEFAULT_TRACE_TAG_EVENTS;
extern const std::string DEFAULT_TRACE_TAG_TIMESTAMP;
extern const std::string DEFAULT_TRACE_TAG_STATUS_CODE;
extern const std::string DEFAULT_TRACE_TAG_STATUS_MESSAGE;
extern const std::string DEFAULT_TRACE_TAG_SPAN_KIND;
extern const std::string DEFAULT_TRACE_TAG_TRACE_STATE;
// for arms
extern const std::string DEFAULT_TRACE_TAG_APP_ID;
extern const std::string DEFAULT_TRACE_TAG_IP;


} // namespace logtail
34 changes: 31 additions & 3 deletions core/models/SpanEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ size_t SpanEvent::SpanLink::DataSize() const {
return mTraceId.size() + mSpanId.size() + mTraceState.size() + mTags.DataSize();
}

#ifdef APSARA_UNIT_TEST_MAIN
Json::Value SpanEvent::SpanLink::ToJson() const {
Json::Value root;
root["traceId"] = mTraceId.to_string();
Expand All @@ -84,14 +83,29 @@ Json::Value SpanEvent::SpanLink::ToJson() const {
root["traceState"] = mTraceState.to_string();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

字段的标准要统一

}
if (!mTags.mInner.empty()) {
Json::Value& tags = root["tags"];
Json::Value& tags = root["attributes"];
for (const auto& tag : mTags.mInner) {
tags[tag.first.to_string()] = tag.second.to_string();
}
}
return root;
}

std::string SpanEvent::SerializeLinksToString() const {
if (mLinks.empty()) {
return "";
}
Json::Value root;
Json::Value jsonLinks(Json::arrayValue);
for (auto& link : mLinks) {
jsonLinks.append(link.ToJson());
}
root["links"] = jsonLinks;
Json::StreamWriterBuilder writer;
return Json::writeString(writer, root);
}

#ifdef APSARA_UNIT_TEST_MAIN
void SpanEvent::SpanLink::FromJson(const Json::Value& value) {
SetTraceId(value["traceId"].asString());
SetSpanId(value["spanId"].asString());
Expand Down Expand Up @@ -155,7 +169,6 @@ size_t SpanEvent::InnerEvent::DataSize() const {
return sizeof(decltype(mTimestampNs)) + mName.size() + mTags.DataSize();
}

#ifdef APSARA_UNIT_TEST_MAIN
Json::Value SpanEvent::InnerEvent::ToJson() const {
Json::Value root;
root["name"] = mName.to_string();
Expand All @@ -169,6 +182,21 @@ Json::Value SpanEvent::InnerEvent::ToJson() const {
return root;
}

std::string SpanEvent::SerializeEventsToString() const {
if (mEvents.empty()) {
return "";
}
Json::Value root;
Json::Value jsonLinks(Json::arrayValue);
for (auto& link : mEvents) {
jsonLinks.append(link.ToJson());
}
root["events"] = jsonLinks;
Json::StreamWriterBuilder writer;
return Json::writeString(writer, root);
}

#ifdef APSARA_UNIT_TEST_MAIN
void SpanEvent::InnerEvent::FromJson(const Json::Value& value) {
SetName(value["name"].asString());
SetTimestampNs(value["timestampNs"].asUInt64());
Expand Down
26 changes: 24 additions & 2 deletions core/models/SpanEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <map>
#include <string>
#include <vector>
#include <json/json.h>

#include "common/memory/SourceBuffer.h"
#include "models/PipelineEvent.h"
Expand Down Expand Up @@ -67,8 +68,8 @@ class SpanEvent : public PipelineEvent {

size_t DataSize() const;

#ifdef APSARA_UNIT_TEST_MAIN
Json::Value ToJson() const;
#ifdef APSARA_UNIT_TEST_MAIN
void FromJson(const Json::Value& value);
#endif

Expand Down Expand Up @@ -107,8 +108,8 @@ class SpanEvent : public PipelineEvent {

size_t DataSize() const;

#ifdef APSARA_UNIT_TEST_MAIN
Json::Value ToJson() const;
#ifdef APSARA_UNIT_TEST_MAIN
void FromJson(const Json::Value& value);
#endif

Expand Down Expand Up @@ -146,6 +147,17 @@ class SpanEvent : public PipelineEvent {
void SetName(const std::string& name);

Kind GetKind() const { return mKind; }
constexpr StringView GetKindString() const {
switch (mKind) {
case Kind::Unspecified: return "unspecified";
case Kind::Internal: return "internal";
case Kind::Server: return "server";
case Kind::Client: return "client";
case Kind::Producer: return "producer";
case Kind::Consumer: return "consumer";
default: return "Unknown";
}
}
void SetKind(Kind kind) { mKind = kind; }

uint64_t GetStartTimeNs() const { return mStartTimeNs; }
Expand All @@ -167,12 +179,22 @@ class SpanEvent : public PipelineEvent {

const std::vector<InnerEvent>& GetEvents() const { return mEvents; }
InnerEvent* AddEvent();
std::string SerializeEventsToString() const;

const std::vector<SpanLink>& GetLinks() const { return mLinks; }
SpanLink* AddLink();
std::string SerializeLinksToString() const;

StatusCode GetStatus() const { return mStatus; }
void SetStatus(StatusCode status) { mStatus = status; }
constexpr StringView GetStatusString() const {
switch (mStatus) {
case StatusCode::Unset: return "UNSET";
case StatusCode::Ok: return "OK";
case StatusCode::Error: return "ERROR";
default: return "UNSET";
}
}

StringView GetScopeTag(StringView key) const;
bool HasScopeTag(StringView key) const;
Expand Down
102 changes: 100 additions & 2 deletions core/pipeline/serializer/SLSSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
#include "pipeline/serializer/SLSSerializer.h"

#include "common/Flags.h"
#include "constants/TagConstants.h"
#include "common/compression/CompressType.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "protobuf/sls/LogGroupSerializer.h"

#include <json/json.h>
#include <array>

DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024);

using namespace std;
Expand Down Expand Up @@ -50,6 +54,16 @@ bool Serializer<vector<CompressedLogGroup>>::DoSerialize(vector<CompressedLogGro
return res;
}

enum SpanCacheIdx {
ATTR_KEY_IDX,
LINK_KEY_IDX,
EVENT_KEY_IDX,
START_TS_KEY_IDX,
END_TS_KEY_IDX,
DURATION_KEY_IDX,
IDX_KEY_MAX,
};

bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) {
if (group.mEvents.empty()) {
errorMsg = "empty event group";
Expand All @@ -68,9 +82,10 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri
// caculate serialized logGroup size first, where some critical results can be cached
vector<size_t> logSZ(group.mEvents.size());
vector<pair<string, size_t>> metricEventContentCache(group.mEvents.size());
vector<array<string, IDX_KEY_MAX>> spanEventContentCache(group.mEvents.size());
size_t logGroupSZ = 0;
switch (eventType) {
case PipelineEvent::Type::LOG:
case PipelineEvent::Type::LOG:{
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<LogEvent>();
if (e.Empty()) {
Expand All @@ -83,7 +98,8 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri
logGroupSZ += GetLogSize(contentSZ, enableNs && e.GetTimestampNanosecond(), logSZ[i]);
}
break;
case PipelineEvent::Type::METRIC:
}
case PipelineEvent::Type::METRIC:{
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<MetricEvent>();
if (e.Is<UntypedSingleValue>()) {
Expand All @@ -107,7 +123,50 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri
logGroupSZ += GetLogSize(contentSZ, false, logSZ[i]);
}
break;
}
case PipelineEvent::Type::SPAN:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<SpanEvent>();
size_t contentSZ = 0;
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_TRACE_ID.size(), e.GetTraceId().size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_ID.size(), e.GetSpanId().size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_PARENT_ID.size(), e.GetParentSpanId().size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_NAME.size(), e.GetName().size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_KIND.size(), e.GetKindString().size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_STATUS_CODE.size(), e.GetStatusString().size());
//
// set tags and scope tags
Json::Value jsonVal;
for (auto it = e.TagsBegin(); it != e.TagsEnd(); ++it) {
jsonVal[it->first.to_string()] = it->second.to_string();
}
for (auto it = e.ScopeTagsBegin(); it != e.ScopeTagsEnd(); ++it) {
jsonVal[it->first.to_string()] = it->second.to_string();
}
Json::StreamWriterBuilder writer;
std::string attrString = Json::writeString(writer, jsonVal);
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_ATTRIBUTES.size(), attrString.size());
spanEventContentCache[i][ATTR_KEY_IDX] = std::move(attrString);

auto linkString = e.SerializeLinksToString();
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_LINKS.size(), linkString.size());
spanEventContentCache[i][LINK_KEY_IDX] = std::move(linkString);
auto eventString = e.SerializeEventsToString();
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_EVENTS.size(), eventString.size());
spanEventContentCache[i][EVENT_KEY_IDX] = std::move(eventString);

// time related
auto startTsNs = std::to_string(e.GetStartTimeNs());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_START_TIME_NANO.size(), startTsNs.size());
spanEventContentCache[i][START_TS_KEY_IDX] = std::move(startTsNs);
auto endTsNs = std::to_string(e.GetEndTimeNs());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_END_TIME_NANO.size(), endTsNs.size());
spanEventContentCache[i][END_TS_KEY_IDX] = std::move(endTsNs);
auto durationNs = std::to_string(e.GetEndTimeNs() - e.GetStartTimeNs());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_DURATION.size(), durationNs.size());
spanEventContentCache[i][DURATION_KEY_IDX] = std::move(durationNs);
logGroupSZ += GetLogSize(contentSZ, false, logSZ[i]);
}
break;
default:
break;
Expand Down Expand Up @@ -164,6 +223,45 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri
}
break;
case PipelineEvent::Type::SPAN:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& spanEvent = group.mEvents[i].Cast<SpanEvent>();

serializer.StartToAddLog(logSZ[i]);
// set trace_id span_id span_kind status etc
serializer.AddLogContent(DEFAULT_TRACE_TAG_TRACE_ID, spanEvent.GetTraceId());
serializer.AddLogContent(DEFAULT_TRACE_TAG_SPAN_ID, spanEvent.GetSpanId());
serializer.AddLogContent(DEFAULT_TRACE_TAG_PARENT_ID, spanEvent.GetParentSpanId());
// span_name
serializer.AddLogContent(DEFAULT_TRACE_TAG_SPAN_NAME, spanEvent.GetName());
// span_kind
serializer.AddLogContent(DEFAULT_TRACE_TAG_SPAN_KIND, spanEvent.GetKindString());
// status_code
serializer.AddLogContent(DEFAULT_TRACE_TAG_STATUS_CODE, spanEvent.GetStatusString());

//// TODO @qianlu.kk enterprise
// ip
// auto ipView = spanEvent.GetTag("ip");
// if (ipView.size()) {
// serializer.AddLogContent(DEFAULT_TRACE_TAG_IP, ipView.to_string());
// }
// pid
// auto appIdItr = group.mTags.mInner.find("pid");
// if (appIdItr != group.mTags.mInner.end()) {
// serializer.AddLogContent(DEFAULT_TRACE_TAG_APP_ID, appIdItr->second.to_string());
// }
serializer.AddLogContent(DEFAULT_TRACE_TAG_ATTRIBUTES, spanEventContentCache[i][ATTR_KEY_IDX]);

serializer.AddLogTime(spanEvent.GetTimestamp());
serializer.AddLogContent(DEFAULT_TRACE_TAG_LINKS, spanEventContentCache[i][LINK_KEY_IDX]);
serializer.AddLogContent(DEFAULT_TRACE_TAG_EVENTS, spanEventContentCache[i][EVENT_KEY_IDX]);

// start_time
serializer.AddLogContent(DEFAULT_TRACE_TAG_START_TIME_NANO, spanEventContentCache[i][START_TS_KEY_IDX]);
// end_time
serializer.AddLogContent(DEFAULT_TRACE_TAG_END_TIME_NANO, spanEventContentCache[i][END_TS_KEY_IDX]);
// duration
serializer.AddLogContent(DEFAULT_TRACE_TAG_DURATION, spanEventContentCache[i][DURATION_KEY_IDX]);
}
break;
default:
break;
Expand Down
9 changes: 5 additions & 4 deletions core/plugin/flusher/sls/DiskBufferWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) {
bufferMeta.set_shardhashkey(data->mShardHashKey);
bufferMeta.set_compresstype(ConvertCompressType(flusher->GetCompressType()));
bufferMeta.set_telemetrytype(flusher->mTelemetryType);
bufferMeta.set_subpath(flusher->mSubpath);
string encodedInfo;
bufferMeta.SerializeToString(&encodedInfo);

Expand Down Expand Up @@ -753,23 +754,23 @@ SendResult DiskBufferWriter::SendToNetSync(sdk::Client* sendClient,
bufferMeta.compresstype(),
logData,
bufferMeta.rawsize(),
bufferMeta.shardhashkey());
bufferMeta.shardhashkey(), false, bufferMeta.has_subpath() ? bufferMeta.subpath(): "");
else
sendClient->PostLogStoreLogs(bufferMeta.project(),
bufferMeta.logstore(),
bufferMeta.compresstype(),
logData,
bufferMeta.rawsize());
bufferMeta.rawsize(), "", false, bufferMeta.has_subpath() ? bufferMeta.subpath(): "");
} else {
if (bufferMeta.has_shardhashkey() && !bufferMeta.shardhashkey().empty())
sendClient->PostLogStoreLogPackageList(bufferMeta.project(),
bufferMeta.logstore(),
bufferMeta.compresstype(),
logData,
bufferMeta.shardhashkey());
bufferMeta.shardhashkey(), bufferMeta.has_subpath() ? bufferMeta.subpath(): "");
else
sendClient->PostLogStoreLogPackageList(
bufferMeta.project(), bufferMeta.logstore(), bufferMeta.compresstype(), logData);
bufferMeta.project(), bufferMeta.logstore(), bufferMeta.compresstype(), logData, "", bufferMeta.has_subpath() ? bufferMeta.subpath(): "");
}
return SEND_OK;
} catch (sdk::LOGException& ex) {
Expand Down
Loading