Skip to content

Commit

Permalink
Fix duplicated ID DTFile that cause inconsistent query result (#2770) (
Browse files Browse the repository at this point in the history
…#2775) (#3014)

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
windtalker and ti-chi-bot authored Sep 6, 2021
1 parent d573f71 commit 348cea5
Show file tree
Hide file tree
Showing 17 changed files with 198 additions and 127 deletions.
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
*\#
.tramp_history

# vscode clangd cache
.cache

# JSON Compilation Database Format Specification
# https://clang.llvm.org/docs/JSONCompilationDatabase.html
compile_commands.json

# git patch reject report
*.rej

# vim cache files
*.swp

Expand Down
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ if (ENABLE_TESTS)
${ClickHouse_SOURCE_DIR}/dbms/src/Server/StorageConfigParser.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/Server/UserConfigParser.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/Server/RaftConfigParser.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/TestUtils/TiFlashTestBasic.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/AggregateFunctions/AggregateFunctionSum.cpp
)
target_include_directories(gtests_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(random_slow_page_storage_remove_expired_snapshots) \
M(random_slow_page_storage_list_all_live_files) \
M(force_set_safepoint_when_decode_block) \
M(force_set_page_data_compact_batch)
M(force_set_page_data_compact_batch) \
M(force_set_dtfile_exist_when_acquire_id)


#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,11 @@ DeltaPacks DeltaValueSpace::checkHeadAndCloneTail(DMContext & context,
}
else if (auto f = pack->tryToFile(); f)
{
auto new_ref_id = context.storage_pool.newDataPageId();
auto file_id = f->getFile()->fileId();
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto file_id = f->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* ref_id= */ new_ref_id, file_parent_path);

