Skip to content

Commit

Permalink
Integrate some functions in KVStore and rename them to what they actu…
Browse files Browse the repository at this point in the history
…ally do (#7402)

ref #7256
  • Loading branch information
CalvinNeo authored Apr 28, 2023
1 parent 90784b1 commit c154ad2
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 42 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void dbgFuncTryFlushRegion(Context & context, const ASTs & args, DBGInvoker::Pri
auto region_id = static_cast<RegionID>(safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[0]).value));

TMTContext & tmt = context.getTMTContext();
tmt.getRegionTable().tryFlushRegion(region_id);
tmt.getRegionTable().tryWriteBlockByRegionAndFlush(region_id);

output(fmt::format("region_table try flush region {}", region_id));
}
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWrap & new_region,
// engine may delete data unsafely.
auto region_lock = region_manager.genRegionTaskLock(old_region->id());
old_region->setStateApplying();
tmt.getRegionTable().tryFlushRegion(old_region, false);
tmt.getRegionTable().tryWriteBlockByRegionAndFlush(old_region, false);
tryFlushRegionCacheInStorage(tmt, *old_region, log);
persistRegion(*old_region, region_lock, "save previous region before apply");
persistRegion(*old_region, &region_lock, "save previous region before apply");
}
}

Expand Down Expand Up @@ -209,7 +209,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re
{
try
{
auto tmp = region_table.tryFlushRegion(new_region_wrap, false);
auto tmp = region_table.tryWriteBlockByRegionAndFlush(new_region_wrap, false);
{
std::lock_guard lock(bg_gc_region_data_mutex);
bg_gc_region_data.push_back(std::move(tmp));
Expand Down Expand Up @@ -261,7 +261,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re
manage_lock.index.add(new_region);
}

persistRegion(*new_region, region_lock, "save current region after apply");
persistRegion(*new_region, &region_lock, "save current region after apply");

tmt.getRegionTable().shrinkRegionRange(*new_region);
}
Expand Down Expand Up @@ -506,7 +506,7 @@ EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec
return;
try
{
tmt.getRegionTable().tryFlushRegion(region, false);
tmt.getRegionTable().tryWriteBlockByRegionAndFlush(region, false);
tryFlushRegionCacheInStorage(tmt, *region, log);
}
catch (Exception & e)
Expand All @@ -533,7 +533,7 @@ EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec
}
else
{
persistRegion(*region, region_task_lock, __FUNCTION__);
persistRegion(*region, &region_task_lock, __FUNCTION__);
return EngineStoreApplyRes::Persist;
}
}
Expand Down
32 changes: 19 additions & 13 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,12 @@ bool KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi
return true;
}

void KVStore::tryPersist(RegionID region_id)
void KVStore::tryPersistRegion(RegionID region_id)
{
auto region = getRegion(region_id);
if (region)
{
LOG_INFO(log, "Try to persist {}", region->toString(false));
RUNTIME_CHECK_MSG(region_persister, "try access to region_persister without initialization, stack={}", StackTrace().toString());
region_persister->persist(*region);
LOG_INFO(log, "After persisted {}, cache {} bytes", region->toString(false), region->dataSize());
persistRegion(*region, std::nullopt, "");
}
}

Expand Down Expand Up @@ -333,12 +330,21 @@ void KVStore::setRegionCompactLogConfig(UInt64 sec, UInt64 rows, UInt64 bytes)
bytes);
}

void KVStore::persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller)
void KVStore::persistRegion(const Region & region, std::optional<const RegionTaskLock *> region_task_lock, const char * caller)
{
LOG_INFO(log, "Start to persist {}, cache size: {} bytes for `{}`", region.toString(true), region.dataSize(), caller);
RUNTIME_CHECK_MSG(region_persister, "try access to region_persister without initialization, stack={}", StackTrace().toString());
region_persister->persist(region, region_task_lock);
LOG_DEBUG(log, "Persist {} done", region.toString(false));
if (region_task_lock.has_value())
{
LOG_INFO(log, "Start to persist {}, cache size: {} bytes for `{}`", region.toString(true), region.dataSize(), caller);
region_persister->persist(region, *region_task_lock.value());
LOG_DEBUG(log, "Persist {} done", region.toString(false));
}
else
{
LOG_INFO(log, "Try to persist {}", region.toString(false));
region_persister->persist(region);
LOG_INFO(log, "After persisted {}, cache {} bytes", region.toString(false), region.dataSize());
}
}

