Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Fix potential data lost of alter_partition_by (release-7.5) #8350

Merged
merged 3 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 84 additions & 61 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,26 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateTable(DatabaseID database_id,
table_id_map.emplaceTableID(table_id, database_id);
LOG_DEBUG(log, "register table to table_id_map, database_id={} table_id={}", database_id, table_id);

if (table_info->isLogicalPartitionTable())
// non partition table, done
if (!table_info->isLogicalPartitionTable())
{
// If table is partition table, we will create the logical table here.
// Because we get the table_info, so we can ensure new_db_info will not be nullptr.
auto new_db_info = getter.getDatabase(database_id);
applyCreatePhysicalTable(new_db_info, table_info);
return;
}

for (const auto & part_def : table_info->partition.definitions)
{
LOG_DEBUG(
log,
"register table to table_id_map for partition table, logical_table_id={} physical_table_id={}",
table_id,
part_def.id);
table_id_map.emplacePartitionTableID(part_def.id, table_id);
}
// If table is partition table, we will create the logical table here.
// Because we get the table_info, so we can ensure new_db_info will not be nullptr.
auto new_db_info = getter.getDatabase(database_id);
applyCreateStorageInstance(new_db_info, table_info);

// Register the partition_id -> logical_table_id mapping
for (const auto & part_def : table_info->partition.definitions)
{
LOG_DEBUG(
log,
"register table to table_id_map for partition table, logical_table_id={} physical_table_id={}",
table_id,
part_def.id);
table_id_map.emplacePartitionTableID(part_def.id, table_id);
}
}

Expand Down Expand Up @@ -252,9 +256,13 @@ void SchemaBuilder<Getter, NameMapper>::applyDiff(const SchemaDiff & diff)
}
else
{
/// The new non-partitioned table will have a new id
applyDropTable(diff.schema_id, diff.old_table_id);
// Create the new table.
// If the new table is a partition table, this will also overwrite
// the partition id mapping to the new logical table
applyCreateTable(diff.schema_id, diff.table_id);
// Drop the old table. if the previous partitions of the old table are
// not mapping to the old logical table now, they will not be removed.
applyDropTable(diff.schema_id, diff.old_table_id);
}
break;
}
Expand Down Expand Up @@ -410,70 +418,70 @@ void SchemaBuilder<Getter, NameMapper>::applyPartitionDiff(
const TableInfoPtr & table_info,
const ManageableStoragePtr & storage)
{
const auto & orig_table_info = storage->getTableInfo();
if (!orig_table_info.isLogicalPartitionTable())
const auto & local_table_info = storage->getTableInfo();
// ALTER TABLE t PARTITION BY ... may turn a non-partition table into partition table
// with some partition ids in `partition.adding_definitions`/`partition.definitions`
// and `partition.dropping_definitions`. We need to create those partitions.
if (!local_table_info.isLogicalPartitionTable())
{
LOG_ERROR(
LOG_INFO(
log,
"old table in TiFlash not partition table {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, orig_table_info),
"Altering non-partition table to be a partition table {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, local_table_info),
db_info->id,
orig_table_info.id);
return;
local_table_info.id);
}

const auto & orig_defs = orig_table_info.partition.definitions;
const auto & local_defs = local_table_info.partition.definitions;
const auto & new_defs = table_info->partition.definitions;

std::unordered_set<TableID> orig_part_id_set, new_part_id_set;
std::vector<String> orig_part_ids, new_part_ids;
std::for_each(orig_defs.begin(), orig_defs.end(), [&orig_part_id_set, &orig_part_ids](const auto & def) {
orig_part_id_set.emplace(def.id);
orig_part_ids.emplace_back(std::to_string(def.id));
std::unordered_set<TableID> local_part_id_set, new_part_id_set;
std::for_each(local_defs.begin(), local_defs.end(), [&local_part_id_set](const auto & def) {
local_part_id_set.emplace(def.id);
});
std::for_each(new_defs.begin(), new_defs.end(), [&new_part_id_set, &new_part_ids](const auto & def) {
std::for_each(new_defs.begin(), new_defs.end(), [&new_part_id_set](const auto & def) {
new_part_id_set.emplace(def.id);
new_part_ids.emplace_back(std::to_string(def.id));
});

auto orig_part_ids_str = boost::algorithm::join(orig_part_ids, ", ");
auto new_part_ids_str = boost::algorithm::join(new_part_ids, ", ");

LOG_INFO(
log,
"Applying partition changes {} with database_id={}, table_id={}, old: {}, new: {}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id,
orig_part_ids_str,
new_part_ids_str);
local_part_id_set,
new_part_id_set);
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved

if (orig_part_id_set == new_part_id_set)
if (local_part_id_set == new_part_id_set)
{
LOG_INFO(
log,
"No partition changes {} with database_id={}, table_id={}",
"No partition changes, paritions_size={} {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
new_part_id_set.size(),
db_info->id,
table_info->id);
return;
}

auto updated_table_info = orig_table_info;
// Copy the local table info and update fileds on the copy
auto updated_table_info = local_table_info;
updated_table_info.is_partition_table = true;
updated_table_info.belonging_table_id = table_info->belonging_table_id;
updated_table_info.partition = table_info->partition;

/// Apply changes to physical tables.
for (const auto & orig_def : orig_defs)
for (const auto & local_def : local_defs)
{
if (new_part_id_set.count(orig_def.id) == 0)
if (!new_part_id_set.contains(local_def.id))
{
applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), orig_def.id);
applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), local_def.id);
}
}

