Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flash-664] Enable DDL for engine DeltaTree #526

Merged
merged 36 commits into from
Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
154b298
Add fullstack test for engine DeltaTree
JaySon-Huang Mar 13, 2020
34145db
DDL support for DMFile
JaySon-Huang Mar 12, 2020
ff67696
Fix readPackFromCache/Disk
JaySon-Huang Mar 11, 2020
5868260
Enable ddl test for DT
JaySon-Huang Mar 12, 2020
ea49bbc
fix some tests
JaySon-Huang Mar 14, 2020
79fb615
Fix bug: add some data after ddl
JaySon-Huang Mar 14, 2020
3773a2d
Fix bug: null for DMFile
JaySon-Huang Mar 14, 2020
da6f076
Fix CI error
JaySon-Huang Mar 15, 2020
5937bd1
Cleanup
JaySon-Huang Mar 16, 2020
560c538
Port txn_schema
JaySon-Huang Mar 16, 2020
fcb3901
Fix inject test
JaySon-Huang Mar 16, 2020
ea4b335
Add some comments for inject test
JaySon-Huang Mar 17, 2020
3b382bc
Add a inject test for atomic rename table, but not fixed. Ignore for now
JaySon-Huang Mar 17, 2020
1880cad
Desc do not show column version, tag
JaySon-Huang Mar 17, 2020
d89829e
Cleanup
JaySon-Huang Mar 17, 2020
7470c2f
shutdown should cancel all background tasks
JaySon-Huang Mar 17, 2020
716ae4a
Add more message if read/write to StorageDeltaMerge throw exception
JaySon-Huang Mar 17, 2020
cd8d4cd
Refine some comments
JaySon-Huang Mar 17, 2020
2bbe884
Fix some different behavior between DeltaTree and TxnMergeTree
JaySon-Huang Mar 17, 2020
f5efb90
Merge remote-tracking branch 'upstream/master' into FLASH-664
JaySon-Huang Mar 17, 2020
f2d3374
Fix fail test for dt
JaySon-Huang Mar 17, 2020
4a90ccd
fix
JaySon-Huang Mar 17, 2020
8607d4b
Fix default value for enum and string
JaySon-Huang Mar 18, 2020
359f100
address comments
JaySon-Huang Mar 18, 2020
df9db59
address comments
JaySon-Huang Mar 18, 2020
422363c
Fix synchronized between drop table and remove region
JaySon-Huang Mar 18, 2020
02a2746
Fix default value for Decimal128/256
JaySon-Huang Mar 19, 2020
40644c3
Fix alter_default_value.test
JaySon-Huang Mar 19, 2020
da44396
fault-inject wait more time for schema sync
JaySon-Huang Mar 19, 2020
32860ff
Use fullname for table in fullstack-test
JaySon-Huang Mar 19, 2020
6c7a1dd
Merge remote-tracking branch 'upstream/master' into FLASH-664
JaySon-Huang Mar 19, 2020
fa4cc36
Fix removeRegion && address comments
JaySon-Huang Mar 19, 2020
dd73776
Get default value field from TiDB::ColumnInfo.defaultValueToField ;Ad…
JaySon-Huang Mar 20, 2020
3ee04b2
Merge branch 'master' into FLASH-664
JaySon-Huang Mar 20, 2020
36fdfef
Remove desc since it is different in local and ci environment
JaySon-Huang Mar 20, 2020
97ecc66
Merge branch 'master' into FLASH-664
JaySon-Huang Mar 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dbms/src/Common/FailPoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ extern const int FAIL_POINT_ERROR;

#define FAIL_POINT_REGISTER(name) static constexpr char name[] = #name "";

#define FAIL_POINT_ENABLE(trigger, name) else if (trigger == name) fiu_enable(name, 1, nullptr, FIU_ONETIME);
#define FAIL_POINT_ENABLE(trigger, name) else if (trigger == name) { fiu_enable(name, 1, nullptr, FIU_ONETIME); }

