Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Fix potential data lost of alter_partition_by #8337

Merged
merged 12 commits into from
Nov 10, 2023
145 changes: 84 additions & 61 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,26 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateTable(DatabaseID database_id,
table_id_map.emplaceTableID(table_id, database_id);
LOG_DEBUG(log, "register table to table_id_map, database_id={} table_id={}", database_id, table_id);

if (table_info->isLogicalPartitionTable())
// non partition table, done
if (!table_info->isLogicalPartitionTable())
{
// If table is partition table, we will create the logical table here.
// Because we get the table_info, so we can ensure new_db_info will not be nullptr.
auto new_db_info = getter.getDatabase(database_id);
applyCreatePhysicalTable(new_db_info, table_info);
return;
}

for (const auto & part_def : table_info->partition.definitions)
{
LOG_DEBUG(
log,
"register table to table_id_map for partition table, logical_table_id={} physical_table_id={}",
table_id,
part_def.id);
table_id_map.emplacePartitionTableID(part_def.id, table_id);
}
// If table is partition table, we will create the logical table here.
// Because we get the table_info, so we can ensure new_db_info will not be nullptr.
auto new_db_info = getter.getDatabase(database_id);
applyCreateStorageInstance(new_db_info, table_info);

// Register the partition_id -> logical_table_id mapping
for (const auto & part_def : table_info->partition.definitions)
{
LOG_DEBUG(
log,
"register table to table_id_map for partition table, logical_table_id={} physical_table_id={}",
table_id,
part_def.id);
table_id_map.emplacePartitionTableID(part_def.id, table_id);
}
}

Expand Down Expand Up @@ -252,9 +256,13 @@ void SchemaBuilder<Getter, NameMapper>::applyDiff(const SchemaDiff & diff)
}
else
{
/// The new non-partitioned table will have a new id
applyDropTable(diff.schema_id, diff.old_table_id);
// Create the new table.
// If the new table is a partition table, this will also overwrite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be better to seperate the logical for ActionAlterTablePartitioning and ActionRemovePartitioning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ActionRemovePartitioning, it will

  • Receive a schema diff, and add the new non-partition table as a table_id in partition.adding_definitions. -- TiFlash should add the new table id to mapping and handle the apply snapshot
  • Then receive a schema diff to make the new non-partition table as a normal table and remove the old partition-table

So it is the same logic as ActionAlterTablePartitioning in tiflash

// the partition id mapping to the new logical table
applyCreateTable(diff.schema_id, diff.table_id);
// Drop the old table. if the previous partitions of the old table are
// not mapping to the old logical table now, they will not be removed.
applyDropTable(diff.schema_id, diff.old_table_id);
}
break;
}
Expand Down Expand Up @@ -410,70 +418,70 @@ void SchemaBuilder<Getter, NameMapper>::applyPartitionDiff(
const TableInfoPtr & table_info,
const ManageableStoragePtr & storage)
{
const auto & orig_table_info = storage->getTableInfo();
if (!orig_table_info.isLogicalPartitionTable())
const auto & local_table_info = storage->getTableInfo();
// ALTER TABLE t PARTITION BY ... may turn a non-partition table into partition table
// with some partition ids in `partition.adding_definitions`/`partition.definitions`
// and `partition.dropping_definitions`. We need to create those partitions.
if (!local_table_info.isLogicalPartitionTable())
{
LOG_ERROR(
LOG_INFO(
log,
"old table in TiFlash not partition table {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, orig_table_info),
"Altering non-partition table to be a partition table {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, local_table_info),
db_info->id,
orig_table_info.id);
return;
local_table_info.id);
}
Comment on lines +425 to 433
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When fetching the last table schema from TiKV, TiFlash may see a non-partition table turned into a table with "partition" field like this.
We need to create the physical table with the id in partition.adding_definitions/partition.definitions/partition.dropping_definitions

"partition": {
        "type": 0,
        "expr": "",
        "columns": null,
        "enable": true,
        "definitions": [{
            "id": 164,
            "name": {
                "O": "pFullTable",
                "L": "pfulltable"
            },
            "less_than": null,
            "in_values": null,
            "policy_ref_info": null,
            "comment": "Intermediate partition during ALTER TABLE ... PARTITION BY ..."
        }],
        "adding_definitions": [{
            "id": 168,
            "name": {
                "O": "p0",
                "L": "p0"
            },
            "less_than": null,
            "in_values": null,
            "policy_ref_info": null
        }, {
            "id": 169,
            "name": {
                "O": "p1",
                "L": "p1"
            },
            "less_than": null,
            "in_values": null,
            "policy_ref_info": null
        }, {
            "id": 170,
            "name": {
                "O": "p2",
                "L": "p2"
            },
            "less_than": null,
            "in_values": null,
            "policy_ref_info": null
        }],
        "dropping_definitions": [{
            "id": 164,
            "name": {
                "O": "pFullTable",
                "L": "pfulltable"
            },
            "less_than": null,
            "in_values": null,
            "policy_ref_info": null,
            "comment": "Intermediate partition during ALTER TABLE ... PARTITION BY ..."
        }],
        "NewPartitionIDs": null,
        "states": null,
        "num": 1,
        "ddl_state": 3,
        "new_table_id": 171,
        "ddl_type": 2,
        "ddl_expr": "`a`",
        "ddl_columns": null
    },


const auto & orig_defs = orig_table_info.partition.definitions;
const auto & local_defs = local_table_info.partition.definitions;
const auto & new_defs = table_info->partition.definitions;

std::unordered_set<TableID> orig_part_id_set, new_part_id_set;
std::vector<String> orig_part_ids, new_part_ids;
std::for_each(orig_defs.begin(), orig_defs.end(), [&orig_part_id_set, &orig_part_ids](const auto & def) {
orig_part_id_set.emplace(def.id);
orig_part_ids.emplace_back(std::to_string(def.id));
std::unordered_set<TableID> local_part_id_set, new_part_id_set;
std::for_each(local_defs.begin(), local_defs.end(), [&local_part_id_set](const auto & def) {
local_part_id_set.emplace(def.id);
});
std::for_each(new_defs.begin(), new_defs.end(), [&new_part_id_set, &new_part_ids](const auto & def) {
std::for_each(new_defs.begin(), new_defs.end(), [&new_part_id_set](const auto & def) {
new_part_id_set.emplace(def.id);
new_part_ids.emplace_back(std::to_string(def.id));
});

auto orig_part_ids_str = boost::algorithm::join(orig_part_ids, ", ");
auto new_part_ids_str = boost::algorithm::join(new_part_ids, ", ");

LOG_INFO(
log,
"Applying partition changes {} with database_id={}, table_id={}, old: {}, new: {}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id,
orig_part_ids_str,
new_part_ids_str);
local_part_id_set,
new_part_id_set);

if (orig_part_id_set == new_part_id_set)
if (local_part_id_set == new_part_id_set)
{
LOG_INFO(
log,
"No partition changes {} with database_id={}, table_id={}",
"No partition changes, paritions_size={} {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
new_part_id_set.size(),
db_info->id,
table_info->id);
return;
}

auto updated_table_info = orig_table_info;
// Copy the local table info and update fileds on the copy
auto updated_table_info = local_table_info;
updated_table_info.is_partition_table = true;
updated_table_info.belonging_table_id = table_info->belonging_table_id;
updated_table_info.partition = table_info->partition;

/// Apply changes to physical tables.
for (const auto & orig_def : orig_defs)
for (const auto & local_def : local_defs)
{
if (new_part_id_set.count(orig_def.id) == 0)
if (!new_part_id_set.contains(local_def.id))
{
applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), orig_def.id);
applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), local_def.id);
}
}

