Skip to content

Commit

Permalink
Fix duplicated ID DTFile that cause inconsistent query result (pingca…
Browse files Browse the repository at this point in the history
…p#2770) (pingcap#2772)

* Add guard for adding DTFiles with duplicated id
* Get rid of generating DTFile with duplicated ID
Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang authored Aug 25, 2021
1 parent 2e3b82e commit 576ddf8
Show file tree
Hide file tree
Showing 13 changed files with 286 additions and 236 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
# vscode clangd cache
.cache

# JSON Compilation Database Format Specification
# https://clang.llvm.org/docs/JSONCompilationDatabase.html
compile_commands.json

# git patch reject report
*.rej

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(random_slow_page_storage_remove_expired_snapshots) \
M(random_slow_page_storage_list_all_live_files) \
M(force_set_safepoint_when_decode_block) \
M(force_set_page_data_compact_batch)
M(force_set_page_data_compact_batch) \
M(force_set_dtfile_exist_when_acquire_id)

#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,11 @@ DeltaPacks DeltaValueSpace::checkHeadAndCloneTail(DMContext & context,
}
else if (auto f = pack->tryToFile(); f)
{
auto new_ref_id = context.storage_pool.newDataPageId();
auto file_id = f->getFile()->fileId();
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto file_id = f->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* ref_id= */ new_ref_id, file_parent_path);

auto new_pack = f->cloneWith(context, new_file, target_range);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ std::tuple<String, PageId> DeltaMergeStore::preAllocateIngestFile()

auto delegator = path_pool.getStableDiskDelegator();
auto parent_path = delegator.choosePath();
auto new_id = storage_pool.newDataPageId();
auto new_id = storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
return {parent_path, new_id};
}

Expand Down Expand Up @@ -679,7 +679,7 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
/// Generate DMFile instance with a new ref_id pointed to the file_id.
auto file_id = file->fileId();
auto & file_parent_path = file->parentPath();
auto ref_id = storage_pool.newDataPageId();
auto ref_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);

auto ref_file = DMFile::restore(file_provider, file_id, ref_id, file_parent_path);
auto pack = std::make_shared<DeltaPackFile>(*dm_context, ref_file, segment_range);
Expand Down
333 changes: 167 additions & 166 deletions dbms/src/Storages/DeltaMerge/Segment.cpp

Large diffs are not rendered by default.

96 changes: 63 additions & 33 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
@@ -1,44 +1,50 @@
#include <Common/FailPoint.h>
#include <Interpreters/Context.h>
#include <Interpreters/Settings.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/Page/ConfigSettings.h>
#include <Storages/PathPool.h>
#include <fmt/format.h>

