Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate some functions in KVStore and rename them to what they actually do #7402

Merged
merged 9 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void dbgFuncTryFlushRegion(Context & context, const ASTs & args, DBGInvoker::Pri
auto region_id = static_cast<RegionID>(safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[0]).value));

TMTContext & tmt = context.getTMTContext();
tmt.getRegionTable().tryFlushRegion(region_id);
tmt.getRegionTable().tryWriteBlockByRegionAndFlush(region_id);

output(fmt::format("region_table try flush region {}", region_id));
}
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWrap & new_region,
// engine may delete data unsafely.
auto region_lock = region_manager.genRegionTaskLock(old_region->id());
old_region->setStateApplying();
tmt.getRegionTable().tryFlushRegion(old_region, false);
tmt.getRegionTable().tryWriteBlockByRegionAndFlush(old_region, false);
tryFlushRegionCacheInStorage(tmt, *old_region, log);
persistRegion(*old_region, region_lock, "save previous region before apply");
persistRegion(*old_region, &region_lock, "save previous region before apply");
}
}

Expand Down Expand Up @@ -209,7 +209,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re
{
try
{
auto tmp = region_table.tryFlushRegion(new_region_wrap, false);
auto tmp = region_table.tryWriteBlockByRegionAndFlush(new_region_wrap, false);
{
std::lock_guard lock(bg_gc_region_data_mutex);
bg_gc_region_data.push_back(std::move(tmp));
Expand Down Expand Up @@ -261,7 +261,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re
manage_lock.index.add(new_region);
}

persistRegion(*new_region, region_lock, "save current region after apply");
persistRegion(*new_region, &region_lock, "save current region after apply");

tmt.getRegionTable().shrinkRegionRange(*new_region);
}
Expand Down Expand Up @@ -506,7 +506,7 @@ EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec
return;
try
{
tmt.getRegionTable().tryFlushRegion(region, false);
tmt.getRegionTable().tryWriteBlockByRegionAndFlush(region, false);
tryFlushRegionCacheInStorage(tmt, *region, log);
}
catch (Exception & e)
Expand All @@ -533,7 +533,7 @@ EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec
}
else
{
persistRegion(*region, region_task_lock, __FUNCTION__);
persistRegion(*region, &region_task_lock, __FUNCTION__);
return EngineStoreApplyRes::Persist;
}
}
Expand Down
32 changes: 19 additions & 13 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,12 @@ bool KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi
return true;
}

void KVStore::tryPersist(RegionID region_id)
void KVStore::tryPersistRegion(RegionID region_id)
{
auto region = getRegion(region_id);
if (region)
{
LOG_INFO(log, "Try to persist {}", region->toString(false));
RUNTIME_CHECK_MSG(region_persister, "try access to region_persister without initialization, stack={}", StackTrace().toString());
region_persister->persist(*region);
LOG_INFO(log, "After persisted {}, cache {} bytes", region->toString(false), region->dataSize());
persistRegion(*region, std::nullopt, "");
}
}

Expand Down Expand Up @@ -333,12 +330,21 @@ void KVStore::setRegionCompactLogConfig(UInt64 sec, UInt64 rows, UInt64 bytes)
bytes);
}

void KVStore::persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller)
void KVStore::persistRegion(const Region & region, std::optional<const RegionTaskLock *> region_task_lock, const char * caller)
{
LOG_INFO(log, "Start to persist {}, cache size: {} bytes for `{}`", region.toString(true), region.dataSize(), caller);
RUNTIME_CHECK_MSG(region_persister, "try access to region_persister without initialization, stack={}", StackTrace().toString());
region_persister->persist(region, region_task_lock);
LOG_DEBUG(log, "Persist {} done", region.toString(false));
if (region_task_lock.has_value())
{
LOG_INFO(log, "Start to persist {}, cache size: {} bytes for `{}`", region.toString(true), region.dataSize(), caller);
region_persister->persist(region, *region_task_lock.value());
LOG_DEBUG(log, "Persist {} done", region.toString(false));
}
else
{
LOG_INFO(log, "Try to persist {}", region.toString(false));
region_persister->persist(region);
LOG_INFO(log, "After persisted {}, cache {} bytes", region.toString(false), region.dataSize());
}
}