for (const auto & new_def : new_defs)
{
if (orig_part_id_set.count(new_def.id) == 0)
if (!local_part_id_set.contains(new_def.id))
{
table_id_map.emplacePartitionTableID(new_def.id, updated_table_info.id);
}
Expand Down Expand Up @@ -733,7 +741,7 @@ template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyCreateSchema(const TiDB::DBInfoPtr & db_info)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_create_db).Increment();
LOG_INFO(log, "Creating database {} with database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
LOG_INFO(log, "Create database {} begin, database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);

auto statement = createDatabaseStmt(context, *db_info, name_mapper);

Expand All @@ -749,7 +757,7 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateSchema(const TiDB::DBInfoPtr
databases.emplace(db_info->id, db_info);
}

LOG_INFO(log, "Created database {} with database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
LOG_INFO(log, "Create database {} end, database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
}

template <typename Getter, typename NameMapper>
Expand Down Expand Up @@ -786,11 +794,11 @@ template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyDropSchema(const String & db_name)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_drop_db).Increment();
LOG_INFO(log, "Tombstoning database {}", db_name);
LOG_INFO(log, "Tombstone database begin, db_name={}", db_name);
auto db = context.tryGetDatabase(db_name);
if (db == nullptr)
{
LOG_INFO(log, "Database {} does not exists", db_name);
LOG_INFO(log, "Database does not exist, db_name={}", db_name);
return;
}

Expand All @@ -807,7 +815,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropSchema(const String & db_name)
auto tombstone = tmt_context.getPDClient()->getTS();
db->alterTombstone(context, tombstone);

LOG_INFO(log, "Tombstoned database {}", db_name);
LOG_INFO(log, "Tombstone database end, db_name={}", db_name);
}

std::tuple<NamesAndTypes, Strings> parseColumnsFromTableInfo(const TiDB::TableInfo & table_info)
Expand Down Expand Up @@ -888,14 +896,14 @@ String createTableStmt(
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyCreatePhysicalTable(
void SchemaBuilder<Getter, NameMapper>::applyCreateStorageInstance(
const TiDB::DBInfoPtr & db_info,
const TableInfoPtr & table_info)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_create_table).Increment();
LOG_INFO(
log,
"Creating table {} with database_id={}, table_id={}",
"Create table {} begin, database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id);
Expand Down Expand Up @@ -978,7 +986,7 @@ void SchemaBuilder<Getter, NameMapper>::applyCreatePhysicalTable(
interpreter.execute();
LOG_INFO(
log,
"Created table {}, database_id={} table_id={}",
"Creat table {} end, database_id={} table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id);
Expand All @@ -992,13 +1000,13 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(const String & db
auto storage = tmt_context.getStorages().get(keyspace_id, table_id);
if (storage == nullptr)
{
LOG_DEBUG(log, "table {} does not exist.", table_id);
LOG_DEBUG(log, "table does not exist, table_id={}", table_id);
return;
}
GET_METRIC(tiflash_schema_internal_ddl_count, type_drop_table).Increment();
LOG_INFO(
log,
"Tombstoning table {}.{}, table_id={}",
"Tombstone table {}.{} begin, table_id={}",
db_name,
name_mapper.debugTableName(storage->getTableInfo()),
table_id);
Expand All @@ -1020,7 +1028,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(const String & db
storage->updateTombstone(alter_lock, commands, db_name, storage->getTableInfo(), name_mapper, context);
LOG_INFO(
log,
"Tombstoned table {}.{}, table_id={}",
"Tombstone table {}.{} end, table_id={}",
db_name,
name_mapper.debugTableName(storage->getTableInfo()),
table_id);
Expand All @@ -1033,14 +1041,29 @@ void SchemaBuilder<Getter, NameMapper>::applyDropTable(DatabaseID database_id, T
auto * storage = tmt_context.getStorages().get(keyspace_id, table_id).get();
if (storage == nullptr)
{
LOG_DEBUG(log, "table {} does not exist.", table_id);
LOG_DEBUG(log, "table does not exist, table_id={}", table_id);
return;
}
const auto & table_info = storage->getTableInfo();
if (table_info.isLogicalPartitionTable())
{
for (const auto & part_def : table_info.partition.definitions)
{
if (TableID latest_logical_table_id = table_id_map.findTableIDInPartitionMap(part_def.id);
latest_logical_table_id == -1 || latest_logical_table_id != table_info.id)
{
// The partition is managed by another logical table now (caused by `alter table X partition by ...`),
// skip dropping this partition when dropping the old logical table
LOG_INFO(
log,
"The partition is not managed by current logical table, skip, partition_table_id={} "
"new_logical_table_id={} current_logical_table_id={}",
part_def.id,
latest_logical_table_id,
table_info.id);
continue;
}

applyDropPhysicalTable(name_mapper.mapDatabaseName(database_id, keyspace_id), part_def.id);
}
}
Expand All @@ -1055,7 +1078,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropTable(DatabaseID database_id, T
template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
{
LOG_INFO(log, "Syncing all schemas.");
LOG_INFO(log, "Sync all schemas begin");

/// Create all databases.
std::vector<DBInfoPtr> all_schemas = getter.listDBs();
Expand Down Expand Up @@ -1116,7 +1139,7 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
table_id_map.emplaceTableID(table->id, db->id);
LOG_DEBUG(log, "register table to table_id_map, database_id={} table_id={}", db->id, table->id);

applyCreatePhysicalTable(db, table);
applyCreateStorageInstance(db, table);
if (table->isLogicalPartitionTable())
{
for (const auto & part_def : table->partition.definitions)
Expand Down Expand Up @@ -1173,7 +1196,7 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
}
}

LOG_INFO(log, "Loaded all schemas.");
LOG_INFO(log, "Sync all schemas end");
}

template <typename Getter, typename NameMapper>
Expand Down Expand Up @@ -1230,7 +1253,7 @@ void SchemaBuilder<Getter, NameMapper>::applyTable(
return;
}

applyCreatePhysicalTable(db_info, table_info);
applyCreateStorageInstance(db_info, table_info);
}
else
{
Expand All @@ -1255,7 +1278,7 @@ void SchemaBuilder<Getter, NameMapper>::applyTable(
template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::dropAllSchema()
{
LOG_INFO(log, "Dropping all schemas.");
LOG_INFO(log, "Drop all schemas begin");

auto & tmt_context = context.getTMTContext();

Expand Down Expand Up @@ -1290,7 +1313,7 @@ void SchemaBuilder<Getter, NameMapper>::dropAllSchema()
LOG_DEBUG(log, "DB {} dropped during drop all schemas", db.first);
}

LOG_INFO(log, "Dropped all schemas.");
LOG_INFO(log, "Drop all schemas end");
}

// product env
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/TiDB/Schema/SchemaBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ struct SchemaBuilder

void applyCreateSchema(const TiDB::DBInfoPtr & db_info);

void applyCreatePhysicalTable(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info);
void applyCreateStorageInstance(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info);

void applyDropTable(DatabaseID database_id, TableID table_id);

Expand All @@ -207,7 +207,6 @@ struct SchemaBuilder
void applyDropPhysicalTable(const String & db_name, TableID table_id);

void applyPartitionDiff(DatabaseID database_id, TableID table_id);

void applyPartitionDiff(
const TiDB::DBInfoPtr & db_info,
const TiDB::TableInfoPtr & table_info,
Expand Down
Loading