Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport bvar fork fixes #2960

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ git_repository(
"//:thirdparties/brpc/brpc.patch",
"//:thirdparties/brpc/fix-gcc11.patch",
"//:thirdparties/brpc/0001-bvar-warning-on-conflict-bvar-name.patch",
"//:thirdparties/brpc/0002-Support-fork-without-exec.patch",
"//:thirdparties/brpc/0003-Add-docs-on-fork-w-o-exec.patch",
"//:thirdparties/brpc/0004-not-register-pthread_atfork-in-child-process.patch",
"//:thirdparties/brpc/0005-Fix-LatencyRecorder-qps-not-accurate.patch",
"//:thirdparties/brpc/0006-fix-1973-1863.patch",
],
patch_args = ["-p1"],
)
Expand Down
1 change: 0 additions & 1 deletion curvefs/src/metaserver/copyset/concurrent_apply_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
* Author: Xinlong-Chen
*/


#include "curvefs/src/metaserver/copyset/concurrent_apply_queue.h"

#include <algorithm>
Expand Down
120 changes: 120 additions & 0 deletions thirdparties/brpc/0002-Support-fork-without-exec.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
From 2d88cfa7b52ed6553f1884a669a2b9bc17aff656 Mon Sep 17 00:00:00 2001
From: jamesge <[email protected]>
Date: Fri, 15 Nov 2019 01:06:03 -0800
Subject: [PATCH 1/5] Support fork without exec

---
src/bvar/detail/sampler.cpp | 72 ++++++++++++++++++++++++++++---------
1 file changed, 56 insertions(+), 16 deletions(-)

