From 95188b5f20b83f65346f2f358bdaacddf520d511 Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Wed, 6 Nov 2024 12:08:39 +0800 Subject: [PATCH 1/5] feat: support span serializer Signed-off-by: qianlu.kk --- core/constants/TagConstants.cpp | 23 ++++ core/constants/TagConstants.h | 25 +++++ core/models/SpanEvent.cpp | 34 +++++- core/models/SpanEvent.h | 26 ++++- core/pipeline/serializer/SLSSerializer.cpp | 105 +++++++++++++++++- .../serializer/SLSSerializerUnittest.cpp | 55 ++++++++- 6 files changed, 258 insertions(+), 10 deletions(-) diff --git a/core/constants/TagConstants.cpp b/core/constants/TagConstants.cpp index edd7ea4e55..4115eeda52 100644 --- a/core/constants/TagConstants.cpp +++ b/core/constants/TagConstants.cpp @@ -63,4 +63,27 @@ namespace logtail { const std::string DEFAULT_METRIC_TAG_CONTAINER_IP = DEFAULT_TAG_CONTAINER_IP; const std::string DEFAULT_METRIC_TAG_IMAGE_NAME = DEFAULT_TAG_IMAGE_NAME; +////////////////////////// TRACE //////////////////////// + const std::string DEFAULT_TRACE_TAG_TRACE_ID = "traceId"; + const std::string DEFAULT_TRACE_TAG_SPAN_ID = "spanId"; + const std::string DEFAULT_TRACE_TAG_PARENT_ID = "parentSpanId"; + const std::string DEFAULT_TRACE_TAG_SPAN_NAME = "spanName"; + const std::string DEFAULT_TRACE_TAG_SERVICE_NAME = "serviceName"; + const std::string DEFAULT_TRACE_TAG_HOST_NAME = "hostName"; + const std::string DEFAULT_TRACE_TAG_START_TIME_NANO = "startTime"; + const std::string DEFAULT_TRACE_TAG_END_TIME_NANO = "endTime"; + const std::string DEFAULT_TRACE_TAG_DURATION = "duration"; + const std::string DEFAULT_TRACE_TAG_ATTRIBUTES = "attributes"; + const std::string DEFAULT_TRACE_TAG_RESOURCE = "resources"; + const std::string DEFAULT_TRACE_TAG_LINKS = "links"; + const std::string DEFAULT_TRACE_TAG_EVENTS = "events"; + const std::string DEFAULT_TRACE_TAG_TIMESTAMP = "timestamp"; + const std::string DEFAULT_TRACE_TAG_STATUS_CODE = "statusCode"; + const std::string DEFAULT_TRACE_TAG_STATUS_MESSAGE = "statusMessage"; + const std::string DEFAULT_TRACE_TAG_SPAN_KIND = "kind"; + const std::string DEFAULT_TRACE_TAG_TRACE_STATE = "traceState"; + // for arms + const std::string DEFAULT_TRACE_TAG_APP_ID = "pid"; + const std::string DEFAULT_TRACE_TAG_IP = "ip"; + } // namespace logtail \ No newline at end of file diff --git a/core/constants/TagConstants.h b/core/constants/TagConstants.h index 52d4213961..52e125761e 100644 --- a/core/constants/TagConstants.h +++ b/core/constants/TagConstants.h @@ -44,4 +44,29 @@ namespace logtail { extern const std::string DEFAULT_METRIC_TAG_CONTAINER_IP; extern const std::string DEFAULT_METRIC_TAG_IMAGE_NAME; +////////////////////////// TRACE //////////////////////// + + extern const std::string DEFAULT_TRACE_TAG_TRACE_ID; + extern const std::string DEFAULT_TRACE_TAG_SPAN_ID; + extern const std::string DEFAULT_TRACE_TAG_PARENT_ID; + extern const std::string DEFAULT_TRACE_TAG_SPAN_NAME; + extern const std::string DEFAULT_TRACE_TAG_SERVICE_NAME; + extern const std::string DEFAULT_TRACE_TAG_HOST_NAME; + extern const std::string DEFAULT_TRACE_TAG_START_TIME_NANO; + extern const std::string DEFAULT_TRACE_TAG_END_TIME_NANO; + extern const std::string DEFAULT_TRACE_TAG_DURATION; + extern const std::string DEFAULT_TRACE_TAG_ATTRIBUTES; + extern const std::string DEFAULT_TRACE_TAG_RESOURCE; + extern const std::string DEFAULT_TRACE_TAG_LINKS; + extern const std::string DEFAULT_TRACE_TAG_EVENTS; + extern const std::string DEFAULT_TRACE_TAG_TIMESTAMP; + extern const std::string DEFAULT_TRACE_TAG_STATUS_CODE; + extern const std::string DEFAULT_TRACE_TAG_STATUS_MESSAGE; + extern const std::string DEFAULT_TRACE_TAG_SPAN_KIND; + extern const std::string DEFAULT_TRACE_TAG_TRACE_STATE; + // for arms + extern const std::string DEFAULT_TRACE_TAG_APP_ID; + extern const std::string DEFAULT_TRACE_TAG_IP; + + } // namespace logtail \ No newline at end of file diff --git a/core/models/SpanEvent.cpp b/core/models/SpanEvent.cpp index 812ccbe54b..d9e2067685 100644 --- a/core/models/SpanEvent.cpp +++ b/core/models/SpanEvent.cpp @@ -75,7 +75,6 @@ size_t SpanEvent::SpanLink::DataSize() const { return mTraceId.size() + mSpanId.size() + mTraceState.size() + mTags.DataSize(); } -#ifdef APSARA_UNIT_TEST_MAIN Json::Value SpanEvent::SpanLink::ToJson() const { Json::Value root; root["traceId"] = mTraceId.to_string(); @@ -84,7 +83,7 @@ Json::Value SpanEvent::SpanLink::ToJson() const { root["traceState"] = mTraceState.to_string(); } if (!mTags.mInner.empty()) { - Json::Value& tags = root["tags"]; + Json::Value& tags = root["attributes"]; for (const auto& tag : mTags.mInner) { tags[tag.first.to_string()] = tag.second.to_string(); } @@ -92,6 +91,21 @@ Json::Value SpanEvent::SpanLink::ToJson() const { return root; } +std::string SpanEvent::SerializeLinksToString() const { + if (mLinks.empty()) { + return ""; + } + Json::Value root; + Json::Value jsonLinks(Json::arrayValue); + for (auto& link : mLinks) { + jsonLinks.append(link.ToJson()); + } + root["links"] = jsonLinks; + Json::StreamWriterBuilder writer; + return Json::writeString(writer, root); +} + +#ifdef APSARA_UNIT_TEST_MAIN void SpanEvent::SpanLink::FromJson(const Json::Value& value) { SetTraceId(value["traceId"].asString()); SetSpanId(value["spanId"].asString()); @@ -155,7 +169,6 @@ size_t SpanEvent::InnerEvent::DataSize() const { return sizeof(decltype(mTimestampNs)) + mName.size() + mTags.DataSize(); } -#ifdef APSARA_UNIT_TEST_MAIN Json::Value SpanEvent::InnerEvent::ToJson() const { Json::Value root; root["name"] = mName.to_string(); @@ -169,6 +182,21 @@ Json::Value SpanEvent::InnerEvent::ToJson() const { return root; } +std::string SpanEvent::SerializeEventsToString() const { + if (mEvents.empty()) { + return ""; + } + Json::Value root; + Json::Value jsonLinks(Json::arrayValue); + for (auto& link : mEvents) { + jsonLinks.append(link.ToJson()); + } + root["events"] = jsonLinks; + Json::StreamWriterBuilder writer; + return Json::writeString(writer, root); +} + +#ifdef APSARA_UNIT_TEST_MAIN void SpanEvent::InnerEvent::FromJson(const Json::Value& value) { SetName(value["name"].asString()); SetTimestampNs(value["timestampNs"].asUInt64()); diff --git a/core/models/SpanEvent.h b/core/models/SpanEvent.h index 584d64a5d6..b2aa2160c0 100644 --- a/core/models/SpanEvent.h +++ b/core/models/SpanEvent.h @@ -19,6 +19,7 @@ #include #include #include +#include #include "common/memory/SourceBuffer.h" #include "models/PipelineEvent.h" @@ -67,8 +68,8 @@ class SpanEvent : public PipelineEvent { size_t DataSize() const; -#ifdef APSARA_UNIT_TEST_MAIN Json::Value ToJson() const; +#ifdef APSARA_UNIT_TEST_MAIN void FromJson(const Json::Value& value); #endif @@ -107,8 +108,8 @@ class SpanEvent : public PipelineEvent { size_t DataSize() const; -#ifdef APSARA_UNIT_TEST_MAIN Json::Value ToJson() const; +#ifdef APSARA_UNIT_TEST_MAIN void FromJson(const Json::Value& value); #endif @@ -146,6 +147,17 @@ class SpanEvent : public PipelineEvent { void SetName(const std::string& name); Kind GetKind() const { return mKind; } + constexpr StringView GetKindString() const { + switch (mKind) { + case Kind::Unspecified: return "unspecified"; + case Kind::Internal: return "internal"; + case Kind::Server: return "server"; + case Kind::Client: return "client"; + case Kind::Producer: return "producer"; + case Kind::Consumer: return "consumer"; + default: return "Unknown"; + } + } void SetKind(Kind kind) { mKind = kind; } uint64_t GetStartTimeNs() const { return mStartTimeNs; } @@ -167,12 +179,22 @@ class SpanEvent : public PipelineEvent { const std::vector& GetEvents() const { return mEvents; } InnerEvent* AddEvent(); + std::string SerializeEventsToString() const; const std::vector& GetLinks() const { return mLinks; } SpanLink* AddLink(); + std::string SerializeLinksToString() const; StatusCode GetStatus() const { return mStatus; } void SetStatus(StatusCode status) { mStatus = status; } + constexpr StringView GetStatusString() const { + switch (mStatus) { + case StatusCode::Unset: return "UNSET"; + case StatusCode::Ok: return "OK"; + case StatusCode::Error: return "ERROR"; + default: return "UNSET"; + } + } StringView GetScopeTag(StringView key) const; bool HasScopeTag(StringView key) const; diff --git a/core/pipeline/serializer/SLSSerializer.cpp b/core/pipeline/serializer/SLSSerializer.cpp index 3f48dc961a..5d1663a88a 100644 --- a/core/pipeline/serializer/SLSSerializer.cpp +++ b/core/pipeline/serializer/SLSSerializer.cpp @@ -15,10 +15,14 @@ #include "pipeline/serializer/SLSSerializer.h" #include "common/Flags.h" +#include "constants/TagConstants.h" #include "common/compression/CompressType.h" #include "plugin/flusher/sls/FlusherSLS.h" #include "protobuf/sls/LogGroupSerializer.h" +#include +#include + DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024); using namespace std; @@ -68,9 +72,10 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri // caculate serialized logGroup size first, where some critical results can be cached vector logSZ(group.mEvents.size()); vector> metricEventContentCache(group.mEvents.size()); + vector> spanEventContentCache(group.mEvents.size()); size_t logGroupSZ = 0; switch (eventType) { - case PipelineEvent::Type::LOG: + case PipelineEvent::Type::LOG:{ for (size_t i = 0; i < group.mEvents.size(); ++i) { const auto& e = group.mEvents[i].Cast(); if (e.Empty()) { @@ -83,7 +88,8 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri logGroupSZ += GetLogSize(contentSZ, enableNs && e.GetTimestampNanosecond(), logSZ[i]); } break; - case PipelineEvent::Type::METRIC: + } + case PipelineEvent::Type::METRIC:{ for (size_t i = 0; i < group.mEvents.size(); ++i) { const auto& e = group.mEvents[i].Cast(); if (e.Is()) { @@ -107,7 +113,50 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri logGroupSZ += GetLogSize(contentSZ, false, logSZ[i]); } break; + } case PipelineEvent::Type::SPAN: + for (size_t i = 0; i < group.mEvents.size(); ++i) { + const auto& e = group.mEvents[i].Cast(); + size_t contentSZ = 0; + contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_TRACE_ID.size(), e.GetTraceId().size()); + contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_ID.size(), e.GetSpanId().size()); + contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_PARENT_ID.size(), e.GetParentSpanId().size()); + contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_NAME.size(), e.GetName().size()); + contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_KIND.size(), e.GetKindString().size()); + contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_STATUS_CODE.size(), e.GetStatusString().size()); + // + // set tags and scope tags + Json::Value jsonVal; + for (auto it = e.TagsBegin(); it != e.TagsEnd(); ++it) { + jsonVal[it->first.to_string()] = it->second.to_string(); + } + for (auto it = e.ScopeTagsBegin(); it != e.ScopeTagsEnd(); ++it) { + jsonVal[it->first.to_string()] = it->second.to_string(); + } + Json::StreamWriterBuilder writer; + std::string attrString = Json::writeString(writer, jsonVal); + contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_ATTRIBUTES.size(), attrString.size()); + spanEventContentCache[i][0] = std::move(attrString); + + auto linkString = e.SerializeLinksToString(); + contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_LINKS.size(), linkString.size()); + spanEventContentCache[i][1] = std::move(linkString); + auto eventString = e.SerializeEventsToString(); + contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_EVENTS.size(), eventString.size()); + spanEventContentCache[i][2] = std::move(eventString); + + // time related + auto startTsNs = std::to_string(e.GetStartTimeNs()); + contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_START_TIME_NANO.size(), startTsNs.size()); + spanEventContentCache[i][3] = std::move(startTsNs); + auto endTsNs = std::to_string(e.GetEndTimeNs()); + contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_END_TIME_NANO.size(), endTsNs.size()); + spanEventContentCache[i][4] = std::move(endTsNs); + auto durationNs = std::to_string(e.GetEndTimeNs() - e.GetStartTimeNs()); + contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_DURATION.size(), durationNs.size()); + spanEventContentCache[i][5] = std::move(durationNs); + logGroupSZ += GetLogSize(contentSZ, false, logSZ[i]); + } break; default: break; @@ -164,6 +213,58 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri } break; case PipelineEvent::Type::SPAN: + for (size_t i = 0; i < group.mEvents.size(); ++i) { + const auto& spanEvent = group.mEvents[i].Cast(); + + serializer.StartToAddLog(logSZ[i]); + // set trace_id span_id span_kind status etc + serializer.AddLogContent(DEFAULT_TRACE_TAG_TRACE_ID, spanEvent.GetTraceId()); + serializer.AddLogContent(DEFAULT_TRACE_TAG_SPAN_ID, spanEvent.GetSpanId()); + serializer.AddLogContent(DEFAULT_TRACE_TAG_PARENT_ID, spanEvent.GetParentSpanId()); + // span_name + serializer.AddLogContent(DEFAULT_TRACE_TAG_SPAN_NAME, spanEvent.GetName()); + // span_kind + serializer.AddLogContent(DEFAULT_TRACE_TAG_SPAN_KIND, spanEvent.GetKindString()); + // status_code + serializer.AddLogContent(DEFAULT_TRACE_TAG_STATUS_CODE, spanEvent.GetStatusString()); + + //// TODO @qianlu.kk enterprise + // ip + // auto ipView = spanEvent.GetTag("ip"); + // if (ipView.size()) { + // serializer.AddLogContent(DEFAULT_TRACE_TAG_IP, ipView.to_string()); + // } + // pid + // auto appIdItr = group.mTags.mInner.find("pid"); + // if (appIdItr != group.mTags.mInner.end()) { + // serializer.AddLogContent(DEFAULT_TRACE_TAG_APP_ID, appIdItr->second.to_string()); + // } + + + // set tags and scope tags + // Json::Value jsonVal; + // for (auto it = spanEvent.TagsBegin(); it != spanEvent.TagsEnd(); ++it) { + // jsonVal[it->first.to_string()] = it->second.to_string(); + // } + // for (auto it = spanEvent.ScopeTagsBegin(); it != spanEvent.ScopeTagsEnd(); ++it) { + // jsonVal[it->first.to_string()] = it->second.to_string(); + // } + // Json::StreamWriterBuilder writer; + // std::string attrString = Json::writeString(writer, jsonVal); + serializer.AddLogContent(DEFAULT_TRACE_TAG_ATTRIBUTES, spanEventContentCache[i][0]); + + serializer.AddLogTime(spanEvent.GetTimestamp()); + serializer.AddLogContent(DEFAULT_TRACE_TAG_LINKS, spanEventContentCache[i][1]); + serializer.AddLogContent(DEFAULT_TRACE_TAG_EVENTS, spanEventContentCache[i][2]); + + // start_time + serializer.AddLogContent(DEFAULT_TRACE_TAG_START_TIME_NANO, spanEventContentCache[i][3]); + // end_time + serializer.AddLogContent(DEFAULT_TRACE_TAG_END_TIME_NANO, spanEventContentCache[i][4]); + // duration + serializer.AddLogContent(DEFAULT_TRACE_TAG_DURATION, spanEventContentCache[i][5]); + + } break; default: break; diff --git a/core/unittest/serializer/SLSSerializerUnittest.cpp b/core/unittest/serializer/SLSSerializerUnittest.cpp index 837e48ff8b..1fa9d26463 100644 --- a/core/unittest/serializer/SLSSerializerUnittest.cpp +++ b/core/unittest/serializer/SLSSerializerUnittest.cpp @@ -40,6 +40,7 @@ class SLSSerializerUnittest : public ::testing::Test { BatchedEvents CreateBatchedLogEvents(bool enableNanosecond, bool emptyContent); BatchedEvents CreateBatchedMetricEvents(bool enableNanosecond, uint32_t nanoTimestamp, bool emptyValue, bool onlyOneTag); + BatchedEvents CreateBatchedSpanEvents(); static unique_ptr sFlusher; @@ -99,6 +100,19 @@ void SLSSerializerUnittest::TestSerializeEventGroup() { APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedLogEvents(false, true), res, errorMsg)); } } + { + // span + string res, errorMsg; + LOG_INFO(sLogger, ("begin", "span test")); + auto events = CreateBatchedSpanEvents(); + APSARA_TEST_EQUAL(events.mEvents.size(), 1); + APSARA_TEST_TRUE(events.mEvents[0]->GetType() == PipelineEvent::Type::SPAN); + APSARA_TEST_TRUE(serializer.DoSerialize(std::move(events), res, errorMsg)); + LOG_INFO(sLogger, ("res", res)); + sls_logs::LogGroup logGroup; + APSARA_TEST_TRUE(logGroup.ParseFromString(res)); + APSARA_TEST_EQUAL(1, logGroup.logs_size()); + } { // metric { @@ -212,9 +226,6 @@ void SLSSerializerUnittest::TestSerializeEventGroup() { APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedMetricEvents(false, 0, true, false), res, errorMsg)); } } - { - // span - } { // log group exceed size limit INT32_FLAG(max_send_log_group_size) = 0; @@ -315,6 +326,44 @@ BatchedEvents SLSSerializerUnittest::CreateBatchedMetricEvents(bool enableNanose return batch; } +BatchedEvents SLSSerializerUnittest::CreateBatchedSpanEvents() { + PipelineEventGroup group(make_shared()); + group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); + group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); + group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "aaa"); + group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "bbb"); + auto now = std::chrono::system_clock::now(); + auto duration = now.time_since_epoch(); + auto seconds = std::chrono::duration_cast(duration).count(); + auto nano = std::chrono::duration_cast(duration).count(); + StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); + group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); + group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); + SpanEvent* spanEvent = group.AddSpanEvent(); + spanEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); + spanEvent->SetTag(std::string("workloadKind"), std::string("faceless")); + spanEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); + spanEvent->SetTag(std::string("host"), std::string("10.54.0.33")); + spanEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local/1")); + spanEvent->SetTag(std::string("rpcType"), std::string("25")); + spanEvent->SetTag(std::string("callType"), std::string("http-client")); + spanEvent->SetTag(std::string("statusCode"), std::string("200")); + spanEvent->SetTag(std::string("version"), std::string("HTTP1.1")); + spanEvent->SetName("/oneagent/qianlu/local/1"); + spanEvent->SetKind(SpanEvent::Kind::Client); + spanEvent->SetSpanId("span-1-2-3-4-5"); + spanEvent->SetTraceId("trace-1-2-3-4-5"); + spanEvent->SetStartTimeNs(nano - 5e9); + spanEvent->SetEndTimeNs(nano); + spanEvent->SetTimestamp(seconds); + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + return batch; +} + UNIT_TEST_CASE(SLSSerializerUnittest, TestSerializeEventGroup) UNIT_TEST_CASE(SLSSerializerUnittest, TestSerializeEventGroupList) From 4de542cc32377ef419fec7acaaff14c4de6aa90f Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Thu, 7 Nov 2024 11:40:25 +0800 Subject: [PATCH 2/5] support send to arms backend Signed-off-by: qianlu.kk --- core/pipeline/serializer/SLSSerializer.cpp | 49 +++++----- core/plugin/flusher/sls/FlusherSLS.cpp | 103 ++++++++++++++------ core/plugin/flusher/sls/FlusherSLS.h | 1 + core/sdk/Client.cpp | 78 +++++++++++++-- core/sdk/Client.h | 25 ++++- core/unittest/pipeline/PipelineUnittest.cpp | 70 +++++++++++++ 6 files changed, 260 insertions(+), 66 deletions(-) diff --git a/core/pipeline/serializer/SLSSerializer.cpp b/core/pipeline/serializer/SLSSerializer.cpp index 5d1663a88a..90c241e4d3 100644 --- a/core/pipeline/serializer/SLSSerializer.cpp +++ b/core/pipeline/serializer/SLSSerializer.cpp @@ -54,6 +54,16 @@ bool Serializer>::DoSerialize(vector logSZ(group.mEvents.size()); vector> metricEventContentCache(group.mEvents.size()); - vector> spanEventContentCache(group.mEvents.size()); + vector> spanEventContentCache(group.mEvents.size()); size_t logGroupSZ = 0; switch (eventType) { case PipelineEvent::Type::LOG:{ @@ -136,25 +146,25 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri Json::StreamWriterBuilder writer; std::string attrString = Json::writeString(writer, jsonVal); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_ATTRIBUTES.size(), attrString.size()); - spanEventContentCache[i][0] = std::move(attrString); + spanEventContentCache[i][ATTR_KEY_IDX] = std::move(attrString); auto linkString = e.SerializeLinksToString(); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_LINKS.size(), linkString.size()); - spanEventContentCache[i][1] = std::move(linkString); + spanEventContentCache[i][LINK_KEY_IDX] = std::move(linkString); auto eventString = e.SerializeEventsToString(); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_EVENTS.size(), eventString.size()); - spanEventContentCache[i][2] = std::move(eventString); + spanEventContentCache[i][EVENT_KEY_IDX] = std::move(eventString); // time related auto startTsNs = std::to_string(e.GetStartTimeNs()); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_START_TIME_NANO.size(), startTsNs.size()); - spanEventContentCache[i][3] = std::move(startTsNs); + spanEventContentCache[i][START_TS_KEY_IDX] = std::move(startTsNs); auto endTsNs = std::to_string(e.GetEndTimeNs()); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_END_TIME_NANO.size(), endTsNs.size()); - spanEventContentCache[i][4] = std::move(endTsNs); + spanEventContentCache[i][END_TS_KEY_IDX] = std::move(endTsNs); auto durationNs = std::to_string(e.GetEndTimeNs() - e.GetStartTimeNs()); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_DURATION.size(), durationNs.size()); - spanEventContentCache[i][5] = std::move(durationNs); + spanEventContentCache[i][DURATION_KEY_IDX] = std::move(durationNs); logGroupSZ += GetLogSize(contentSZ, false, logSZ[i]); } break; @@ -239,31 +249,18 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri // if (appIdItr != group.mTags.mInner.end()) { // serializer.AddLogContent(DEFAULT_TRACE_TAG_APP_ID, appIdItr->second.to_string()); // } - - - // set tags and scope tags - // Json::Value jsonVal; - // for (auto it = spanEvent.TagsBegin(); it != spanEvent.TagsEnd(); ++it) { - // jsonVal[it->first.to_string()] = it->second.to_string(); - // } - // for (auto it = spanEvent.ScopeTagsBegin(); it != spanEvent.ScopeTagsEnd(); ++it) { - // jsonVal[it->first.to_string()] = it->second.to_string(); - // } - // Json::StreamWriterBuilder writer; - // std::string attrString = Json::writeString(writer, jsonVal); - serializer.AddLogContent(DEFAULT_TRACE_TAG_ATTRIBUTES, spanEventContentCache[i][0]); + serializer.AddLogContent(DEFAULT_TRACE_TAG_ATTRIBUTES, spanEventContentCache[i][ATTR_KEY_IDX]); serializer.AddLogTime(spanEvent.GetTimestamp()); - serializer.AddLogContent(DEFAULT_TRACE_TAG_LINKS, spanEventContentCache[i][1]); - serializer.AddLogContent(DEFAULT_TRACE_TAG_EVENTS, spanEventContentCache[i][2]); + serializer.AddLogContent(DEFAULT_TRACE_TAG_LINKS, spanEventContentCache[i][LINK_KEY_IDX]); + serializer.AddLogContent(DEFAULT_TRACE_TAG_EVENTS, spanEventContentCache[i][EVENT_KEY_IDX]); // start_time - serializer.AddLogContent(DEFAULT_TRACE_TAG_START_TIME_NANO, spanEventContentCache[i][3]); + serializer.AddLogContent(DEFAULT_TRACE_TAG_START_TIME_NANO, spanEventContentCache[i][START_TS_KEY_IDX]); // end_time - serializer.AddLogContent(DEFAULT_TRACE_TAG_END_TIME_NANO, spanEventContentCache[i][4]); + serializer.AddLogContent(DEFAULT_TRACE_TAG_END_TIME_NANO, spanEventContentCache[i][END_TS_KEY_IDX]); // duration - serializer.AddLogContent(DEFAULT_TRACE_TAG_DURATION, spanEventContentCache[i][5]); - + serializer.AddLogContent(DEFAULT_TRACE_TAG_DURATION, spanEventContentCache[i][DURATION_KEY_IDX]); } break; default: diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 9f5b4cd08b..93c16f92bd 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -318,18 +318,6 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mContext->GetRegion()); } - // Logstore - if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { - PARAM_ERROR_RETURN(mContext->GetLogger(), - mContext->GetAlarm(), - errorMsg, - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); - } - #ifdef __ENTERPRISE__ if (EnterpriseConfigProvider::GetInstance()->IsDataServerPrivateCloud()) { mRegion = STRING_FLAG(default_region_name); @@ -407,19 +395,75 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); - } else if (telemetryType == "metrics") { - mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS - : sls_logs::SLS_TELEMETRY_TYPE_LOGS; - } else if (!telemetryType.empty() && telemetryType != "logs") { - PARAM_WARNING_DEFAULT(mContext->GetLogger(), - mContext->GetAlarm(), - "string param TelemetryType is not valid", - "logs", - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); + } + + if (telemetryType == "arms") { + // Parse Match segment + const char* key = "Match"; + const Json::Value* itr = config.find(key, key + strlen(key)); + if (!itr) { + // Error + LOG_WARNING(sLogger, ("invalid config!", "telemetry arms need add match tags!")); + return false; + } + // Type + string type; + // Key + std::string tagKey; + // Value + std::string tagValue; + const std::set supportDataTypes = { + "trace", + "metric", + "agent_info", + }; + if (!itr->isObject() || !GetMandatoryStringParam(*itr, "Type", type, errorMsg) || type != "tag" || + !GetMandatoryStringParam(*itr, "Key", tagKey, errorMsg) || + !GetMandatoryStringParam(*itr, "Value", tagValue, errorMsg) || + tagKey != "data_type" || !supportDataTypes.count(tagValue)) { + // error + LOG_WARNING(sLogger, ("invalid config!", "telemetry arms need add match tags!")("type",type)("key", tagKey)("value", tagValue)); + return false; + } + if (tagValue == "trace") { + mSubpath = "/apm/trace/arms/v1/trace_log"; + mLogstore = "__arms_default_trace__"; + LOG_WARNING(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); + } else if (tagValue == "metric") { + mSubpath = "/apm/metric/arms/v1/metric_log"; + mLogstore = "__arms_default_metric__"; + LOG_WARNING(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); + } else if (tagValue == "agent_info") { + mSubpath = "/apm/metadata/arms/v1/meta_log"; + mLogstore = "__arms_default_agentinfo__"; + LOG_WARNING(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); + } + } else { + // Logstore + if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { + PARAM_ERROR_RETURN(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } + if (telemetryType == "metrics") { + mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS + : sls_logs::SLS_TELEMETRY_TYPE_LOGS; + } else if (!telemetryType.empty() && telemetryType != "logs") { + PARAM_WARNING_DEFAULT(mContext->GetLogger(), + mContext->GetAlarm(), + "string param TelemetryType is not valid", + "logs", + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } } // Batch @@ -625,7 +669,7 @@ unique_ptr FlusherSLS::BuildRequest(SenderQueueItem* item) cons ConvertCompressType(GetCompressType()), data->mData, data->mRawSize, - item); + item, "", 0, false, mSubpath); } else { auto& exactlyOnceCpt = data->mExactlyOnceCheckpoint; int64_t hashKeySeqID = exactlyOnceCpt ? exactlyOnceCpt->data.sequence_id() : sdk::kInvalidHashKeySeqID; @@ -636,20 +680,20 @@ unique_ptr FlusherSLS::BuildRequest(SenderQueueItem* item) cons data->mRawSize, item, data->mShardHashKey, - hashKeySeqID); + hashKeySeqID, false, mSubpath); } } } else { if (data->mShardHashKey.empty()) return sendClient->CreatePostLogStoreLogPackageListRequest( - mProject, data->mLogstore, ConvertCompressType(GetCompressType()), data->mData, item); + mProject, data->mLogstore, ConvertCompressType(GetCompressType()), data->mData, item, "", mSubpath); else return sendClient->CreatePostLogStoreLogPackageListRequest(mProject, data->mLogstore, ConvertCompressType(GetCompressType()), data->mData, item, - data->mShardHashKey); + data->mShardHashKey, mSubpath); } } @@ -657,6 +701,7 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) if (mSendDoneCnt) { mSendDoneCnt->Add(1); } + LOG_INFO(sLogger, ("statusCode", response.GetStatusCode()) ("body", *response.GetBody())); SLSResponse slsResponse; if (AppConfig::GetInstance()->IsResponseVerificationEnabled() && !IsSLSResponse(response)) { slsResponse.mStatusCode = 0; diff --git a/core/plugin/flusher/sls/FlusherSLS.h b/core/plugin/flusher/sls/FlusherSLS.h index 9fb043d9cf..6c75ac2506 100644 --- a/core/plugin/flusher/sls/FlusherSLS.h +++ b/core/plugin/flusher/sls/FlusherSLS.h @@ -75,6 +75,7 @@ class FlusherSLS : public HttpFlusher { std::string mProject; std::string mLogstore; + std::string mSubpath; std::string mRegion; std::string mEndpoint; std::string mAliuid; diff --git a/core/sdk/Client.cpp b/core/sdk/Client.cpp index bc1cc16465..c7f197a672 100644 --- a/core/sdk/Client.cpp +++ b/core/sdk/Client.cpp @@ -221,7 +221,8 @@ namespace sdk { const std::string& compressedLogGroup, uint32_t rawSize, const std::string& hashKey, - bool isTimeSeries) { + bool isTimeSeries, + const std::string& subpath) { map httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_PROTOBUF; if (!mKeyProvider.empty()) { @@ -231,6 +232,8 @@ namespace sdk { httpHeader[X_LOG_COMPRESSTYPE] = Client::GetCompressTypeString(compressType); if (isTimeSeries) { return SynPostMetricStoreLogs(project, logstore, compressedLogGroup, httpHeader); + } else if (subpath.size()) { + return SynPostARMSBackendLogs(project,subpath, compressedLogGroup, httpHeader); } else { return SynPostLogStoreLogs(project, logstore, compressedLogGroup, httpHeader, hashKey); } @@ -240,7 +243,8 @@ namespace sdk { const std::string& logstore, sls_logs::SlsCompressType compressType, const std::string& packageListData, - const std::string& hashKey) { + const std::string& hashKey, + const std::string& subpath) { map httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_PROTOBUF; if (!mKeyProvider.empty()) { @@ -249,7 +253,12 @@ namespace sdk { httpHeader[X_LOG_MODE] = LOG_MODE_BATCH_GROUP; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(packageListData.size()); httpHeader[X_LOG_COMPRESSTYPE] = Client::GetCompressTypeString(compressType); - return SynPostLogStoreLogs(project, logstore, packageListData, httpHeader, hashKey); + if (subpath.size()) { + return SynPostARMSBackendLogs(project,subpath, packageListData, httpHeader); + } else { + return SynPostLogStoreLogs(project, logstore, packageListData, httpHeader, hashKey); + } + } unique_ptr Client::CreatePostLogStoreLogsRequest(const std::string& project, @@ -260,7 +269,8 @@ namespace sdk { SenderQueueItem* item, const std::string& hashKey, int64_t hashKeySeqID, - bool isTimeSeries) { + bool isTimeSeries, + const std::string& subpath) { map httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_PROTOBUF; if (!mKeyProvider.empty()) { @@ -271,6 +281,10 @@ namespace sdk { if (isTimeSeries) { return CreateAsynPostMetricStoreLogsRequest( project, logstore, compressedLogGroup, httpHeader,item); + } else if (subpath.size()){ + return CreateAsynPostARMSBackendRequest( + project, subpath, compressedLogGroup, httpHeader, item + ); } else { return CreateAsynPostLogStoreLogsRequest( project, logstore, compressedLogGroup, httpHeader, hashKey, hashKeySeqID, item); @@ -283,7 +297,8 @@ namespace sdk { sls_logs::SlsCompressType compressType, const std::string& packageListData, SenderQueueItem* item, - const std::string& hashKey) { + const std::string& hashKey, + const std::string& subpath) { map httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_PROTOBUF; if (!mKeyProvider.empty()) { @@ -292,8 +307,12 @@ namespace sdk { httpHeader[X_LOG_MODE] = LOG_MODE_BATCH_GROUP; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(packageListData.size()); httpHeader[X_LOG_COMPRESSTYPE] = Client::GetCompressTypeString(compressType); - return CreateAsynPostLogStoreLogsRequest( - project, logstore, packageListData, httpHeader, hashKey, kInvalidHashKeySeqID, item); + if (subpath.size()) { + return CreateAsynPostARMSBackendRequest(project, subpath, packageListData, httpHeader, item); + } else { + return CreateAsynPostLogStoreLogsRequest( + project, logstore, packageListData, httpHeader, hashKey, kInvalidHashKeySeqID, item); + } } void Client::SendRequest(const std::string& project, @@ -380,6 +399,34 @@ namespace sdk { HTTP_POST, mUsingHTTPS, host, mPort, operation, queryString, httpHeader, body, item); } + std::unique_ptr + Client::CreateAsynPostARMSBackendRequest(const std::string& project, + const std::string& subpath, + const std::string& body, + std::map& httpHeader, + SenderQueueItem* item) { + LOG_INFO(sLogger, ("entering, subpath", subpath) ("project", project)); + string operation = subpath; + httpHeader[CONTENT_MD5] = CalcMD5(body); + + map parameterList; + string host = GetHost(project); + SetCommonHeader(httpHeader, (int32_t)(body.length()), project); + string signature = GetUrlSignature(HTTP_POST, operation, httpHeader, parameterList, body, GetAccessKey()); + httpHeader[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + GetAccessKeyId() + ':' + signature; + string queryString; + GetQueryString(parameterList, queryString); + LOG_INFO(sLogger, + ("host", host) ("operation", operation) ("mUsingHTTPS", mUsingHTTPS) + ("Authorization", httpHeader[AUTHORIZATION]) + ("X_LOG_MODE", httpHeader[X_LOG_MODE]) + ("X_LOG_COMPRESSTYPE", httpHeader[X_LOG_COMPRESSTYPE]) + ("X_LOG_BODYRAWSIZE", httpHeader[X_LOG_BODYRAWSIZE])); + + return make_unique( + HTTP_POST, mUsingHTTPS, host, mPort, operation, queryString, httpHeader, body, item); + } + PostLogStoreLogsResponse Client::PingSLSServer(const std::string& project, const std::string& logstore, std::string* realIpPtr) { sls_logs::LogGroup logGroup; @@ -442,6 +489,23 @@ namespace sdk { return ret; } + PostLogStoreLogsResponse Client::SynPostARMSBackendLogs(const std::string& project, + const std::string& subpath, + const std::string& body, + std::map& httpHeader, + std::string* realIpPtr) { + string operation = subpath; + httpHeader[CONTENT_MD5] = CalcMD5(body); + map parameterList; + HttpMessage httpResponse; + SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse, realIpPtr); + PostLogStoreLogsResponse ret; + ret.bodyBytes = (int32_t)body.size(); + ret.statusCode = httpResponse.statusCode; + ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; + return ret; + } + PostLogStoreLogsResponse Client::PostLogUsingWebTracking(const std::string& project, const std::string& logstore, sls_logs::SlsCompressType compressType, diff --git a/core/sdk/Client.h b/core/sdk/Client.h index 54a6136c07..2fae2a05d0 100644 --- a/core/sdk/Client.h +++ b/core/sdk/Client.h @@ -101,7 +101,8 @@ namespace sdk { const std::string& compressedLogGroup, uint32_t rawSize, const std::string& hashKey = "", - bool isTimeSeries = false); + bool isTimeSeries = false, + const std::string& subpath = ""); PostLogStoreLogsResponse PostMetricStoreLogs(const std::string& project, const std::string& logstore, @@ -122,7 +123,8 @@ namespace sdk { const std::string& logstore, sls_logs::SlsCompressType compressType, const std::string& packageListData, - const std::string& hashKey = ""); + const std::string& hashKey = "", + const std::string& subpath = ""); /** Async Put data to LOG service. Unsuccessful opertaion will cause an LOGException. * @param project The project name * @param logstore The logstore name @@ -139,7 +141,8 @@ namespace sdk { SenderQueueItem* item, const std::string& hashKey = "", int64_t hashKeySeqID = kInvalidHashKeySeqID, - bool isTimeSeries = false); + bool isTimeSeries = false, + const std::string& subpath = ""); /** Async Put metrics data to SLS metricstore. Unsuccessful opertaion will cause an LOGException. * @param project The project name * @param logstore The logstore name @@ -170,7 +173,8 @@ namespace sdk { sls_logs::SlsCompressType compressType, const std::string& packageListData, SenderQueueItem* item, - const std::string& hashKey = ""); + const std::string& hashKey = "", + const std::string& subpath = ""); PostLogStoreLogsResponse PostLogUsingWebTracking(const std::string& project, const std::string& logstore, @@ -209,6 +213,13 @@ namespace sdk { std::map& httpHeader, SenderQueueItem* item); + std::unique_ptr + CreateAsynPostARMSBackendRequest(const std::string& project, + const std::string& subpath, + const std::string& body, + std::map& httpHeader, + SenderQueueItem* item); + // PingSLSServer sends a trivial data packet to SLS for some inner purposes. PostLogStoreLogsResponse PingSLSServer(const std::string& project, const std::string& logstore, std::string* realIpPtr = NULL); @@ -226,6 +237,12 @@ namespace sdk { std::map& httpHeader, std::string* realIpPtr = NULL); + PostLogStoreLogsResponse SynPostARMSBackendLogs(const std::string& project, + const std::string& subpath, + const std::string& body, + std::map& httpHeader, + std::string* realIpPtr = NULL); + void SetCommonHeader(std::map& httpHeader, int32_t contentLength, const std::string& project = ""); diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index 1cf92420d5..a6e8b4e5bf 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -51,6 +51,7 @@ class PipelineUnittest : public ::testing::Test { void TestFlushBatch() const; void TestInProcessingCount() const; void TestWaitAllItemsInProcessFinished() const; + void TestMultiFlusherAndRouter() const; protected: static void SetUpTestCase() { @@ -2291,6 +2292,74 @@ void PipelineUnittest::OnInitVariousTopology() const { APSARA_TEST_FALSE(config->Parse()); } +void PipelineUnittest::TestMultiFlusherAndRouter() const { + unique_ptr configJson; + string configStr, errorMsg; + unique_ptr config; + unique_ptr pipeline; + // new pipeline + configStr = R"( + { + "global": { + "ProcessPriority": 1 + }, + "inputs": [ + { + "Type": "input_file", + "FilePaths": [ + "/home/test.log" + ] + } + ], + "flushers": [ + { + "Type": "flusher_sls", + "TelemetryType": "arms", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "trace" + } + }, + { + "Type": "flusher_sls", + "TelemetryType": "arms", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "metric" + } + }, + { + "Type": "flusher_sls", + "TelemetryType": "arms", + "Project": "test_project", + "Region": "test_region", + "Endpoint": "test_endpoint", + "Match": { + "Type": "tag", + "Key": "data_type", + "Value": "agent_info" + } + } + ] + } + )"; + configJson.reset(new Json::Value()); + APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); + config.reset(new PipelineConfig(configName, std::move(configJson))); + APSARA_TEST_TRUE(config->Parse()); + pipeline.reset(new Pipeline()); + APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); + +} + void PipelineUnittest::TestProcessQueue() const { unique_ptr configJson; string configStr, errorMsg; @@ -2930,6 +2999,7 @@ UNIT_TEST_CASE(PipelineUnittest, TestSend) UNIT_TEST_CASE(PipelineUnittest, TestFlushBatch) UNIT_TEST_CASE(PipelineUnittest, TestInProcessingCount) UNIT_TEST_CASE(PipelineUnittest, TestWaitAllItemsInProcessFinished) +UNIT_TEST_CASE(PipelineUnittest, TestMultiFlusherAndRouter) } // namespace logtail From 54d1f809901f4778fb2edcf779c244ae23be010a Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Tue, 12 Nov 2024 19:19:10 +0800 Subject: [PATCH 3/5] support disk buffer send data by using subpath --- core/plugin/flusher/sls/DiskBufferWriter.cpp | 9 +++++---- core/plugin/flusher/sls/FlusherSLS.cpp | 2 +- core/protobuf/sls/logtail_buffer_meta.proto | 1 + 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index 6be224f86a..213f5548c7 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -663,6 +663,7 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) { bufferMeta.set_shardhashkey(data->mShardHashKey); bufferMeta.set_compresstype(ConvertCompressType(flusher->GetCompressType())); bufferMeta.set_telemetrytype(flusher->mTelemetryType); + bufferMeta.set_subpath(flusher->mSubpath); string encodedInfo; bufferMeta.SerializeToString(&encodedInfo); @@ -753,23 +754,23 @@ SendResult DiskBufferWriter::SendToNetSync(sdk::Client* sendClient, bufferMeta.compresstype(), logData, bufferMeta.rawsize(), - bufferMeta.shardhashkey()); + bufferMeta.shardhashkey(), false, bufferMeta.has_subpath() ? bufferMeta.subpath(): ""); else sendClient->PostLogStoreLogs(bufferMeta.project(), bufferMeta.logstore(), bufferMeta.compresstype(), logData, - bufferMeta.rawsize()); + bufferMeta.rawsize(), "", false, bufferMeta.has_subpath() ? bufferMeta.subpath(): ""); } else { if (bufferMeta.has_shardhashkey() && !bufferMeta.shardhashkey().empty()) sendClient->PostLogStoreLogPackageList(bufferMeta.project(), bufferMeta.logstore(), bufferMeta.compresstype(), logData, - bufferMeta.shardhashkey()); + bufferMeta.shardhashkey(), bufferMeta.has_subpath() ? bufferMeta.subpath(): ""); else sendClient->PostLogStoreLogPackageList( - bufferMeta.project(), bufferMeta.logstore(), bufferMeta.compresstype(), logData); + bufferMeta.project(), bufferMeta.logstore(), bufferMeta.compresstype(), logData, "", bufferMeta.has_subpath() ? bufferMeta.subpath(): ""); } return SEND_OK; } catch (sdk::LOGException& ex) { diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 93c16f92bd..59de4fd420 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -434,7 +434,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mLogstore = "__arms_default_metric__"; LOG_WARNING(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); } else if (tagValue == "agent_info") { - mSubpath = "/apm/metadata/arms/v1/meta_log"; + mSubpath = "/apm/metadata/arms/v1/meta_log/AgentInfo"; mLogstore = "__arms_default_agentinfo__"; LOG_WARNING(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); } diff --git a/core/protobuf/sls/logtail_buffer_meta.proto b/core/protobuf/sls/logtail_buffer_meta.proto index dc2639e997..5135850e8d 100644 --- a/core/protobuf/sls/logtail_buffer_meta.proto +++ b/core/protobuf/sls/logtail_buffer_meta.proto @@ -28,4 +28,5 @@ message LogtailBufferMeta optional string shardhashkey = 7; optional SlsCompressType compresstype = 8; optional SlsTelemetryType telemetrytype = 9; + optional string subpath = 10; } From 44c1bd67fe147e3c9436b7f082ba4890426ade41 Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Wed, 13 Nov 2024 16:43:44 +0800 Subject: [PATCH 4/5] change subpath for metadata --- core/plugin/flusher/sls/FlusherSLS.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 59de4fd420..27ec2e1636 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -434,7 +434,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mLogstore = "__arms_default_metric__"; LOG_WARNING(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); } else if (tagValue == "agent_info") { - mSubpath = "/apm/metadata/arms/v1/meta_log/AgentInfo"; + mSubpath = "/apm/meta/arms/v1/meta_log/AgentInfo"; mLogstore = "__arms_default_agentinfo__"; LOG_WARNING(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); } From 8013bacaac0a23e9de76270b057ea9c3c0fe5339 Mon Sep 17 00:00:00 2001 From: "qianlu.kk" Date: Wed, 20 Nov 2024 15:59:44 +0800 Subject: [PATCH 5/5] change telemetry type --- core/plugin/flusher/sls/FlusherSLS.cpp | 90 ++++++++------------- core/unittest/pipeline/PipelineUnittest.cpp | 6 +- 2 files changed, 38 insertions(+), 58 deletions(-) diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 27ec2e1636..3a2cd80508 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -397,48 +397,19 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mContext->GetRegion()); } - if (telemetryType == "arms") { - // Parse Match segment - const char* key = "Match"; - const Json::Value* itr = config.find(key, key + strlen(key)); - if (!itr) { - // Error - LOG_WARNING(sLogger, ("invalid config!", "telemetry arms need add match tags!")); - return false; - } - // Type - string type; - // Key - std::string tagKey; - // Value - std::string tagValue; - const std::set supportDataTypes = { - "trace", - "metric", - "agent_info", - }; - if (!itr->isObject() || !GetMandatoryStringParam(*itr, "Type", type, errorMsg) || type != "tag" || - !GetMandatoryStringParam(*itr, "Key", tagKey, errorMsg) || - !GetMandatoryStringParam(*itr, "Value", tagValue, errorMsg) || - tagKey != "data_type" || !supportDataTypes.count(tagValue)) { - // error - LOG_WARNING(sLogger, ("invalid config!", "telemetry arms need add match tags!")("type",type)("key", tagKey)("value", tagValue)); - return false; - } - if (tagValue == "trace") { - mSubpath = "/apm/trace/arms/v1/trace_log"; - mLogstore = "__arms_default_trace__"; - LOG_WARNING(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); - } else if (tagValue == "metric") { - mSubpath = "/apm/metric/arms/v1/metric_log"; - mLogstore = "__arms_default_metric__"; - LOG_WARNING(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); - } else if (tagValue == "agent_info") { - mSubpath = "/apm/meta/arms/v1/meta_log/AgentInfo"; - mLogstore = "__arms_default_agentinfo__"; - LOG_WARNING(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); - } - } else { + if (telemetryType == "arms_agentinfo") { + mSubpath = "/apm/meta/arms/v1/meta_log/AgentInfo"; + mLogstore = "__arms_default_agentinfo__"; + LOG_DEBUG(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); + } else if (telemetryType == "arms_metrics") { + mSubpath = "/apm/metric/arms/v1/metric_log"; + mLogstore = "__arms_default_metric__"; + LOG_DEBUG(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); + } else if (telemetryType == "arms_traces") { + mSubpath = "/apm/trace/arms/v1/trace_log"; + mLogstore = "__arms_default_trace__"; + LOG_DEBUG(sLogger, ("successfully set subpath", mSubpath) ("logstore", mLogstore)); + } else if (telemetryType == "metrics") { // Logstore if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { PARAM_ERROR_RETURN(mContext->GetLogger(), @@ -450,20 +421,29 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mContext->GetLogstoreName(), mContext->GetRegion()); } - if (telemetryType == "metrics") { - mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS + mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS : sls_logs::SLS_TELEMETRY_TYPE_LOGS; - } else if (!telemetryType.empty() && telemetryType != "logs") { - PARAM_WARNING_DEFAULT(mContext->GetLogger(), - mContext->GetAlarm(), - "string param TelemetryType is not valid", - "logs", - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); + } else if ((!telemetryType.empty() && telemetryType != "logs")) { + // Logstore + if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { + PARAM_ERROR_RETURN(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); } + PARAM_WARNING_DEFAULT(mContext->GetLogger(), + mContext->GetAlarm(), + "string param TelemetryType is not valid", + "logs", + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); } // Batch @@ -701,7 +681,6 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) if (mSendDoneCnt) { mSendDoneCnt->Add(1); } - LOG_INFO(sLogger, ("statusCode", response.GetStatusCode()) ("body", *response.GetBody())); SLSResponse slsResponse; if (AppConfig::GetInstance()->IsResponseVerificationEnabled() && !IsSLSResponse(response)) { slsResponse.mStatusCode = 0; @@ -722,6 +701,7 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) } auto data = static_cast(item); + LOG_INFO(sLogger, ("statusCode", response.GetStatusCode()) ("body", *response.GetBody()) ("logstore", data->mLogstore)); string configName = HasContext() ? GetContext().GetConfigName() : ""; bool isProfileData = GetProfileSender()->IsProfileData(mRegion, mProject, data->mLogstore); int32_t curTime = time(NULL); diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index a6e8b4e5bf..43448d7985 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -2314,7 +2314,7 @@ void PipelineUnittest::TestMultiFlusherAndRouter() const { "flushers": [ { "Type": "flusher_sls", - "TelemetryType": "arms", + "TelemetryType": "arms_traces", "Project": "test_project", "Region": "test_region", "Endpoint": "test_endpoint", @@ -2326,7 +2326,7 @@ void PipelineUnittest::TestMultiFlusherAndRouter() const { }, { "Type": "flusher_sls", - "TelemetryType": "arms", + "TelemetryType": "arms_metrics", "Project": "test_project", "Region": "test_region", "Endpoint": "test_endpoint", @@ -2338,7 +2338,7 @@ void PipelineUnittest::TestMultiFlusherAndRouter() const { }, { "Type": "flusher_sls", - "TelemetryType": "arms", + "TelemetryType": "arms_agentinfo", "Project": "test_project", "Region": "test_region", "Endpoint": "test_endpoint",