Skip to content

Commit

Permalink
Fix duplicated ID DTFile that cause inconsistent query result (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Sep 10, 2021
1 parent 20238d3 commit 3e0e679
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 99 deletions.
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
*\#
.tramp_history

# vscode clangd cache
.cache

# JSON Compilation Database Format Specification
# https://clang.llvm.org/docs/JSONCompilationDatabase.html
compile_commands.json

# git patch reject report
*.rej

# vim cache files
*.swp

Expand Down
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ if (ENABLE_TESTS)
${ClickHouse_SOURCE_DIR}/dbms/src/Server/StorageConfigParser.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/Server/UserConfigParser.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/Server/RaftConfigParser.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/TestUtils/TiFlashTestBasic.cpp
)
target_include_directories(gtests_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_link_libraries(gtests_dbms gtest_main dbms clickhouse_functions)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(random_slow_page_storage_remove_expired_snapshots) \
M(random_slow_page_storage_list_all_live_files) \
M(force_set_safepoint_when_decode_block) \
M(force_set_page_data_compact_batch)
M(force_set_page_data_compact_batch) \
M(force_set_dtfile_exist_when_acquire_id)

#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,6 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
checkSegmentUpdate(dm_context, segment, ThreadType::Write);
}


void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings & db_settings, const HandleRange & delete_range)
{
LOG_INFO(log, "Write into " << db_name << "." << table_name << " delete range " << delete_range.toDebugString());
Expand Down
22 changes: 11 additions & 11 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,17 @@ StableValueSpacePtr createNewStable(DMContext & context,
PageId stable_id,
WriteBatches & wbs)
{
auto delegate = context.path_pool.getStableDiskDelegator();
auto store_path = delegate.choosePath();
auto delegator = context.path_pool.getStableDiskDelegator();
auto store_path = delegator.choosePath();

PageId dmfile_id = context.storage_pool.newDataPageId();
auto dmfile = writeIntoNewDMFile(context, schema_snap, input_stream, dmfile_id, store_path);
PageId dtfile_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto dmfile = writeIntoNewDMFile(context, schema_snap, input_stream, dtfile_id, store_path);
auto stable = std::make_shared<StableValueSpace>(stable_id);
stable->setFiles({dmfile});
stable->saveMeta(wbs.meta);
wbs.data.putExternal(dmfile_id, 0);
delegate.addDTFile(dmfile_id, dmfile->getBytesOnDisk(), store_path);

wbs.data.putExternal(dtfile_id, 0);
delegator.addDTFile(dtfile_id, dmfile->getBytesOnDisk(), store_path);
return stable;
}

Expand Down Expand Up @@ -771,15 +771,15 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitLogical(DMContext & dm_co
DMFiles my_stable_files;
DMFiles other_stable_files;

auto delegate = dm_context.path_pool.getStableDiskDelegator();
auto delegator = dm_context.path_pool.getStableDiskDelegator();
for (auto & dmfile : segment_snap->stable->getDMFiles())
{
auto ori_ref_id = dmfile->refId();
auto file_id = segment_snap->delta->storage_snap->data_reader.getNormalPageId(ori_ref_id);
auto file_parent_path = delegate.getDTFilePath(file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);

auto my_dmfile_id = storage_pool.newDataPageId();
auto other_dmfile_id = storage_pool.newDataPageId();
auto my_dmfile_id = storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto other_dmfile_id = storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);

wbs.data.putRefPage(my_dmfile_id, file_id);
wbs.data.putRefPage(other_dmfile_id, file_id);
Expand Down
50 changes: 44 additions & 6 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <Common/FailPoint.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/Settings.h>
#include <Storages/DeltaMerge/StoragePool.h>
Expand All @@ -6,6 +8,10 @@

namespace DB
{
namespace FailPoints
{
extern const char force_set_dtfile_exist_when_acquire_id[];
} // namespace FailPoints
namespace DM
{
enum class StorageType
Expand Down Expand Up @@ -66,7 +72,8 @@ StoragePool::StoragePool(const String & name, StoragePathPool & path_pool, const
global_ctx.getTiFlashMetrics()),
max_log_page_id(0),
max_data_page_id(0),
max_meta_page_id(0)
max_meta_page_id(0),
global_context(global_ctx)
{
}

Expand All @@ -88,6 +95,37 @@ void StoragePool::drop()
log_storage.drop();
}

PageId StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who)
{
// In case that there is a DTFile created on disk but TiFlash crashes without persisting the ID.
// After TiFlash process restored, the ID will be inserted into the stable delegator, but we may
// get a duplicated ID from the `storage_pool.data`. (tics#2756)
PageId dtfile_id;
do
{
dtfile_id = ++max_data_page_id;

auto existed_path = delegator.getDTFilePath(dtfile_id, /*throw_on_not_exist=*/false);
fiu_do_on(FailPoints::force_set_dtfile_exist_when_acquire_id, {
static size_t fail_point_called = 0;
if (existed_path.empty() && fail_point_called % 10 == 0)
{
existed_path = "<mock for existed path>";
}
fail_point_called++;
});
if (likely(existed_path.empty()))
{
break;
}
// else there is a DTFile with that id, continue to acquire a new ID.
LOG_WARNING(&Poco::Logger::get(who),
"The DTFile is already exists, continute to acquire another ID. [path=" + existed_path
+ "] [id=" + DB::toString(dtfile_id) + "]");
} while (true);
return dtfile_id;
}

bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_period)
{
{
Expand All @@ -100,22 +138,22 @@ bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_perio
last_try_gc_time = now;
}

bool ok = false;
bool done_anything = false;

// FIXME: The global_context.settings is mutable, we need a way to reload thses settings.
// auto config = extractConfig(settings, StorageType::Meta);
// meta_storage.reloadSettings(config);
ok |= meta_storage.gc();
done_anything |= meta_storage.gc();

// config = extractConfig(settings, StorageType::Data);
// data_storage.reloadSettings(config);
ok |= data_storage.gc();
done_anything |= data_storage.gc();

// config = extractConfig(settings, StorageType::Log);
// log_storage.reloadSettings(config);
ok |= log_storage.gc();
done_anything |= log_storage.gc();

return ok;
return done_anything;
}

} // namespace DM
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace DB
struct Settings;
class Context;
class StoragePathPool;
class StableDiskDelegator;