bool KVStore::needFlushRegionData(UInt64 region_id, TMTContext & tmt)
Expand Down Expand Up @@ -422,7 +428,7 @@ bool KVStore::forceFlushRegionDataImpl(Region & curr_region, bool try_until_succ
}
if (tryFlushRegionCacheInStorage(tmt, curr_region, log, try_until_succeed))
{
persistRegion(curr_region, region_task_lock, "tryFlushRegionData");
persistRegion(curr_region, &region_task_lock, "tryFlushRegionData");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
GET_METRIC(tiflash_raft_apply_write_command_duration_seconds, type_flush_region).Observe(watch.elapsedSeconds());
Expand Down Expand Up @@ -474,7 +480,7 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
|| cmd_type == raft_cmdpb::AdminCmdType::BatchSwitchWitness)
{
tryFlushRegionCacheInStorage(tmt, curr_region, log);
persistRegion(curr_region, region_task_lock, fmt::format("admin cmd useless {}", cmd_type).c_str());
persistRegion(curr_region, &region_task_lock, fmt::format("admin cmd useless {}", cmd_type).c_str());
return EngineStoreApplyRes::Persist;
}
return EngineStoreApplyRes::None;
Expand Down Expand Up @@ -539,7 +545,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ
const auto try_to_flush_region = [&tmt](const RegionPtr & region) {
try
{
tmt.getRegionTable().tryFlushRegion(region, false);
tmt.getRegionTable().tryWriteBlockByRegionAndFlush(region, false);
}
catch (...)
{
Expand All @@ -549,7 +555,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ

const auto persist_and_sync = [&](const Region & region) {
tryFlushRegionCacheInStorage(tmt, region, log);
persistRegion(region, region_task_lock, "admin raft cmd");
persistRegion(region, &region_task_lock, "admin raft cmd");
};

const auto handle_batch_split = [&](Regions & split_regions) {
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class KVStore final : private boost::noncopyable

void gcRegionPersistedCache(Seconds gc_persist_period = Seconds(60 * 5));

void tryPersist(RegionID region_id);
void tryPersistRegion(RegionID region_id);

static bool tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, const LoggerPtr & log, bool try_until_succeed = true);

Expand Down Expand Up @@ -246,7 +246,7 @@ class KVStore final : private boost::noncopyable
bool canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term);
bool forceFlushRegionDataImpl(Region & curr_region, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term);

void persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller);
void persistRegion(const Region & region, std::optional<const RegionTaskLock *> region_task_lock, const char * caller);
void releaseReadIndexWorkers();
void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &);

Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ bool RegionTable::shouldFlush(const InternalRegion & region) const
return false;
}

RegionDataReadInfoList RegionTable::flushRegion(const RegionPtrWithBlock & region, bool try_persist) const
RegionDataReadInfoList RegionTable::writeBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist) const
{
auto & tmt = context->getTMTContext();

Expand All @@ -140,7 +140,7 @@ RegionDataReadInfoList RegionTable::flushRegion(const RegionPtrWithBlock & regio
if (try_persist)
{
KVStore::tryFlushRegionCacheInStorage(tmt, *region, log);
tmt.getKVStore()->tryPersist(region->id());
tmt.getKVStore()->tryPersistRegion(region->id());
}
}

Expand Down Expand Up @@ -302,7 +302,7 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const
}
}

RegionDataReadInfoList RegionTable::tryFlushRegion(RegionID region_id, bool try_persist)
RegionDataReadInfoList RegionTable::tryWriteBlockByRegionAndFlush(RegionID region_id, bool try_persist)
{
auto region = context->getTMTContext().getKVStore()->getRegion(region_id);
if (!region)
Expand All @@ -311,10 +311,10 @@ RegionDataReadInfoList RegionTable::tryFlushRegion(RegionID region_id, bool try_
return {};
}

return tryFlushRegion(region, try_persist);
return tryWriteBlockByRegionAndFlush(region, try_persist);
}