namespace DB
{
namespace FailPoints
{
extern const char force_set_dtfile_exist_when_acquire_id[];
} // namespace FailPoints
namespace DM
{
enum class StorageType
{
Log = 1,
Log = 1,
Data = 2,
Meta = 3,
};

PageStorage::Config extractConfig(const Settings & settings, StorageType subtype)
{
#define SET_CONFIG(NAME) \
config.num_write_slots = settings.dt_storage_pool_##NAME##_write_slots; \
config.gc_min_files = settings.dt_storage_pool_##NAME##_gc_min_file_num; \
config.gc_min_bytes = settings.dt_storage_pool_##NAME##_gc_min_bytes; \
config.num_write_slots = settings.dt_storage_pool_##NAME##_write_slots; \
config.gc_min_files = settings.dt_storage_pool_##NAME##_gc_min_file_num; \
config.gc_min_bytes = settings.dt_storage_pool_##NAME##_gc_min_bytes; \
config.gc_min_legacy_num = settings.dt_storage_pool_##NAME##_gc_min_legacy_num; \
config.gc_max_valid_rate = settings.dt_storage_pool_##NAME##_gc_max_valid_rate;

PageStorage::Config config = getConfigFromSettings(settings);

switch (subtype)
{
case StorageType::Log:
SET_CONFIG(log);
break;
case StorageType::Data:
SET_CONFIG(data);
break;
case StorageType::Meta:
SET_CONFIG(meta);
break;
default:
throw Exception("Unknown subtype in extractConfig: " + DB::toString(static_cast<Int32>(subtype)));
case StorageType::Log:
SET_CONFIG(log);
break;
case StorageType::Data:
SET_CONFIG(data);
break;
case StorageType::Meta:
SET_CONFIG(meta);
break;
default:
throw Exception("Unknown subtype in extractConfig: " + DB::toString(static_cast<Int32>(subtype)));
}
#undef SET_CONFIG

Expand All @@ -47,37 +53,31 @@ PageStorage::Config extractConfig(const Settings & settings, StorageType subtype

StoragePool::StoragePool(const String & name, StoragePathPool & path_pool, const Context & global_ctx, const Settings & settings)
: // The iops and bandwidth in log_storage are relatively high, use multi-disks if possible
log_storage(name + ".log",
path_pool.getPSDiskDelegatorMulti("log"),
extractConfig(settings, StorageType::Log),
global_ctx.getFileProvider(),
global_ctx.getTiFlashMetrics()),
log_storage(
name + ".log", path_pool.getPSDiskDelegatorMulti("log"), extractConfig(settings, StorageType::Log), global_ctx.getFileProvider()),
// The iops in data_storage is low, only use the first disk for storing data
data_storage(name + ".data",
path_pool.getPSDiskDelegatorSingle("data"),
extractConfig(settings, StorageType::Data),
global_ctx.getFileProvider(),
global_ctx.getTiFlashMetrics()),
path_pool.getPSDiskDelegatorSingle("data"),
extractConfig(settings, StorageType::Data),
global_ctx.getFileProvider()),
// The iops in meta_storage is relatively high, use multi-disks if possible
meta_storage(name + ".meta",
path_pool.getPSDiskDelegatorMulti("meta"),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider(),
global_ctx.getTiFlashMetrics()),
path_pool.getPSDiskDelegatorMulti("meta"),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider()),
max_log_page_id(0),
max_data_page_id(0),
max_meta_page_id(0),
global_context(global_ctx)
{
}
{}

void StoragePool::restore()
{
log_storage.restore();
data_storage.restore();
meta_storage.restore();

max_log_page_id = log_storage.getMaxId();
max_log_page_id = log_storage.getMaxId();
max_data_page_id = data_storage.getMaxId();
max_meta_page_id = meta_storage.getMaxId();
}
Expand All @@ -89,6 +89,36 @@ void StoragePool::drop()
log_storage.drop();
}

PageId StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who)
{
// In case that there is a DTFile created on disk but TiFlash crashes without persisting the ID.
// After TiFlash process restored, the ID will be inserted into the stable delegator, but we may
// get a duplicated ID from the `storage_pool.data`. (tics#2756)
PageId dtfile_id;
do
{
dtfile_id = ++max_data_page_id;

auto existed_path = delegator.getDTFilePath(dtfile_id, /*throw_on_not_exist=*/false);
fiu_do_on(FailPoints::force_set_dtfile_exist_when_acquire_id, {
static size_t fail_point_called = 0;
if (existed_path.empty() && fail_point_called % 10 == 0)
{
existed_path = "<mock for existed path>";
}
fail_point_called++;
});
if (likely(existed_path.empty()))
{
break;
}
// else there is a DTFile with that id, continue to acquire a new ID.
LOG_WARNING(&Poco::Logger::get(who),
fmt::format("The DTFile is already exists, continute to acquire another ID. [path={}] [id={}]", existed_path, dtfile_id));
} while (true);
return dtfile_id;
}

bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_period)
{
{
Expand All @@ -102,8 +132,8 @@ bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_perio
}

bool done_anything = false;
auto write_limiter = global_context.getWriteLimiter();
auto read_limiter = global_context.getReadLimiter();
auto write_limiter = global_context.getWriteLimiter();
auto read_limiter = global_context.getReadLimiter();
// FIXME: The global_context.settings is mutable, we need a way to reload thses settings.
// auto config = extractConfig(settings, StorageType::Meta);
// meta_storage.reloadSettings(config);
Expand Down
15 changes: 8 additions & 7 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace DB
struct Settings;
class Context;
class StoragePathPool;
class StableDiskDelegator;

namespace DM
{
Expand All @@ -19,10 +20,10 @@ static const std::chrono::seconds DELTA_MERGE_GC_PERIOD(60);
class StoragePool : private boost::noncopyable
{
public:
using Clock = std::chrono::system_clock;
using Clock = std::chrono::system_clock;
using Timepoint = Clock::time_point;
using Duration = Clock::duration;
using Seconds = std::chrono::seconds;
using Duration = Clock::duration;
using Seconds = std::chrono::seconds;

StoragePool(const String & name, StoragePathPool & path_pool, const Context & global_ctx, const Settings & settings);

Expand All @@ -33,9 +34,10 @@ class StoragePool : private boost::noncopyable
PageId maxMetaPageId() { return max_meta_page_id; }

PageId newLogPageId() { return ++max_log_page_id; }
PageId newDataPageId() { return ++max_data_page_id; }
PageId newMetaPageId() { return ++max_meta_page_id; }

PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who);

PageStorage & log() { return log_storage; }
PageStorage & data() { return data_storage; }
PageStorage & meta() { return meta_storage; }
Expand All @@ -58,7 +60,7 @@ class StoragePool : private boost::noncopyable

std::mutex mutex;

const Context& global_context;
const Context & global_context;
};

struct StorageSnapshot : private boost::noncopyable
Expand All @@ -67,8 +69,7 @@ struct StorageSnapshot : private boost::noncopyable
: log_reader(storage.log(), snapshot_read ? storage.log().getSnapshot() : nullptr, read_limiter),
data_reader(storage.data(), snapshot_read ? storage.data().getSnapshot() : nullptr, read_limiter),
meta_reader(storage.meta(), snapshot_read ? storage.meta().getSnapshot() : nullptr, read_limiter)
{
}
{}

PageReader log_reader;
PageReader data_reader;
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ inline ::testing::AssertionResult HandleRangeCompare(const char * lhs_exp
{
if (lhs == rhs)
return ::testing::AssertionSuccess();
else
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
}
/// helper functions for comparing HandleRange
inline ::testing::AssertionResult RowKeyRangeCompare(const char * lhs_expr,
Expand All @@ -43,8 +42,7 @@ inline ::testing::AssertionResult RowKeyRangeCompare(const char * lhs_exp
{
if (lhs == rhs)
return ::testing::AssertionSuccess();
else
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false);
}
#define ASSERT_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::HandleRangeCompare, val1, val2)
#define ASSERT_ROWKEY_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::RowKeyRangeCompare, val1, val2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1323,7 +1323,7 @@ try
/* max_version= */ tso1,
EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1UL);
ASSERT_EQ(ins.size(), 1);
BlockInputStreamPtr in = ins[0];

size_t num_rows_read = 0;
Expand All @@ -1345,7 +1345,7 @@ try
num_rows_read += block.rows();
}
in->readSuffix();
EXPECT_EQ(num_rows_read, 32UL) << "Data [32, 128) before ingest should be erased, should only get [0, 32)";
EXPECT_EQ(num_rows_read, 32) << "Data [32, 128) before ingest should be erased, should only get [0, 32)";
}

{
Expand All @@ -1359,15 +1359,15 @@ try
/* max_version= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1UL);
ASSERT_EQ(ins.size(), 1);
BlockInputStreamPtr in = ins[0];

size_t num_rows_read = 0;
in->readPrefix();
while (Block block = in->read())
num_rows_read += block.rows();
in->readSuffix();
EXPECT_EQ(num_rows_read, 32UL) << "The rows number after ingest is not match";
EXPECT_EQ(num_rows_read, 32) << "The rows number after ingest is not match";
}
}
CATCH
Expand Down
15 changes: 7 additions & 8 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,22 +1033,22 @@ class Segment_test_2 : public Segment_test, public testing::WithParamInterface<S

std::pair<RowKeyRange, std::vector<PageId>> genDMFile(DMContext & context, const Block & block)
{
auto file_id = context.storage_pool.newDataPageId();
auto delegator = context.path_pool.getStableDiskDelegator();
auto file_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto input_stream = std::make_shared<OneBlockInputStream>(block);
auto delegate = context.path_pool.getStableDiskDelegator();
auto store_path = delegate.choosePath();
auto store_path = delegator.choosePath();

DMFileBlockOutputStream::Flags flags;
flags.setSingleFile(DMTestEnv::getPseudoRandomNumber() % 2);

auto dmfile
= writeIntoNewDMFile(context, std::make_shared<ColumnDefines>(*tableColumns()), input_stream, file_id, store_path, flags);

delegate.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path);
delegator.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path);

auto & pk_column = block.getByPosition(0).column;
auto min_pk = pk_column->getInt(0);
auto max_pk = pk_column->getInt(block.rows() - 1);
auto & pk_column = block.getByPosition(0).column;
auto min_pk = pk_column->getInt(0);
auto max_pk = pk_column->getInt(block.rows() - 1);
HandleRange range(min_pk, max_pk + 1);

return {RowKeyRange::fromHandleRange(range), {file_id}};
Expand Down Expand Up @@ -1080,7 +1080,6 @@ try
auto file_parent_path = delegate.getDTFilePath(file_id);
auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path);
auto pack = std::make_shared<DeltaPackFile>(dmContext(), file, range);
delegate.addDTFile(file_id, file->getBytesOnDisk(), file_parent_path);
WriteBatches wbs(*storage_pool);
wbs.data.putExternal(file_id, 0);
wbs.writeLogAndData();
Expand Down
Loading

0 comments on commit 576ddf8

Please sign in to comment.