namespace DM
{
Expand All @@ -33,9 +34,10 @@ class StoragePool : private boost::noncopyable
PageId maxMetaPageId() { return max_meta_page_id; }

PageId newLogPageId() { return ++max_log_page_id; }
PageId newDataPageId() { return ++max_data_page_id; }
PageId newMetaPageId() { return ++max_meta_page_id; }

PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who);

PageStorage & log() { return log_storage; }
PageStorage & data() { return data_storage; }
PageStorage & meta() { return meta_storage; }
Expand All @@ -57,6 +59,8 @@ class StoragePool : private boost::noncopyable
std::atomic<Timepoint> last_try_gc_time = Clock::now();

std::mutex mutex;

const Context & global_context;
};

struct StorageSnapshot
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ inline ::testing::AssertionResult HandleRangeCompare(const char * lhs_exp
{
if (lhs == rhs)
return ::testing::AssertionSuccess();
else
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
}
#define ASSERT_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::HandleRangeCompare, val1, val2)
#define EXPECT_RANGE_EQ(val1, val2) EXPECT_PRED_FORMAT2(::DB::DM::tests::HandleRangeCompare, val1, val2)
Expand Down
20 changes: 13 additions & 7 deletions dbms/src/Storages/PathPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,25 +339,29 @@ String StableDiskDelegator::choosePath() const
return genericChoosePath(pool.main_path_infos, pool.global_capacity, path_generator, pool.log, log_msg);
}

String StableDiskDelegator::getDTFilePath(UInt64 file_id) const
String StableDiskDelegator::getDTFilePath(UInt64 file_id, bool throw_on_not_exist) const
{
std::lock_guard<std::mutex> lock{pool.mutex};
auto iter = pool.dt_file_path_map.find(file_id);
if (likely(iter != pool.dt_file_path_map.end()))
return pool.main_path_infos[iter->second].path + "/" + StoragePathPool::STABLE_FOLDER_NAME;
throw Exception("Can not find path for DMFile [id=" + toString(file_id) + "]");
if (likely(throw_on_not_exist))
throw Exception("Can not find path for DMFile [id=" + toString(file_id) + "]");
return "";
}

