Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] fix: fix metric may be zero #2730

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 309 additions & 0 deletions thirdparties/brpc/brpc.patch
Original file line number Diff line number Diff line change
Expand Up @@ -3256,3 +3256,312 @@ index fc8bdf2c..020d5316 100644
#ifdef _MSC_VER
#define BAIDU_THREAD_LOCAL __declspec(thread)
#else
diff --git a/src/bvar/detail/sampler.cpp b/src/bvar/detail/sampler.cpp
index fd499911..9ff91bf1 100644
--- a/src/bvar/detail/sampler.cpp
+++ b/src/bvar/detail/sampler.cpp
@@ -43,6 +43,10 @@ struct CombineSampler {
}
};

+// True iff pthread_atfork was called. The callback to atfork works for child
+// of child as well, no need to register in the child again.
+static bool registered_atfork = false;
+
// Call take_sample() of all scheduled samplers.
// This can be done with regular timer thread, but it's way too slow(global
// contention + log(N) heap manipulations). We need it to be super fast so that
@@ -88,7 +92,10 @@ private:
LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc);
} else {
_created = true;
- pthread_atfork(NULL, NULL, child_callback_atfork);
+ if (!registered_atfork) {
+ registered_atfork = true;
+ pthread_atfork(NULL, NULL, child_callback_atfork);
+ }
}
}
diff --git a/src/bvar/detail/sampler.cpp b/src/bvar/detail/sampler.cpp
index 7866e76a..551b7167 100644
--- a/src/bvar/detail/sampler.cpp
+++ b/src/bvar/detail/sampler.cpp
@@ -57,13 +57,11 @@ struct CombineSampler {
// deletion is taken place in the thread as well.
class SamplerCollector : public bvar::Reducer<Sampler*, CombineSampler> {
public:
- SamplerCollector() : _created(false), _stop(false), _cumulated_time_us(0) {
- int rc = pthread_create(&_tid, NULL, sampling_thread, this);
- if (rc != 0) {
- LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc);
- } else {
- _created = true;
- }
+ SamplerCollector()
+ : _created(false)
+ , _stop(false)
+ , _cumulated_time_us(0) {
+ create_sampling_thread();
}
~SamplerCollector() {
if (_created) {
@@ -73,33 +71,75 @@ public:
}
}

- static double get_cumulated_time(void* arg) {
- return ((SamplerCollector*)arg)->_cumulated_time_us / 1000.0 / 1000.0;
- }
-
private:
+ // Support for fork:
+ // * The singleton can be null before forking, the child callback will not
+ // be registered.
+ // * If the singleton is not null before forking, the child callback will
+ // be registered and the sampling thread will be re-created.
+ // * A forked program can be forked again.
+
+ static void child_callback_atfork() {
+ butil::get_leaky_singleton<SamplerCollector>()->after_forked_as_child();
+ }
+
+ void create_sampling_thread() {
+ const int rc = pthread_create(&_tid, NULL, sampling_thread, this);
+ if (rc != 0) {
+ LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc);
+ } else {
+ _created = true;
+ pthread_atfork(NULL, NULL, child_callback_atfork);
+ }
+ }
+
+ void after_forked_as_child() {
+ _created = false;
+ create_sampling_thread();
+ }
+
void run();
-
+
static void* sampling_thread(void* arg) {
- ((SamplerCollector*)arg)->run();
+ static_cast<SamplerCollector*>(arg)->run();
return NULL;
}

+ static double get_cumulated_time(void* arg) {
+ return static_cast<SamplerCollector*>(arg)->_cumulated_time_us / 1000.0 / 1000.0;
+ }
+
private:
bool _created;
bool _stop;
+ pid_t _created_pid;
int64_t _cumulated_time_us;
pthread_t _tid;
};