bool KVStore::needFlushRegionData(UInt64 region_id, TMTContext & tmt)
Expand Down Expand Up @@ -422,7 +428,7 @@ bool KVStore::forceFlushRegionDataImpl(Region & curr_region, bool try_until_succ
}
if (tryFlushRegionCacheInStorage(tmt, curr_region, log, try_until_succeed))
{
persistRegion(curr_region, region_task_lock, "tryFlushRegionData");
persistRegion(curr_region, &region_task_lock, "tryFlushRegionData");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
GET_METRIC(tiflash_raft_apply_write_command_duration_seconds, type_flush_region).Observe(watch.elapsedSeconds());
Expand Down Expand Up @@ -474,7 +480,7 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
|| cmd_type == raft_cmdpb::AdminCmdType::BatchSwitchWitness)
{
tryFlushRegionCacheInStorage(tmt, curr_region, log);
persistRegion(curr_region, region_task_lock, fmt::format("admin cmd useless {}", cmd_type).c_str());
persistRegion(curr_region, &region_task_lock, fmt::format("admin cmd useless {}", cmd_type).c_str());
return EngineStoreApplyRes::Persist;
}
return EngineStoreApplyRes::None;
Expand Down Expand Up @@ -539,7 +545,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ
const auto try_to_flush_region = [&tmt](const RegionPtr & region) {
try
{
tmt.getRegionTable().tryFlushRegion(region, false);
tmt.getRegionTable().tryWriteBlockByRegionAndFlush(region, false);
}
catch (...)
{
Expand All @@ -549,7 +555,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ

const auto persist_and_sync = [&](const Region & region) {
tryFlushRegionCacheInStorage(tmt, region, log);
persistRegion(region, region_task_lock, "admin raft cmd");
persistRegion(region, &region_task_lock, "admin raft cmd");
};

const auto handle_batch_split = [&](Regions & split_regions) {
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class KVStore final : private boost::noncopyable

void gcRegionPersistedCache(Seconds gc_persist_period = Seconds(60 * 5));

void tryPersist(RegionID region_id);
void tryPersistRegion(RegionID region_id);

static bool tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, const LoggerPtr & log, bool try_until_succeed = true);

Expand Down Expand Up @@ -246,7 +246,7 @@ class KVStore final : private boost::noncopyable
bool canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term);
bool forceFlushRegionDataImpl(Region & curr_region, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term);

void persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller);
void persistRegion(const Region & region, std::optional<const RegionTaskLock *> region_task_lock, const char * caller);
void releaseReadIndexWorkers();
void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &);

Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ bool RegionTable::shouldFlush(const InternalRegion & region) const
return false;
}

RegionDataReadInfoList RegionTable::flushRegion(const RegionPtrWithBlock & region, bool try_persist) const
RegionDataReadInfoList RegionTable::writeBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist) const
{
auto & tmt = context->getTMTContext();

Expand All @@ -140,7 +140,7 @@ RegionDataReadInfoList RegionTable::flushRegion(const RegionPtrWithBlock & regio
if (try_persist)
{
KVStore::tryFlushRegionCacheInStorage(tmt, *region, log);
tmt.getKVStore()->tryPersist(region->id());
tmt.getKVStore()->tryPersistRegion(region->id());
}
}

Expand Down Expand Up @@ -302,7 +302,7 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const
}
}

RegionDataReadInfoList RegionTable::tryFlushRegion(RegionID region_id, bool try_persist)
RegionDataReadInfoList RegionTable::tryWriteBlockByRegionAndFlush(RegionID region_id, bool try_persist)
{
auto region = context->getTMTContext().getKVStore()->getRegion(region_id);
if (!region)
Expand All @@ -311,10 +311,10 @@ RegionDataReadInfoList RegionTable::tryFlushRegion(RegionID region_id, bool try_
return {};
}

return tryFlushRegion(region, try_persist);
return tryWriteBlockByRegionAndFlush(region, try_persist);
}

