Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicated ID DTFile that cause inconsistent query result (#2770) #2775

Merged
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
*\#
.tramp_history

# vscode clangd cache
.cache

# JSON Compilation Database Format Specification
# https://clang.llvm.org/docs/JSONCompilationDatabase.html
compile_commands.json

# git patch reject report
*.rej

# vim cache files
*.swp

Expand Down
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ if (ENABLE_TESTS)
${ClickHouse_SOURCE_DIR}/dbms/src/Server/StorageConfigParser.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/Server/UserConfigParser.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/Server/RaftConfigParser.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/TestUtils/TiFlashTestBasic.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/AggregateFunctions/AggregateFunctionSum.cpp
)
target_include_directories(gtests_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,17 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
M(minimum_block_size_for_cross_join)
#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
M(minimum_block_size_for_cross_join) \
M(random_exception_after_dt_write_done) \
M(random_slow_page_storage_write) \
M(random_exception_after_page_storage_sequence_acquired) \
M(random_slow_page_storage_remove_expired_snapshots) \
M(random_slow_page_storage_list_all_live_files) \
M(force_set_safepoint_when_decode_block) \
M(force_set_page_data_compact_batch) \
M(force_set_dtfile_exist_when_acquire_id)

#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,11 @@ DeltaPacks DeltaValueSpace::checkHeadAndCloneTail(DMContext & context,
}
else if (auto f = pack->tryToFile(); f)
{
auto new_ref_id = context.storage_pool.newDataPageId();
auto file_id = f->getFile()->fileId();
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto file_id = f->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* ref_id= */ new_ref_id, file_parent_path);