RegionDataReadInfoList RegionTable::tryFlushRegion(const RegionPtrWithBlock & region, bool try_persist)
RegionDataReadInfoList RegionTable::tryWriteBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist)
{
RegionID region_id = region->id();

Expand Down Expand Up @@ -349,7 +349,7 @@ RegionDataReadInfoList RegionTable::tryFlushRegion(const RegionPtrWithBlock & re
RegionDataReadInfoList data_list_to_remove;
try
{
data_list_to_remove = flushRegion(region, try_persist);
data_list_to_remove = writeBlockByRegionAndFlush(region, try_persist);
}
catch (const Exception & e)
{
Expand Down Expand Up @@ -414,7 +414,7 @@ bool RegionTable::tryFlushRegions()
{
if (RegionID region_to_flush = pickRegionToFlush(); region_to_flush != InvalidRegionID)
{
tryFlushRegion(region_to_flush, true);
tryWriteBlockByRegionAndFlush(region_to_flush, true);
return true;
}

Expand Down
16 changes: 12 additions & 4 deletions dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,15 @@ class RegionTable : private boost::noncopyable

void removeRegion(RegionID region_id, bool remove_data, const RegionTaskLock &);

// Find all regions with data, call writeBlockByRegionAndFlush with try_persist = true.
// This function is only for debug.
bool tryFlushRegions();
RegionDataReadInfoList tryFlushRegion(RegionID region_id, bool try_persist = false);
RegionDataReadInfoList tryFlushRegion(const RegionPtrWithBlock & region, bool try_persist);

// Protects writeBlockByRegionAndFlush and ensures it's executed by only one thread at the smae time.
// Only one thread can do this at the same time.
// The original name for this function is tryFlushRegion.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// The original name for this function is tryFlushRegion.

RegionDataReadInfoList tryWriteBlockByRegionAndFlush(RegionID region_id, bool try_persist = false);
RegionDataReadInfoList tryWriteBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist);

void handleInternalRegionsByTable(KeyspaceID keyspace_id, TableID table_id, std::function<void(const InternalRegions &)> && callback) const;
std::vector<std::pair<RegionID, RegionPtr>> getRegionsByTable(KeyspaceID keyspace_id, TableID table_id) const;
Expand Down Expand Up @@ -191,7 +197,6 @@ class RegionTable : private boost::noncopyable
/// extend range for possible InternalRegion or add one.
void extendRegionRange(RegionID region_id, const RegionRangeKeys & region_range_keys);


void updateSafeTS(UInt64 region_id, UInt64 leader_safe_ts, UInt64 self_safe_ts);

// unit: ms. If safe_ts diff is larger than 2min, we think the data synchronization progress is far behind the leader.
Expand All @@ -211,7 +216,10 @@ class RegionTable : private boost::noncopyable
InternalRegion & insertRegion(Table & table, const Region & region);
InternalRegion & doGetInternalRegion(KeyspaceTableID ks_tb_id, RegionID region_id);

RegionDataReadInfoList flushRegion(const RegionPtrWithBlock & region, bool try_persist) const;
// Try write the committed kvs into cache of columnar DeltaMergeStore.
// Flush the cache if try_persist is set to true.
// The original name for this method is flushRegion.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// The original name for this method is flushRegion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to keep this message... it doesn't help

RegionDataReadInfoList writeBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist) const;
bool shouldFlush(const InternalRegion & region) const;
RegionID pickRegionToFlush();

Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ TEST_F(RegionKVStoreTest, NewProxy)
}
}
{
kvs.tryPersist(1);
kvs.tryPersistRegion(1);
kvs.gcRegionPersistedCache(Seconds{0});
}
{
Expand Down Expand Up @@ -796,7 +796,7 @@ TEST_F(RegionKVStoreTest, KVStore)
}
}
{
kvs.tryPersist(1);
kvs.tryPersistRegion(1);
kvs.gcRegionPersistedCache(Seconds{0});
}
{
Expand Down Expand Up @@ -1329,9 +1329,9 @@ TEST_F(RegionKVStoreTest, KVStoreRestore)
lock.index.add(region);
}
}
kvs.tryPersist(1);
kvs.tryPersist(2);
kvs.tryPersist(3);
kvs.tryPersistRegion(1);
kvs.tryPersistRegion(2);
kvs.tryPersistRegion(3);
}
{
KVStore & kvs = reloadKVSFromDisk();
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ try
proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index);
ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 1);
ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1);
kvs.tryPersist(region_id);
kvs.tryPersistRegion(region_id);
}
{
const KVStore & kvs = reloadKVSFromDisk();
Expand Down Expand Up @@ -70,7 +70,7 @@ try
proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index);
ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index);
ASSERT_EQ(kvr1->appliedIndex(), applied_index);
kvs.tryPersist(region_id);
kvs.tryPersistRegion(region_id);
}
{
KVStore & kvs = reloadKVSFromDisk();
Expand Down Expand Up @@ -103,7 +103,7 @@ try
proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index);
ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index);
ASSERT_EQ(kvr1->appliedIndex(), applied_index);
kvs.tryPersist(region_id);
kvs.tryPersistRegion(region_id);
}
{
KVStore & kvs = reloadKVSFromDisk();
Expand Down Expand Up @@ -135,7 +135,7 @@ try
proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index);
ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index);
ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1);
kvs.tryPersist(region_id);
kvs.tryPersistRegion(region_id);
}
{
MockRaftStoreProxy::FailCond cond;
Expand Down