FAIL_POINT_REGISTER(exception_between_drop_meta_and_data)
FAIL_POINT_REGISTER(exception_between_alter_data_and_meta)
FAIL_POINT_REGISTER(exception_drop_table_during_remove_meta)
FAIL_POINT_REGISTER(exception_between_rename_table_data_and_metadata);

#define FAIL_POINT_TRIGGER_EXCEPTION(fail_point) \
fiu_do_on(fail_point, throw Exception("Fail point " #fail_point " is triggered.", ErrorCodes::FAIL_POINT_ERROR);)
Expand All @@ -33,7 +34,8 @@ class FailPointHelper
FAIL_POINT_ENABLE(fail_point_name, exception_between_alter_data_and_meta)
FAIL_POINT_ENABLE(fail_point_name, exception_between_drop_meta_and_data)
FAIL_POINT_ENABLE(fail_point_name, exception_drop_table_during_remove_meta)
FAIL_POINT_ENABLE(fail_point_name, exception_between_rename_table_data_and_metadata)
else throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR);
}
};
} // namespace DB
} // namespace DB
18 changes: 6 additions & 12 deletions dbms/src/Core/ColumnWithTypeAndName.cpp
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
#include <Core/ColumnsWithTypeAndName.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>


namespace DB
{

ColumnWithTypeAndName ColumnWithTypeAndName::cloneEmpty() const
{
ColumnWithTypeAndName res;

res.name = name;
res.type = type;
res.column_id = column_id;
if (column)
res.column = column->cloneEmpty();

ColumnWithTypeAndName res{(column != nullptr ? column->cloneEmpty() : nullptr), type, name, column_id, default_value};
return res;
}


bool ColumnWithTypeAndName::operator==(const ColumnWithTypeAndName & other) const
{
// TODO should we check column_id here?
return name == other.name
&& ((!type && !other.type) || (type && other.type && type->equals(*other.type)))
return name == other.name && ((!type && !other.type) || (type && other.type && type->equals(*other.type)))
&& ((!column && !other.column) || (column && other.column && column->getName() == other.column->getName()));
}

Expand All @@ -43,6 +35,8 @@ void ColumnWithTypeAndName::dumpStructure(WriteBuffer & out) const
out << ' ' << column->dumpStructure();
else
out << " nullptr";

out << " " << column_id;
}

