Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: MetricEvent support UntypedMultiValues #1908

Merged
merged 9 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions core/models/MetricEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ MetricEvent::MetricEvent(PipelineEventGroup* ptr) : PipelineEvent(Type::METRIC,
}

unique_ptr<PipelineEvent> MetricEvent::Copy() const {
return make_unique<MetricEvent>(*this);
unique_ptr<MetricEvent> newPtr = make_unique<MetricEvent>(*this);
if (newPtr->Is<UntypedMultiDoubleValues>()) {
newPtr->MutableValue<UntypedMultiDoubleValues>()->ResetPipelineEvent(newPtr.get());
}
return newPtr;
}

void MetricEvent::Reset() {
Expand Down Expand Up @@ -88,7 +92,21 @@ Json::Value MetricEvent::ToJson(bool enableEventMeta) const {
root["timestampNanosecond"] = static_cast<int32_t>(GetTimestampNanosecond().value());
}
root["name"] = mName.to_string();
root["value"] = MetricValueToJson(mValue);
root["value"] = Json::Value();
visit(
[&](auto&& arg) {
using T = decay_t<decltype(arg)>;
if constexpr (is_same_v<T, UntypedSingleValue>) {
root["value"]["type"] = "untyped_single_value";
root["value"]["detail"] = get<UntypedSingleValue>(mValue).ToJson();
} else if constexpr (is_same_v<T, UntypedMultiDoubleValues>) {
root["value"]["type"] = "untyped_multi_double_values";
root["value"]["detail"] = get<UntypedMultiDoubleValues>(mValue).ToJson();
} else if constexpr (is_same_v<T, monostate>) {
root["value"]["type"] = "unknown";
}
},
mValue);
if (!mTags.mInner.empty()) {
Json::Value& tags = root["tags"];
for (const auto& tag : mTags.mInner) {
Expand All @@ -106,7 +124,15 @@ bool MetricEvent::FromJson(const Json::Value& root) {
}
SetName(root["name"].asString());
const Json::Value& value = root["value"];
SetValue(JsonToMetricValue(value["type"].asString(), value["detail"]));
if (value["type"].asString() == "untyped_single_value") {
UntypedSingleValue v;
v.FromJson(value["detail"]);
SetValue(v);
} else if (value["type"].asString() == "untyped_multi_double_values") {
UntypedMultiDoubleValues v(this);
v.FromJson(value["detail"]);
SetValue(v);
}
if (root.isMember("tags")) {
Json::Value tags = root["tags"];
for (const auto& key : tags.getMemberNames()) {
Expand Down
13 changes: 13 additions & 0 deletions core/models/MetricEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class MetricEvent : public PipelineEvent {
return std::get_if<T>(&mValue);
}

template <typename T>
constexpr std::add_pointer_t<T> MutableValue() noexcept {
return std::get_if<T>(&mValue);
}

template <typename T>
void SetValue(const T& value) {
mValue = value;
Expand All @@ -58,6 +63,14 @@ class MetricEvent : public PipelineEvent {
mValue = T{std::forward<Args>(args)...};
}

void SetValue(const std::map<StringView, double>& multiDoubleValues) {
mValue = UntypedMultiDoubleValues{multiDoubleValues, this};
}

void SetValue(const UntypedMultiDoubleValues& multiDoubleValues) {
mValue = UntypedMultiDoubleValues{multiDoubleValues.mValues, this};
}

StringView GetTag(StringView key) const;
bool HasTag(StringView key) const;
void SetTag(StringView key, StringView val);
Expand Down
85 changes: 66 additions & 19 deletions core/models/MetricValue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,62 @@ using namespace std;

namespace logtail {

bool UntypedMultiDoubleValues::GetValue(StringView key, double& val) const {
if (mValues.find(key) != mValues.end()) {
val = mValues.at(key);
return true;
}
return false;
}

bool UntypedMultiDoubleValues::HasValue(StringView key) const {
return mValues.find(key) != mValues.end();
}

void UntypedMultiDoubleValues::SetValue(const std::string& key, double val) {
if (mMetricEventPtr) {
SetValueNoCopy(mMetricEventPtr->GetSourceBuffer()->CopyString(key), val);
}
}

void UntypedMultiDoubleValues::SetValue(StringView key, double val) {
if (mMetricEventPtr) {
SetValueNoCopy(mMetricEventPtr->GetSourceBuffer()->CopyString(key), val);
}
}

void UntypedMultiDoubleValues::SetValueNoCopy(const StringBuffer& key, double val) {
SetValueNoCopy(StringView(key.data, key.size), val);
}

void UntypedMultiDoubleValues::SetValueNoCopy(StringView key, double val) {
mValues[key] = val;
}

void UntypedMultiDoubleValues::DelValue(StringView key) {
mValues.erase(key);
}

std::map<StringView, double>::const_iterator UntypedMultiDoubleValues::ValusBegin() const {
return mValues.begin();
}

std::map<StringView, double>::const_iterator UntypedMultiDoubleValues::ValusEnd() const {
return mValues.end();
}

size_t UntypedMultiDoubleValues::ValusSize() const {
return mValues.size();
}

size_t UntypedMultiDoubleValues::DataSize() const {
size_t totalSize = sizeof(UntypedMultiDoubleValues);
for (const auto& pair : mValues) {
totalSize += pair.first.size() + sizeof(pair.second);
}
return totalSize;
}

size_t DataSize(const MetricValue& value) {
return visit(
[](auto&& arg) {
Expand All @@ -42,29 +98,20 @@ void UntypedSingleValue::FromJson(const Json::Value& value) {
mValue = value.asFloat();
}

Json::Value MetricValueToJson(const MetricValue& value) {
Json::Value UntypedMultiDoubleValues::ToJson() const {
Json::Value res;
visit(
[&](auto&& arg) {
using T = decay_t<decltype(arg)>;
if constexpr (is_same_v<T, UntypedSingleValue>) {
res["type"] = "untyped_single_value";
res["detail"] = get<UntypedSingleValue>(value).ToJson();
} else if constexpr (is_same_v<T, monostate>) {
res["type"] = "unknown";
}
},
value);
for (auto metric : mValues) {
res[metric.first.to_string()] = metric.second;
}
return res;
}

MetricValue JsonToMetricValue(const string& type, const Json::Value& detail) {
if (type == "untyped_single_value") {
UntypedSingleValue v;
v.FromJson(detail);
return v;
} else {
return MetricValue();
void UntypedMultiDoubleValues::FromJson(const Json::Value& value) {
mValues.clear();
for (Json::Value::const_iterator itr = value.begin(); itr != value.end(); ++itr) {
if (itr->asDouble()) {
SetValue(itr.key().asString(), itr->asDouble());
}
}
}
#endif
Expand Down
37 changes: 33 additions & 4 deletions core/models/MetricValue.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <map>
#include <variant>

#ifdef APSARA_UNIT_TEST_MAIN
Expand All @@ -24,6 +25,10 @@
#include <string>
#endif

#include "common/memory/SourceBuffer.h"
#include "models/PipelineEvent.h"
#include "models/StringView.h"

namespace logtail {

struct UntypedSingleValue {
Expand All @@ -37,13 +42,37 @@ struct UntypedSingleValue {
#endif
};

using MetricValue = std::variant<std::monostate, UntypedSingleValue>;
struct UntypedMultiDoubleValues {
std::map<StringView, double> mValues;
PipelineEvent* mMetricEventPtr;
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved

size_t DataSize(const MetricValue& value);
UntypedMultiDoubleValues(PipelineEvent* ptr) : mMetricEventPtr(ptr) {}
UntypedMultiDoubleValues(std::map<StringView, double> values, PipelineEvent* ptr)
: mValues(values), mMetricEventPtr(ptr) {}

bool GetValue(StringView key, double& val) const;
bool HasValue(StringView key) const;
void SetValue(const std::string& key, double val);
void SetValue(StringView key, double val);
void SetValueNoCopy(const StringBuffer& key, double val);
void SetValueNoCopy(StringView key, double val);
void DelValue(StringView key);

std::map<StringView, double>::const_iterator ValusBegin() const;
std::map<StringView, double>::const_iterator ValusEnd() const;
size_t ValusSize() const;

size_t DataSize() const;
void ResetPipelineEvent(PipelineEvent* ptr) { mMetricEventPtr = ptr; }

#ifdef APSARA_UNIT_TEST_MAIN
Json::Value MetricValueToJson(const MetricValue& value);
MetricValue JsonToMetricValue(const std::string& type, const Json::Value& detail);
Json::Value ToJson() const;
void FromJson(const Json::Value& value);
#endif
};

using MetricValue = std::variant<std::monostate, UntypedSingleValue, UntypedMultiDoubleValues>;

size_t DataSize(const MetricValue& value);

} // namespace logtail
4 changes: 2 additions & 2 deletions core/runner/FlusherRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ void FlusherRunner::Dispatch(SenderQueueItem* item) {
if (!BOOL_FLAG(enable_full_drain_mode) && Application::GetInstance()->IsExiting()
&& item->mFlusher->Name() == "flusher_sls") {
DiskBufferWriter::GetInstance()->PushToDiskBuffer(item, 3);
SenderQueueManager::GetInstance()->RemoveItem(item->mFlusher->GetQueueKey(), item);
SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item);
} else {
PushToHttpSink(item);
}
break;
default:
SenderQueueManager::GetInstance()->RemoveItem(item->mFlusher->GetQueueKey(), item);
SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item);
break;
}
}
Expand Down
Loading
Loading