Skip to content

Commit

Permalink
Fix potential data inconsistency under heavy ddl operation (#5046)
Browse files Browse the repository at this point in the history
ref #5032
  • Loading branch information
lidezhu authored Jun 2, 2022
1 parent fff77b4 commit a547a53
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 25 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(exception_when_read_from_log) \
M(exception_mpp_hash_build) \
M(exception_before_drop_segment) \
M(exception_after_drop_segment)
M(exception_after_drop_segment) \
M(exception_between_schema_change_in_the_same_diff)

#define APPLY_FOR_FAILPOINTS(M) \
M(skip_check_segment_update) \
Expand Down
18 changes: 17 additions & 1 deletion dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace DB
{
namespace ErrorCodes
{
extern const int FAIL_POINT_ERROR;
extern const int UNKNOWN_TABLE;
} // namespace ErrorCodes

Expand Down Expand Up @@ -62,7 +63,22 @@ void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer
{
TMTContext & tmt = context.getTMTContext();
auto schema_syncer = tmt.getSchemaSyncer();
schema_syncer->syncSchemas(context);
try
{
schema_syncer->syncSchemas(context);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::FAIL_POINT_ERROR)
{
output(e.message());
return;
}
else
{
throw;
}
}

output("schemas refreshed");
}
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,15 @@ class IManageableStorage : public IStorage
/// when `need_block` is true, it will try return a cached block corresponding to DecodingStorageSchemaSnapshotConstPtr,
/// and `releaseDecodingBlock` need to be called when the block is free
/// when `need_block` is false, it will just return an nullptr
virtual std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(bool /* need_block */)
/// This method must be called under the protection of table structure lock
virtual std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & /* table_structure_lock */, bool /* need_block */)
{
throw Exception("Method getDecodingSchemaSnapshot is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
};

virtual void releaseDecodingBlock(Int64 /* schema_version */, BlockUPtr /* block */)
/// The `block_decoding_schema_version` is just an internal version for `DecodingStorageSchemaSnapshot`,
/// And it has no relation with the table schema version.
virtual void releaseDecodingBlock(Int64 /* block_decoding_schema_version */, BlockUPtr /* block */)
{
throw Exception("Method getDecodingSchemaSnapshot is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -901,14 +901,16 @@ void StorageDeltaMerge::deleteRows(const Context & context, size_t delete_rows)
LOG_FMT_ERROR(log, "Rows after delete range not match, expected: {}, got: {}", (total_rows - delete_rows), after_delete_rows);
}

std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> StorageDeltaMerge::getSchemaSnapshotAndBlockForDecoding(bool need_block)
std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> StorageDeltaMerge::getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & table_structure_lock, bool need_block)
{
(void)table_structure_lock;
std::lock_guard lock{decode_schema_mutex};
if (!decoding_schema_snapshot || decoding_schema_snapshot->schema_version < tidb_table_info.schema_version)
if (!decoding_schema_snapshot || decoding_schema_changed)
{
auto & store = getAndMaybeInitStore();
decoding_schema_snapshot = std::make_shared<DecodingStorageSchemaSnapshot>(store->getStoreColumns(), tidb_table_info, store->getHandle());
decoding_schema_snapshot = std::make_shared<DecodingStorageSchemaSnapshot>(store->getStoreColumns(), tidb_table_info, store->getHandle(), decoding_schema_version++);
cache_blocks.clear();
decoding_schema_changed = false;
}

if (need_block)
Expand All @@ -930,10 +932,10 @@ std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> StorageDeltaMerg
}
}

void StorageDeltaMerge::releaseDecodingBlock(Int64 schema_version, BlockUPtr block_ptr)
void StorageDeltaMerge::releaseDecodingBlock(Int64 block_decoding_schema_version, BlockUPtr block_ptr)
{
std::lock_guard lock{decode_schema_mutex};
if (!decoding_schema_snapshot || schema_version < decoding_schema_snapshot->schema_version)
if (!decoding_schema_snapshot || block_decoding_schema_version < decoding_schema_snapshot->decoding_schema_version)
return;
if (cache_blocks.size() >= max_cached_blocks_num)
return;
Expand Down Expand Up @@ -1113,6 +1115,7 @@ try
updateTableColumnInfo();
}
}
decoding_schema_changed = true;

SortDescription pk_desc = getPrimarySortDescription();
ColumnDefines store_columns = getStoreColumnDefines();
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ class StorageDeltaMerge

size_t getRowKeyColumnSize() const override { return rowkey_column_size; }

std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(bool /* need_block */) override;
std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & table_structure_lock, bool /* need_block */) override;

void releaseDecodingBlock(Int64 schema_version, BlockUPtr block) override;
void releaseDecodingBlock(Int64 block_decoding_schema_version, BlockUPtr block) override;

bool initStoreIfDataDirExist() override;

Expand Down Expand Up @@ -238,6 +238,11 @@ class StorageDeltaMerge