diff --git a/src/bvar/detail/sampler.cpp b/src/bvar/detail/sampler.cpp
index 1de80970..23cfbd8b 100644
--- a/src/bvar/detail/sampler.cpp
+++ b/src/bvar/detail/sampler.cpp
@@ -54,13 +54,11 @@ struct CombineSampler {
// deletion is taken place in the thread as well.
class SamplerCollector : public bvar::Reducer<Sampler*, CombineSampler> {
public:
- SamplerCollector() : _created(false), _stop(false), _cumulated_time_us(0) {
- int rc = pthread_create(&_tid, NULL, sampling_thread, this);
- if (rc != 0) {
- LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc);
- } else {
- _created = true;
- }
+ SamplerCollector()
+ : _created(false)
+ , _stop(false)
+ , _cumulated_time_us(0) {
+ create_sampling_thread();
}
~SamplerCollector() {
if (_created) {
@@ -70,33 +68,75 @@ public:
}
}

- static double get_cumulated_time(void* arg) {
- return ((SamplerCollector*)arg)->_cumulated_time_us / 1000.0 / 1000.0;
+private:
+ // Support for fork:
+ // * The singleton can be null before forking, the child callback will not
+ // be registered.
+ // * If the singleton is not null before forking, the child callback will
+ // be registered and the sampling thread will be re-created.
+ // * A forked program can be forked again.
+
+ static void child_callback_atfork() {
+ butil::get_leaky_singleton<SamplerCollector>()->after_forked_as_child();
+ }
+
+ void create_sampling_thread() {
+ const int rc = pthread_create(&_tid, NULL, sampling_thread, this);
+ if (rc != 0) {
+ LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc);
+ } else {
+ _created = true;
+ pthread_atfork(NULL, NULL, child_callback_atfork);
+ }
+ }
+
+ void after_forked_as_child() {
+ _created = false;
+ create_sampling_thread();
}

-private:
void run();

static void* sampling_thread(void* arg) {
- ((SamplerCollector*)arg)->run();
+ static_cast<SamplerCollector*>(arg)->run();
return NULL;
}

+ static double get_cumulated_time(void* arg) {
+ return static_cast<SamplerCollector*>(arg)->_cumulated_time_us / 1000.0 / 1000.0;
+ }
+
private:
bool _created;
bool _stop;
+ pid_t _created_pid;
int64_t _cumulated_time_us;
pthread_t _tid;
};

+PassiveStatus<double>* g_cumulated_time_bvar = NULL;
+bvar::PerSecond<bvar::PassiveStatus<double> >* g_sampling_thread_usage_bvar = NULL;
+
void SamplerCollector::run() {
- butil::LinkNode<Sampler> root;
- int consecutive_nosleep = 0;
#ifndef UNIT_TEST
- // PassiveStatus<double> cumulated_time(get_cumulated_time, this);
- // bvar::PerSecond<bvar::PassiveStatus<double> > usage(
- // "bvar_sampler_collector_usage", &cumulated_time, 10);
+ // NOTE:
+ // * Following vars can't be created on thread's stack since this thread
+ // may be adandoned at any time after forking.
+ // * They can't created inside the constructor of SamplerCollector as well,
+ // which results in deadlock.
+ if (g_cumulated_time_bvar == NULL) {
+ g_cumulated_time_bvar =
+ new PassiveStatus<double>(get_cumulated_time, this);
+ }
+ if (g_sampling_thread_usage_bvar == NULL) {
+ g_sampling_thread_usage_bvar =
+ new bvar::PerSecond<bvar::PassiveStatus<double> >(
+ "bvar_sampler_collector_usage", g_cumulated_time_bvar, 10);
+ }
#endif
+
+ butil::LinkNode<Sampler> root;
+ int consecutive_nosleep = 0;
while (!_stop) {
int64_t abstime = butil::gettimeofday_us();
Sampler* s = this->reset();
--
2.37.2

57 changes: 57 additions & 0 deletions thirdparties/brpc/0003-Add-docs-on-fork-w-o-exec.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
From 0a200eebce9abad390342d880bb446099bcfd1c3 Mon Sep 17 00:00:00 2001
From: Ge Jun <[email protected]>
Date: Fri, 15 Nov 2019 10:18:06 +0000
Subject: [PATCH 2/5] Add docs on fork w/o exec

---
docs/cn/server.md | 11 +++++++++++
docs/en/server.md | 11 +++++++++++
2 files changed, 22 insertions(+)

diff --git a/docs/cn/server.md b/docs/cn/server.md
index a2262c6d..9f519871 100644
--- a/docs/cn/server.md
+++ b/docs/cn/server.md
@@ -341,6 +341,17 @@ server端会自动尝试其支持的协议,无需用户指定。`cntl->protoco

如果你有更多的协议需求,可以联系我们。

+# fork without exec
+一般来说,[fork](https://linux.die.net/man/3/fork)出的子进程应尽快调用[exec](https://linux.die.net/man/3/exec)以重置所有状态,中间只应调用满足async-signal-safe的函数。这么使用fork的brpc程序在之前的版本也不会有问题。
+
+但在一些场景中,用户想直接运行fork出的子进程,而不调用exec。由于fork只复制其调用者的线程,其余线程便随之消失了。对应到brpc中,bvar会依赖一个sampling_thread采样各种信息,在fork后便消失了,现象是很多bvar归零。
+
+最新版本的brpc会在fork后重建这个线程(如有必要),从而使bvar在fork后能正常工作,再次fork也可以。已知问题是fork后cpu profiler不正常。然而,这并不意味着用户可随意地fork,不管是brpc还是上层应用都会大量地创建线程,它们在fork后不会被重建,因为:
+* 大部分fork会紧接exec,浪费了重建
+* 给代码编写带来很多的麻烦和复杂度
+
+brpc的策略是按需创建这类线程,同时fork without exec必须发生在所有可能创建这些线程的代码前。具体地说,至少**发生在初始化所有Server/Channel/应用代码前**,越早越好,不遵守这个约定的fork会导致程序不正常。另外,不支持fork without exec的lib相当普遍,最好还是避免这种用法。
+
# 设置

## 版本
diff --git a/docs/en/server.md b/docs/en/server.md
index d604c1fe..f28fd96f 100644
--- a/docs/en/server.md
+++ b/docs/en/server.md
@@ -344,6 +344,17 @@ Server detects supported protocols automatically, without assignment from users.

If you need more protocols, contact us.

+# fork without exec
+In general, [forked](https://linux.die.net/man/3/fork) subprocess should call [exec](https://linux.die.net/man/3/exec) ASAP, before which only async-signal-safe functions should be called. brpc programs using fork like this should work correctly even in previous versions.
+
+But in some scenarios, users continue the subprocess without exec. Since fork only copies its caller's thread, which causes other threads to disappear after fork. In the case of brpc, bvar depends on a sampling_thread to sample various information, which disappears after fork and causes many bvars to be zeros.
+
+Latest brpc re-creates the thread after fork(when necessary) to make bvar work correctly, and can be forked again. A known problem is that the cpu profiler does not work after fork. However users still can't call fork at any time, since brpc and its applications create threads extensively, which are not re-created after fork:
+* most fork continues with exec, which wastes re-creations
+* bring too many troubles and complexities to the code
+
+brpc's strategy is to create these threads on demand and fork without exec should happen before all code that may create the threads. Specifically, **fork without exec should happen before initializing all Servers/Channels/Applications, earlier is better. fork not obeying this causes the program dysfunctional. BTW, fork without exec better be avoided because many libraries do not support it.
+
# Settings

## Version
--
2.37.2

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
From 7077d2c4d71bfa9699b738252fe06a468a4eca34 Mon Sep 17 00:00:00 2001
From: jamesge <[email protected]>
Date: Wed, 18 Mar 2020 17:34:09 +0800
Subject: [PATCH 3/5] not register pthread_atfork in child process

---
src/bvar/detail/sampler.cpp | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/src/bvar/detail/sampler.cpp b/src/bvar/detail/sampler.cpp
index 23cfbd8b..06d5cdbd 100644
--- a/src/bvar/detail/sampler.cpp
+++ b/src/bvar/detail/sampler.cpp
@@ -41,6 +41,10 @@ struct CombineSampler {
}
};

+// True iff pthread_atfork was called. The callback to atfork works for child
+// of child as well, no need to register in the child again.
+static bool registered_atfork = false;
+
// Call take_sample() of all scheduled samplers.
// This can be done with regular timer thread, but it's way too slow(global
// contention + log(N) heap manipulations). We need it to be super fast so that
@@ -86,7 +90,10 @@ private:
LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc);
} else {
_created = true;
- pthread_atfork(NULL, NULL, child_callback_atfork);
+ if (!registered_atfork) {
+ registered_atfork = true;
+ pthread_atfork(NULL, NULL, child_callback_atfork);
+ }
}
}

--
2.37.2

103 changes: 103 additions & 0 deletions thirdparties/brpc/0005-Fix-LatencyRecorder-qps-not-accurate.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
From 1e866ba4f2ea633a15e0bcaebe0f84f7b0fcc1c2 Mon Sep 17 00:00:00 2001
From: wangweibing <[email protected]>
Date: Wed, 2 Mar 2022 04:26:51 +0000
Subject: [PATCH 4/5] Fix LatencyRecorder qps not accurate

---
src/bvar/latency_recorder.cpp | 15 +++++++++---
test/bvar_recorder_unittest.cpp | 42 +++++++++++++++++++++++++++++++++
2 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/src/bvar/latency_recorder.cpp b/src/bvar/latency_recorder.cpp
index 7a27e170..fe8c776e 100644
--- a/src/bvar/latency_recorder.cpp
+++ b/src/bvar/latency_recorder.cpp
@@ -87,14 +87,23 @@ int CDF::describe_series(
return 0;
}

+// Return random int value with expectation = `dval'
+static int64_t double_to_random_int(double dval) {
+ int64_t ival = static_cast<int64_t>(dval);
+ if (dval > ival + butil::fast_rand_double()) {
+ ival += 1;
+ }
+ return ival;
+}
+
static int64_t get_window_recorder_qps(void* arg) {
detail::Sample<Stat> s;
- static_cast<RecorderWindow*>(arg)->get_span(1, &s);
+ static_cast<RecorderWindow*>(arg)->get_span(&s);
// Use floating point to avoid overflow.
if (s.time_us <= 0) {
return 0;
}
- return static_cast<int64_t>(round(s.data.num * 1000000.0 / s.time_us));
+ return double_to_random_int(s.data.num * 1000000.0 / s.time_us);
}

static int64_t get_recorder_count(void* arg) {
@@ -174,7 +183,7 @@ int64_t LatencyRecorder::qps(time_t window_size) const {
if (s.time_us <= 0) {
return 0;
}
- return static_cast<int64_t>(round(s.data.num * 1000000.0 / s.time_us));
+ return detail::double_to_random_int(s.data.num * 1000000.0 / s.time_us);
}

int LatencyRecorder::expose(const butil::StringPiece& prefix1,
diff --git a/test/bvar_recorder_unittest.cpp b/test/bvar_recorder_unittest.cpp
index 412ec36c..823e88b2 100644
--- a/test/bvar_recorder_unittest.cpp
+++ b/test/bvar_recorder_unittest.cpp
@@ -192,4 +192,46 @@ TEST(RecorderTest, perf) {
<< "ns per sample with " << ARRAY_SIZE(threads)
<< " threads";
}
+
+TEST(RecorderTest, latency_recorder_qps_accuracy) {
+ bvar::LatencyRecorder lr1(2); // set windows size to 2s
+ bvar::LatencyRecorder lr2(2);
+ bvar::LatencyRecorder lr3(2);
+ bvar::LatencyRecorder lr4(2);
+ usleep(2000000); // wait sampler to sample 2 times
+
+ auto write = [](bvar::LatencyRecorder& lr, int times) {
+ for (int i = 0; i < times; ++i) {
+ lr << 1;
+ }
+ };
+ write(lr1, 100);
+ write(lr2, 101);
+ write(lr3, 3);
+ write(lr4, 1);
+ usleep(1000000); // wait sampler to sample 1 time
+
+ auto read = [](bvar::LatencyRecorder& lr, double exp_qps, int window_size = 0) {
+ int64_t qps_sum = 0;
+ int64_t exp_qps_int = (int64_t)exp_qps;
+ for (int i = 0; i < 1000; ++i) {
+ int64_t qps = window_size ? lr.qps(window_size): lr.qps();
+ EXPECT_GE(qps, exp_qps_int - 1);
+ EXPECT_LE(qps, exp_qps_int + 1);
+ qps_sum += qps;
+ }
+ double err = fabs(qps_sum / 1000.0 - exp_qps);
+ return err;
+ };
+ ASSERT_GT(0.1, read(lr1, 100/2.0));
+ ASSERT_GT(0.1, read(lr2, 101/2.0));
+ ASSERT_GT(0.1, read(lr3, 3/2.0));
+ ASSERT_GT(0.1, read(lr4, 1/2.0));
+
+ ASSERT_GT(0.1, read(lr1, 100/3.0, 3));
+ ASSERT_GT(0.1, read(lr2, 101/3.0, 3));
+ ASSERT_GT(0.1, read(lr3, 3/3.0, 3));
+ ASSERT_GT(0.1, read(lr4, 1/3.0, 3));
+}
+
} // namespace
--
2.37.2

37 changes: 37 additions & 0 deletions thirdparties/brpc/0006-fix-1973-1863.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
From aa9987efe7ce1d13969f03194d8894060a6d58b2 Mon Sep 17 00:00:00 2001
From: HU <[email protected]>
Date: Mon, 1 Aug 2022 10:15:44 +0800
Subject: [PATCH 5/5] fix #1973 (#1863)

Co-authored-by: XiguoHu <[email protected]>
---
src/bvar/detail/sampler.cpp | 5 +++++
1 file changed, 5 insertions(+)

diff --git a/src/bvar/detail/sampler.cpp b/src/bvar/detail/sampler.cpp
index 06d5cdbd..7baf20b7 100644
--- a/src/bvar/detail/sampler.cpp
+++ b/src/bvar/detail/sampler.cpp
@@ -15,6 +15,7 @@
// Author: Ge,Jun ([email protected])
// Date: Tue Jul 28 18:14:40 CST 2015

+#include <gflags/gflags.h>
#include "butil/time.h"
#include "butil/memory/singleton_on_pthread_once.h"
#include "bvar/reducer.h"
@@ -124,7 +125,11 @@ private:
PassiveStatus<double>* g_cumulated_time_bvar = NULL;
bvar::PerSecond<bvar::PassiveStatus<double> >* g_sampling_thread_usage_bvar = NULL;

+DEFINE_int32(bvar_sampler_thread_start_delay_us, 10000, "bvar sampler thread start delay us");
+
void SamplerCollector::run() {
+ ::usleep(FLAGS_bvar_sampler_thread_start_delay_us);
+
#ifndef UNIT_TEST
// NOTE:
// * Following vars can't be created on thread's stack since this thread
--
2.37.2