auto new_pack = f->cloneWith(context, new_file, target_range);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ void DeltaMergeStore::writeRegionSnapshot(const DMContextPtr & dm_context,

EventRecorder write_block_recorder(ProfileEvents::DMDeleteRange, ProfileEvents::DMDeleteRangeNS);

auto delegate = dm_context->path_pool.getStableDiskDelegator();
auto delegator = dm_context->path_pool.getStableDiskDelegator();
auto file_provider = dm_context->db_context.getFileProvider();

size_t rows = 0;
Expand All @@ -577,7 +577,7 @@ void DeltaMergeStore::writeRegionSnapshot(const DMContextPtr & dm_context,
DMFiles files;
for (auto file_id : file_ids)
{
auto file_parent_path = delegate.getDTFilePath(file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);

auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path);
files.push_back(file);
Expand Down Expand Up @@ -650,7 +650,7 @@ void DeltaMergeStore::writeRegionSnapshot(const DMContextPtr & dm_context,
else
{
auto & file_parent_path = file->parentPath();
auto ref_id = storage_pool.newDataPageId();
auto ref_id = storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);

auto ref_file = DMFile::restore(file_provider, file_id, ref_id, file_parent_path);
auto pack = std::make_shared<DeltaPackFile>(*dm_context, ref_file, segment_range);
Expand Down
20 changes: 9 additions & 11 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,16 @@ StableValueSpacePtr createNewStable(DMContext & context,
WriteBatches & wbs,
bool need_rate_limit)
{
auto delegate = context.path_pool.getStableDiskDelegator();
auto store_path = delegate.choosePath();
auto delegator = context.path_pool.getStableDiskDelegator();
auto store_path = delegator.choosePath();

PageId dmfile_id = context.storage_pool.newDataPageId();
PageId dmfile_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto dmfile = writeIntoNewDMFile(context, schema_snap, input_stream, dmfile_id, store_path, need_rate_limit);
auto stable = std::make_shared<StableValueSpace>(stable_id);
stable->setFiles({dmfile}, RowKeyRange::newAll(context.is_common_handle, context.rowkey_column_size));
stable->saveMeta(wbs.meta);
wbs.data.putExternal(dmfile_id, 0);
delegate.addDTFile(dmfile_id, dmfile->getBytesOnDisk(), store_path);
delegator.addDTFile(dmfile_id, dmfile->getBytesOnDisk(), store_path);

return stable;
}
Expand Down Expand Up @@ -215,16 +215,14 @@ SegmentPtr Segment::restoreSegment(DMContext & context, PageId segment_id)

switch (version)
{
case SegmentFormat::V1:
{
case SegmentFormat::V1: {
HandleRange range;
readIntBinary(range.start, buf);
readIntBinary(range.end, buf);
rowkey_range = RowKeyRange::fromHandleRange(range);
break;
}
case SegmentFormat::V2:
{
case SegmentFormat::V2: {
rowkey_range = RowKeyRange::deserialize(buf);
break;
}
Expand Down Expand Up @@ -855,8 +853,8 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitLogical(DMContext & dm_co
auto file_id = dmfile->fileId();
auto file_parent_path = delegate.getDTFilePath(file_id);

auto my_dmfile_id = storage_pool.newDataPageId();
auto other_dmfile_id = storage_pool.newDataPageId();
auto my_dmfile_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
auto other_dmfile_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);

wbs.data.putRefPage(my_dmfile_id, file_id);
wbs.data.putRefPage(other_dmfile_id, file_id);
Expand Down Expand Up @@ -1472,7 +1470,7 @@ bool Segment::placeDelete(const DMContext & dm_context,
{
RowKeyValueRef first_rowkey = RowKeyColumnContainer(block.getByPosition(0).column, is_common_handle).getRowKeyValue(0);
auto place_handle_range = skippable_place ? RowKeyRange::startFrom(first_rowkey, is_common_handle, rowkey_column_size)
: RowKeyRange::newAll(is_common_handle, rowkey_column_size);
: RowKeyRange::newAll(is_common_handle, rowkey_column_size);

auto compacted_index = update_delta_tree.getCompactedEntries();

Expand Down
50 changes: 44 additions & 6 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <Common/FailPoint.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/Settings.h>
#include <Storages/DeltaMerge/StoragePool.h>
Expand All @@ -6,6 +8,10 @@

namespace DB
{
namespace FailPoints
{
extern const char force_set_dtfile_exist_when_acquire_id[];
} // namespace FailPoints
namespace DM
{
enum class StorageType
Expand Down Expand Up @@ -66,7 +72,8 @@ StoragePool::StoragePool(const String & name, StoragePathPool & path_pool, const
global_ctx.getTiFlashMetrics()),
max_log_page_id(0),
max_data_page_id(0),
max_meta_page_id(0)
max_meta_page_id(0),
global_context(global_ctx)
{
}

Expand All @@ -88,6 +95,37 @@ void StoragePool::drop()
log_storage.drop();
}

PageId StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who)
{
// In case that there is a DTFile created on disk but TiFlash crashes without persisting the ID.
// After TiFlash process restored, the ID will be inserted into the stable delegator, but we may
// get a duplicated ID from the `storage_pool.data`. (tics#2756)
PageId dtfile_id;
do
{
dtfile_id = ++max_data_page_id;

auto existed_path = delegator.getDTFilePath(dtfile_id, /*throw_on_not_exist=*/false);
fiu_do_on(FailPoints::force_set_dtfile_exist_when_acquire_id, {
static size_t fail_point_called = 0;
if (existed_path.empty() && fail_point_called % 10 == 0)
{
existed_path = "<mock for existed path>";
}
fail_point_called++;
});
if (likely(existed_path.empty()))
{
break;
}
// else there is a DTFile with that id, continue to acquire a new ID.
LOG_WARNING(&Poco::Logger::get(who),
"The DTFile is already exists, continute to acquire another ID. [path=" + existed_path
+ "] [id=" + DB::toString(dtfile_id) + "]");
} while (true);
return dtfile_id;
}

bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_period)
{
{
Expand All @@ -100,22 +138,22 @@ bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_perio
last_try_gc_time = now;
}

bool ok = false;
bool done_anything = false;

// FIXME: The global_context.settings is mutable, we need a way to reload thses settings.
// auto config = extractConfig(settings, StorageType::Meta);
// meta_storage.reloadSettings(config);
ok |= meta_storage.gc();
done_anything |= meta_storage.gc();

// config = extractConfig(settings, StorageType::Data);
// data_storage.reloadSettings(config);
ok |= data_storage.gc();
done_anything |= data_storage.gc();

// config = extractConfig(settings, StorageType::Log);
// log_storage.reloadSettings(config);
ok |= log_storage.gc();
done_anything |= log_storage.gc();

return ok;
return done_anything;
}

} // namespace DM
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace DB
struct Settings;
class Context;
class StoragePathPool;
class StableDiskDelegator;

namespace DM
{
Expand All @@ -33,9 +34,10 @@ class StoragePool : private boost::noncopyable
PageId maxMetaPageId() { return max_meta_page_id; }

PageId newLogPageId() { return ++max_log_page_id; }
PageId newDataPageId() { return ++max_data_page_id; }
PageId newMetaPageId() { return ++max_meta_page_id; }

PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who);

PageStorage & log() { return log_storage; }
PageStorage & data() { return data_storage; }
PageStorage & meta() { return meta_storage; }
Expand All @@ -57,6 +59,8 @@ class StoragePool : private boost::noncopyable
std::atomic<Timepoint> last_try_gc_time = Clock::now();

std::mutex mutex;

const Context & global_context;
};

struct StorageSnapshot
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ inline ::testing::AssertionResult HandleRangeCompare(const char * lhs_exp
{
if (lhs == rhs)
return ::testing::AssertionSuccess();
else
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
}
/// helper functions for comparing HandleRange
inline ::testing::AssertionResult RowKeyRangeCompare(const char * lhs_expr,
Expand All @@ -41,8 +40,7 @@ inline ::testing::AssertionResult RowKeyRangeCompare(const char * lhs_exp
{
if (lhs == rhs)
return ::testing::AssertionSuccess();
else
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
}
#define ASSERT_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::HandleRangeCompare, val1, val2)
#define ASSERT_ROWKEY_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::RowKeyRangeCompare, val1, val2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,14 @@ class DeltaMergeStore_RWTest : public ::testing::Test, public testing::WithParam

std::pair<RowKeyRange, std::vector<PageId>> genDMFile(DMContext & context, const Block & block)
{
auto file_id = context.storage_pool.newDataPageId();
auto delegator = context.path_pool.getStableDiskDelegator();
auto file_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto input_stream = std::make_shared<OneBlockInputStream>(block);
auto delegate = context.path_pool.getStableDiskDelegator();
auto store_path = delegate.choosePath();
auto store_path = delegator.choosePath();
auto dmfile = writeIntoNewDMFile(
context, std::make_shared<ColumnDefines>(store->getTableColumns()), input_stream, file_id, store_path, false);

delegate.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path);
delegator.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path);

auto & pk_column = block.getByPosition(0).column;
auto min_pk = pk_column->getInt(0);
Expand Down
29 changes: 14 additions & 15 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/Segment.h>
#include <gtest/gtest.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <gtest/gtest.h>

#include <ctime>
#include <memory>
Expand Down Expand Up @@ -1042,14 +1042,15 @@ class Segment_test_2 : public Segment_test, public testing::WithParamInterface<S

std::pair<RowKeyRange, std::vector<PageId>> genDMFile(DMContext & context, const Block & block)
{
auto file_id = context.storage_pool.newDataPageId();
auto delegator = context.path_pool.getStableDiskDelegator();
auto file_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto input_stream = std::make_shared<OneBlockInputStream>(block);
auto delegate = context.path_pool.getStableDiskDelegator();
auto store_path = delegate.choosePath();
auto store_path = delegator.choosePath();

auto dmfile
= writeIntoNewDMFile(context, std::make_shared<ColumnDefines>(*tableColumns()), input_stream, file_id, store_path, false);

delegate.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path);
delegator.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path);

auto & pk_column = block.getByPosition(0).column;
auto min_pk = pk_column->getInt(0);
Expand Down Expand Up @@ -1077,16 +1078,14 @@ try
case Segment_test_Mode::V2_BlockOnly:
segment->write(dmContext(), std::move(block));
break;
case Segment_test_Mode::V2_FileOnly:
{
auto delegate = dmContext().path_pool.getStableDiskDelegator();
auto file_provider = dmContext().db_context.getFileProvider();
auto [range, file_ids] = genDMFile(dmContext(), block);
auto file_id = file_ids[0];
auto file_parent_path = delegate.getDTFilePath(file_id);
auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path);
auto pack = std::make_shared<DeltaPackFile>(dmContext(), file, range);
delegate.addDTFile(file_id, file->getBytesOnDisk(), file_parent_path);
case Segment_test_Mode::V2_FileOnly: {
auto delegate = dmContext().path_pool.getStableDiskDelegator();
auto file_provider = dmContext().db_context.getFileProvider();
auto [range, file_ids] = genDMFile(dmContext(), block);
auto file_id = file_ids[0];
auto file_parent_path = delegate.getDTFilePath(file_id);
auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path);
auto pack = std::make_shared<DeltaPackFile>(dmContext(), file, range);
WriteBatches wbs(*storage_pool);
wbs.data.putExternal(file_id, 0);
wbs.writeLogAndData();
Expand Down
Loading

0 comments on commit 348cea5

Please sign in to comment.