mutable std::mutex decode_schema_mutex;
DecodingStorageSchemaSnapshotPtr decoding_schema_snapshot;
// The following two members must be used under the protection of table structure lock
bool decoding_schema_changed = false;
// internal version for `decoding_schema_snapshot`
Int64 decoding_schema_version = 1;

// avoid creating block every time when decoding row
std::vector<BlockUPtr> cache_blocks;
// avoid creating too many cached blocks(the typical num should be less and equal than raft apply thread)
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/Transaction/DecodingStorageSchemaSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ struct DecodingStorageSchemaSnapshot
bool pk_is_handle;
bool is_common_handle;
TMTPKType pk_type = TMTPKType::UNSPECIFIED;
Int64 schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION;
// an internal increasing version for `DecodingStorageSchemaSnapshot`, has no relation with the table schema version
Int64 decoding_schema_version;

DecodingStorageSchemaSnapshot(DM::ColumnDefinesPtr column_defines_, const TiDB::TableInfo & table_info_, const DM::ColumnDefine & original_handle_)
DecodingStorageSchemaSnapshot(DM::ColumnDefinesPtr column_defines_, const TiDB::TableInfo & table_info_, const DM::ColumnDefine & original_handle_, Int64 decoding_schema_version_)
: column_defines{std::move(column_defines_)}
, pk_is_handle{table_info_.pk_is_handle}
, is_common_handle{table_info_.is_common_handle}
, schema_version{table_info_.schema_version}
, decoding_schema_version{decoding_schema_version_}
{
std::unordered_map<ColumnID, size_t> column_lut;
for (size_t i = 0; i < table_info_.columns.size(); i++)
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ static void writeRegionDataToStorage(
/// Read region data as block.
Stopwatch watch;

Int64 block_schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION;
Int64 block_decoding_schema_version = -1;
BlockUPtr block_ptr = nullptr;
if (need_decode)
{
LOG_FMT_TRACE(log, "{} begin to decode table {}, region {}", FUNCTION_NAME, table_id, region->id());
DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot;
std::tie(decoding_schema_snapshot, block_ptr) = storage->getSchemaSnapshotAndBlockForDecoding(true);
block_schema_version = decoding_schema_snapshot->schema_version;
std::tie(decoding_schema_snapshot, block_ptr) = storage->getSchemaSnapshotAndBlockForDecoding(lock, true);
block_decoding_schema_version = decoding_schema_snapshot->decoding_schema_version;

auto reader = RegionBlockReader(decoding_schema_snapshot);
if (!reader.read(*block_ptr, data_list_read, force_decode))
Expand Down Expand Up @@ -153,7 +153,7 @@ static void writeRegionDataToStorage(
write_part_cost = watch.elapsedMilliseconds();
GET_METRIC(tiflash_raft_write_data_to_storage_duration_seconds, type_write).Observe(write_part_cost / 1000.0);
if (need_decode)
storage->releaseDecodingBlock(block_schema_version, std::move(block_ptr));
storage->releaseDecodingBlock(block_decoding_schema_version, std::move(block_ptr));

LOG_FMT_TRACE(log, "{}: table {}, region {}, cost [region decode {}, write part {}] ms", FUNCTION_NAME, table_id, region->id(), region_decode_cost, write_part_cost);
return true;
Expand Down Expand Up @@ -509,7 +509,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
}

DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot;
std::tie(decoding_schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(false);
std::tie(decoding_schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(lock, false);
res_block = createBlockSortByColumnID(decoding_schema_snapshot);
auto reader = RegionBlockReader(decoding_schema_snapshot);
if (!reader.read(res_block, *data_list_read, force_decode))
Expand Down Expand Up @@ -562,7 +562,7 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)
auto table_lock = storage->lockStructureForShare(getThreadName());
dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
// only dt storage engine support `getSchemaSnapshotAndBlockForDecoding`, other engine will throw exception
std::tie(schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(false);
std::tie(schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
std::tie(std::ignore, drop_lock) = std::move(table_lock).release();
return true;
};
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/Storages/Transaction/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ extern const char exception_before_step_2_rename_in_exchange_partition[];
extern const char exception_after_step_2_in_exchange_partition[];
extern const char exception_before_step_3_rename_in_exchange_partition[];
extern const char exception_after_step_3_in_exchange_partition[];
extern const char exception_between_schema_change_in_the_same_diff[];
} // namespace FailPoints

bool isReservedDatabase(Context & context, const String & database_name)
Expand Down Expand Up @@ -336,6 +337,7 @@ void SchemaBuilder<Getter, NameMapper>::applyAlterPhysicalTable(DBInfoPtr db_inf
FmtBuffer fmt_buf;
fmt_buf.fmtAppend("Detected schema changes: {}: ", name_mapper.debugCanonicalName(*db_info, *table_info));
for (const auto & schema_change : schema_changes)
{
for (const auto & command : schema_change.first)
{
if (command.type == AlterCommand::ADD_COLUMN)
Expand All @@ -347,6 +349,7 @@ void SchemaBuilder<Getter, NameMapper>::applyAlterPhysicalTable(DBInfoPtr db_inf
else if (command.type == AlterCommand::RENAME_COLUMN)
fmt_buf.fmtAppend("RENAME COLUMN from {} to {}, ", command.column_name, command.new_column_name);
}
}
return fmt_buf.toString();
};
LOG_DEBUG(log, log_str());
Expand All @@ -355,8 +358,16 @@ void SchemaBuilder<Getter, NameMapper>::applyAlterPhysicalTable(DBInfoPtr db_inf
// Using original table info with updated columns instead of using new_table_info directly,
// so that other changes (RENAME commands) won't be saved.
// Also, updating schema_version as altering column is structural.
for (const auto & schema_change : schema_changes)
for (size_t i = 0; i < schema_changes.size(); i++)
{
if (i > 0)
{
/// If there are multiple schema change in the same diff,
/// the table schema version will be set to the latest schema version after the first schema change is applied.
/// Throw exception in the middle of the schema change to mock the case that there is a race between data decoding and applying different schema change.
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_between_schema_change_in_the_same_diff);
}
const auto & schema_change = schema_changes[i];
/// Update column infos by applying schema change in this step.
schema_change.second(orig_table_info);
/// Update schema version aggressively for the sake of correctness.
Expand Down Expand Up @@ -1078,7 +1089,7 @@ void SchemaBuilder<Getter, NameMapper>::applyCreatePhysicalTable(DBInfoPtr db_in
ParserCreateQuery parser;
ASTPtr ast = parseQuery(parser, stmt.data(), stmt.data() + stmt.size(), "from syncSchema " + table_info->name, 0);

ASTCreateQuery * ast_create_query = typeid_cast<ASTCreateQuery *>(ast.get());
auto * ast_create_query = typeid_cast<ASTCreateQuery *>(ast.get());
ast_create_query->attach = true;
ast_create_query->if_not_exists = true;
ast_create_query->database = name_mapper.mapDatabaseName(*db_info);
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Storages/Transaction/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@

namespace DB
{
namespace ErrorCodes
{
extern const int FAIL_POINT_ERROR;
};

template <bool mock_getter>
struct TiDBSchemaSyncer : public SchemaSyncer
{
Expand Down Expand Up @@ -177,6 +182,10 @@ struct TiDBSchemaSyncer : public SchemaSyncer
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::FAIL_POINT_ERROR)
{
throw;
}
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString());
return false;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,11 @@ inline DecodingStorageSchemaSnapshotConstPtr getDecodingStorageSchemaSnapshot(co
if (handle_id != EXTRA_HANDLE_COLUMN_ID)
{
auto iter = std::find_if(store_columns.begin(), store_columns.end(), [&](const ColumnDefine & cd) { return cd.id == handle_id; });
return std::make_shared<DecodingStorageSchemaSnapshot>(std::make_shared<ColumnDefines>(store_columns), table_info, *iter);
return std::make_shared<DecodingStorageSchemaSnapshot>(std::make_shared<ColumnDefines>(store_columns), table_info, *iter, /* decoding_schema_version_ */ 1);
}
else
{
return std::make_shared<DecodingStorageSchemaSnapshot>(std::make_shared<ColumnDefines>(store_columns), table_info, store_columns[0]);
return std::make_shared<DecodingStorageSchemaSnapshot>(std::make_shared<ColumnDefines>(store_columns), table_info, store_columns[0], /* decoding_schema_version_ */ 1);
}
}

Expand Down
18 changes: 17 additions & 1 deletion tests/fullstack-test2/ddl/alter_column_when_pk_is_handle.test
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,25 @@ mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t
| 1 | world | 0.00 | 2 | NULL |
+---+-------+------+------+------+

# Need to apply a lossy type change to reorganize data. issue#3714
=> DBGInvoke __enable_schema_sync_service('false')

>> DBGInvoke __enable_fail_point(exception_between_schema_change_in_the_same_diff)

# stop decoding data
>> DBGInvoke __enable_fail_point(pause_before_apply_raft_cmd)

# Need to apply a lossy type change to reorganize data. issue#3714
mysql> alter table test.t modify c decimal(6,3)

# refresh schema and hit the `exception_between_schema_change_in_the_same_diff` failpoint
>> DBGInvoke __refresh_schemas()

>> DBGInvoke __disable_fail_point(exception_between_schema_change_in_the_same_diff)

>> DBGInvoke __disable_fail_point(pause_before_apply_raft_cmd)

=> DBGInvoke __enable_schema_sync_service('true')

mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t
+---+-------+-------+------+------+
| a | b | c | d | e |
Expand Down

0 comments on commit a547a53

Please sign in to comment.