void StableDiskDelegator::addDTFile(UInt64 file_id, size_t file_size, std::string_view path)
{
path.remove_suffix(1 + strlen(StoragePathPool::STABLE_FOLDER_NAME)); // remove '/stable' added in listPathsForStable/getDTFilePath
std::lock_guard<std::mutex> lock{pool.mutex};
if (auto iter = pool.dt_file_path_map.find(file_id); iter != pool.dt_file_path_map.end())
if (auto iter = pool.dt_file_path_map.find(file_id); unlikely(iter != pool.dt_file_path_map.end()))
{
auto & path_info = pool.main_path_infos[iter->second];
pool.dt_file_path_map.erase(iter);
path_info.file_size_map.erase(file_id);
const auto & path_info = pool.main_path_infos[iter->second];
throw DB::TiFlashException("Try to add a DTFile with duplicated id. [id=" + DB::toString(file_id) + "] [path=" + String(path)
+ "] [existed_path=" + path_info.path + "]",
Errors::DeltaTree::Internal);
}

UInt32 index = UINT32_MAX;
for (size_t i = 0; i < pool.main_path_infos.size(); i++)
{
Expand All @@ -368,7 +372,9 @@ void StableDiskDelegator::addDTFile(UInt64 file_id, size_t file_size, std::strin
}
}
if (unlikely(index == UINT32_MAX))
throw Exception("Unrecognized path " + String(path));
throw DB::TiFlashException(
"Try to add a DTFile to an unrecognized path. [id=" + DB::toString(file_id) + "] [path=" + String(path) + "]",
Errors::DeltaTree::Internal);
pool.dt_file_path_map.emplace(file_id, index);
pool.main_path_infos[index].file_size_map.emplace(file_id, file_size);
// update global used size
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/PathPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class StableDiskDelegator : private boost::noncopyable

String choosePath() const;

String getDTFilePath(UInt64 file_id) const;
// Get the path of the DTFile with file_id.
// If throw_on_not_exist is false, return empty string when the path is not exists.
String getDTFilePath(UInt64 file_id, bool throw_on_not_exist = true) const;

void addDTFile(UInt64 file_id, size_t file_size, std::string_view path);

Expand Down
71 changes: 71 additions & 0 deletions dbms/src/TestUtils/TiFlashTestBasic.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#include <Encryption/MockKeyManager.h>
#include <Server/RaftConfigParser.h>
#include <Storages/Transaction/TMTContext.h>
#include <TestUtils/TiFlashTestBasic.h>

namespace DB::tests
{
std::unique_ptr<Context> TiFlashTestEnv::global_context = nullptr;

void TiFlashTestEnv::initializeGlobalContext()
{
// set itself as global context
global_context = std::make_unique<DB::Context>(DB::Context::createGlobal());
global_context->setGlobalContext(*global_context);
global_context->setApplicationType(DB::Context::ApplicationType::SERVER);

global_context->initializeTiFlashMetrics();
KeyManagerPtr key_manager = std::make_shared<MockKeyManager>(false);
global_context->initializeFileProvider(key_manager, false);

// Theses global variables should be initialized by the following order
// 1. capacity
// 2. path pool
// 3. TMTContext

Strings testdata_path = {getTemporaryPath()};
global_context->initializePathCapacityMetric(0, testdata_path, {}, {}, {});

auto paths = getPathPool(testdata_path);
global_context->setPathPool(
paths.first, paths.second, Strings{}, true, global_context->getPathCapacity(), global_context->getFileProvider());
TiFlashRaftConfig raft_config;

raft_config.ignore_databases = {"default", "system"};
raft_config.engine = TiDB::StorageEngine::TMT;
raft_config.disable_bg_flush = false;
global_context->createTMTContext(raft_config, pingcap::ClusterConfig());

global_context->getTMTContext().restore();
}

Context TiFlashTestEnv::getContext(const DB::Settings & settings, Strings testdata_path)
{
Context context = *global_context;
context.setGlobalContext(*global_context);
// Load `testdata_path` as path if it is set.
const String root_path = testdata_path.empty() ? getTemporaryPath() : testdata_path[0];
if (testdata_path.empty())
testdata_path.push_back(root_path);
context.setPath(root_path);
auto paths = getPathPool(testdata_path);
context.setPathPool(paths.first, paths.second, Strings{}, true, context.getPathCapacity(), context.getFileProvider());
context.getSettingsRef() = settings;
return context;
}

void TiFlashTestEnv::shutdown()
{
global_context->getTMTContext().setTerminated();
global_context->shutdown();
global_context.reset();
}

::testing::AssertionResult DataTypeCompare(const char * lhs_expr, const char * rhs_expr, const DataTypePtr & lhs, const DataTypePtr & rhs)
{
if (lhs->equals(*rhs))
return ::testing::AssertionSuccess();
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs->getName(), rhs->getName(), false);
}

} // namespace DB::tests
10 changes: 2 additions & 8 deletions dbms/src/TestUtils/TiFlashTestBasic.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,11 @@ namespace tests
}

/// helper functions for comparing DataType
inline ::testing::AssertionResult DataTypeCompare( //
::testing::AssertionResult DataTypeCompare( //
const char * lhs_expr,
const char * rhs_expr,
const DataTypePtr & lhs,
const DataTypePtr & rhs)
{
if (lhs->equals(*rhs))
return ::testing::AssertionSuccess();
else
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs->getName(), rhs->getName(), false);
}
const DataTypePtr & rhs);
#define ASSERT_DATATYPE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::tests::DataTypeCompare, val1, val2)
#define EXPECT_DATATYPE_EQ(val1, val2) EXPECT_PRED_FORMAT2(::DB::tests::DataTypeCompare, val1, val2)

Expand Down
Loading

0 comments on commit 3e0e679

Please sign in to comment.