Skip to content

Commit

Permalink
slow down select half open instance
Browse files Browse the repository at this point in the history
  • Loading branch information
lambdaliu committed Oct 5, 2021
1 parent 5f960ea commit 695b48d
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 25 deletions.
2 changes: 1 addition & 1 deletion polaris/engine/circuit_breaker_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void CircuitBreakerExecutor::TimingCircuitBreak(CircuitBreakerExecutor* executor
all_service_contexts.clear();
// 设置定时任务
executor->reactor_.AddTimingTask(
new TimingFuncTask<CircuitBreakerExecutor>(TimingCircuitBreak, executor, 500));
new TimingFuncTask<CircuitBreakerExecutor>(TimingCircuitBreak, executor, 1000));
}

} // namespace polaris
14 changes: 14 additions & 0 deletions polaris/model/model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ void Service::SetCircuitBreakerData(const CircuitBreakerData& circuit_breaker_da
impl_->have_half_open_data_ = true;
} else {
impl_->have_half_open_data_ = false;
impl_->try_half_open_count_ = 20;
}
}

Expand All @@ -1020,6 +1021,19 @@ ReturnCode Service::TryChooseHalfOpenInstance(std::set<Instance*>& instances, In
if (!impl_->have_half_open_data_ || instances.empty()) {
return kReturnInstanceNotFound;
}
// 控制释放半开节点的评论,有半开节点以后每20个请求
// 且距离上次释放半开节点超过2s就释放1个半开请求
if (++impl_->try_half_open_count_ < 20) {
return kReturnInstanceNotFound; // 距离上次释放不足20个正常请求
}
uint64_t last_half_open_time = impl_->last_half_open_time_.Load();
uint64_t current_time = Time::GetCurrentTimeMs();
if (current_time < last_half_open_time + 2000 ||
!impl_->last_half_open_time_.Cas(last_half_open_time, current_time)) {
return kReturnInstanceNotFound; // 距离上一次释放半开间隔不足2s
}
impl_->try_half_open_count_ = 0;

std::set<Instance*>::iterator split_it = instances.begin();
std::advance(split_it, rand() % instances.size());
std::set<Instance*>::iterator instance_it;
Expand Down
2 changes: 2 additions & 0 deletions polaris/model/model_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ class ServiceImpl {

// 半开优先分配数据
sync::Mutex half_open_lock_;
sync::Atomic<uint64_t> last_half_open_time_;
sync::Atomic<int> try_half_open_count_;
bool have_half_open_data_;
std::map<std::string, int> half_open_data_; // 存储半开分配数据

Expand Down
2 changes: 1 addition & 1 deletion polaris/plugin/circuit_breaker/circuit_breaker.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ static const char kChainEnableKey[] = "enable";
static const bool kChainEnableDefault = true;

static const char kChainCheckPeriodKey[] = "checkPeriod";
static const uint64_t kChainCheckPeriodDefault = 500;
static const uint64_t kChainCheckPeriodDefault = 1000;

// 用于传入是否开启探测插件
static const char kDetectorDisableKey[] = "detectorDisable";
Expand Down
2 changes: 1 addition & 1 deletion polaris/plugin/circuit_breaker/error_count.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ ReturnCode ErrorCountCircuitBreaker::TimingCircuitBreak(
error_count_status.error_count = 0;
error_count_status.last_update_time = current_time;
}
} else if (error_count_status.last_update_time + 20 * sleep_window_ <= current_time) {
} else if (error_count_status.last_access_time + 100 * sleep_window_ <= current_time) {
// 兜底:如果访问量一定时间达不到要求,则重新熔断
if (instances_status->TranslateStatus(it->first, kCircuitBreakerHalfOpen,
kCircuitBreakerOpen)) {
Expand Down
3 changes: 0 additions & 3 deletions polaris/plugin/circuit_breaker/error_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@

namespace polaris {

class Config;
class Context;

struct ErrorCountStatus {
CircuitBreakerStatus status;
int error_count;
Expand Down
2 changes: 1 addition & 1 deletion polaris/plugin/circuit_breaker/error_rate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ ReturnCode ErrorRateCircuitBreaker::TimingCircuitBreak(
error_rate_status.ClearBuckets(metric_num_buckets_);
}
} else if (err_req > request_count_after_half_open_ - success_count_after_half_open_ ||
error_rate_status.last_update_time + 20 * sleep_window_ <= current_time) {
error_rate_status.last_access_time + 100 * sleep_window_ <= current_time) {
// 达到重新熔断条件
if (instances_status->TranslateStatus(it->first, kCircuitBreakerHalfOpen,
kCircuitBreakerOpen)) {
Expand Down
3 changes: 0 additions & 3 deletions polaris/plugin/circuit_breaker/error_rate.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@

namespace polaris {

class Config;
class Context;

struct ErrorRateBucket {
ErrorRateBucket() : total_count(0), error_count(0), bucket_time(0) {}
int total_count;
Expand Down
53 changes: 38 additions & 15 deletions test/model/model_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "mock/fake_server_response.h"
#include "model/model_impl.h"
#include "polaris/plugin.h"
#include "test_utils.h"
#include "utils/string_utils.h"

namespace polaris {
Expand All @@ -29,9 +30,10 @@ class ModelTest : public ::testing::Test {
void SetUp() {
service_key_.namespace_ = "test_namespace";
service_key_.name_ = "test_service";
TestUtils::SetUpFakeTime();
}

void TearDown() {}
void TearDown() { TestUtils::TearDownFakeTime(); }

protected:
ServiceKey service_key_;
Expand All @@ -52,26 +54,39 @@ TEST_F(ModelTest, TryChooseHalfOpenInstance) {
service->SetCircuitBreakerData(circuit_breaker_data);

Instance *instance = NULL;
ASSERT_EQ(service->TryChooseHalfOpenInstance(service_instances, instance), kReturnOk);
ASSERT_TRUE(instance != NULL);
ASSERT_EQ(instance->GetId(), "instance_0");
instance = NULL;
ASSERT_EQ(service->TryChooseHalfOpenInstance(service_instances, instance),
kReturnInstanceNotFound);
ASSERT_TRUE(instance == NULL);
for (int i = 1; i <= 60; i++) {
instance = NULL;
ReturnCode ret = service->TryChooseHalfOpenInstance(service_instances, instance);
if (i == 20) {
ASSERT_EQ(ret, kReturnOk) << i;
ASSERT_TRUE(instance != NULL);
ASSERT_EQ(instance->GetId(), "instance_0");
} else {
ASSERT_EQ(ret, kReturnInstanceNotFound);
ASSERT_TRUE(instance == NULL);
}
TestUtils::FakeNowIncrement(1500);
}

circuit_breaker_data.version = 2;
circuit_breaker_data.half_open_instances["instance_0"] = 1;
circuit_breaker_data.half_open_instances["instance_x"] = 2;
circuit_breaker_data.half_open_instances["instance_1"] = 5;
service->SetCircuitBreakerData(circuit_breaker_data);
for (int i = 0; i < 5; i++) {
for (int i = 1; i <= 100; i++) {
instance = NULL;
ASSERT_EQ(service->TryChooseHalfOpenInstance(service_instances, instance), kReturnOk);
ASSERT_TRUE(instance != NULL);
ASSERT_EQ(instance->GetId(), "instance_1");
TestUtils::FakeNowIncrement(1500);
ReturnCode ret = service->TryChooseHalfOpenInstance(service_instances, instance);
if (i % 20 == 0) {
ASSERT_EQ(ret, kReturnOk) << i;
ASSERT_TRUE(instance != NULL);
ASSERT_EQ(instance->GetId(), "instance_1");
} else {
ASSERT_EQ(ret, kReturnInstanceNotFound);
}
}
instance = NULL;
TestUtils::FakeNowIncrement(10000);
ASSERT_EQ(service->TryChooseHalfOpenInstance(service_instances, instance),
kReturnInstanceNotFound);
ASSERT_TRUE(instance == NULL);
Expand All @@ -89,6 +104,8 @@ TEST_F(ModelTest, TryChooseHalfOpenInstanceRand) {
Instance *instance;
CircuitBreakerData circuit_breaker_data;
circuit_breaker_data.version = 1;
service->SetCircuitBreakerData(circuit_breaker_data);
circuit_breaker_data.version = 2;
for (int i = 0; i < 10; i++) {
std::string instance_id = "instance_" + StringUtils::TypeToStr<int>(i);
instance = new Instance(instance_id, "host", 8000, 100);
Expand All @@ -98,9 +115,15 @@ TEST_F(ModelTest, TryChooseHalfOpenInstanceRand) {
service->SetCircuitBreakerData(circuit_breaker_data);

std::set<Instance *> select_instance;
for (int i = 0; i < 10; i++) {
ASSERT_EQ(service->TryChooseHalfOpenInstance(instances, instance), kReturnOk);
select_instance.insert(instance);
for (int i = 0; i < 200; i++) {
TestUtils::FakeNowIncrement(50);
ReturnCode ret_code = service->TryChooseHalfOpenInstance(instances, instance);
if (i % 40 == 0) {
ASSERT_EQ(ret_code, kReturnOk) << i;
select_instance.insert(instance);
} else {
ASSERT_EQ(ret_code, kReturnInstanceNotFound) << i;
}
}
ASSERT_GT(select_instance.size(), 1);

Expand Down

0 comments on commit 695b48d

Please sign in to comment.