Skip to content

Commit

Permalink
Fix potential data inconsistency under heavy ddl operation (#5044) (#…
Browse files Browse the repository at this point in the history
…5050)

close #5032
  • Loading branch information
ti-chi-bot authored Jun 2, 2022
1 parent aa14a01 commit 2a0339c
Show file tree
Hide file tree
Showing 14 changed files with 126 additions and 57 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream) \
M(exception_when_read_from_log) \
M(exception_mpp_hash_build)
M(exception_mpp_hash_build) \
M(exception_between_schema_change_in_the_same_diff)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
Expand Down
22 changes: 19 additions & 3 deletions dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace DB
{
namespace ErrorCodes
{
extern const int FAIL_POINT_ERROR;
extern const int UNKNOWN_TABLE;
} // namespace ErrorCodes

Expand Down Expand Up @@ -44,7 +45,22 @@ void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer
{
TMTContext & tmt = context.getTMTContext();
auto schema_syncer = tmt.getSchemaSyncer();
schema_syncer->syncSchemas(context);
try
{
schema_syncer->syncSchemas(context);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::FAIL_POINT_ERROR)
{
output(e.message());
return;
}
else
{
throw;
}
}

output("schemas refreshed");
}
Expand All @@ -56,7 +72,7 @@ void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer
{
auto & service = context.getSchemaSyncService();
Timestamp gc_safe_point = 0;
if (args.size() == 0)
if (args.empty())
gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient());
else
gc_safe_point = safeGet<Timestamp>(typeid_cast<const ASTLiteral &>(*args[0]).value);
Expand All @@ -76,7 +92,7 @@ void dbgFuncResetSchemas(Context & context, const ASTs &, DBGInvoker::Printer ou

void dbgFuncIsTombstone(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() < 1 || args.size() > 2)
if (args.empty() || args.size() > 2)
throw Exception("Args not matched, should be: database-name[, table-name]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,18 @@ class IManageableStorage : public IStorage

virtual size_t getRowKeyColumnSize() const { return 1; }

// when `need_block` is true, it will try return a cached block corresponding to DecodingStorageSchemaSnapshotConstPtr,
// and `releaseDecodingBlock` need to be called when the block is free
// when `need_block` is false, it will just return an nullptr
virtual std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(bool /* need_block */)
/// when `need_block` is true, it will try return a cached block corresponding to DecodingStorageSchemaSnapshotConstPtr,
/// and `releaseDecodingBlock` need to be called when the block is free
/// when `need_block` is false, it will just return an nullptr
/// This method must be called under the protection of table structure lock
virtual std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & /* table_structure_lock */, bool /* need_block */)
{
throw Exception("Method getDecodingSchemaSnapshot is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
};

virtual void releaseDecodingBlock(Int64 /* schema_version */, BlockUPtr /* block */)
/// The `block_decoding_schema_version` is just an internal version for `DecodingStorageSchemaSnapshot`,
/// And it has no relation with the table schema version.
virtual void releaseDecodingBlock(Int64 /* block_decoding_schema_version */, BlockUPtr /* block */)
{
throw Exception("Method getDecodingSchemaSnapshot is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
Expand Down
21 changes: 11 additions & 10 deletions dbms/src/Storages/PrimaryKeyNotMatchException.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,29 @@

namespace DB
{

String fixCreateStatementWithPriKeyNotMatchException( //
Context & context, const String old_definition, const String & table_metadata_path, const PrimaryKeyNotMatchException & ex,
Context & context,
const String & old_definition,
const String & table_metadata_path,
const PrimaryKeyNotMatchException & ex,
Poco::Logger * log)
{
LOG_WARNING(
log, "Try to fix statement in " + table_metadata_path + ", primary key [" + ex.pri_key + "] -> [" + ex.actual_pri_key + "]");
log,
"Try to fix statement in " + table_metadata_path + ", primary key [" + ex.pri_key + "] -> [" + ex.actual_pri_key + "]");
// Try to fix the create statement.
ParserCreateQuery parser;
ASTPtr ast
= parseQuery(parser, old_definition.data(), old_definition.data() + old_definition.size(), "in file " + table_metadata_path, 0);
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
auto args = ast_create_query.storage->engine->arguments;
if (args->children.size() >= 1)
if (!args->children.empty())
{
ASTPtr pk_ast = std::make_shared<ASTExpressionList>();
pk_ast->children.emplace_back(std::make_shared<ASTIdentifier>(ex.actual_pri_key));
args->children[0] = pk_ast;
}
const String statement = getTableDefinitionFromCreateQuery(ast);
String statement = getTableDefinitionFromCreateQuery(ast);
const String table_metadata_tmp_path = table_metadata_path + ".tmp";

{
Expand All @@ -47,9 +50,8 @@ String fixCreateStatementWithPriKeyNotMatchException( //
EncryptionPath encryption_path
= use_target_encrypt_info ? EncryptionPath(table_metadata_path, "") : EncryptionPath(table_metadata_tmp_path, "");
{
bool create_new_encryption_info = !use_target_encrypt_info && statement.size();
WriteBufferFromFileProvider out(context.getFileProvider(), table_metadata_tmp_path, encryption_path, create_new_encryption_info,
nullptr, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
bool create_new_encryption_info = !use_target_encrypt_info && !statement.empty();
WriteBufferFromFileProvider out(context.getFileProvider(), table_metadata_tmp_path, encryption_path, create_new_encryption_info, nullptr, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out);
out.next();
if (context.getSettingsRef().fsync_metadata)
Expand All @@ -60,8 +62,7 @@ String fixCreateStatementWithPriKeyNotMatchException( //
try
{
/// rename atomically replaces the old file with the new one.
context.getFileProvider()->renameFile(table_metadata_tmp_path, encryption_path, table_metadata_path,
EncryptionPath(table_metadata_path, ""), !use_target_encrypt_info);
context.getFileProvider()->renameFile(table_metadata_tmp_path, encryption_path, table_metadata_path, EncryptionPath(table_metadata_path, ""), !use_target_encrypt_info);
}
catch (...)
{
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/Storages/PrimaryKeyNotMatchException.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,21 @@ class Logger;

namespace DB
{

class Context;

struct PrimaryKeyNotMatchException
struct PrimaryKeyNotMatchException : public std::exception
{
// The primary key name in definition
const String pri_key;
// The actual primary key name in TiDB::TableInfo
const String actual_pri_key;
PrimaryKeyNotMatchException(const String & pri_key_, const String & actual_pri_key_)
: pri_key(pri_key_), actual_pri_key(actual_pri_key_)
: pri_key(pri_key_)
, actual_pri_key(actual_pri_key_)
{}
};

// This function will replace the primary key and update statement in `table_metadata_path`. The correct statement will be return.
String fixCreateStatementWithPriKeyNotMatchException(Context & context, const String old_definition, const String & table_metadata_path,
const PrimaryKeyNotMatchException & ex, Poco::Logger * log);
String fixCreateStatementWithPriKeyNotMatchException(Context & context, const String & old_definition, const String & table_metadata_path, const PrimaryKeyNotMatchException & ex, Poco::Logger * log);

} // namespace DB
32 changes: 20 additions & 12 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ class DMBlockOutputStream : public IBlockOutputStream

BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settings & settings)
{
auto & insert_query = typeid_cast<const ASTInsertQuery &>(*query);
const auto & insert_query = typeid_cast<const ASTInsertQuery &>(*query);
auto decorator = [&](const Block & block) { //
return this->buildInsertBlock(insert_query.is_import, insert_query.is_delete, block);
};
Expand Down Expand Up @@ -563,7 +563,7 @@ BlockInputStreams StorageDeltaMerge::read(
// failed to parsed.
ColumnDefines columns_to_read;
auto header = store->getHeader();
for (auto & n : column_names)
for (const auto & n : column_names)
{
ColumnDefine col_define;
if (n == EXTRA_HANDLE_COLUMN_NAME)
Expand Down Expand Up @@ -798,7 +798,7 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM

DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context, size_t total_rows, size_t delete_rows)
{
auto start_index = rand() % (total_rows - delete_rows + 1);
auto start_index = rand() % (total_rows - delete_rows + 1); // NOLINT(cert-msc50-cpp)

DM::RowKeyRange range = DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize());
{
Expand Down Expand Up @@ -849,14 +849,16 @@ void StorageDeltaMerge::deleteRows(const Context & context, size_t delete_rows)
LOG_FMT_ERROR(log, "Rows after delete range not match, expected: {}, got: {}", (total_rows - delete_rows), after_delete_rows);
}

std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> StorageDeltaMerge::getSchemaSnapshotAndBlockForDecoding(bool need_block)
std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> StorageDeltaMerge::getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & table_structure_lock, bool need_block)
{
(void)table_structure_lock;
std::lock_guard lock{decode_schema_mutex};
if (!decoding_schema_snapshot || decoding_schema_snapshot->schema_version < tidb_table_info.schema_version)
if (!decoding_schema_snapshot || decoding_schema_changed)
{
auto & store = getAndMaybeInitStore();
decoding_schema_snapshot = std::make_shared<DecodingStorageSchemaSnapshot>(store->getStoreColumns(), tidb_table_info, store->getHandle());
decoding_schema_snapshot = std::make_shared<DecodingStorageSchemaSnapshot>(store->getStoreColumns(), tidb_table_info, store->getHandle(), decoding_schema_version++);
cache_blocks.clear();
decoding_schema_changed = false;
}

if (need_block)
Expand All @@ -878,10 +880,10 @@ std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> StorageDeltaMerg
}
}

void StorageDeltaMerge::releaseDecodingBlock(Int64 schema_version, BlockUPtr block_ptr)
void StorageDeltaMerge::releaseDecodingBlock(Int64 block_decoding_schema_version, BlockUPtr block_ptr)
{
std::lock_guard lock{decode_schema_mutex};
if (!decoding_schema_snapshot || schema_version < decoding_schema_snapshot->schema_version)
if (!decoding_schema_snapshot || block_decoding_schema_version < decoding_schema_snapshot->decoding_schema_version)
return;
if (cache_blocks.size() >= max_cached_blocks_num)
return;
Expand Down Expand Up @@ -931,12 +933,12 @@ static void updateDeltaMergeTableCreateStatement(
const SortDescription & pk_names,
const ColumnsDescription & columns,
const OrderedNameSet & hidden_columns,
const OptionTableInfoConstRef table_info,
OptionTableInfoConstRef table_info,
Timestamp tombstone,
const Context & context);

inline OptionTableInfoConstRef getTableInfoForCreateStatement(
const OptionTableInfoConstRef table_info_from_tidb,
OptionTableInfoConstRef table_info_from_tidb,
TiDB::TableInfo & table_info_from_store,
const ColumnDefines & store_table_columns,
const OrderedNameSet & hidden_columns)
Expand Down Expand Up @@ -1060,6 +1062,7 @@ try
updateTableColumnInfo();
}
}
decoding_schema_changed = true;

SortDescription pk_desc = getPrimarySortDescription();
ColumnDefines store_columns = getStoreColumnDefines();
Expand Down Expand Up @@ -1416,7 +1419,7 @@ void StorageDeltaMerge::startup()
tmt.getStorages().put(std::static_pointer_cast<StorageDeltaMerge>(shared_from_this()));
}

void StorageDeltaMerge::shutdown()
void StorageDeltaMerge::shutdownImpl()
{
bool v = false;
if (!shutdown_called.compare_exchange_strong(v, true))
Expand All @@ -1427,6 +1430,11 @@ void StorageDeltaMerge::shutdown()
}
}

void StorageDeltaMerge::shutdown()
{
shutdownImpl();
}

void StorageDeltaMerge::removeFromTMTContext()
{
// remove this table from TMTContext
Expand All @@ -1437,7 +1445,7 @@ void StorageDeltaMerge::removeFromTMTContext()

StorageDeltaMerge::~StorageDeltaMerge()
{
shutdown();
shutdownImpl();
}

DataTypePtr StorageDeltaMerge::getPKTypeImpl() const
Expand Down
12 changes: 9 additions & 3 deletions dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class StorageDeltaMerge
// Apply AlterCommands synced from TiDB should use `alterFromTiDB` instead of `alter(...)`
void alterFromTiDB(
const TableLockHolder &,
const AlterCommands & commands,
const AlterCommands & params,
const String & database_name,
const TiDB::TableInfo & table_info,
const SchemaNameMapper & name_mapper,
Expand Down Expand Up @@ -123,9 +123,9 @@ class StorageDeltaMerge

size_t getRowKeyColumnSize() const override { return rowkey_column_size; }

std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(bool /* need_block */) override;
std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & table_structure_lock, bool /* need_block */) override;

void releaseDecodingBlock(Int64 schema_version, BlockUPtr block) override;
void releaseDecodingBlock(Int64 block_decoding_schema_version, BlockUPtr block) override;

bool initStoreIfDataDirExist() override;

Expand Down Expand Up @@ -168,6 +168,7 @@ class StorageDeltaMerge
void updateTableColumnInfo();
DM::ColumnDefines getStoreColumnDefines() const;
bool dataDirExist();
void shutdownImpl();

#ifndef DBMS_PUBLIC_GTEST
private:
Expand Down Expand Up @@ -206,6 +207,11 @@ class StorageDeltaMerge

mutable std::mutex decode_schema_mutex;
DecodingStorageSchemaSnapshotPtr decoding_schema_snapshot;
// The following two members must be used under the protection of table structure lock
bool decoding_schema_changed = false;
// internal version for `decoding_schema_snapshot`
Int64 decoding_schema_version = 1;

// avoid creating block every time when decoding row
std::vector<BlockUPtr> cache_blocks;
// avoid creating too many cached blocks(the typical num should be less and equal than raft apply thread)
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/Transaction/DecodingStorageSchemaSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ struct DecodingStorageSchemaSnapshot
bool pk_is_handle;
bool is_common_handle;
TMTPKType pk_type = TMTPKType::UNSPECIFIED;
Int64 schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION;
// an internal increasing version for `DecodingStorageSchemaSnapshot`, has no relation with the table schema version
Int64 decoding_schema_version;

DecodingStorageSchemaSnapshot(DM::ColumnDefinesPtr column_defines_, const TiDB::TableInfo & table_info_, const DM::ColumnDefine & original_handle_)
DecodingStorageSchemaSnapshot(DM::ColumnDefinesPtr column_defines_, const TiDB::TableInfo & table_info_, const DM::ColumnDefine & original_handle_, Int64 decoding_schema_version_)
: column_defines{std::move(column_defines_)}
, pk_is_handle{table_info_.pk_is_handle}
, is_common_handle{table_info_.is_common_handle}
, schema_version{table_info_.schema_version}
, decoding_schema_version{decoding_schema_version_}
{
std::unordered_map<ColumnID, size_t> column_lut;
for (size_t i = 0; i < table_info_.columns.size(); i++)
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ static void writeRegionDataToStorage(
/// Read region data as block.
Stopwatch watch;

Int64 block_schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION;
Int64 block_decoding_schema_version = -1;
BlockUPtr block_ptr = nullptr;
if (need_decode)
{
LOG_FMT_TRACE(log, "{} begin to decode table {}, region {}", FUNCTION_NAME, table_id, region->id());
DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot;
std::tie(decoding_schema_snapshot, block_ptr) = storage->getSchemaSnapshotAndBlockForDecoding(true);
block_schema_version = decoding_schema_snapshot->schema_version;
std::tie(decoding_schema_snapshot, block_ptr) = storage->getSchemaSnapshotAndBlockForDecoding(lock, true);
block_decoding_schema_version = decoding_schema_snapshot->decoding_schema_version;

auto reader = RegionBlockReader(decoding_schema_snapshot);
if (!reader.read(*block_ptr, data_list_read, force_decode))
Expand Down Expand Up @@ -139,7 +139,7 @@ static void writeRegionDataToStorage(
write_part_cost = watch.elapsedMilliseconds();
GET_METRIC(tiflash_raft_write_data_to_storage_duration_seconds, type_write).Observe(write_part_cost / 1000.0);
if (need_decode)
storage->releaseDecodingBlock(block_schema_version, std::move(block_ptr));
storage->releaseDecodingBlock(block_decoding_schema_version, std::move(block_ptr));

LOG_TRACE(log,
FUNCTION_NAME << ": table " << table_id << ", region " << region->id() << ", cost [region decode " << region_decode_cost
Expand Down Expand Up @@ -496,7 +496,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
}

DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot;
std::tie(decoding_schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(false);
std::tie(decoding_schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(lock, false);
res_block = createBlockSortByColumnID(decoding_schema_snapshot);
auto reader = RegionBlockReader(decoding_schema_snapshot);
if (!reader.read(res_block, *data_list_read, force_decode))
Expand Down Expand Up @@ -548,7 +548,7 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)
auto table_lock = storage->lockStructureForShare(getThreadName());
dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
// only dt storage engine support `getSchemaSnapshotAndBlockForDecoding`, other engine will throw exception
std::tie(schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(false);
std::tie(schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
std::tie(std::ignore, drop_lock) = std::move(table_lock).release();
return true;
};
Expand Down
Loading

0 comments on commit 2a0339c

Please sign in to comment.