String ColumnWithTypeAndName::dumpStructure() const
Expand All @@ -52,4 +46,4 @@ String ColumnWithTypeAndName::dumpStructure() const
return out.str();
}

}
} // namespace DB
7 changes: 7 additions & 0 deletions dbms/src/Databases/DatabaseOrdinary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ void DatabaseOrdinary::removeTable(

try
{
// If tiflash crash before remove metadata, next time it restart, will
// full apply schema from TiDB. And the old table's metadata and data
// will be removed.
FAIL_POINT_TRIGGER_EXCEPTION(exception_drop_table_during_remove_meta);
Poco::File(table_metadata_path).remove();
}
Expand Down Expand Up @@ -410,13 +413,17 @@ void DatabaseOrdinary::renameTable(
throw Exception{e};
}

// TODO: Atomic rename table is not fixed.
FAIL_POINT_TRIGGER_EXCEPTION(exception_between_rename_table_data_and_metadata);

ASTPtr ast = getQueryFromMetadata(detail::getTableMetadataPath(metadata_path, table_name));
if (!ast)
throw Exception("There is no metadata file for table " + table_name, ErrorCodes::FILE_DOESNT_EXIST);
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
ast_create_query.table = to_table_name;

/// NOTE Non-atomic.
// Create new metadata and remove old metadata.
to_database_concrete->createTable(context, to_table_name, table, ast);
removeTable(context, table_name);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
else if (engine_type == "buggy")
table_info.engine_type = TiDB::StorageEngine::DEBUGGING_MEMORY;
else
throw Exception("Unknown engine type : " + engine_type + ", must be 'tmt' or 'dm'", ErrorCodes::BAD_ARGUMENTS);
throw Exception("Unknown engine type : " + engine_type + ", must be 'tmt' or 'dt'", ErrorCodes::BAD_ARGUMENTS);

auto table = std::make_shared<Table>(database_name, table_name, std::move(table_info));
tables_by_id.emplace(table->table_info.id, table);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ extern const int LOGICAL_ERROR;
void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 3 && args.size() != 4 && args.size() != 5)
throw Exception("Args not matched, should be: database-name, table-name, schema-string [, handle_pk_name], [, engine-type(tmt|dm|buggy)]", ErrorCodes::BAD_ARGUMENTS);
throw Exception("Args not matched, should be: database-name, table-name, schema-string [, handle_pk_name], [, engine-type(tmt|dt|buggy)]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncMockTiDBTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct MockTiDBTable
// Inject mocked TiDB table.
// Usage:
// ./storages-client.sh "DBGInvoke mock_tidb_table(database_name, table_name, 'col1 type1, col2 type2, ...'[, engine])"
// engine: [tmt, dm, buggy], tmt by default
// engine: [tmt, dt, buggy], tmt by default
static void dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Inject mocked TiDB table.
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,16 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
info.range_in_table = current_region->getHandleRangeByTable(table_id);
query_info.mvcc_query_info->regions_query_info.push_back(info);
query_info.mvcc_query_info->concurrent = 0.0;
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
try
{
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
}
catch (DB::Exception & e)
{
e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName()
+ "`, table_id: " + DB::toString(table_id) + ")");
throw;
}

if (pipeline.streams.empty())
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/InterpreterDescribeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
Block sample_block = getSampleBlock();
MutableColumns res_columns = sample_block.cloneEmptyColumns();

OrderedNameSet filtered_names = MutableSupport::instance().hiddenColumns(table->getName());
const OrderedNameSet filtered_names = MutableSupport::instance().hiddenColumns(table->getName());

for (const auto & column : columns)
{
Expand Down
39 changes: 19 additions & 20 deletions dbms/src/Interpreters/InterpreterDropQuery.cpp
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
#include <Poco/File.h>
#include <Common/FailPoint.h>

#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Poco/File.h>
#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>


namespace DB
{

namespace ErrorCodes
{
extern const int TABLE_WAS_NOT_DROPPED;
extern const int DATABASE_NOT_EMPTY;
extern const int UNKNOWN_DATABASE;
extern const int READONLY;
extern const int FAIL_POINT_ERROR;
}
extern const int TABLE_WAS_NOT_DROPPED;
extern const int DATABASE_NOT_EMPTY;
extern const int UNKNOWN_DATABASE;
extern const int READONLY;
extern const int FAIL_POINT_ERROR;
} // namespace ErrorCodes


InterpreterDropQuery::InterpreterDropQuery(const ASTPtr & query_ptr_, Context & context_) : query_ptr(query_ptr_), context(context_) {}
Expand Down Expand Up @@ -57,8 +56,8 @@ BlockIO InterpreterDropQuery::execute()
{
if (drop.database.empty() && !drop.temporary)
{
LOG_WARNING((&Logger::get("InterpreterDropQuery")),
"It is recommended to use `DROP TEMPORARY TABLE` to delete temporary tables");
LOG_WARNING(
(&Logger::get("InterpreterDropQuery")), "It is recommended to use `DROP TEMPORARY TABLE` to delete temporary tables");
}
table->shutdown();
/// If table was already dropped by anyone, an exception will be thrown
Expand Down Expand Up @@ -119,7 +118,8 @@ BlockIO InterpreterDropQuery::execute()
if (!drop.detach)
{
if (!table.first->checkTableCanBeDropped())
throw Exception("Table " + database_name + "." + table.first->getTableName() + " couldn't be dropped due to failed pre-drop check",
throw Exception(
"Table " + database_name + "." + table.first->getTableName() + " couldn't be dropped due to failed pre-drop check",
ErrorCodes::TABLE_WAS_NOT_DROPPED);
}

Expand Down Expand Up @@ -147,20 +147,19 @@ BlockIO InterpreterDropQuery::execute()

table.first->is_dropped = true;

// drop is complete, then clean tmt context;
if (auto storage = std::static_pointer_cast<StorageMergeTree>(table.first))
storage->removeFromTMTContext();

String database_data_path = database->getDataPath();

/// If it is not virtual database like Dictionary then drop remaining data dir
const String database_data_path = database->getDataPath();
if (!database_data_path.empty())
{
String table_data_path = database_data_path + "/" + escapeForFileName(current_table_name);

if (Poco::File(table_data_path).exists())
Poco::File(table_data_path).remove(true);
}

// drop is complete, then clean tmt context
if (auto storage = std::dynamic_pointer_cast<IManageableStorage>(table.first); storage)
storage->removeFromTMTContext();
}
}

Expand Down Expand Up @@ -214,4 +213,4 @@ void InterpreterDropQuery::checkAccess(const ASTDropQuery & drop)
throw Exception("Cannot drop table in readonly mode", ErrorCodes::READONLY);
}

}
} // namespace DB
17 changes: 10 additions & 7 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Poco/Net/HTTPServer.h>
#include <Poco/Net/NetException.h>
#include <Poco/StringTokenizer.h>
#include <Storages/MutableSupport.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/System/attachSystemTables.h>
#include <Storages/Transaction/KVStore.h>
Expand Down Expand Up @@ -447,22 +448,24 @@ int Server::main(const std::vector<std::string> & /*args*/)
}