RegionDataReadInfoList RegionTable::tryFlushRegion(const RegionPtrWithBlock & region, bool try_persist)
RegionDataReadInfoList RegionTable::tryWriteBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist)
{
RegionID region_id = region->id();

Expand Down Expand Up @@ -349,7 +349,7 @@ RegionDataReadInfoList RegionTable::tryFlushRegion(const RegionPtrWithBlock & re
RegionDataReadInfoList data_list_to_remove;
try
{
data_list_to_remove = flushRegion(region, try_persist);
data_list_to_remove = writeBlockByRegionAndFlush(region, try_persist);
}
catch (const Exception & e)
{
Expand Down Expand Up @@ -414,7 +414,7 @@ bool RegionTable::tryFlushRegions()
{
if (RegionID region_to_flush = pickRegionToFlush(); region_to_flush != InvalidRegionID)
{
tryFlushRegion(region_to_flush, true);
tryWriteBlockByRegionAndFlush(region_to_flush, true);
return true;
}

Expand Down
16 changes: 12 additions & 4 deletions dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,15 @@ class RegionTable : private boost::noncopyable

void removeRegion(RegionID region_id, bool remove_data, const RegionTaskLock &);

// Find all regions with data, call writeBlockByRegionAndFlush with try_persist = true.
// This function is only for debug.
bool tryFlushRegions();
RegionDataReadInfoList tryFlushRegion(RegionID region_id, bool try_persist = false);
RegionDataReadInfoList tryFlushRegion(const RegionPtrWithBlock & region, bool try_persist);

// Protects writeBlockByRegionAndFlush and ensures it's executed by only one thread at the smae time.
// Only one thread can do this at the same time.
// The original name for this function is tryFlushRegion.
RegionDataReadInfoList tryWriteBlockByRegionAndFlush(RegionID region_id, bool try_persist = false);
RegionDataReadInfoList tryWriteBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist);

void handleInternalRegionsByTable(KeyspaceID keyspace_id, TableID table_id, std::function<void(const InternalRegions &)> && callback) const;
std::vector<std::pair<RegionID, RegionPtr>> getRegionsByTable(KeyspaceID keyspace_id, TableID table_id) const;
Expand Down Expand Up @@ -191,7 +197,6 @@ class RegionTable : private boost::noncopyable
/// extend range for possible InternalRegion or add one.
void extendRegionRange(RegionID region_id, const RegionRangeKeys & region_range_keys);


void updateSafeTS(UInt64 region_id, UInt64 leader_safe_ts, UInt64 self_safe_ts);

// unit: ms. If safe_ts diff is larger than 2min, we think the data synchronization progress is far behind the leader.
Expand All @@ -211,7 +216,10 @@ class RegionTable : private boost::noncopyable
InternalRegion & insertRegion(Table & table, const Region & region);
InternalRegion & doGetInternalRegion(KeyspaceTableID ks_tb_id, RegionID region_id);

RegionDataReadInfoList flushRegion(const RegionPtrWithBlock & region, bool try_persist) const;
// Try write the committed kvs into cache of columnar DeltaMergeStore.
// Flush the cache if try_persist is set to true.
// The original name for this method is flushRegion.
RegionDataReadInfoList writeBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist) const;
bool shouldFlush(const InternalRegion & region) const;
RegionID pickRegionToFlush();

Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ TEST_F(RegionKVStoreTest, NewProxy)
}
}
{
kvs.tryPersist(1);
kvs.tryPersistRegion(1);
kvs.gcRegionPersistedCache(Seconds{0});
}
{
Expand Down Expand Up @@ -796,7 +796,7 @@ TEST_F(RegionKVStoreTest, KVStore)
}
}
{
kvs.tryPersist(1);
kvs.tryPersistRegion(1);
kvs.gcRegionPersistedCache(Seconds{0});
}
{
Expand Down Expand Up @@ -1329,9 +1329,9 @@ TEST_F(RegionKVStoreTest, KVStoreRestore)
lock.index.add(region);
}
}
kvs.tryPersist(1);
kvs.tryPersist(2);
kvs.tryPersist(3);
kvs.tryPersistRegion(1);
kvs.tryPersistRegion(2);
kvs.tryPersistRegion(3);
}
{
KVStore & kvs = reloadKVSFromDisk();
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ try
proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index);
ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 1);
ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1);
kvs.tryPersist(region_id);
kvs.tryPersistRegion(region_id);
}
{
const KVStore & kvs = reloadKVSFromDisk();
Expand Down Expand Up @@ -70,7 +70,7 @@ try
proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index);
ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index);
ASSERT_EQ(kvr1->appliedIndex(), applied_index);
kvs.tryPersist(region_id);
kvs.tryPersistRegion(region_id);
}
{
KVStore & kvs = reloadKVSFromDisk();
Expand Down Expand Up @@ -103,7 +103,7 @@ try
proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index);
ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index);
ASSERT_EQ(kvr1->appliedIndex(), applied_index);
kvs.tryPersist(region_id);
kvs.tryPersistRegion(region_id);
}
{
KVStore & kvs = reloadKVSFromDisk();
Expand Down Expand Up @@ -135,7 +135,7 @@ try
proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index);
ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index);
ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1);
kvs.tryPersist(region_id);
kvs.tryPersistRegion(region_id);
}
{
MockRaftStoreProxy::FailCond cond;
Expand Down

0 comments on commit c154ad2

Please sign in to comment.