Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add row & bytes threshold config for CompactLog (#1997) #2005

Merged
merged 2 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/BackgroundService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ BackgroundService::BackgroundService(TMTContext & tmt_)
RegionTable & region_table = tmt.getRegionTable();
region_table.checkTableOptimize();
}
tmt.getKVStore()->gcRegionCache();
tmt.getKVStore()->gcRegionPersistedCache();
return false;
},
false);
Expand Down
56 changes: 39 additions & 17 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void KVStore::tryPersist(const RegionID region_id)
}
}

void KVStore::gcRegionCache(Seconds gc_persist_period)
void KVStore::gcRegionPersistedCache(Seconds gc_persist_period)
{
{
decltype(bg_gc_region_data) tmp;
Expand Down Expand Up @@ -254,7 +254,16 @@ void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt)
removeRegion(region_id, /* remove_data */ true, tmt.getRegionTable(), task_lock, region_manager.genRegionTaskLock(region_id));
}

void KVStore::setRegionCompactLogPeriod(UInt64 sec) { REGION_COMPACT_LOG_PERIOD = sec; }
void KVStore::setRegionCompactLogConfig(UInt64 sec, UInt64 rows, UInt64 bytes)
{
REGION_COMPACT_LOG_PERIOD = sec;
REGION_COMPACT_LOG_MIN_ROWS = rows;
REGION_COMPACT_LOG_MIN_BYTES = bytes;

LOG_INFO(log,
__FUNCTION__ << ": threshold config: "
<< "period " << sec << ", rows " << rows << ", bytes " << bytes);
}

