Skip to content

Commit

Permalink
Fixes #66.
Browse files Browse the repository at this point in the history
1. Introduce `hz` flag to control the frequency of periodically running tasks.
2. Update helio dependency.
  • Loading branch information
romange committed Jun 1, 2022
1 parent 33af405 commit d5e0a8a
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 11 deletions.
2 changes: 1 addition & 1 deletion helio
17 changes: 13 additions & 4 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@ extern "C" {
using namespace std;

ABSL_FLAG(string, backing_prefix, "", "");

ABSL_FLAG(uint32_t, hz, 1000,
"Base frequency at which the server updates its expiry clock "
"and performs other background tasks. Warning: not advised to decrease in production, "
"because it can affect expiry precision for PSETEX etc.");

ABSL_DECLARE_FLAG(bool, cache_mode);

namespace dfly {

using namespace util;
namespace this_fiber = ::boost::this_fiber;
namespace fibers = ::boost::fibers;
using absl::GetFlag;

namespace {

Expand All @@ -48,16 +55,18 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)

EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
db_slice_(pb->GetIndex(), absl::GetFlag(FLAGS_cache_mode), this) {
db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) {
fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] {
this_fiber::properties<FiberProps>().set_name(absl::StrCat("shard_queue", index));
queue_.Run();
});

if (update_db_time) {
constexpr uint32_t kClockCycleMs = 1;
uint32_t clock_cycle_ms = 1000 / std::max<uint32_t>(1, GetFlag(FLAGS_hz));
if (clock_cycle_ms == 0)
clock_cycle_ms = 1;

periodic_task_ = pb->AddPeriodic(kClockCycleMs, [this] { Heartbeat(); });
periodic_task_ = pb->AddPeriodic(clock_cycle_ms, [this] { Heartbeat(); });
}

tmp_str1 = sdsempty();
Expand Down Expand Up @@ -92,7 +101,7 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
CompactObj::InitThreadLocal(shard_->memory_resource());
SmallString::InitThreadLocal(data_heap);

string backing_prefix = absl::GetFlag(FLAGS_backing_prefix);
string backing_prefix = GetFlag(FLAGS_backing_prefix);
if (!backing_prefix.empty()) {
string fn =
absl::StrCat(backing_prefix, "-", absl::Dec(pb->GetIndex(), absl::kZeroPad4), ".ssd");
Expand Down
14 changes: 9 additions & 5 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ extern "C" {
#include "server/replica.h"
#include "server/script_mgr.h"
#include "server/server_state.h"
#include "server/version.h"
#include "server/tiered_storage.h"
#include "server/transaction.h"
#include "server/version.h"
#include "strings/human_readable.h"
#include "util/accept_server.h"
#include "util/uring/uring_file.h"
Expand All @@ -48,6 +48,7 @@ ABSL_FLAG(string, requirepass, "", "password for AUTH authentication");

ABSL_DECLARE_FLAG(uint32_t, port);
ABSL_DECLARE_FLAG(bool, cache_mode);
ABSL_DECLARE_FLAG(uint32_t, hz);

extern "C" mi_stats_t _mi_stats_main;

Expand All @@ -56,10 +57,10 @@ namespace dfly {
using namespace util;
namespace fibers = ::boost::fibers;
namespace fs = std::filesystem;
using absl::GetFlag;
using absl::StrCat;
using facade::MCReplyBuilder;
using strings::HumanReadableNumBytes;
using absl::GetFlag;

namespace {

Expand Down Expand Up @@ -170,7 +171,10 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m
used_mem_peak.store(sum, memory_order_relaxed);
};

task_10ms_ = pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(10, cache_cb); });
uint32_t cache_hz = max(GetFlag(FLAGS_hz) / 10, 1u);
uint32_t period_ms = max(1u, 1000 / cache_hz);
stats_caching_task_ =
pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); });

fs::path data_folder = fs::current_path();
const auto& dir = GetFlag(FLAGS_dir);
Expand All @@ -197,8 +201,8 @@ void ServerFamily::Shutdown() {
load_fiber_.join();

pb_task_->Await([this] {
pb_task_->CancelPeriodic(task_10ms_);
task_10ms_ = 0;
pb_task_->CancelPeriodic(stats_caching_task_);
stats_caching_task_ = 0;

unique_lock lk(replicaof_mu_);
if (replica_) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class ServerFamily {

boost::fibers::fiber load_fiber_;

uint32_t task_10ms_ = 0;
uint32_t stats_caching_task_ = 0;
Service& service_;

util::AcceptServer* acceptor_ = nullptr;
Expand Down

0 comments on commit d5e0a8a

Please sign in to comment.