From 2ce9529f1069185f82b3018e5b96152fcd02b601 Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Thu, 2 Jun 2022 13:13:44 +0800 Subject: [PATCH] Fix potential data inconsistency under heavy ddl operation (#5044) * fix decoding error under heavy ddl operation * small fix --- dbms/src/Common/FailPoint.cpp | 3 ++- dbms/src/Debug/dbgFuncSchema.cpp | 18 +++++++++++++++++- dbms/src/Storages/IManageableStorage.h | 7 +++++-- dbms/src/Storages/StorageDeltaMerge.cpp | 13 ++++++++----- dbms/src/Storages/StorageDeltaMerge.h | 9 +++++++-- .../DecodingStorageSchemaSnapshot.h | 7 ++++--- .../Storages/Transaction/PartitionStreams.cpp | 12 ++++++------ .../Transaction/tests/RowCodecTestUtils.h | 4 ++-- dbms/src/TiDB/Schema/SchemaBuilder.cpp | 13 ++++++++++++- dbms/src/TiDB/Schema/TiDBSchemaSyncer.h | 9 +++++++++ .../ddl/alter_column_when_pk_is_handle.test | 18 +++++++++++++++++- 11 files changed, 89 insertions(+), 24 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 921dd5bf748..c6c3caa44ad 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -65,7 +65,8 @@ std::unordered_map> FailPointHelper::f M(exception_when_read_from_log) \ M(exception_mpp_hash_build) \ M(exception_before_drop_segment) \ - M(exception_after_drop_segment) + M(exception_after_drop_segment) \ + M(exception_between_schema_change_in_the_same_diff) #define APPLY_FOR_FAILPOINTS(M) \ M(skip_check_segment_update) \ diff --git a/dbms/src/Debug/dbgFuncSchema.cpp b/dbms/src/Debug/dbgFuncSchema.cpp index 8b73ddc23a3..c388015dc10 100644 --- a/dbms/src/Debug/dbgFuncSchema.cpp +++ b/dbms/src/Debug/dbgFuncSchema.cpp @@ -34,6 +34,7 @@ namespace DB { namespace ErrorCodes { +extern const int FAIL_POINT_ERROR; extern const int UNKNOWN_TABLE; } // namespace ErrorCodes @@ -62,7 +63,22 @@ void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer { TMTContext & tmt = context.getTMTContext(); auto schema_syncer = tmt.getSchemaSyncer(); - schema_syncer->syncSchemas(context); + try + { + schema_syncer->syncSchemas(context); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::FAIL_POINT_ERROR) + { + output(e.message()); + return; + } + else + { + throw; + } + } output("schemas refreshed"); } diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index e41d092ca87..ebf84c592e4 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -157,12 +157,15 @@ class IManageableStorage : public IStorage /// when `need_block` is true, it will try return a cached block corresponding to DecodingStorageSchemaSnapshotConstPtr, /// and `releaseDecodingBlock` need to be called when the block is free /// when `need_block` is false, it will just return an nullptr - virtual std::pair getSchemaSnapshotAndBlockForDecoding(bool /* need_block */) + /// This method must be called under the protection of table structure lock + virtual std::pair getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & /* table_structure_lock */, bool /* need_block */) { throw Exception("Method getDecodingSchemaSnapshot is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); }; - virtual void releaseDecodingBlock(Int64 /* schema_version */, BlockUPtr /* block */) + /// The `block_decoding_schema_version` is just an internal version for `DecodingStorageSchemaSnapshot`, + /// And it has no relation with the table schema version. + virtual void releaseDecodingBlock(Int64 /* block_decoding_schema_version */, BlockUPtr /* block */) { throw Exception("Method getDecodingSchemaSnapshot is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); } diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index fc73e28e23a..67d32c73a05 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -901,14 +901,16 @@ void StorageDeltaMerge::deleteRows(const Context & context, size_t delete_rows) LOG_FMT_ERROR(log, "Rows after delete range not match, expected: {}, got: {}", (total_rows - delete_rows), after_delete_rows); } -std::pair StorageDeltaMerge::getSchemaSnapshotAndBlockForDecoding(bool need_block) +std::pair StorageDeltaMerge::getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & table_structure_lock, bool need_block) { + (void)table_structure_lock; std::lock_guard lock{decode_schema_mutex}; - if (!decoding_schema_snapshot || decoding_schema_snapshot->schema_version < tidb_table_info.schema_version) + if (!decoding_schema_snapshot || decoding_schema_changed) { auto & store = getAndMaybeInitStore(); - decoding_schema_snapshot = std::make_shared(store->getStoreColumns(), tidb_table_info, store->getHandle()); + decoding_schema_snapshot = std::make_shared(store->getStoreColumns(), tidb_table_info, store->getHandle(), decoding_schema_version++); cache_blocks.clear(); + decoding_schema_changed = false; } if (need_block) @@ -930,10 +932,10 @@ std::pair StorageDeltaMerg } } -void StorageDeltaMerge::releaseDecodingBlock(Int64 schema_version, BlockUPtr block_ptr) +void StorageDeltaMerge::releaseDecodingBlock(Int64 block_decoding_schema_version, BlockUPtr block_ptr) { std::lock_guard lock{decode_schema_mutex}; - if (!decoding_schema_snapshot || schema_version < decoding_schema_snapshot->schema_version) + if (!decoding_schema_snapshot || block_decoding_schema_version < decoding_schema_snapshot->decoding_schema_version) return; if (cache_blocks.size() >= max_cached_blocks_num) return; @@ -1113,6 +1115,7 @@ try updateTableColumnInfo(); } } + decoding_schema_changed = true; SortDescription pk_desc = getPrimarySortDescription(); ColumnDefines store_columns = getStoreColumnDefines(); diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index e304c713b7b..79ee225d237 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -151,9 +151,9 @@ class StorageDeltaMerge size_t getRowKeyColumnSize() const override { return rowkey_column_size; } - std::pair getSchemaSnapshotAndBlockForDecoding(bool /* need_block */) override; + std::pair getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & table_structure_lock, bool /* need_block */) override; - void releaseDecodingBlock(Int64 schema_version, BlockUPtr block) override; + void releaseDecodingBlock(Int64 block_decoding_schema_version, BlockUPtr block) override; bool initStoreIfDataDirExist() override; @@ -238,6 +238,11 @@ class StorageDeltaMerge mutable std::mutex decode_schema_mutex; DecodingStorageSchemaSnapshotPtr decoding_schema_snapshot; + // The following two members must be used under the protection of table structure lock + bool decoding_schema_changed = false; + // internal version for `decoding_schema_snapshot` + Int64 decoding_schema_version = 1; + // avoid creating block every time when decoding row std::vector cache_blocks; // avoid creating too many cached blocks(the typical num should be less and equal than raft apply thread) diff --git a/dbms/src/Storages/Transaction/DecodingStorageSchemaSnapshot.h b/dbms/src/Storages/Transaction/DecodingStorageSchemaSnapshot.h index 6cedbe3f0c0..c636d9e60ab 100644 --- a/dbms/src/Storages/Transaction/DecodingStorageSchemaSnapshot.h +++ b/dbms/src/Storages/Transaction/DecodingStorageSchemaSnapshot.h @@ -67,13 +67,14 @@ struct DecodingStorageSchemaSnapshot bool pk_is_handle; bool is_common_handle; TMTPKType pk_type = TMTPKType::UNSPECIFIED; - Int64 schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION; + // an internal increasing version for `DecodingStorageSchemaSnapshot`, has no relation with the table schema version + Int64 decoding_schema_version; - DecodingStorageSchemaSnapshot(DM::ColumnDefinesPtr column_defines_, const TiDB::TableInfo & table_info_, const DM::ColumnDefine & original_handle_) + DecodingStorageSchemaSnapshot(DM::ColumnDefinesPtr column_defines_, const TiDB::TableInfo & table_info_, const DM::ColumnDefine & original_handle_, Int64 decoding_schema_version_) : column_defines{std::move(column_defines_)} , pk_is_handle{table_info_.pk_is_handle} , is_common_handle{table_info_.is_common_handle} - , schema_version{table_info_.schema_version} + , decoding_schema_version{decoding_schema_version_} { std::unordered_map column_lut; for (size_t i = 0; i < table_info_.columns.size(); i++) diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index ada792c80f7..4b2ca6c07a8 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -114,14 +114,14 @@ static void writeRegionDataToStorage( /// Read region data as block. Stopwatch watch; - Int64 block_schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION; + Int64 block_decoding_schema_version = -1; BlockUPtr block_ptr = nullptr; if (need_decode) { LOG_FMT_TRACE(log, "{} begin to decode table {}, region {}", FUNCTION_NAME, table_id, region->id()); DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot; - std::tie(decoding_schema_snapshot, block_ptr) = storage->getSchemaSnapshotAndBlockForDecoding(true); - block_schema_version = decoding_schema_snapshot->schema_version; + std::tie(decoding_schema_snapshot, block_ptr) = storage->getSchemaSnapshotAndBlockForDecoding(lock, true); + block_decoding_schema_version = decoding_schema_snapshot->decoding_schema_version; auto reader = RegionBlockReader(decoding_schema_snapshot); if (!reader.read(*block_ptr, data_list_read, force_decode)) @@ -153,7 +153,7 @@ static void writeRegionDataToStorage( write_part_cost = watch.elapsedMilliseconds(); GET_METRIC(tiflash_raft_write_data_to_storage_duration_seconds, type_write).Observe(write_part_cost / 1000.0); if (need_decode) - storage->releaseDecodingBlock(block_schema_version, std::move(block_ptr)); + storage->releaseDecodingBlock(block_decoding_schema_version, std::move(block_ptr)); LOG_FMT_TRACE(log, "{}: table {}, region {}, cost [region decode {}, write part {}] ms", FUNCTION_NAME, table_id, region->id(), region_decode_cost, write_part_cost); return true; @@ -455,7 +455,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio } DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot; - std::tie(decoding_schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(false); + std::tie(decoding_schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(lock, false); res_block = createBlockSortByColumnID(decoding_schema_snapshot); auto reader = RegionBlockReader(decoding_schema_snapshot); if (!reader.read(res_block, *data_list_read, force_decode)) @@ -508,7 +508,7 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) auto table_lock = storage->lockStructureForShare(getThreadName()); dm_storage = std::dynamic_pointer_cast(storage); // only dt storage engine support `getSchemaSnapshotAndBlockForDecoding`, other engine will throw exception - std::tie(schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(false); + std::tie(schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false); std::tie(std::ignore, drop_lock) = std::move(table_lock).release(); return true; }; diff --git a/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h b/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h index c28ea531afe..20b395a9952 100644 --- a/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h +++ b/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h @@ -285,11 +285,11 @@ inline DecodingStorageSchemaSnapshotConstPtr getDecodingStorageSchemaSnapshot(co if (handle_id != EXTRA_HANDLE_COLUMN_ID) { auto iter = std::find_if(store_columns.begin(), store_columns.end(), [&](const ColumnDefine & cd) { return cd.id == handle_id; }); - return std::make_shared(std::make_shared(store_columns), table_info, *iter); + return std::make_shared(std::make_shared(store_columns), table_info, *iter, /* decoding_schema_version_ */ 1); } else { - return std::make_shared(std::make_shared(store_columns), table_info, store_columns[0]); + return std::make_shared(std::make_shared(store_columns), table_info, store_columns[0], /* decoding_schema_version_ */ 1); } } diff --git a/dbms/src/TiDB/Schema/SchemaBuilder.cpp b/dbms/src/TiDB/Schema/SchemaBuilder.cpp index 64d118eec3e..99e540e6c95 100644 --- a/dbms/src/TiDB/Schema/SchemaBuilder.cpp +++ b/dbms/src/TiDB/Schema/SchemaBuilder.cpp @@ -61,6 +61,7 @@ extern const char exception_before_step_2_rename_in_exchange_partition[]; extern const char exception_after_step_2_in_exchange_partition[]; extern const char exception_before_step_3_rename_in_exchange_partition[]; extern const char exception_after_step_3_in_exchange_partition[]; +extern const char exception_between_schema_change_in_the_same_diff[]; } // namespace FailPoints bool isReservedDatabase(Context & context, const String & database_name) @@ -336,6 +337,7 @@ void SchemaBuilder::applyAlterPhysicalTable(DBInfoPtr db_inf FmtBuffer fmt_buf; fmt_buf.fmtAppend("Detected schema changes: {}: ", name_mapper.debugCanonicalName(*db_info, *table_info)); for (const auto & schema_change : schema_changes) + { for (const auto & command : schema_change.first) { if (command.type == AlterCommand::ADD_COLUMN) @@ -347,6 +349,7 @@ void SchemaBuilder::applyAlterPhysicalTable(DBInfoPtr db_inf else if (command.type == AlterCommand::RENAME_COLUMN) fmt_buf.fmtAppend("RENAME COLUMN from {} to {}, ", command.column_name, command.new_column_name); } + } return fmt_buf.toString(); }; LOG_DEBUG(log, log_str()); @@ -355,8 +358,16 @@ void SchemaBuilder::applyAlterPhysicalTable(DBInfoPtr db_inf // Using original table info with updated columns instead of using new_table_info directly, // so that other changes (RENAME commands) won't be saved. // Also, updating schema_version as altering column is structural. - for (const auto & schema_change : schema_changes) + for (size_t i = 0; i < schema_changes.size(); i++) { + if (i > 0) + { + /// If there are multiple schema change in the same diff, + /// the table schema version will be set to the latest schema version after the first schema change is applied. + /// Throw exception in the middle of the schema change to mock the case that there is a race between data decoding and applying different schema change. + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_between_schema_change_in_the_same_diff); + } + const auto & schema_change = schema_changes[i]; /// Update column infos by applying schema change in this step. schema_change.second(orig_table_info); /// Update schema version aggressively for the sake of correctness. diff --git a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h index b682abf1af4..8aab2b3302e 100644 --- a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h +++ b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h @@ -28,6 +28,11 @@ namespace DB { +namespace ErrorCodes +{ +extern const int FAIL_POINT_ERROR; +}; + template struct TiDBSchemaSyncer : public SchemaSyncer { @@ -177,6 +182,10 @@ struct TiDBSchemaSyncer : public SchemaSyncer } catch (Exception & e) { + if (e.code() == ErrorCodes::FAIL_POINT_ERROR) + { + throw; + } GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString()); return false; diff --git a/tests/fullstack-test2/ddl/alter_column_when_pk_is_handle.test b/tests/fullstack-test2/ddl/alter_column_when_pk_is_handle.test index ca92828e6cf..df0aa13823a 100644 --- a/tests/fullstack-test2/ddl/alter_column_when_pk_is_handle.test +++ b/tests/fullstack-test2/ddl/alter_column_when_pk_is_handle.test @@ -37,9 +37,25 @@ mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t | 1 | world | 0.00 | 2 | NULL | +---+-------+------+------+------+ -# Need to apply a lossy type change to reorganize data. issue#3714 +=> DBGInvoke __enable_schema_sync_service('false') + +>> DBGInvoke __enable_fail_point(exception_between_schema_change_in_the_same_diff) + +# stop decoding data +>> DBGInvoke __enable_fail_point(pause_before_apply_raft_cmd) + +# Need to apply a lossy type change to reorganize data. issue#3714 mysql> alter table test.t modify c decimal(6,3) +# refresh schema and hit the `exception_between_schema_change_in_the_same_diff` failpoint +>> DBGInvoke __refresh_schemas() + +>> DBGInvoke __disable_fail_point(exception_between_schema_change_in_the_same_diff) + +>> DBGInvoke __disable_fail_point(pause_before_apply_raft_cmd) + +=> DBGInvoke __enable_schema_sync_service('true') + mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t +---+-------+-------+------+------+ | a | b | c | d | e |