Skip to content

Commit

Permalink
[INLONG-10851][SDK] Support multiple protocols for DataProxy C++ SDK (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi authored Aug 22, 2024
1 parent fbde3da commit 2bb4c0e
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ void TcpClient::UpdateMetric() {
stat.Update(it.second);
it.second.ResetStat();
}
LOG_INFO(stat.ToString() << CLIENT_INFO);
LOG_INFO(stat.GetSendMetricInfo() << CLIENT_INFO);
}

void TcpClient::HeartBeat(bool only_heart_heat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ void SdkConfig::defaultInit() {

// manager parameters
manager_url_ = constants::kManagerURL;
enable_manager_url_from_cluster_ = constants::kEnableManagerFromCluster;
manager_cluster_url_ = constants::kManagerClusterURL;
manager_update_interval_ = constants::kManagerUpdateInterval;
manager_url_timeout_ = constants::kManagerTimeout;
max_proxy_num_ = constants::kMaxProxyNum;
Expand Down Expand Up @@ -329,22 +327,7 @@ void SdkConfig::InitManagerParam(const rapidjson::Value &doc) {
} else {
manager_url_ = constants::kManagerURL;
}
// manager cluster url
if (doc.HasMember("manager_cluster_url") &&
doc["manager_cluster_url"].IsString()) {
const rapidjson::Value &obj = doc["manager_cluster_url"];
manager_cluster_url_ = obj.GetString();
} else {
manager_cluster_url_ = constants::kManagerClusterURL;
}
// enable manager from cluster
if (doc.HasMember("enable_manager_url_from_cluster") &&
doc["enable_manager_url_from_cluster"].IsBool()) {
const rapidjson::Value &obj = doc["enable_manager_url_from_cluster"];
enable_manager_url_from_cluster_ = obj.GetBool();
} else {
enable_manager_url_from_cluster_ = constants::kEnableManagerFromCluster;
}

// manager update interval
if (doc.HasMember("manager_update_interval") &&
doc["manager_update_interval"].IsInt() &&
Expand Down Expand Up @@ -522,6 +505,13 @@ void SdkConfig::OthersParam(const rapidjson::Value &doc) {
} else {
max_instance_ = constants::kMaxInstance;
}

if (doc.HasMember("extend_report") && doc["extend_report"].IsBool()) {
const rapidjson::Value &obj = doc["extend_report"];
extend_report_ = obj.GetBool();
} else {
extend_report_ = constants::kExtendReport;
}
}

bool SdkConfig::GetLocalIPV4Address(std::string &err_info, std::string &localhost) {
Expand Down Expand Up @@ -589,11 +579,6 @@ void SdkConfig::ShowClientConfig() {
LOG_INFO("log_level: " << log_level_);
LOG_INFO("log_path: " << log_path_.c_str());
LOG_INFO("manager_url: " << manager_url_.c_str());
LOG_INFO("manager_cluster_url: " << manager_cluster_url_.c_str());
LOG_INFO(
"enable_manager_url_from_cluster: " << enable_manager_url_from_cluster_
? "true"
: "false");
LOG_INFO("manager_update_interval: minutes" << manager_update_interval_);
LOG_INFO("manager_url_timeout: " << manager_url_timeout_);
LOG_INFO("max_tcp_num: " << max_proxy_num_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <stdint.h>
#include <string>
#include <vector>
#include <atomic>
#include "../utils/capi_constant.h"

namespace inlong {
class SdkConfig {
Expand All @@ -40,7 +42,7 @@ class SdkConfig {
void InitAuthParm(const rapidjson::Value &doc);
void OthersParam(const rapidjson::Value &doc);
bool GetLocalIPV4Address(std::string& err_info, std::string& localhost);
SdkConfig() { defaultInit(); };
SdkConfig():extend_report_(false) { defaultInit(); };

public:
// cache parameter
Expand Down Expand Up @@ -80,8 +82,6 @@ class SdkConfig {

// manager parameters
std::string manager_url_;
bool enable_manager_url_from_cluster_;
std::string manager_cluster_url_;
uint32_t manager_update_interval_; // Automatic update interval, minutes
uint32_t manager_url_timeout_; // URL parsing timeout, seconds
uint64_t max_proxy_num_;
Expand Down Expand Up @@ -114,6 +114,7 @@ class SdkConfig {
uint32_t buf_size_;

volatile bool parsed_ = false;
bool extend_report_;

void defaultInit();
static SdkConfig *getInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ RecvGroup::RecvGroup(const std::string &group_key, std::shared_ptr<SendManager>
last_pack_time_ = Utils::getCurrentMsTime();
max_recv_size_ = SdkConfig::getInstance()->recv_buf_size_;
local_ip_ = SdkConfig::getInstance()->local_ip_;
group_id_key_ = SdkConfig::getInstance()->extend_report_ ? "bid=" : "groupId=";
stream_id_key_ = SdkConfig::getInstance()->extend_report_ ? "&tid=" : "&streamId=";
LOG_INFO("RecvGroup:" << group_key_ << ",data_capacity:" << data_capacity_ << ",max_recv_size:" << max_recv_size_);
}

Expand Down Expand Up @@ -224,8 +226,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,uint32_t &
streamId_num_ == 0) {
groupId_num = 0;
streamId_num = 0;
groupId_streamId_char = "groupId=" + msgs[0]->inlong_group_id_ +
"&streamId=" + msgs[0]->inlong_stream_id_;
groupId_streamId_char = group_id_key_ + msgs[0]->inlong_group_id_ + stream_id_key_ + msgs[0]->inlong_stream_id_;
char_groupId_flag = 0x4;
} else {
groupId_num = groupId_num_;
Expand All @@ -245,7 +246,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,uint32_t &
"&node1ip=" + SdkConfig::getInstance()->local_ip_ +
"&rtime1=" + std::to_string(Utils::getCurrentMsTime());
} else {
attr = "groupId=" + msgs[0]->inlong_group_id_ +
attr = group_id_key_ + msgs[0]->inlong_group_id_ +
"&streamId=" + msgs[0]->inlong_stream_id_;
}
*(uint16_t *)bodyBegin = htons(attr.size());
Expand Down Expand Up @@ -296,8 +297,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,uint32_t &

// attr
std::string attr;
attr = "groupId=" + msgs[0]->inlong_group_id_ +
"&streamId=" + msgs[0]->inlong_stream_id_;
attr = group_id_key_ + msgs[0]->inlong_group_id_ + stream_id_key_ + msgs[0]->inlong_stream_id_;

attr += "&dt=" + std::to_string(data_time_);
attr += "&mid=" + std::to_string(uniq_id_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class RecvGroup {
std::unordered_map<std::string, std::queue<SdkMsgPtr>> dispatch_queue_;
std::queue<SendBufferPtrT> fail_queue_;

std::string group_id_key_;
std::string stream_id_key_;

void DoDispatchMsg();
bool IsZipAndOperate(std::string& res, uint32_t real_cur_len);
inline void ResetPackBuf() { memset(pack_buf_, 0x0, data_capacity_); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void MetricManager::InitEnvironment() {
environment_.setVersion(constants::kVersion);
environment_.setPid(getpid());
environment_.setIp(SdkConfig::getInstance()->local_ip_);
environment_.SetExtendReport(SdkConfig::getInstance()->extend_report_);
}
void MetricManager::Run() {
prctl(PR_SET_NAME, "metric-manager");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
*/

#include "proxy_manager.h"

#include "api_code.h"
#include <fstream>

#include "../config/ini_help.h"
#include "../utils/capi_constant.h"
#include "../utils/logger.h"
#include "../utils/utils.h"
#include "api_code.h"
#include <fstream>
#include <rapidjson/document.h>
#include "../utils/parse_json.h"

namespace inlong {
const uint64_t MINUTE = 60000;
Expand Down Expand Up @@ -110,104 +112,6 @@ void ProxyManager::DoUpdate() {
LOG_INFO("finish ProxyManager DoUpdate.");
}

int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id,
const std::string &meta_data,
ProxyInfoVec &proxy_info_vec) {
rapidjson::Document doc;
if (doc.Parse(meta_data.c_str()).HasParseError()) {
LOG_ERROR("failed to parse meta_data, error" << doc.GetParseError() << ":"
<< doc.GetErrorOffset());
return SdkCode::kErrorParseJson;
}

if (!(doc.HasMember("success") && doc["success"].IsBool() &&
doc["success"].GetBool())) {
LOG_ERROR("failed to get proxy_list of inlong_group_id:%s, success: not "
"exist or false"
<< inlong_group_id.c_str());
return SdkCode::kErrorParseJson;
}
// check data valid
if (!doc.HasMember("data") || doc["data"].IsNull()) {
LOG_ERROR("failed to get proxy_list of inlong_group_id:%s, data: not exist "
"or null"
<< inlong_group_id.c_str());
return SdkCode::kErrorParseJson;
}

// check nodelist valid
const rapidjson::Value &clusterInfo = doc["data"];
if (!clusterInfo.HasMember("nodeList") || clusterInfo["nodeList"].IsNull()) {
LOG_ERROR("invalid nodeList of inlong_group_id:%s, not exist or null"
<< inlong_group_id.c_str());
return SdkCode::kErrorParseJson;
}

// check nodeList isn't empty
const rapidjson::Value &nodeList = clusterInfo["nodeList"];
if (nodeList.GetArray().Size() == 0) {
LOG_ERROR("empty nodeList of inlong_group_id:%s"
<< inlong_group_id.c_str());
return SdkCode::kErrorParseJson;
}
// check clusterId
if (!clusterInfo.HasMember("clusterId") ||
!clusterInfo["clusterId"].IsInt() ||
clusterInfo["clusterId"].GetInt() < 0) {
LOG_ERROR("clusterId of inlong_group_id:%s is not found or not a integer"
<< inlong_group_id.c_str());
return SdkCode::kErrorParseJson;
}
groupid_2_cluster_id_update_map_[inlong_group_id] =
clusterInfo["clusterId"].GetInt();

// check load
int32_t load = 0;
if (clusterInfo.HasMember("load") && clusterInfo["load"].IsInt() &&
!clusterInfo["load"].IsNull()) {
const rapidjson::Value &obj = clusterInfo["load"];
load = obj.GetInt();
} else {
load = 0;
}

// proxy list
for (auto &proxy : nodeList.GetArray()) {
std::string ip;
std::string id;
int32_t port;
if (proxy.HasMember("ip") && !proxy["ip"].IsNull())
ip = proxy["ip"].GetString();
else {
LOG_ERROR("this ip info is null");
continue;
}
if (proxy.HasMember("port") && !proxy["port"].IsNull()) {
if (proxy["port"].IsString())
port = std::stoi(proxy["port"].GetString());
else if (proxy["port"].IsInt())
port = proxy["port"].GetInt();
}

else {
LOG_ERROR("this ip info is null or negative");
continue;
}
if (proxy.HasMember("id") && !proxy["id"].IsNull()) {
if (proxy["id"].IsString())
id = proxy["id"].GetString();
else if (proxy["id"].IsInt())
id = proxy["id"].GetInt();
} else {
LOG_WARN("there is no id info of inlong_group_id");
continue;
}
proxy_info_vec.emplace_back(id, ip, port, load);
}

return SdkCode::kSuccess;
}

int32_t ProxyManager::GetProxy(const std::string &key,
ProxyInfoVec &proxy_info_vec) {
if (constants::IsolationLevel::kLevelOne ==
Expand Down Expand Up @@ -407,16 +311,15 @@ void ProxyManager::UpdateProxy(
LOG_WARN("SkipUpdate group_id:" << groupid2cluster.first);
continue;
}
std::string url;
if (SdkConfig::getInstance()->enable_manager_url_from_cluster_)
url = SdkConfig::getInstance()->manager_cluster_url_;
else {
url =
SdkConfig::getInstance()->manager_url_ + "/" + groupid2cluster.first;
std::string url = SdkConfig::getInstance()->manager_url_ + "/" + groupid2cluster.first;
if (SdkConfig::getInstance()->extend_report_) {
url = SdkConfig::getInstance()->manager_url_ + "?bid=" + groupid2cluster.first + "&net_tag=all&ip=" +
SdkConfig::getInstance()->local_ip_;
}

std::string post_data = "ip=" + SdkConfig::getInstance()->local_ip_ +
"&version=" + constants::kVersion +
"&protocolType=" + constants::kProtocolType;
"&version=" + constants::kVersion +
"&protocolType=" + constants::kProtocolType;
LOG_WARN("get inlong_group_id:" << groupid2cluster.first.c_str()
<< "proxy cfg url " << url.c_str()
<< "post_data:" << post_data.c_str());
Expand All @@ -441,7 +344,7 @@ void ProxyManager::UpdateProxy(
if (groupid_2_proxy_map_.find(groupid2cluster.first) !=
groupid_2_proxy_map_.end()) {
LOG_WARN("failed to request from manager, use previous "
<< groupid2cluster.first);
<< groupid2cluster.first);
continue;
}
if (!SdkConfig::getInstance()->enable_local_cache_) {
Expand All @@ -456,10 +359,14 @@ void ProxyManager::UpdateProxy(
}

ProxyInfoVec proxyInfoVec;
ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec);
if (SdkConfig::getInstance()->extend_report_) {
ret = ParseJson::ParseProxyInfo(groupid2cluster.first, meta_data, groupid_2_cluster_id_update_map_, proxyInfoVec);
} else {
ret = ParseJson::ParseProxyInfo(groupid2cluster.first, meta_data, proxyInfoVec, groupid_2_cluster_id_update_map_);
}

if (ret != SdkCode::kSuccess) {
LOG_ERROR("failed to parse groupid:%s json proxy list "
<< groupid2cluster.first.c_str());
LOG_ERROR("Failed to parse json: " << meta_data);
continue;
}
if (!proxyInfoVec.empty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <vector>

namespace inlong {
using GroupId2ClusterIdMap = std::unordered_map<std::string, int32_t>;
class ProxyManager {
private:
uint32_t timeout_;
Expand All @@ -53,7 +54,7 @@ class ProxyManager {
uint64_t last_update_time_;

int32_t ParseAndGet(const std::string &key, const std::string &meta_data,
ProxyInfoVec &proxy_info_vec);
ProxyInfoVec &proxy_info_vec,GroupId2ClusterIdMap &group_id_2_cluster_id);

public:
ProxyManager(){};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ SendGroupPtr SendManager::DoGetSendGroup(const std::string &send_group_key) {
unique_read_lock<read_write_mutex> rdlck(send_group_map_rwmutex_);
auto send_group_map = send_group_map_.find(send_group_key);
if (send_group_map == send_group_map_.end()) {
LOG_ERROR("fail to get send group, group_id:%s" << send_group_key);
LOG_ERROR("Fail to get send group, group key:" << send_group_key);
return nullptr;
}
if (send_group_map->second.empty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class Environment {
std::string version_;
std::string ip_;
uint64_t pid_;
bool extend_report_;
const std::string &getType() const { return type_; }
void setType(const std::string &type) { type_ = type; }
std::string getVersion() { return version_; }
Expand All @@ -35,12 +36,14 @@ class Environment {
void setIp(const std::string &ip) { ip_ = ip; }
uint64_t getPid() const { return pid_; }
void setPid(uint64_t pid) { pid_ = pid; }
void SetExtendReport(bool extend_report) {extend_report_ = extend_report;}

std::string ToString() const {
std::stringstream metric;
metric << "local ip[" << ip_ << "] ";
metric << "version [" << version_ << "] ";
metric << "pid [" << pid_ << "] ";
metric << "version[" << version_ << "] ";
metric << "pid[" << pid_ << "] ";
metric << "extend report[" << extend_report_ << "]";
return metric.str();
}
};
Expand Down
Loading

0 comments on commit 2bb4c0e

Please sign in to comment.