From 9218dd8755b078173b6768725e18f83075ecf93c Mon Sep 17 00:00:00 2001 From: QiangYu Date: Sun, 3 Sep 2023 22:36:43 +0800 Subject: [PATCH] [WIP] fix: fix metric may be zero we can cherry-pick some commits, which supports fork new thread without exec, to avoid metric may be zero Signed-off-by: QiangYu --- thirdparties/brpc/brpc.patch | 309 +++++++++++++++++++++++++++++++++++ 1 file changed, 309 insertions(+) diff --git a/thirdparties/brpc/brpc.patch b/thirdparties/brpc/brpc.patch index 0799d8b281..f105f9597b 100644 --- a/thirdparties/brpc/brpc.patch +++ b/thirdparties/brpc/brpc.patch @@ -3256,3 +3256,312 @@ index fc8bdf2c..020d5316 100644 #ifdef _MSC_VER #define BAIDU_THREAD_LOCAL __declspec(thread) #else +diff --git a/src/bvar/detail/sampler.cpp b/src/bvar/detail/sampler.cpp +index fd499911..9ff91bf1 100644 +--- a/src/bvar/detail/sampler.cpp ++++ b/src/bvar/detail/sampler.cpp +@@ -43,6 +43,10 @@ struct CombineSampler { + } + }; + ++// True iff pthread_atfork was called. The callback to atfork works for child ++// of child as well, no need to register in the child again. ++static bool registered_atfork = false; ++ + // Call take_sample() of all scheduled samplers. + // This can be done with regular timer thread, but it's way too slow(global + // contention + log(N) heap manipulations). We need it to be super fast so that +@@ -88,7 +92,10 @@ private: + LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc); + } else { + _created = true; +- pthread_atfork(NULL, NULL, child_callback_atfork); ++ if (!registered_atfork) { ++ registered_atfork = true; ++ pthread_atfork(NULL, NULL, child_callback_atfork); ++ } + } + } +diff --git a/src/bvar/detail/sampler.cpp b/src/bvar/detail/sampler.cpp +index 7866e76a..551b7167 100644 +--- a/src/bvar/detail/sampler.cpp ++++ b/src/bvar/detail/sampler.cpp +@@ -57,13 +57,11 @@ struct CombineSampler { + // deletion is taken place in the thread as well. + class SamplerCollector : public bvar::Reducer { + public: +- SamplerCollector() : _created(false), _stop(false), _cumulated_time_us(0) { +- int rc = pthread_create(&_tid, NULL, sampling_thread, this); +- if (rc != 0) { +- LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc); +- } else { +- _created = true; +- } ++ SamplerCollector() ++ : _created(false) ++ , _stop(false) ++ , _cumulated_time_us(0) { ++ create_sampling_thread(); + } + ~SamplerCollector() { + if (_created) { +@@ -73,33 +71,75 @@ public: + } + } + +- static double get_cumulated_time(void* arg) { +- return ((SamplerCollector*)arg)->_cumulated_time_us / 1000.0 / 1000.0; +- } +- + private: ++ // Support for fork: ++ // * The singleton can be null before forking, the child callback will not ++ // be registered. ++ // * If the singleton is not null before forking, the child callback will ++ // be registered and the sampling thread will be re-created. ++ // * A forked program can be forked again. ++ ++ static void child_callback_atfork() { ++ butil::get_leaky_singleton()->after_forked_as_child(); ++ } ++ ++ void create_sampling_thread() { ++ const int rc = pthread_create(&_tid, NULL, sampling_thread, this); ++ if (rc != 0) { ++ LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc); ++ } else { ++ _created = true; ++ pthread_atfork(NULL, NULL, child_callback_atfork); ++ } ++ } ++ ++ void after_forked_as_child() { ++ _created = false; ++ create_sampling_thread(); ++ } ++ + void run(); +- ++ + static void* sampling_thread(void* arg) { +- ((SamplerCollector*)arg)->run(); ++ static_cast(arg)->run(); + return NULL; + } + ++ static double get_cumulated_time(void* arg) { ++ return static_cast(arg)->_cumulated_time_us / 1000.0 / 1000.0; ++ } ++ + private: + bool _created; + bool _stop; ++ pid_t _created_pid; + int64_t _cumulated_time_us; + pthread_t _tid; + }; + ++PassiveStatus* g_cumulated_time_bvar = NULL; ++bvar::PerSecond >* g_sampling_thread_usage_bvar = NULL; ++ + void SamplerCollector::run() { +- butil::LinkNode root; +- int consecutive_nosleep = 0; + #ifndef UNIT_TEST +- PassiveStatus cumulated_time(get_cumulated_time, this); +- bvar::PerSecond > usage( +- "bvar_sampler_collector_usage", &cumulated_time, 10); ++ // NOTE: ++ // * Following vars can't be created on thread's stack since this thread ++ // may be adandoned at any time after forking. ++ // * They can't created inside the constructor of SamplerCollector as well, ++ // which results in deadlock. ++ if (g_cumulated_time_bvar == NULL) { ++ g_cumulated_time_bvar = ++ new PassiveStatus(get_cumulated_time, this); ++ } ++ if (g_sampling_thread_usage_bvar == NULL) { ++ g_sampling_thread_usage_bvar = ++ new bvar::PerSecond >( ++ "bvar_sampler_collector_usage", g_cumulated_time_bvar, 10); ++ } + #endif ++ ++ butil::LinkNode root; ++ int consecutive_nosleep = 0; + while (!_stop) { + int64_t abstime = butil::gettimeofday_us(); + Sampler* s = this->reset(); +diff --git a/docs/cn/http_client.md b/docs/cn/http_client.md +index 66d3292c..ebc55af6 100644 +--- a/docs/cn/http_client.md ++++ b/docs/cn/http_client.md +@@ -114,7 +114,7 @@ URL的一般形式如下图: + + 若用户没有填且URL中包含host,比如http://www.foo.com/path,则http request中会包含"Host: www.foo.com"。 + +-若用户没有填且URL不包含host,比如"/index.html?name=value",但如果Channel初始化的地址包含域名,则框架会以域名作为Host,比如"http://www.foo.com",该http server将会看到"Host: www.foo.com"。如果地址是"http://www.foo.com:8989",则该http server将会看到"Host: www.foo.com:8989"。 ++若用户没有填且URL不包含host,比如"/index.html?name=value",但如果Channel初始化的地址scheme为http(s)且包含域名,则框架会以域名作为Host,比如"http://www.foo.com",该http server将会看到"Host: www.foo.com"。如果地址是"http://www.foo.com:8989",则该http server将会看到"Host: www.foo.com:8989"。 + + 若用户没有填且URL不包含host,比如"/index.html?name=value",如果Channel初始化的地址也不包含域名,则框架会以目标server的ip和port为Host,地址为10.46.188.39:8989的http server将会看到"Host: 10.46.188.39:8989"。 + +diff --git a/docs/en/http_client.md b/docs/en/http_client.md +index a6e0e34a..f83478d5 100644 +--- a/docs/en/http_client.md ++++ b/docs/en/http_client.md +@@ -115,7 +115,7 @@ If user already sets `Host` header(case insensitive), framework makes no change. + + If user does not set `Host` header and the URL has host, for example http://www.foo.com/path, the http request contains "Host: www.foo.com". + +-If user does not set host header and the URL does not have host as well, for example "/index.html?name=value", but if the address initialized by the channel contains domain name. framework sets `Host` header with domain name of the target server. if this address is "http://www.foo.com", this http server should see `Host: www.foo.com`, if this address is "http://www.foo.com:8989", this http server should be see `Host: www.foo.com:8989`. ++If user does not set host header and the URL does not have host either, for example "/index.html?name=value", but if the channel is initlized by a http(s) address with valid domain name. framework sets `Host` header with domain name of the target server. if this address is "http://www.foo.com", this http server should see `Host: www.foo.com`, if this address is "http://www.foo.com:8989", this http server should be see `Host: www.foo.com:8989`. + + If user does not set host header and the URL does not have host as well, for example "/index.html?name=value", and the address initialized by the channel doesn't contain domain name. framework sets `Host` header with IP and port of the target server. A http server at 10.46.188.39:8989 should see `Host: 10.46.188.39:8989`. + +diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp +index 2de1de10..a94c09b5 100644 +--- a/src/brpc/channel.cpp ++++ b/src/brpc/channel.cpp +@@ -322,13 +322,12 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port, + if (InitChannelOptions(options) != 0) { + return -1; + } +- std::string scheme; + int* port_out = raw_port == -1 ? &raw_port: NULL; +- ParseURL(raw_server_address, &scheme, &_service_name, port_out); ++ ParseURL(raw_server_address, &_scheme, &_service_name, port_out); + if (raw_port != -1) { + _service_name.append(":").append(std::to_string(raw_port)); + } +- if (_options.protocol == brpc::PROTOCOL_HTTP && scheme == "https") { ++ if (_options.protocol == brpc::PROTOCOL_HTTP && _scheme == "https") { + if (_options.mutable_ssl_options()->sni_name.empty()) { + _options.mutable_ssl_options()->sni_name = _service_name; + } +@@ -363,13 +362,12 @@ int Channel::Init(const char* ns_url, + if (InitChannelOptions(options) != 0) { + return -1; + } +- std::string scheme; + int raw_port = -1; +- ParseURL(ns_url, &scheme, &_service_name, &raw_port); ++ ParseURL(ns_url, &_scheme, &_service_name, &raw_port); + if (raw_port != -1) { + _service_name.append(":").append(std::to_string(raw_port)); + } +- if (_options.protocol == brpc::PROTOCOL_HTTP && scheme == "https") { ++ if (_options.protocol == brpc::PROTOCOL_HTTP && _scheme == "https") { + if (_options.mutable_ssl_options()->sni_name.empty()) { + _options.mutable_ssl_options()->sni_name = _service_name; + } +@@ -429,7 +427,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, + CHECK(cntl->protocol_param().empty()); + cntl->protocol_param() = _options.protocol.param(); + } +- if (_options.protocol == brpc::PROTOCOL_HTTP) { ++ if (_options.protocol == brpc::PROTOCOL_HTTP && (_scheme == "https" || _scheme == "http")) { + URI& uri = cntl->http_request().uri(); + if (uri.host().empty() && !_service_name.empty()) { + uri.SetHostAndPort(_service_name); +diff --git a/src/brpc/channel.h b/src/brpc/channel.h +index 6600fcf2..8e888f45 100644 +--- a/src/brpc/channel.h ++++ b/src/brpc/channel.h +@@ -221,6 +221,7 @@ protected: + int raw_port = -1); + + std::string _service_name; ++ std::string _scheme; + butil::EndPoint _server_address; + SocketId _server_id; + Protocol::SerializeRequest _serialize_request; +diff --git a/src/bvar/latency_recorder.cpp b/src/bvar/latency_recorder.cpp +index ed914224..c6532958 100644 +--- a/src/bvar/latency_recorder.cpp ++++ b/src/bvar/latency_recorder.cpp +@@ -89,14 +89,23 @@ int CDF::describe_series( + return 0; + } + ++// Return random int value with expectation = `dval' ++static int64_t double_to_random_int(double dval) { ++ int64_t ival = static_cast(dval); ++ if (dval > ival + butil::fast_rand_double()) { ++ ival += 1; ++ } ++ return ival; ++} ++ + static int64_t get_window_recorder_qps(void* arg) { + detail::Sample s; +- static_cast(arg)->get_span(1, &s); ++ static_cast(arg)->get_span(&s); + // Use floating point to avoid overflow. + if (s.time_us <= 0) { + return 0; + } +- return static_cast(round(s.data.num * 1000000.0 / s.time_us)); ++ return double_to_random_int(s.data.num * 1000000.0 / s.time_us); + } + + static int64_t get_recorder_count(void* arg) { +@@ -176,7 +185,7 @@ int64_t LatencyRecorder::qps(time_t window_size) const { + if (s.time_us <= 0) { + return 0; + } +- return static_cast(round(s.data.num * 1000000.0 / s.time_us)); ++ return detail::double_to_random_int(s.data.num * 1000000.0 / s.time_us); + } + + int LatencyRecorder::expose(const butil::StringPiece& prefix1, +diff --git a/test/bvar_recorder_unittest.cpp b/test/bvar_recorder_unittest.cpp +index e1606b66..99f1ff1b 100644 +--- a/test/bvar_recorder_unittest.cpp ++++ b/test/bvar_recorder_unittest.cpp +@@ -206,4 +206,46 @@ TEST(RecorderTest, perf) { + << "ns per sample with " << ARRAY_SIZE(threads) + << " threads"; + } ++ ++TEST(RecorderTest, latency_recorder_qps_accuracy) { ++ bvar::LatencyRecorder lr1(2); // set windows size to 2s ++ bvar::LatencyRecorder lr2(2); ++ bvar::LatencyRecorder lr3(2); ++ bvar::LatencyRecorder lr4(2); ++ usleep(2000000); // wait sampler to sample 2 times ++ ++ auto write = [](bvar::LatencyRecorder& lr, int times) { ++ for (int i = 0; i < times; ++i) { ++ lr << 1; ++ } ++ }; ++ write(lr1, 100); ++ write(lr2, 101); ++ write(lr3, 3); ++ write(lr4, 1); ++ usleep(1000000); // wait sampler to sample 1 time ++ ++ auto read = [](bvar::LatencyRecorder& lr, double exp_qps, int window_size = 0) { ++ int64_t qps_sum = 0; ++ int64_t exp_qps_int = (int64_t)exp_qps; ++ for (int i = 0; i < 1000; ++i) { ++ int64_t qps = window_size ? lr.qps(window_size): lr.qps(); ++ EXPECT_GE(qps, exp_qps_int - 1); ++ EXPECT_LE(qps, exp_qps_int + 1); ++ qps_sum += qps; ++ } ++ double err = fabs(qps_sum / 1000.0 - exp_qps); ++ return err; ++ }; ++ ASSERT_GT(0.1, read(lr1, 100/2.0)); ++ ASSERT_GT(0.1, read(lr2, 101/2.0)); ++ ASSERT_GT(0.1, read(lr3, 3/2.0)); ++ ASSERT_GT(0.1, read(lr4, 1/2.0)); ++ ++ ASSERT_GT(0.1, read(lr1, 100/3.0, 3)); ++ ASSERT_GT(0.1, read(lr2, 101/3.0, 3)); ++ ASSERT_GT(0.1, read(lr3, 3/3.0, 3)); ++ ASSERT_GT(0.1, read(lr4, 1/3.0, 3)); ++} ++ + } // namespace