+PassiveStatus<double>* g_cumulated_time_bvar = NULL;
+bvar::PerSecond<bvar::PassiveStatus<double> >* g_sampling_thread_usage_bvar = NULL;
+
void SamplerCollector::run() {
- butil::LinkNode<Sampler> root;
- int consecutive_nosleep = 0;
#ifndef UNIT_TEST
- PassiveStatus<double> cumulated_time(get_cumulated_time, this);
- bvar::PerSecond<bvar::PassiveStatus<double> > usage(
- "bvar_sampler_collector_usage", &cumulated_time, 10);
+ // NOTE:
+ // * Following vars can't be created on thread's stack since this thread
+ // may be adandoned at any time after forking.
+ // * They can't created inside the constructor of SamplerCollector as well,
+ // which results in deadlock.
+ if (g_cumulated_time_bvar == NULL) {
+ g_cumulated_time_bvar =
+ new PassiveStatus<double>(get_cumulated_time, this);
+ }
+ if (g_sampling_thread_usage_bvar == NULL) {
+ g_sampling_thread_usage_bvar =
+ new bvar::PerSecond<bvar::PassiveStatus<double> >(
+ "bvar_sampler_collector_usage", g_cumulated_time_bvar, 10);
+ }
#endif
+
+ butil::LinkNode<Sampler> root;
+ int consecutive_nosleep = 0;
while (!_stop) {
int64_t abstime = butil::gettimeofday_us();
Sampler* s = this->reset();
diff --git a/docs/cn/http_client.md b/docs/cn/http_client.md
index 66d3292c..ebc55af6 100644
--- a/docs/cn/http_client.md
+++ b/docs/cn/http_client.md
@@ -114,7 +114,7 @@ URL的一般形式如下图:

若用户没有填且URL中包含host,比如http://www.foo.com/path,则http request中会包含"Host: www.foo.com"。

-若用户没有填且URL不包含host,比如"/index.html?name=value",但如果Channel初始化的地址包含域名,则框架会以域名作为Host,比如"http://www.foo.com",该http server将会看到"Host: www.foo.com"。如果地址是"http://www.foo.com:8989",则该http server将会看到"Host: www.foo.com:8989"。
+若用户没有填且URL不包含host,比如"/index.html?name=value",但如果Channel初始化的地址scheme为http(s)且包含域名,则框架会以域名作为Host,比如"http://www.foo.com",该http server将会看到"Host: www.foo.com"。如果地址是"http://www.foo.com:8989",则该http server将会看到"Host: www.foo.com:8989"。

若用户没有填且URL不包含host,比如"/index.html?name=value",如果Channel初始化的地址也不包含域名,则框架会以目标server的ip和port为Host,地址为10.46.188.39:8989的http server将会看到"Host: 10.46.188.39:8989"。

diff --git a/docs/en/http_client.md b/docs/en/http_client.md
index a6e0e34a..f83478d5 100644
--- a/docs/en/http_client.md
+++ b/docs/en/http_client.md
@@ -115,7 +115,7 @@ If user already sets `Host` header(case insensitive), framework makes no change.

If user does not set `Host` header and the URL has host, for example http://www.foo.com/path, the http request contains "Host: www.foo.com".

-If user does not set host header and the URL does not have host as well, for example "/index.html?name=value", but if the address initialized by the channel contains domain name. framework sets `Host` header with domain name of the target server. if this address is "http://www.foo.com", this http server should see `Host: www.foo.com`, if this address is "http://www.foo.com:8989", this http server should be see `Host: www.foo.com:8989`.
+If user does not set host header and the URL does not have host either, for example "/index.html?name=value", but if the channel is initlized by a http(s) address with valid domain name. framework sets `Host` header with domain name of the target server. if this address is "http://www.foo.com", this http server should see `Host: www.foo.com`, if this address is "http://www.foo.com:8989", this http server should be see `Host: www.foo.com:8989`.

If user does not set host header and the URL does not have host as well, for example "/index.html?name=value", and the address initialized by the channel doesn't contain domain name. framework sets `Host` header with IP and port of the target server. A http server at 10.46.188.39:8989 should see `Host: 10.46.188.39:8989`.

diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp
index 2de1de10..a94c09b5 100644
--- a/src/brpc/channel.cpp
+++ b/src/brpc/channel.cpp
@@ -322,13 +322,12 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port,
if (InitChannelOptions(options) != 0) {
return -1;
}
- std::string scheme;
int* port_out = raw_port == -1 ? &raw_port: NULL;
- ParseURL(raw_server_address, &scheme, &_service_name, port_out);
+ ParseURL(raw_server_address, &_scheme, &_service_name, port_out);
if (raw_port != -1) {
_service_name.append(":").append(std::to_string(raw_port));
}
- if (_options.protocol == brpc::PROTOCOL_HTTP && scheme == "https") {
+ if (_options.protocol == brpc::PROTOCOL_HTTP && _scheme == "https") {
if (_options.mutable_ssl_options()->sni_name.empty()) {
_options.mutable_ssl_options()->sni_name = _service_name;
}
@@ -363,13 +362,12 @@ int Channel::Init(const char* ns_url,
if (InitChannelOptions(options) != 0) {
return -1;
}
- std::string scheme;
int raw_port = -1;
- ParseURL(ns_url, &scheme, &_service_name, &raw_port);
+ ParseURL(ns_url, &_scheme, &_service_name, &raw_port);
if (raw_port != -1) {
_service_name.append(":").append(std::to_string(raw_port));
}
- if (_options.protocol == brpc::PROTOCOL_HTTP && scheme == "https") {
+ if (_options.protocol == brpc::PROTOCOL_HTTP && _scheme == "https") {
if (_options.mutable_ssl_options()->sni_name.empty()) {
_options.mutable_ssl_options()->sni_name = _service_name;
}
@@ -429,7 +427,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
CHECK(cntl->protocol_param().empty());
cntl->protocol_param() = _options.protocol.param();
}
- if (_options.protocol == brpc::PROTOCOL_HTTP) {
+ if (_options.protocol == brpc::PROTOCOL_HTTP && (_scheme == "https" || _scheme == "http")) {
URI& uri = cntl->http_request().uri();
if (uri.host().empty() && !_service_name.empty()) {
uri.SetHostAndPort(_service_name);
diff --git a/src/brpc/channel.h b/src/brpc/channel.h
index 6600fcf2..8e888f45 100644
--- a/src/brpc/channel.h
+++ b/src/brpc/channel.h
@@ -221,6 +221,7 @@ protected:
int raw_port = -1);

std::string _service_name;
+ std::string _scheme;
butil::EndPoint _server_address;
SocketId _server_id;
Protocol::SerializeRequest _serialize_request;
diff --git a/src/bvar/latency_recorder.cpp b/src/bvar/latency_recorder.cpp
index ed914224..c6532958 100644
--- a/src/bvar/latency_recorder.cpp
+++ b/src/bvar/latency_recorder.cpp
@@ -89,14 +89,23 @@ int CDF::describe_series(
return 0;
}

+// Return random int value with expectation = `dval'
+static int64_t double_to_random_int(double dval) {
+ int64_t ival = static_cast<int64_t>(dval);
+ if (dval > ival + butil::fast_rand_double()) {
+ ival += 1;
+ }
+ return ival;
+}
+
static int64_t get_window_recorder_qps(void* arg) {
detail::Sample<Stat> s;
- static_cast<RecorderWindow*>(arg)->get_span(1, &s);
+ static_cast<RecorderWindow*>(arg)->get_span(&s);
// Use floating point to avoid overflow.
if (s.time_us <= 0) {
return 0;
}
- return static_cast<int64_t>(round(s.data.num * 1000000.0 / s.time_us));
+ return double_to_random_int(s.data.num * 1000000.0 / s.time_us);
}

static int64_t get_recorder_count(void* arg) {
@@ -176,7 +185,7 @@ int64_t LatencyRecorder::qps(time_t window_size) const {
if (s.time_us <= 0) {
return 0;
}
- return static_cast<int64_t>(round(s.data.num * 1000000.0 / s.time_us));
+ return detail::double_to_random_int(s.data.num * 1000000.0 / s.time_us);
}

int LatencyRecorder::expose(const butil::StringPiece& prefix1,
diff --git a/test/bvar_recorder_unittest.cpp b/test/bvar_recorder_unittest.cpp
index e1606b66..99f1ff1b 100644
--- a/test/bvar_recorder_unittest.cpp
+++ b/test/bvar_recorder_unittest.cpp
@@ -206,4 +206,46 @@ TEST(RecorderTest, perf) {
<< "ns per sample with " << ARRAY_SIZE(threads)
<< " threads";
}
+
+TEST(RecorderTest, latency_recorder_qps_accuracy) {
+ bvar::LatencyRecorder lr1(2); // set windows size to 2s
+ bvar::LatencyRecorder lr2(2);
+ bvar::LatencyRecorder lr3(2);
+ bvar::LatencyRecorder lr4(2);
+ usleep(2000000); // wait sampler to sample 2 times
+
+ auto write = [](bvar::LatencyRecorder& lr, int times) {
+ for (int i = 0; i < times; ++i) {
+ lr << 1;
+ }
+ };
+ write(lr1, 100);
+ write(lr2, 101);
+ write(lr3, 3);
+ write(lr4, 1);
+ usleep(1000000); // wait sampler to sample 1 time
+
+ auto read = [](bvar::LatencyRecorder& lr, double exp_qps, int window_size = 0) {
+ int64_t qps_sum = 0;
+ int64_t exp_qps_int = (int64_t)exp_qps;
+ for (int i = 0; i < 1000; ++i) {
+ int64_t qps = window_size ? lr.qps(window_size): lr.qps();
+ EXPECT_GE(qps, exp_qps_int - 1);
+ EXPECT_LE(qps, exp_qps_int + 1);
+ qps_sum += qps;
+ }
+ double err = fabs(qps_sum / 1000.0 - exp_qps);
+ return err;
+ };
+ ASSERT_GT(0.1, read(lr1, 100/2.0));
+ ASSERT_GT(0.1, read(lr2, 101/2.0));
+ ASSERT_GT(0.1, read(lr3, 3/2.0));
+ ASSERT_GT(0.1, read(lr4, 1/2.0));
+
+ ASSERT_GT(0.1, read(lr1, 100/3.0, 3));
+ ASSERT_GT(0.1, read(lr2, 101/3.0, 3));
+ ASSERT_GT(0.1, read(lr3, 3/3.0, 3));
+ ASSERT_GT(0.1, read(lr4, 1/3.0, 3));
+}
+
} // namespace