/// "tmt" engine ONLY support disable_bg_flush = false.
/// "dm" engine by default disable_bg_flush = true.
/// "dt" engine ONLY support disable_bg_flush = true.

String disable_bg_flush_conf = "raft.disable_bg_flush";
if (engine == ::TiDB::StorageEngine::TMT)
{
if (config().has(disable_bg_flush_conf) && config().getBool(disable_bg_flush_conf))
throw Exception(
"Illegal arguments: disable background flush while using engine TxnMergeTree.", ErrorCodes::INVALID_CONFIG_PARAMETER);
throw Exception("Illegal arguments: disable background flush while using engine " + MutableSupport::txn_storage_name,
ErrorCodes::INVALID_CONFIG_PARAMETER);
disable_bg_flush = false;
}
else if (engine == ::TiDB::StorageEngine::DT)
{
if (config().has(disable_bg_flush_conf))
disable_bg_flush = config().getBool(disable_bg_flush_conf);
else
disable_bg_flush = true;
/// If background flush is enabled, read will not triggle schema sync.
/// Which means that we may get the wrong result with outdated schema.
if (config().has(disable_bg_flush_conf) && !config().getBool(disable_bg_flush_conf))
throw Exception("Illegal arguments: enable background flush while using engine " + MutableSupport::delta_tree_storage_name,
ErrorCodes::INVALID_CONFIG_PARAMETER);
disable_bg_flush = true;
}
}

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/CompactDelta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ bool DeltaValueSpace::compact(DMContext & context)
if (unlikely(pack->isDeleteRange()))
throw Exception("Unexpectedly selected a delete range to compact", ErrorCodes::LOGICAL_ERROR);

// We ensure schema of all packs are the same
Block block = pack->isCached() ? readPackFromCache(pack) : readPackFromDisk(pack, reader);
size_t block_rows = block.rows();
for (size_t i = 0; i < schema.columns(); ++i)
{
compact_columns[i]->insertRangeFrom(*block.getByPosition(i).column, 0, block_rows);
}

wbs.removed_log.delPage(pack->data_page);
}
Expand Down
Loading