auto new_pack = f->cloneWith(context, new_file, target_range);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ void DeltaMergeStore::writeRegionSnapshot(const DMContextPtr & dm_context,

EventRecorder write_block_recorder(ProfileEvents::DMDeleteRange, ProfileEvents::DMDeleteRangeNS);

auto delegate = dm_context->path_pool.getStableDiskDelegator();
auto delegator = dm_context->path_pool.getStableDiskDelegator();
auto file_provider = dm_context->db_context.getFileProvider();

size_t rows = 0;
Expand All @@ -567,7 +567,7 @@ void DeltaMergeStore::writeRegionSnapshot(const DMContextPtr & dm_context,
DMFiles files;
for (auto file_id : file_ids)
{
auto file_parent_path = delegate.getDTFilePath(file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);

auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path);
files.push_back(file);
Expand Down Expand Up @@ -640,7 +640,7 @@ void DeltaMergeStore::writeRegionSnapshot(const DMContextPtr & dm_context,
else
{
auto & file_parent_path = file->parentPath();
auto ref_id = storage_pool.newDataPageId();
auto ref_id = storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);

auto ref_file = DMFile::restore(file_provider, file_id, ref_id, file_parent_path);
auto pack = std::make_shared<DeltaPackFile>(*dm_context, ref_file, segment_range);
Expand Down
20 changes: 9 additions & 11 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,16 @@ StableValueSpacePtr createNewStable(DMContext & context,
WriteBatches & wbs,
bool need_rate_limit)
{
auto delegate = context.path_pool.getStableDiskDelegator();
auto store_path = delegate.choosePath();
auto delegator = context.path_pool.getStableDiskDelegator();
auto store_path = delegator.choosePath();

PageId dmfile_id = context.storage_pool.newDataPageId();
PageId dmfile_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto dmfile = writeIntoNewDMFile(context, schema_snap, input_stream, dmfile_id, store_path, need_rate_limit);
auto stable = std::make_shared<StableValueSpace>(stable_id);
stable->setFiles({dmfile}, RowKeyRange::newAll(context.is_common_handle, context.rowkey_column_size));
stable->saveMeta(wbs.meta);
wbs.data.putExternal(dmfile_id, 0);
delegate.addDTFile(dmfile_id, dmfile->getBytesOnDisk(), store_path);
delegator.addDTFile(dmfile_id, dmfile->getBytesOnDisk(), store_path);

return stable;
}
Expand Down Expand Up @@ -215,16 +215,14 @@ SegmentPtr Segment::restoreSegment(DMContext & context, PageId segment_id)

switch (version)
{
case SegmentFormat::V1:
{
case SegmentFormat::V1: {
HandleRange range;
readIntBinary(range.start, buf);
readIntBinary(range.end, buf);
rowkey_range = RowKeyRange::fromHandleRange(range);
break;
}
case SegmentFormat::V2:
{
case SegmentFormat::V2: {
rowkey_range = RowKeyRange::deserialize(buf);
break;
}
Expand Down Expand Up @@ -855,8 +853,8 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitLogical(DMContext & dm_co
auto file_id = dmfile->fileId();
auto file_parent_path = delegate.getDTFilePath(file_id);

auto my_dmfile_id = storage_pool.newDataPageId();
auto other_dmfile_id = storage_pool.newDataPageId();
auto my_dmfile_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
auto other_dmfile_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);

wbs.data.putRefPage(my_dmfile_id, file_id);
wbs.data.putRefPage(other_dmfile_id, file_id);
Expand Down Expand Up @@ -1472,7 +1470,7 @@ bool Segment::placeDelete(const DMContext & dm_context,
{
RowKeyValueRef first_rowkey = RowKeyColumnContainer(block.getByPosition(0).column, is_common_handle).getRowKeyValue(0);
auto place_handle_range = skippable_place ? RowKeyRange::startFrom(first_rowkey, is_common_handle, rowkey_column_size)
: RowKeyRange::newAll(is_common_handle, rowkey_column_size);
: RowKeyRange::newAll(is_common_handle, rowkey_column_size);

auto compacted_index = update_delta_tree.getCompactedEntries();

Expand Down
50 changes: 44 additions & 6 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <Common/FailPoint.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/Settings.h>
#include <Storages/DeltaMerge/StoragePool.h>
Expand All @@ -6,6 +8,10 @@

namespace DB
{
namespace FailPoints
{
extern const char force_set_dtfile_exist_when_acquire_id[];
} // namespace FailPoints
namespace DM
{
enum class StorageType
Expand Down Expand Up @@ -66,7 +72,8 @@ StoragePool::StoragePool(const String & name, StoragePathPool & path_pool, const
global_ctx.getTiFlashMetrics()),
max_log_page_id(0),
max_data_page_id(0),
max_meta_page_id(0)
max_meta_page_id(0),
global_context(global_ctx)
{
}

Expand All @@ -88,6 +95,37 @@ void StoragePool::drop()
log_storage.drop();
}

PageId StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who)
{
// In case that there is a DTFile created on disk but TiFlash crashes without persisting the ID.
// After TiFlash process restored, the ID will be inserted into the stable delegator, but we may
// get a duplicated ID from the `storage_pool.data`. (tics#2756)
PageId dtfile_id;
do
{
dtfile_id = ++max_data_page_id;

auto existed_path = delegator.getDTFilePath(dtfile_id, /*throw_on_not_exist=*/false);
fiu_do_on(FailPoints::force_set_dtfile_exist_when_acquire_id, {
static size_t fail_point_called = 0;
if (existed_path.empty() && fail_point_called % 10 == 0)
{
existed_path = "<mock for existed path>";
}
fail_point_called++;
});
if (likely(existed_path.empty()))
{
break;
}
// else there is a DTFile with that id, continue to acquire a new ID.
LOG_WARNING(&Poco::Logger::get(who),
"The DTFile is already exists, continute to acquire another ID. [path=" + existed_path
+ "] [id=" + DB::toString(dtfile_id) + "]");
} while (true);
return dtfile_id;
}

bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_period)
{
{
Expand All @@ -100,22 +138,22 @@ bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_perio
last_try_gc_time = now;
}

bool ok = false;
bool done_anything = false;

// FIXME: The global_context.settings is mutable, we need a way to reload thses settings.
// auto config = extractConfig(settings, StorageType::Meta);
// meta_storage.reloadSettings(config);
ok |= meta_storage.gc();
done_anything |= meta_storage.gc();

// config = extractConfig(settings, StorageType::Data);
// data_storage.reloadSettings(config);
ok |= data_storage.gc();
done_anything |= data_storage.gc();

// config = extractConfig(settings, StorageType::Log);
// log_storage.reloadSettings(config);
ok |= log_storage.gc();
done_anything |= log_storage.gc();

return ok;
return done_anything;
}

} // namespace DM
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace DB
struct Settings;
class Context;
class StoragePathPool;
class StableDiskDelegator;

namespace DM
{
Expand All @@ -33,9 +34,10 @@ class StoragePool : private boost::noncopyable
PageId maxMetaPageId() { return max_meta_page_id; }

PageId newLogPageId() { return ++max_log_page_id; }
PageId newDataPageId() { return ++max_data_page_id; }
PageId newMetaPageId() { return ++max_meta_page_id; }

PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who);

PageStorage & log() { return log_storage; }
PageStorage & data() { return data_storage; }
PageStorage & meta() { return meta_storage; }
Expand All @@ -57,6 +59,8 @@ class StoragePool : private boost::noncopyable
std::atomic<Timepoint> last_try_gc_time = Clock::now();

std::mutex mutex;

const Context & global_context;
};

struct StorageSnapshot
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ inline ::testing::AssertionResult HandleRangeCompare(const char * lhs_exp
{
if (lhs == rhs)
return ::testing::AssertionSuccess();
else
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
}
/// helper functions for comparing HandleRange
inline ::testing::AssertionResult RowKeyRangeCompare(const char * lhs_expr,
Expand All @@ -41,8 +40,7 @@ inline ::testing::AssertionResult RowKeyRangeCompare(const char * lhs_exp
{
if (lhs == rhs)
return ::testing::AssertionSuccess();
else
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
}
#define ASSERT_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::HandleRangeCompare, val1, val2)
#define ASSERT_ROWKEY_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::RowKeyRangeCompare, val1, val2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,14 @@ class DeltaMergeStore_RWTest : public ::testing::Test, public testing::WithParam

std::pair<RowKeyRange, std::vector<PageId>> genDMFile(DMContext & context, const Block & block)
{
auto file_id = context.storage_pool.newDataPageId();
auto delegator = context.path_pool.getStableDiskDelegator();
auto file_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto input_stream = std::make_shared<OneBlockInputStream>(block);
auto delegate = context.path_pool.getStableDiskDelegator();
auto store_path = delegate.choosePath();
auto store_path = delegator.choosePath();
auto dmfile = writeIntoNewDMFile(
context, std::make_shared<ColumnDefines>(store->getTableColumns()), input_stream, file_id, store_path, false);

delegate.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path);
delegator.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path);

auto & pk_column = block.getByPosition(0).column;
auto min_pk = pk_column->getInt(0);
Expand Down
29 changes: 14 additions & 15 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/Segment.h>
#include <gtest/gtest.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <gtest/gtest.h>

#include <ctime>
#include <memory>
Expand Down Expand Up @@ -1042,14 +1042,15 @@ class Segment_test_2 : public Segment_test, public testing::WithParamInterface<S

std::pair<RowKeyRange, std::vector<PageId>> genDMFile(DMContext & context, const Block & block)
{
auto file_id = context.storage_pool.newDataPageId();
auto delegator = context.path_pool.getStableDiskDelegator();
auto file_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto input_stream = std::make_shared<OneBlockInputStream>(block);
auto delegate = context.path_pool.getStableDiskDelegator();
auto store_path = delegate.choosePath();
auto store_path = delegator.choosePath();

auto dmfile
= writeIntoNewDMFile(context, std::make_shared<ColumnDefines>(*tableColumns()), input_stream, file_id, store_path, false);

delegate.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path);
delegator.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path);

auto & pk_column = block.getByPosition(0).column;
auto min_pk = pk_column->getInt(0);
Expand Down Expand Up @@ -1077,16 +1078,14 @@ try
case Segment_test_Mode::V2_BlockOnly:
segment->write(dmContext(), std::move(block));
break;
case Segment_test_Mode::V2_FileOnly:
{
auto delegate = dmContext().path_pool.getStableDiskDelegator();
auto file_provider = dmContext().db_context.getFileProvider();
auto [range, file_ids] = genDMFile(dmContext(), block);
auto file_id = file_ids[0];
auto file_parent_path = delegate.getDTFilePath(file_id);
auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path);
auto pack = std::make_shared<DeltaPackFile>(dmContext(), file, range);
delegate.addDTFile(file_id, file->getBytesOnDisk(), file_parent_path);
case Segment_test_Mode::V2_FileOnly: {
auto delegate = dmContext().path_pool.getStableDiskDelegator();
auto file_provider = dmContext().db_context.getFileProvider();
auto [range, file_ids] = genDMFile(dmContext(), block);
auto file_id = file_ids[0];
auto file_parent_path = delegate.getDTFilePath(file_id);
auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path);
auto pack = std::make_shared<DeltaPackFile>(dmContext(), file, range);
WriteBatches wbs(*storage_pool);
wbs.data.putExternal(file_id, 0);
wbs.writeLogAndData();
Expand Down
Loading