for (const auto & new_def : new_defs)
{
if (orig_part_id_set.count(new_def.id) == 0)
if (!local_part_id_set.contains(new_def.id))
{
table_id_map.emplacePartitionTableID(new_def.id, updated_table_info.id);
}
Expand Down Expand Up @@ -733,7 +741,7 @@ template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyCreateSchema(const TiDB::DBInfoPtr & db_info)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_create_db).Increment();
LOG_INFO(log, "Creating database {} with database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
LOG_INFO(log, "Create database {} begin, database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);

auto statement = createDatabaseStmt(context, *db_info, name_mapper);

Expand All @@ -749,7 +757,7 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateSchema(const TiDB::DBInfoPtr
databases.emplace(db_info->id, db_info);
}

LOG_INFO(log, "Created database {} with database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
LOG_INFO(log, "Create database {} end, database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
}

template <typename Getter, typename NameMapper>
Expand Down Expand Up @@ -786,11 +794,11 @@ template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyDropSchema(const String & db_name)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_drop_db).Increment();
LOG_INFO(log, "Tombstoning database {}", db_name);
LOG_INFO(log, "Tombstone database begin, db_name={}", db_name);
auto db = context.tryGetDatabase(db_name);
if (db == nullptr)
{
LOG_INFO(log, "Database {} does not exists", db_name);
LOG_INFO(log, "Database does not exist, db_name={}", db_name);
return;
}

Expand All @@ -807,7 +815,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropSchema(const String & db_name)
auto tombstone = tmt_context.getPDClient()->getTS();
db->alterTombstone(context, tombstone);

LOG_INFO(log, "Tombstoned database {}", db_name);
LOG_INFO(log, "Tombstone database end, db_name={}", db_name);
}

std::tuple<NamesAndTypes, Strings> parseColumnsFromTableInfo(const TiDB::TableInfo & table_info)
Expand Down Expand Up @@ -888,14 +896,14 @@ String createTableStmt(
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyCreatePhysicalTable(
void SchemaBuilder<Getter, NameMapper>::applyCreateStorageInstance(
const TiDB::DBInfoPtr & db_info,
const TableInfoPtr & table_info)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_create_table).Increment();
LOG_INFO(
log,
"Creating table {} with database_id={}, table_id={}",
"Create table {} begin, database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id);
Expand Down Expand Up @@ -978,7 +986,7 @@ void SchemaBuilder<Getter, NameMapper>::applyCreatePhysicalTable(
interpreter.execute();
LOG_INFO(
log,
"Created table {}, database_id={} table_id={}",
"Creat table {} end, database_id={} table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id);
Expand All @@ -992,13 +1000,13 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(const String & db
auto storage = tmt_context.getStorages().get(keyspace_id, table_id);
if (storage == nullptr)
{
LOG_DEBUG(log, "table {} does not exist.", table_id);
LOG_DEBUG(log, "table does not exist, table_id={}", table_id);
return;
}
GET_METRIC(tiflash_schema_internal_ddl_count, type_drop_table).Increment();
LOG_INFO(
log,
"Tombstoning table {}.{}, table_id={}",
"Tombstone table {}.{} begin, table_id={}",
db_name,
name_mapper.debugTableName(storage->getTableInfo()),
table_id);
Expand All @@ -1020,7 +1028,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(const String & db
storage->updateTombstone(alter_lock, commands, db_name, storage->getTableInfo(), name_mapper, context);
LOG_INFO(
log,
"Tombstoned table {}.{}, table_id={}",
"Tombstone table {}.{} end, table_id={}",
db_name,
name_mapper.debugTableName(storage->getTableInfo()),
table_id);
Expand All @@ -1033,14 +1041,29 @@ void SchemaBuilder<Getter, NameMapper>::applyDropTable(DatabaseID database_id, T
auto * storage = tmt_context.getStorages().get(keyspace_id, table_id).get();
if (storage == nullptr)
{
LOG_DEBUG(log, "table {} does not exist.", table_id);
LOG_DEBUG(log, "table does not exist, table_id={}", table_id);
return;
}
const auto & table_info = storage->getTableInfo();
if (table_info.isLogicalPartitionTable())
{
for (const auto & part_def : table_info.partition.definitions)
{
if (TableID latest_logical_table_id = table_id_map.findTableIDInPartitionMap(part_def.id);
latest_logical_table_id == -1 || latest_logical_table_id != table_info.id)
{
// The partition is managed by another logical table now (caused by `alter table X partition by ...`),
// skip dropping this partition when dropping the old logical table
LOG_INFO(
log,
"The partition is not managed by current logical table, skip, partition_table_id={} "
"new_logical_table_id={} current_logical_table_id={}",
part_def.id,
latest_logical_table_id,
table_info.id);
continue;
}

applyDropPhysicalTable(name_mapper.mapDatabaseName(database_id, keyspace_id), part_def.id);
}
}
Expand All @@ -1055,7 +1078,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropTable(DatabaseID database_id, T
template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
{
LOG_INFO(log, "Syncing all schemas.");
LOG_INFO(log, "Sync all schemas begin");

/// Create all databases.
std::vector<DBInfoPtr> all_schemas = getter.listDBs();
Expand Down Expand Up @@ -1116,7 +1139,7 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
table_id_map.emplaceTableID(table->id, db->id);
LOG_DEBUG(log, "register table to table_id_map, database_id={} table_id={}", db->id, table->id);

applyCreatePhysicalTable(db, table);
applyCreateStorageInstance(db, table);
if (table->isLogicalPartitionTable())
{
for (const auto & part_def : table->partition.definitions)
Expand Down Expand Up @@ -1173,7 +1196,7 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
}
}

LOG_INFO(log, "Loaded all schemas.");
LOG_INFO(log, "Sync all schemas end");
}

template <typename Getter, typename NameMapper>
Expand Down Expand Up @@ -1230,7 +1253,7 @@ void SchemaBuilder<Getter, NameMapper>::applyTable(
return;
}

applyCreatePhysicalTable(db_info, table_info);
applyCreateStorageInstance(db_info, table_info);
}
else
{
Expand All @@ -1255,7 +1278,7 @@ void SchemaBuilder<Getter, NameMapper>::applyTable(
template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::dropAllSchema()
{
LOG_INFO(log, "Dropping all schemas.");
LOG_INFO(log, "Drop all schemas begin");

auto & tmt_context = context.getTMTContext();

Expand Down Expand Up @@ -1290,7 +1313,7 @@ void SchemaBuilder<Getter, NameMapper>::dropAllSchema()
LOG_DEBUG(log, "DB {} dropped during drop all schemas", db.first);
}

LOG_INFO(log, "Dropped all schemas.");
LOG_INFO(log, "Drop all schemas end");
}

// product env
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/TiDB/Schema/SchemaBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ struct SchemaBuilder

void applyCreateSchema(const TiDB::DBInfoPtr & db_info);

void applyCreatePhysicalTable(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info);
void applyCreateStorageInstance(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info);

void applyDropTable(DatabaseID database_id, TableID table_id);

Expand All @@ -207,7 +207,6 @@ struct SchemaBuilder
void applyDropPhysicalTable(const String & db_name, TableID table_id);

void applyPartitionDiff(DatabaseID database_id, TableID table_id);

void applyPartitionDiff(
const TiDB::DBInfoPtr & db_info,
const TiDB::TableInfoPtr & table_info,
Expand Down
Loading