diff --git a/dbms/src/Columns/ColumnNullable.cpp b/dbms/src/Columns/ColumnNullable.cpp index 557a57b3c6d..8393bfd306e 100644 --- a/dbms/src/Columns/ColumnNullable.cpp +++ b/dbms/src/Columns/ColumnNullable.cpp @@ -426,7 +426,7 @@ void ColumnNullable::applyNullMap(const ColumnNullable & other) void ColumnNullable::checkConsistency() const { if (null_map->size() != getNestedColumn().size()) - throw Exception("Logical error: Sizes of nested column and null map of Nullable column are not equal", + throw Exception("Logical error: Sizes of nested column and null map of Nullable column are not equal: null size is : " + std::to_string(null_map->size()) + " column size is : "+ std::to_string(getNestedColumn().size()), ErrorCodes::SIZES_OF_NESTED_COLUMNS_ARE_INCONSISTENT); } diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index 4dff87d486a..d4c2f5fd6c4 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -385,12 +385,11 @@ void MockTiDB::modifyColumnInTable(const String & database_name, const String & ColumnInfo column_info = getColumnInfoFromColumn(column, 0, Field()); if (it->hasUnsignedFlag() != column_info.hasUnsignedFlag()) throw Exception("Modify column " + column.name + " UNSIGNED flag is not allowed", ErrorCodes::LOGICAL_ERROR); - if (it->hasNotNullFlag() != column_info.hasNotNullFlag()) - throw Exception("Modify column " + column.name + " NOT NULL flag is not allowed", ErrorCodes::LOGICAL_ERROR); - if (it->tp == column_info.tp) + if (it->tp == column_info.tp && it->hasNotNullFlag() == column_info.hasNotNullFlag()) throw Exception("Column " + column.name + " type not changed", ErrorCodes::LOGICAL_ERROR); it->tp = column_info.tp; + it->flag = column_info.flag; version++; SchemaDiff diff; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 89a7cdd726f..024ad34a79a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -948,12 +948,12 @@ void MergeTreeData::checkAlter(const AlterCommands & commands) ExpressionActionsPtr unused_expression; NameToNameMap unused_map; bool unused_bool; - - createConvertExpression(nullptr, getColumns().getAllPhysical(), new_columns.getAllPhysical(), unused_expression, unused_map, unused_bool); + DataPart::Checksums checksums; + createConvertExpression(nullptr, checksums, getColumns().getAllPhysical(), new_columns.getAllPhysical(), unused_expression, unused_map, unused_bool); } -void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns, - ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const +void MergeTreeData::createConvertExpression(const DataPartPtr & part, DataPart::Checksums & checksums, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns, + ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) { out_expression = nullptr; out_rename_map = {}; @@ -1006,8 +1006,32 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name if (!new_type->equals(*old_type) && (!part || part->hasColumnFiles(column.name))) { - // TODO: Asserting TXN table never needs data conversion might be arbitary. - if (isMetadataOnlyConversion(old_type, new_type) || merging_params.mode == MergingParams::Txn) + if (merging_params.mode == MergingParams::Txn) + { + // Any type conversion for TMT is ignored, except adding nullable property. + // And for adding null map ONLY, i.e. not touching the data file, we do null map writing here in place. + if (part && !old_type->isNullable() && new_type->isNullable()) + { + auto null_map_name = column.name + "_null"; + auto null_map_type = std::make_shared(); + Block b; + b.insert({ null_map_type->createColumnConstWithDefaultValue(part->rows_count)->convertToFullColumnIfConst(), null_map_type, null_map_name}); + auto compression_settings = this->context.chooseCompressionSettings( + part->bytes_on_disk, + static_cast(part->bytes_on_disk) / this->getTotalActiveSizeInBytes()); + MergedColumnOnlyOutputStream out(*this, b, part->getFullPath(), true /* sync */, compression_settings, true /* skip_offsets */); + out.write(b); + auto add_checksums = out.writeSuffixAndGetChecksums(); + checksums.files[column.name + ".null.bin"] = add_checksums.files[null_map_name + ".bin"]; + checksums.files[column.name + ".null.mrk"] = add_checksums.files[null_map_name + ".mrk"]; + out_rename_map[null_map_name + ".bin"] = column.name + ".null.bin"; + out_rename_map[null_map_name + ".mrk"] = column.name + ".null.mrk"; + } + out_force_update_metadata = true; + continue; + } + + if (isMetadataOnlyConversion(old_type, new_type)) { out_force_update_metadata = true; continue; @@ -1110,7 +1134,8 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( ExpressionActionsPtr expression; AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part)); /// Blocks changes to the part. bool force_update_metadata; - createConvertExpression(part, part->columns, new_columns, expression, transaction->rename_map, force_update_metadata); + DataPart::Checksums new_checksums = part->checksums; + createConvertExpression(part, new_checksums, part->columns, new_columns, expression, transaction->rename_map, force_update_metadata); size_t num_files_to_modify = transaction->rename_map.size(); size_t num_files_to_remove = 0; @@ -1260,12 +1285,11 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( } /// Update the checksums. - DataPart::Checksums new_checksums = part->checksums; for (auto it : transaction->rename_map) { if (it.second.empty()) new_checksums.files.erase(it.first); - else + else if (add_checksums.files.find(it.first) != add_checksums.files.end()) new_checksums.files[it.second] = add_checksums.files[it.first]; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index e10c6aee706..c863f46bd6c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -651,8 +651,8 @@ class MergeTreeData : public ITableDeclaration /// for transformation-free changing of Enum values list). /// Files to be deleted are mapped to an empty string in out_rename_map. /// If part == nullptr, just checks that all type conversions are possible. - void createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns, - ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const; + void createConvertExpression(const DataPartPtr & part, DataPart::Checksums &, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns, + ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata); /// Calculates column sizes in compressed form for the current state of data_parts. Call with data_parts mutex locked. void calculateColumnSizesImpl(); diff --git a/dbms/src/Storages/Transaction/Codec.cpp b/dbms/src/Storages/Transaction/Codec.cpp index f6db728f6ba..7465a04b6b5 100644 --- a/dbms/src/Storages/Transaction/Codec.cpp +++ b/dbms/src/Storages/Transaction/Codec.cpp @@ -494,6 +494,11 @@ inline T getFieldValue(const Field & field) void EncodeDatum(const Field & field, TiDB::CodecFlag flag, std::stringstream & ss) { + if (field.isNull()) + { + ss << UInt8(TiDB::CodecFlagNil); + return; + } ss << UInt8(flag); switch (flag) { diff --git a/dbms/src/Storages/Transaction/Datum.cpp b/dbms/src/Storages/Transaction/Datum.cpp index 8dc4b357c67..3e62648dc6a 100644 --- a/dbms/src/Storages/Transaction/Datum.cpp +++ b/dbms/src/Storages/Transaction/Datum.cpp @@ -151,6 +151,8 @@ DatumFlat::DatumFlat(const DB::Field & field, TP tp) : DatumBase(field, tp) } } +bool DatumFlat::invalidNull(const ColumnInfo & column_info) { return column_info.hasNotNullFlag() && orig.isNull(); } + bool DatumFlat::overflow(const ColumnInfo & column_info) { switch (tp) diff --git a/dbms/src/Storages/Transaction/Datum.h b/dbms/src/Storages/Transaction/Datum.h index 79ee200d5e8..4c20ffb3d9f 100644 --- a/dbms/src/Storages/Transaction/Datum.h +++ b/dbms/src/Storages/Transaction/Datum.h @@ -32,6 +32,9 @@ class DatumFlat : public DatumBase public: DatumFlat(const DB::Field & field, TP tp); + /// Checks if it's null value with a not null type for schema mismatch detection. + bool invalidNull(const ColumnInfo & column_info); + /// Checks overflow for schema mismatch detection. bool overflow(const ColumnInfo & column_info); }; diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.cpp b/dbms/src/Storages/Transaction/RegionBlockReader.cpp index 828c9ce158f..73bb273adae 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.cpp +++ b/dbms/src/Storages/Transaction/RegionBlockReader.cpp @@ -389,6 +389,21 @@ std::tuple readRegionBlock(const TableInfo & table_info, return std::make_tuple(Block(), false); } + if (datum.invalidNull(column_info)) + { + // Null value with non-null type detected, fatal if force_decode is true, + // as schema being newer and with invalid null shouldn't happen. + // Otherwise return false to outer, outer should sync schema and try again. + if (force_decode) + { + const auto & data_type = ColumnDataInfoMap::getNameAndTypePair(col_info).type; + throw Exception("Detected invalid null when decoding data " + std::to_string(unflattened.get()) + + " of column " + column_info.name + " with type " + data_type->getName(), + ErrorCodes::LOGICAL_ERROR); + } + + return std::make_tuple(Block(), false); + } auto & mut_col = ColumnDataInfoMap::getMutableColumnPtr(col_info); mut_col->insert(unflattened); } diff --git a/dbms/src/Storages/Transaction/SchemaBuilder.cpp b/dbms/src/Storages/Transaction/SchemaBuilder.cpp index b0d27b7e624..e3d332ecabf 100644 --- a/dbms/src/Storages/Transaction/SchemaBuilder.cpp +++ b/dbms/src/Storages/Transaction/SchemaBuilder.cpp @@ -49,7 +49,6 @@ inline AlterCommands detectSchemaChanges(Logger * log, const TableInfo & table_i { AlterCommands alter_commands; - /// Detect new columns. // TODO: Detect rename columns. /// Detect dropped columns. @@ -75,6 +74,7 @@ inline AlterCommands detectSchemaChanges(Logger * log, const TableInfo & table_i alter_commands.emplace_back(std::move(command)); } + /// Detect new columns. for (const auto & column_info : table_info.columns) { const auto & orig_column_info = std::find_if(orig_table_info.columns.begin(), @@ -105,7 +105,8 @@ inline AlterCommands detectSchemaChanges(Logger * log, const TableInfo & table_i if (column_info_.id == orig_column_info.id && column_info_.name != orig_column_info.name) LOG_ERROR(log, "detect column " << orig_column_info.name << " rename to " << column_info_.name); - return column_info_.id == orig_column_info.id && column_info_.tp != orig_column_info.tp; + return column_info_.id == orig_column_info.id + && (column_info_.tp != orig_column_info.tp || column_info_.hasNotNullFlag() != orig_column_info.hasNotNullFlag()); }); AlterCommand command; diff --git a/tests/mutable-test/txn_schema/alter_for_nullable.test b/tests/mutable-test/txn_schema/alter_for_nullable.test new file mode 100644 index 00000000000..f604489544e --- /dev/null +++ b/tests/mutable-test/txn_schema/alter_for_nullable.test @@ -0,0 +1,68 @@ + +# Preparation. +=> DBGInvoke __enable_schema_sync_service('false') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test +=> DBGInvoke __refresh_schemas() + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Sync add column by reading. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int8, col_3 Int32') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test', 1, 3) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test', 2, 4) +=> DBGInvoke __try_flush_region(4) + +# test add nullable flag and change type at the same time. +=> DBGInvoke __modify_column_in_tidb_table(default, test, 'col_2 Nullable(Int32)') +# test trigger by background worker. +=> DBGInvoke __refresh_schemas() +=> select col_2 from default.test +┌─col_2─┐ +│ 1 │ +│ 2 │ +└───────┘ + +# test only add nullable. +=> DBGInvoke __modify_column_in_tidb_table(default, test, 'col_3 Nullable(Int32)') + +=> DBGInvoke __put_region(5, 100, 150, default, test) +=> DBGInvoke __raft_insert_row(default, test, 5, 100, 'test', 1, NULL) +=> DBGInvoke __raft_insert_row(default, test, 5, 101, 'test', 2, NULL) +# test trigger by flush worker. +=> DBGInvoke __try_flush_region(5) + +=> select col_3 from default.test +┌─col_2─┐ +│ 3 │ +│ 4 │ +│ \N │ +│ \N │ +└───────┘ + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +# Test convert nullable type to not-null type. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Nullable(Int8)') +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test', 1) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test', 2) +=> DBGInvoke __try_flush_region(4) +=> select col_2 from default.test +┌─col_2─┐ +│ 1 │ +│ 2 │ +└───────┘ +=> DBGInvoke __modify_column_in_tidb_table(default, test, 'col_2 Int16') +=> DBGInvoke __refresh_schemas() +=> select col_2 from default.test +┌─col_2─┐ +│ 1 │ +│ 2 │ +└───────┘ + +=> DBGInvoke __refresh_schemas()