void KVStore::persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller)
{
Expand All @@ -267,7 +276,6 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
raft_cmdpb::AdminCmdType cmd_type, UInt64 curr_region_id, UInt64 index, UInt64 term, TMTContext & tmt)
{
auto region_task_lock = region_manager.genRegionTaskLock(curr_region_id);
bool sync_log = true;
const RegionPtr curr_region_ptr = getRegion(curr_region_id);
if (curr_region_ptr == nullptr)
{
Expand All @@ -282,29 +290,43 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(

curr_region.handleWriteRaftCmd({}, index, term, tmt);

if (cmd_type != raft_cmdpb::AdminCmdType::CompactLog)
{
sync_log = false;
}
else
{
// use random period so that lots of regions will not be persisted at same time.
auto compact_log_period = std::rand() % REGION_COMPACT_LOG_PERIOD.load(std::memory_order_relaxed);
if (curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now())
const auto check_sync_log = [&]() {
if (cmd_type != raft_cmdpb::AdminCmdType::CompactLog)
{
sync_log = false;
LOG_DEBUG(log, curr_region.toString(false) << " ignore compact log cmd");
// ignore ComputeHash, VerifyHash or other useless cmd.
return false;
}
else
{
curr_region.markCompactLog();
auto [rows, size_bytes] = curr_region.getApproxMemCacheInfo();

LOG_DEBUG(log, curr_region.toString(false) << " approx mem cache info: rows " << rows << ", bytes " << size_bytes);

if (rows >= REGION_COMPACT_LOG_MIN_ROWS.load(std::memory_order_relaxed)
|| size_bytes >= REGION_COMPACT_LOG_MIN_BYTES.load(std::memory_order_relaxed))
{
// if rows or bytes more than threshold, flush cache and perist mem data.
return true;
}
else
{
// if thhere is little data in mem, wait until time interval reached threshold.
// use random period so that lots of regions will not be persisted at same time.
auto compact_log_period = std::rand() % REGION_COMPACT_LOG_PERIOD.load(std::memory_order_relaxed);
if (curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now())
return false;
else
return true;
}
}
}
};

if (sync_log)
if (check_sync_log())
{
tryFlushRegionCacheInStorage(tmt, curr_region, log);
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return EngineStoreApplyRes::Persist;
}
return EngineStoreApplyRes::None;
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class KVStore final : private boost::noncopyable

void traverseRegions(std::function<void(RegionID, const RegionPtr &)> && callback) const;

void gcRegionCache(Seconds gc_persist_period = REGION_CACHE_GC_PERIOD);
void gcRegionPersistedCache(Seconds gc_persist_period = REGION_CACHE_GC_PERIOD);

void tryPersist(const RegionID region_id);

Expand All @@ -90,7 +90,7 @@ class KVStore final : private boost::noncopyable
void handlePreApplySnapshot(const RegionPtrWrap &, TMTContext & tmt);

void handleDestroy(UInt64 region_id, TMTContext & tmt);
void setRegionCompactLogPeriod(UInt64);
void setRegionCompactLogConfig(UInt64, UInt64, UInt64);
EngineStoreApplyRes handleIngestSST(UInt64 region_id, const SSTViewVec, UInt64 index, UInt64 term, TMTContext & tmt);
RegionPtr genRegionPtr(metapb::Region && region, UInt64 peer_id, UInt64 index, UInt64 term);
const TiFlashRaftProxyHelper * getProxyHelper() const { return proxy_helper; }
Expand Down Expand Up @@ -159,6 +159,8 @@ class KVStore final : private boost::noncopyable
Logger * log;

std::atomic<UInt64> REGION_COMPACT_LOG_PERIOD;
std::atomic<UInt64> REGION_COMPACT_LOG_MIN_ROWS;
std::atomic<UInt64> REGION_COMPACT_LOG_MIN_BYTES;

mutable std::mutex bg_gc_region_data_mutex;
std::list<RegionDataReadInfoList> bg_gc_region_data;
Expand Down
21 changes: 18 additions & 3 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -680,23 +680,27 @@ EngineStoreApplyRes Region::handleWriteRaftCmd(const WriteCmdsView & cmds, UInt6
};

const auto handle_write_cmd_func = [&]() {
auto need_handle_write_cf = false;
size_t cmd_write_cf_cnt = 0, cache_written_size = 0;
auto ori_cache_size = dataSize();
for (UInt64 i = 0; i < cmds.len; ++i)
{
if (cmds.cmd_cf[i] == ColumnFamilyType::Write)
need_handle_write_cf = true;
cmd_write_cf_cnt++;
else
handle_by_index_func(i);
}

if (need_handle_write_cf)
if (cmd_write_cf_cnt)
{
for (UInt64 i = 0; i < cmds.len; ++i)
{
if (cmds.cmd_cf[i] == ColumnFamilyType::Write)
handle_by_index_func(i);
}
}
cache_written_size = dataSize() - ori_cache_size;
approx_mem_cache_rows += cmd_write_cf_cnt;
approx_mem_cache_bytes += cache_written_size;
};

{
Expand Down Expand Up @@ -821,4 +825,15 @@ UInt64 RegionRaftCommandDelegate::appliedIndex() { return meta.makeRaftCommandDe
metapb::Region Region::getMetaRegion() const { return meta.getMetaRegion(); }
raft_serverpb::MergeState Region::getMergeState() const { return meta.getMergeState(); }

std::pair<size_t, size_t> Region::getApproxMemCacheInfo() const
{
return {approx_mem_cache_rows.load(std::memory_order_relaxed), approx_mem_cache_bytes.load(std::memory_order_relaxed)};
}

void Region::cleanApproxMemCacheInfo() const
{
approx_mem_cache_rows = 0;
approx_mem_cache_bytes = 0;
}

} // namespace DB
9 changes: 7 additions & 2 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ class Region : public std::enable_shared_from_this<Region>

UInt64 getSnapshotEventFlag() const { return snapshot_event_flag; }

/// get approx rows, bytes info about mem cache.
std::pair<size_t, size_t> getApproxMemCacheInfo() const;
void cleanApproxMemCacheInfo() const;

private:
Region() = delete;
friend class RegionRaftCommandDelegate;
Expand All @@ -201,14 +205,15 @@ class Region : public std::enable_shared_from_this<Region>

RegionMeta meta;

mutable std::atomic<Timepoint> last_compact_log_time = Timepoint::min();

Logger * log;

const TableID mapped_table_id;

std::atomic<UInt64> snapshot_event_flag{1};
const TiFlashRaftProxyHelper * proxy_helper{nullptr};
mutable std::atomic<Timepoint> last_compact_log_time = Timepoint::min();
mutable std::atomic<size_t> approx_mem_cache_rows{0};
mutable std::atomic<size_t> approx_mem_cache_bytes{0};
};

class RegionRaftCommandDelegate : public Region, private boost::noncopyable
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,16 @@ void TMTContext::reloadConfig(const Poco::Util::AbstractConfiguration & config)
{
static constexpr const char * TABLE_OVERLAP_THRESHOLD = "flash.overlap_threshold";
static constexpr const char * COMPACT_LOG_MIN_PERIOD = "flash.compact_log_min_period";
static constexpr const char * COMPACT_LOG_MIN_ROWS = "flash.compact_log_min_rows";
static constexpr const char * COMPACT_LOG_MIN_BYTES = "flash.compact_log_min_bytes";
static constexpr const char * REPLICA_READ_MAX_THREAD = "flash.replica_read_max_thread";


getRegionTable().setTableCheckerThreshold(config.getDouble(TABLE_OVERLAP_THRESHOLD, 0.6));
getKVStore()->setRegionCompactLogPeriod(std::max(config.getUInt64(COMPACT_LOG_MIN_PERIOD, 120), 1));
// default config about compact-log: period 120s, rows 40k, bytes 32MB.
getKVStore()->setRegionCompactLogConfig(std::max(config.getUInt64(COMPACT_LOG_MIN_PERIOD, 120), 1),
std::max(config.getUInt64(COMPACT_LOG_MIN_ROWS, 40 * 1024), 1),
std::max(config.getUInt64(COMPACT_LOG_MIN_BYTES, 32 * 1024 * 1024), 1));
replica_read_max_thread = std::max(config.getUInt64(REPLICA_READ_MAX_THREAD, 1), 1);
}

Expand Down