Skip to content

Commit

Permalink
Feat: MetricEvent support UntypedMultiValues (#1908)
Browse files Browse the repository at this point in the history
* add UntypedMultiValues

* polish

* polish

* polish

* polish

* polish

* fix bug

* polish
  • Loading branch information
Takuka0311 authored Nov 25, 2024
1 parent 802d034 commit 2275def
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 69 deletions.
32 changes: 29 additions & 3 deletions core/models/MetricEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ MetricEvent::MetricEvent(PipelineEventGroup* ptr) : PipelineEvent(Type::METRIC,
}

unique_ptr<PipelineEvent> MetricEvent::Copy() const {
return make_unique<MetricEvent>(*this);
unique_ptr<MetricEvent> newPtr = make_unique<MetricEvent>(*this);
if (newPtr->Is<UntypedMultiDoubleValues>()) {
newPtr->MutableValue<UntypedMultiDoubleValues>()->ResetPipelineEvent(newPtr.get());
}
return newPtr;
}

void MetricEvent::Reset() {
Expand Down Expand Up @@ -88,7 +92,21 @@ Json::Value MetricEvent::ToJson(bool enableEventMeta) const {
root["timestampNanosecond"] = static_cast<int32_t>(GetTimestampNanosecond().value());
}
root["name"] = mName.to_string();
root["value"] = MetricValueToJson(mValue);
root["value"] = Json::Value();
visit(
[&](auto&& arg) {
using T = decay_t<decltype(arg)>;
if constexpr (is_same_v<T, UntypedSingleValue>) {
root["value"]["type"] = "untyped_single_value";
root["value"]["detail"] = get<UntypedSingleValue>(mValue).ToJson();
} else if constexpr (is_same_v<T, UntypedMultiDoubleValues>) {
root["value"]["type"] = "untyped_multi_double_values";
root["value"]["detail"] = get<UntypedMultiDoubleValues>(mValue).ToJson();
} else if constexpr (is_same_v<T, monostate>) {
root["value"]["type"] = "unknown";
}
},
mValue);
if (!mTags.mInner.empty()) {
Json::Value& tags = root["tags"];
for (const auto& tag : mTags.mInner) {
Expand All @@ -106,7 +124,15 @@ bool MetricEvent::FromJson(const Json::Value& root) {
}
SetName(root["name"].asString());
const Json::Value& value = root["value"];
SetValue(JsonToMetricValue(value["type"].asString(), value["detail"]));
if (value["type"].asString() == "untyped_single_value") {
UntypedSingleValue v;
v.FromJson(value["detail"]);
SetValue(v);
} else if (value["type"].asString() == "untyped_multi_double_values") {
UntypedMultiDoubleValues v(this);
v.FromJson(value["detail"]);
SetValue(v);
}
if (root.isMember("tags")) {
Json::Value tags = root["tags"];
for (const auto& key : tags.getMemberNames()) {
Expand Down
13 changes: 13 additions & 0 deletions core/models/MetricEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class MetricEvent : public PipelineEvent {
return std::get_if<T>(&mValue);
}

template <typename T>
constexpr std::add_pointer_t<T> MutableValue() noexcept {
return std::get_if<T>(&mValue);
}

template <typename T>
void SetValue(const T& value) {
mValue = value;
Expand All @@ -58,6 +63,14 @@ class MetricEvent : public PipelineEvent {
mValue = T{std::forward<Args>(args)...};
}

void SetValue(const std::map<StringView, double>& multiDoubleValues) {
mValue = UntypedMultiDoubleValues{multiDoubleValues, this};
}

void SetValue(const UntypedMultiDoubleValues& multiDoubleValues) {
mValue = UntypedMultiDoubleValues{multiDoubleValues.mValues, this};
}

StringView GetTag(StringView key) const;
bool HasTag(StringView key) const;
void SetTag(StringView key, StringView val);
Expand Down
85 changes: 66 additions & 19 deletions core/models/MetricValue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,62 @@ using namespace std;

namespace logtail {

bool UntypedMultiDoubleValues::GetValue(StringView key, double& val) const {
if (mValues.find(key) != mValues.end()) {
val = mValues.at(key);
return true;
}
return false;
}

bool UntypedMultiDoubleValues::HasValue(StringView key) const {
return mValues.find(key) != mValues.end();
}

void UntypedMultiDoubleValues::SetValue(const std::string& key, double val) {
if (mMetricEventPtr) {
SetValueNoCopy(mMetricEventPtr->GetSourceBuffer()->CopyString(key), val);
}
}

void UntypedMultiDoubleValues::SetValue(StringView key, double val) {
if (mMetricEventPtr) {
SetValueNoCopy(mMetricEventPtr->GetSourceBuffer()->CopyString(key), val);
}
}

void UntypedMultiDoubleValues::SetValueNoCopy(const StringBuffer& key, double val) {
SetValueNoCopy(StringView(key.data, key.size), val);
}

void UntypedMultiDoubleValues::SetValueNoCopy(StringView key, double val) {
mValues[key] = val;
}

void UntypedMultiDoubleValues::DelValue(StringView key) {
mValues.erase(key);
}

std::map<StringView, double>::const_iterator UntypedMultiDoubleValues::ValusBegin() const {
return mValues.begin();
}

std::map<StringView, double>::const_iterator UntypedMultiDoubleValues::ValusEnd() const {
return mValues.end();
}

size_t UntypedMultiDoubleValues::ValusSize() const {
return mValues.size();
}

size_t UntypedMultiDoubleValues::DataSize() const {
size_t totalSize = sizeof(UntypedMultiDoubleValues);
for (const auto& pair : mValues) {
totalSize += pair.first.size() + sizeof(pair.second);
}
return totalSize;
}

size_t DataSize(const MetricValue& value) {
return visit(
[](auto&& arg) {
Expand All @@ -42,29 +98,20 @@ void UntypedSingleValue::FromJson(const Json::Value& value) {
mValue = value.asFloat();
}

Json::Value MetricValueToJson(const MetricValue& value) {
Json::Value UntypedMultiDoubleValues::ToJson() const {
Json::Value res;
visit(
[&](auto&& arg) {
using T = decay_t<decltype(arg)>;
if constexpr (is_same_v<T, UntypedSingleValue>) {
res["type"] = "untyped_single_value";
res["detail"] = get<UntypedSingleValue>(value).ToJson();
} else if constexpr (is_same_v<T, monostate>) {
res["type"] = "unknown";
}
},
value);
for (auto metric : mValues) {
res[metric.first.to_string()] = metric.second;
}
return res;
}

MetricValue JsonToMetricValue(const string& type, const Json::Value& detail) {
if (type == "untyped_single_value") {
UntypedSingleValue v;
v.FromJson(detail);
return v;
} else {
return MetricValue();
void UntypedMultiDoubleValues::FromJson(const Json::Value& value) {
mValues.clear();
for (Json::Value::const_iterator itr = value.begin(); itr != value.end(); ++itr) {
if (itr->asDouble()) {
SetValue(itr.key().asString(), itr->asDouble());
}
}
}
#endif
Expand Down
37 changes: 33 additions & 4 deletions core/models/MetricValue.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <map>
#include <variant>

#ifdef APSARA_UNIT_TEST_MAIN
Expand All @@ -24,6 +25,10 @@
#include <string>
#endif

#include "common/memory/SourceBuffer.h"
#include "models/PipelineEvent.h"
#include "models/StringView.h"

namespace logtail {

struct UntypedSingleValue {
Expand All @@ -37,13 +42,37 @@ struct UntypedSingleValue {
#endif
};

using MetricValue = std::variant<std::monostate, UntypedSingleValue>;
struct UntypedMultiDoubleValues {
std::map<StringView, double> mValues;
PipelineEvent* mMetricEventPtr;

size_t DataSize(const MetricValue& value);
UntypedMultiDoubleValues(PipelineEvent* ptr) : mMetricEventPtr(ptr) {}
UntypedMultiDoubleValues(std::map<StringView, double> values, PipelineEvent* ptr)
: mValues(values), mMetricEventPtr(ptr) {}

bool GetValue(StringView key, double& val) const;
bool HasValue(StringView key) const;
void SetValue(const std::string& key, double val);
void SetValue(StringView key, double val);
void SetValueNoCopy(const StringBuffer& key, double val);
void SetValueNoCopy(StringView key, double val);
void DelValue(StringView key);

std::map<StringView, double>::const_iterator ValusBegin() const;
std::map<StringView, double>::const_iterator ValusEnd() const;
size_t ValusSize() const;

size_t DataSize() const;
void ResetPipelineEvent(PipelineEvent* ptr) { mMetricEventPtr = ptr; }

#ifdef APSARA_UNIT_TEST_MAIN
Json::Value MetricValueToJson(const MetricValue& value);
MetricValue JsonToMetricValue(const std::string& type, const Json::Value& detail);
Json::Value ToJson() const;
void FromJson(const Json::Value& value);
#endif
};

using MetricValue = std::variant<std::monostate, UntypedSingleValue, UntypedMultiDoubleValues>;

size_t DataSize(const MetricValue& value);

} // namespace logtail
4 changes: 2 additions & 2 deletions core/runner/FlusherRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,13 @@ void FlusherRunner::Dispatch(SenderQueueItem* item) {
if (!BOOL_FLAG(enable_full_drain_mode) && Application::GetInstance()->IsExiting()
&& item->mFlusher->Name() == "flusher_sls") {
DiskBufferWriter::GetInstance()->PushToDiskBuffer(item, 3);
SenderQueueManager::GetInstance()->RemoveItem(item->mFlusher->GetQueueKey(), item);
SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item);
} else {
PushToHttpSink(item);
}
break;
default:
SenderQueueManager::GetInstance()->RemoveItem(item->mFlusher->GetQueueKey(), item);
SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item);
break;
}
}
Expand Down
Loading

0 comments on commit 2275def

Please sign in to comment.