diff --git a/dbms/src/Debug/dbgFuncRegion.cpp b/dbms/src/Debug/dbgFuncRegion.cpp index f65a18b8fd0..34d788bf788 100644 --- a/dbms/src/Debug/dbgFuncRegion.cpp +++ b/dbms/src/Debug/dbgFuncRegion.cpp @@ -116,7 +116,7 @@ void dbgFuncTryFlushRegion(Context & context, const ASTs & args, DBGInvoker::Pri auto region_id = static_cast(safeGet(typeid_cast(*args[0]).value)); TMTContext & tmt = context.getTMTContext(); - tmt.getRegionTable().tryFlushRegion(region_id); + tmt.getRegionTable().tryWriteBlockByRegionAndFlush(region_id); output(fmt::format("region_table try flush region {}", region_id)); } diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index 5ab1cc87479..08d7fd5fe0d 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -84,9 +84,9 @@ void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWrap & new_region, // engine may delete data unsafely. auto region_lock = region_manager.genRegionTaskLock(old_region->id()); old_region->setStateApplying(); - tmt.getRegionTable().tryFlushRegion(old_region, false); + tmt.getRegionTable().tryWriteBlockByRegionAndFlush(old_region, false); tryFlushRegionCacheInStorage(tmt, *old_region, log); - persistRegion(*old_region, region_lock, "save previous region before apply"); + persistRegion(*old_region, ®ion_lock, "save previous region before apply"); } } @@ -209,7 +209,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re { try { - auto tmp = region_table.tryFlushRegion(new_region_wrap, false); + auto tmp = region_table.tryWriteBlockByRegionAndFlush(new_region_wrap, false); { std::lock_guard lock(bg_gc_region_data_mutex); bg_gc_region_data.push_back(std::move(tmp)); @@ -261,7 +261,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re manage_lock.index.add(new_region); } - persistRegion(*new_region, region_lock, "save current region after apply"); + persistRegion(*new_region, ®ion_lock, "save current region after apply"); tmt.getRegionTable().shrinkRegionRange(*new_region); } @@ -506,7 +506,7 @@ EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec return; try { - tmt.getRegionTable().tryFlushRegion(region, false); + tmt.getRegionTable().tryWriteBlockByRegionAndFlush(region, false); tryFlushRegionCacheInStorage(tmt, *region, log); } catch (Exception & e) @@ -533,7 +533,7 @@ EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec } else { - persistRegion(*region, region_task_lock, __FUNCTION__); + persistRegion(*region, ®ion_task_lock, __FUNCTION__); return EngineStoreApplyRes::Persist; } } diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 51ec448d7d7..0be67d40e7c 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -172,15 +172,12 @@ bool KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi return true; } -void KVStore::tryPersist(RegionID region_id) +void KVStore::tryPersistRegion(RegionID region_id) { auto region = getRegion(region_id); if (region) { - LOG_INFO(log, "Try to persist {}", region->toString(false)); - RUNTIME_CHECK_MSG(region_persister, "try access to region_persister without initialization, stack={}", StackTrace().toString()); - region_persister->persist(*region); - LOG_INFO(log, "After persisted {}, cache {} bytes", region->toString(false), region->dataSize()); + persistRegion(*region, std::nullopt, ""); } } @@ -333,12 +330,21 @@ void KVStore::setRegionCompactLogConfig(UInt64 sec, UInt64 rows, UInt64 bytes) bytes); } -void KVStore::persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller) +void KVStore::persistRegion(const Region & region, std::optional region_task_lock, const char * caller) { - LOG_INFO(log, "Start to persist {}, cache size: {} bytes for `{}`", region.toString(true), region.dataSize(), caller); RUNTIME_CHECK_MSG(region_persister, "try access to region_persister without initialization, stack={}", StackTrace().toString()); - region_persister->persist(region, region_task_lock); - LOG_DEBUG(log, "Persist {} done", region.toString(false)); + if (region_task_lock.has_value()) + { + LOG_INFO(log, "Start to persist {}, cache size: {} bytes for `{}`", region.toString(true), region.dataSize(), caller); + region_persister->persist(region, *region_task_lock.value()); + LOG_DEBUG(log, "Persist {} done", region.toString(false)); + } + else + { + LOG_INFO(log, "Try to persist {}", region.toString(false)); + region_persister->persist(region); + LOG_INFO(log, "After persisted {}, cache {} bytes", region.toString(false), region.dataSize()); + } } bool KVStore::needFlushRegionData(UInt64 region_id, TMTContext & tmt) @@ -422,7 +428,7 @@ bool KVStore::forceFlushRegionDataImpl(Region & curr_region, bool try_until_succ } if (tryFlushRegionCacheInStorage(tmt, curr_region, log, try_until_succeed)) { - persistRegion(curr_region, region_task_lock, "tryFlushRegionData"); + persistRegion(curr_region, ®ion_task_lock, "tryFlushRegionData"); curr_region.markCompactLog(); curr_region.cleanApproxMemCacheInfo(); GET_METRIC(tiflash_raft_apply_write_command_duration_seconds, type_flush_region).Observe(watch.elapsedSeconds()); @@ -474,7 +480,7 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd( || cmd_type == raft_cmdpb::AdminCmdType::BatchSwitchWitness) { tryFlushRegionCacheInStorage(tmt, curr_region, log); - persistRegion(curr_region, region_task_lock, fmt::format("admin cmd useless {}", cmd_type).c_str()); + persistRegion(curr_region, ®ion_task_lock, fmt::format("admin cmd useless {}", cmd_type).c_str()); return EngineStoreApplyRes::Persist; } return EngineStoreApplyRes::None; @@ -539,7 +545,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ const auto try_to_flush_region = [&tmt](const RegionPtr & region) { try { - tmt.getRegionTable().tryFlushRegion(region, false); + tmt.getRegionTable().tryWriteBlockByRegionAndFlush(region, false); } catch (...) { @@ -549,7 +555,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ const auto persist_and_sync = [&](const Region & region) { tryFlushRegionCacheInStorage(tmt, region, log); - persistRegion(region, region_task_lock, "admin raft cmd"); + persistRegion(region, ®ion_task_lock, "admin raft cmd"); }; const auto handle_batch_split = [&](Regions & split_regions) { diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 9c8a18f960c..0ece457eace 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -93,7 +93,7 @@ class KVStore final : private boost::noncopyable void gcRegionPersistedCache(Seconds gc_persist_period = Seconds(60 * 5)); - void tryPersist(RegionID region_id); + void tryPersistRegion(RegionID region_id); static bool tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, const LoggerPtr & log, bool try_until_succeed = true); @@ -246,7 +246,7 @@ class KVStore final : private boost::noncopyable bool canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term); bool forceFlushRegionDataImpl(Region & curr_region, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term); - void persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller); + void persistRegion(const Region & region, std::optional region_task_lock, const char * caller); void releaseReadIndexWorkers(); void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &); diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index bd490015479..c85d95ebe81 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -120,7 +120,7 @@ bool RegionTable::shouldFlush(const InternalRegion & region) const return false; } -RegionDataReadInfoList RegionTable::flushRegion(const RegionPtrWithBlock & region, bool try_persist) const +RegionDataReadInfoList RegionTable::writeBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist) const { auto & tmt = context->getTMTContext(); @@ -140,7 +140,7 @@ RegionDataReadInfoList RegionTable::flushRegion(const RegionPtrWithBlock & regio if (try_persist) { KVStore::tryFlushRegionCacheInStorage(tmt, *region, log); - tmt.getKVStore()->tryPersist(region->id()); + tmt.getKVStore()->tryPersistRegion(region->id()); } } @@ -302,7 +302,7 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const } } -RegionDataReadInfoList RegionTable::tryFlushRegion(RegionID region_id, bool try_persist) +RegionDataReadInfoList RegionTable::tryWriteBlockByRegionAndFlush(RegionID region_id, bool try_persist) { auto region = context->getTMTContext().getKVStore()->getRegion(region_id); if (!region) @@ -311,10 +311,10 @@ RegionDataReadInfoList RegionTable::tryFlushRegion(RegionID region_id, bool try_ return {}; } - return tryFlushRegion(region, try_persist); + return tryWriteBlockByRegionAndFlush(region, try_persist); } -RegionDataReadInfoList RegionTable::tryFlushRegion(const RegionPtrWithBlock & region, bool try_persist) +RegionDataReadInfoList RegionTable::tryWriteBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist) { RegionID region_id = region->id(); @@ -349,7 +349,7 @@ RegionDataReadInfoList RegionTable::tryFlushRegion(const RegionPtrWithBlock & re RegionDataReadInfoList data_list_to_remove; try { - data_list_to_remove = flushRegion(region, try_persist); + data_list_to_remove = writeBlockByRegionAndFlush(region, try_persist); } catch (const Exception & e) { @@ -414,7 +414,7 @@ bool RegionTable::tryFlushRegions() { if (RegionID region_to_flush = pickRegionToFlush(); region_to_flush != InvalidRegionID) { - tryFlushRegion(region_to_flush, true); + tryWriteBlockByRegionAndFlush(region_to_flush, true); return true; } diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 6936eb648da..176b28f8937 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -159,9 +159,15 @@ class RegionTable : private boost::noncopyable void removeRegion(RegionID region_id, bool remove_data, const RegionTaskLock &); + // Find all regions with data, call writeBlockByRegionAndFlush with try_persist = true. + // This function is only for debug. bool tryFlushRegions(); - RegionDataReadInfoList tryFlushRegion(RegionID region_id, bool try_persist = false); - RegionDataReadInfoList tryFlushRegion(const RegionPtrWithBlock & region, bool try_persist); + + // Protects writeBlockByRegionAndFlush and ensures it's executed by only one thread at the smae time. + // Only one thread can do this at the same time. + // The original name for this function is tryFlushRegion. + RegionDataReadInfoList tryWriteBlockByRegionAndFlush(RegionID region_id, bool try_persist = false); + RegionDataReadInfoList tryWriteBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist); void handleInternalRegionsByTable(KeyspaceID keyspace_id, TableID table_id, std::function && callback) const; std::vector> getRegionsByTable(KeyspaceID keyspace_id, TableID table_id) const; @@ -191,7 +197,6 @@ class RegionTable : private boost::noncopyable /// extend range for possible InternalRegion or add one. void extendRegionRange(RegionID region_id, const RegionRangeKeys & region_range_keys); - void updateSafeTS(UInt64 region_id, UInt64 leader_safe_ts, UInt64 self_safe_ts); // unit: ms. If safe_ts diff is larger than 2min, we think the data synchronization progress is far behind the leader. @@ -211,7 +216,10 @@ class RegionTable : private boost::noncopyable InternalRegion & insertRegion(Table & table, const Region & region); InternalRegion & doGetInternalRegion(KeyspaceTableID ks_tb_id, RegionID region_id); - RegionDataReadInfoList flushRegion(const RegionPtrWithBlock & region, bool try_persist) const; + // Try write the committed kvs into cache of columnar DeltaMergeStore. + // Flush the cache if try_persist is set to true. + // The original name for this method is flushRegion. + RegionDataReadInfoList writeBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist) const; bool shouldFlush(const InternalRegion & region) const; RegionID pickRegionToFlush(); diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp index b27445a572f..4f91ef788bf 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp @@ -35,7 +35,7 @@ TEST_F(RegionKVStoreTest, NewProxy) } } { - kvs.tryPersist(1); + kvs.tryPersistRegion(1); kvs.gcRegionPersistedCache(Seconds{0}); } { @@ -796,7 +796,7 @@ TEST_F(RegionKVStoreTest, KVStore) } } { - kvs.tryPersist(1); + kvs.tryPersistRegion(1); kvs.gcRegionPersistedCache(Seconds{0}); } { @@ -1329,9 +1329,9 @@ TEST_F(RegionKVStoreTest, KVStoreRestore) lock.index.add(region); } } - kvs.tryPersist(1); - kvs.tryPersist(2); - kvs.tryPersist(3); + kvs.tryPersistRegion(1); + kvs.tryPersistRegion(2); + kvs.tryPersistRegion(3); } { KVStore & kvs = reloadKVSFromDisk(); diff --git a/dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp index e0fc4979d43..9b8676609be 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp @@ -40,7 +40,7 @@ try proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 1); ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1); - kvs.tryPersist(region_id); + kvs.tryPersistRegion(region_id); } { const KVStore & kvs = reloadKVSFromDisk(); @@ -70,7 +70,7 @@ try proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index); ASSERT_EQ(kvr1->appliedIndex(), applied_index); - kvs.tryPersist(region_id); + kvs.tryPersistRegion(region_id); } { KVStore & kvs = reloadKVSFromDisk(); @@ -103,7 +103,7 @@ try proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index); ASSERT_EQ(kvr1->appliedIndex(), applied_index); - kvs.tryPersist(region_id); + kvs.tryPersistRegion(region_id); } { KVStore & kvs = reloadKVSFromDisk(); @@ -135,7 +135,7 @@ try proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index); ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1); - kvs.tryPersist(region_id); + kvs.tryPersistRegion(region_id); } { MockRaftStoreProxy::FailCond cond;