Skip to content

Commit

Permalink
Add fullstack test for engine DeltaTree (#524)
Browse files Browse the repository at this point in the history
## Add fullstack test for engine DeltaTree.
* Refine `tests/docker/run.sh` and split `tests/docker/docker-compose.yaml` into `tests/docker/{gtest/mock-test/cluster/tiflash-dt/tiflash-tmt}.yaml`

`fullstack/ddl`,`fullstack-test/fault-inject` will be enabled in #526 

## Others
* Add column `tidb_table_id` in `system.tables`
* Add some debugging info

Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang authored Mar 17, 2020
1 parent aeb98b4 commit 8fd8dc5
Show file tree
Hide file tree
Showing 20 changed files with 290 additions and 170 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/InterpreterManageQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <Interpreters/InterpreterManageQuery.h>
#include <Parsers/ASTManageQuery.h>
#include <Storages/IStorage.h>

#include <Storages/MutableSupport.h>
#include <Storages/StorageDeltaMerge.h>

namespace DB
Expand All @@ -14,7 +14,7 @@ BlockIO InterpreterManageQuery::execute()

StoragePtr table = context.getTable(ast.database, ast.table);
IManageableStorage * manageable_storage;
if (table->getName() == "DeltaMerge")
if (table->getName() == MutableSupport::delta_tree_storage_name)
{
manageable_storage = &dynamic_cast<StorageDeltaMerge &>(*table);
}
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Server/ClusterManagerService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,21 @@ constexpr long MILLISECOND = 1000;
constexpr long INIT_DELAY = 5;

void ClusterManagerService::run(const std::string & bin_path, const std::vector<std::string> & args)
try
{
auto proc = ShellCommand::executeDirect(bin_path, args);
proc->wait();
}
catch (DB::Exception & e)
{
std::stringstream ss;
ss << bin_path;
for (const auto & arg : args)
{
ss << " " << arg;
}
e.addMessage("(while running `" + ss.str() + "`)");
}

ClusterManagerService::ClusterManagerService(DB::Context & context_, const std::string & config_path)
: context(context_), timer(), log(&Logger::get("ClusterManagerService"))
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/MutableSupport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace DB

const String MutableSupport::mmt_storage_name = "MutableMergeTree";
const String MutableSupport::txn_storage_name = "TxnMergeTree";
const String MutableSupport::delta_tree_storage_name = "DeltaMerge";

const String MutableSupport::tidb_pk_column_name = "_tidb_rowid";
const String MutableSupport::version_column_name = "_INTERNAL_VERSION";
Expand All @@ -15,4 +16,4 @@ const DataTypePtr MutableSupport::tidb_pk_column_type = DataTypeFactory::instanc
const DataTypePtr MutableSupport::version_column_type = DataTypeFactory::instance().get("UInt64");
const DataTypePtr MutableSupport::delmark_column_type = DataTypeFactory::instance().get("UInt8");

}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Storages/MutableSupport.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class MutableSupport : public ext::singleton<MutableSupport>

static const String mmt_storage_name;
static const String txn_storage_name;
static const String delta_tree_storage_name;

static const String tidb_pk_column_name;
static const String version_column_name;
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/FilterParser/FilterParser.h>
#include <Storages/StorageDeltaMergeHelpers.h>
#include <Storages/MutableSupport.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/StorageDeltaMergeHelpers.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionException.h>
Expand Down Expand Up @@ -838,6 +839,8 @@ void StorageDeltaMerge::alterImpl(const AlterCommands & commands,
setColumns(std::move(new_columns));
}

String StorageDeltaMerge::getName() const { return MutableSupport::delta_tree_storage_name; }

void StorageDeltaMerge::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
const String new_path = new_path_to_db + "/" + new_table_name;
Expand Down Expand Up @@ -915,8 +918,8 @@ void updateDeltaMergeTableCreateStatement( //
else if (args.children.size() == 2)
args.children.back() = literal;
else
throw Exception(
"Wrong arguments num:" + DB::toString(args.children.size()) + " in table: " + table_name + " with engine=DeltaMerge",
throw Exception("Wrong arguments num:" + DB::toString(args.children.size()) + " in table: " + table_name
+ " with engine=" + MutableSupport::delta_tree_storage_name,
ErrorCodes::BAD_ARGUMENTS);
};

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class StorageDeltaMerge : public ext::shared_ptr_helper<StorageDeltaMerge>, publ

bool supportsModification() const override { return true; }

String getName() const override { return "DeltaMerge"; }
String getName() const override;
String getTableName() const override { return table_name; }

void drop() override;
Expand Down
62 changes: 36 additions & 26 deletions dbms/src/Storages/System/StorageSystemTables.cpp
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Common/typeid_cast.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemTables.h>
#include <Storages/VirtualColumnUtils.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/queryToString.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypesNumber.h>

#include <Storages/IManageableStorage.h>
#include <Storages/MutableSupport.h>
#include <Storages/System/StorageSystemTables.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/Types.h>
#include <Storages/VirtualColumnUtils.h>

namespace DB
{

namespace ErrorCodes
{
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
}

/// Some virtual columns routines
Expand Down Expand Up @@ -52,7 +55,8 @@ NameAndTypePair tryGetColumn(const ColumnsWithTypeAndName & columns, const Strin
struct VirtualColumnsProcessor
{
explicit VirtualColumnsProcessor(const ColumnsWithTypeAndName & all_virtual_columns_)
: all_virtual_columns(all_virtual_columns_), virtual_columns_mask(all_virtual_columns_.size(), 0) {}
: all_virtual_columns(all_virtual_columns_), virtual_columns_mask(all_virtual_columns_.size(), 0)
{}

/// Separates real and virtual column names, returns real ones
Names process(const Names & column_names, const std::vector<bool *> & virtual_columns_exists_flag = {})
Expand Down Expand Up @@ -108,28 +112,23 @@ struct VirtualColumnsProcessor
std::vector<UInt8> virtual_columns_mask;
};

}
} // namespace


StorageSystemTables::StorageSystemTables(const std::string & name_)
: name(name_)
StorageSystemTables::StorageSystemTables(const std::string & name_) : name(name_)
{
setColumns(ColumnsDescription(
{
setColumns(ColumnsDescription({
{"database", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"tidb_table_id", std::make_shared<DataTypeInt64>()},
{"is_temporary", std::make_shared<DataTypeUInt8>()},
{"data_path", std::make_shared<DataTypeString>()},
{"metadata_path", std::make_shared<DataTypeString>()},
}));

virtual_columns =
{
{std::make_shared<DataTypeDateTime>(), "metadata_modification_time"},
{std::make_shared<DataTypeString>(), "create_table_query"},
{std::make_shared<DataTypeString>(), "engine_full"}
};
virtual_columns = {{std::make_shared<DataTypeDateTime>(), "metadata_modification_time"},
{std::make_shared<DataTypeString>(), "create_table_query"}, {std::make_shared<DataTypeString>(), "engine_full"}};
}


Expand All @@ -139,14 +138,13 @@ static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & cont
for (const auto & db : context.getDatabases())
column->insert(db.first);

Block block { ColumnWithTypeAndName( std::move(column), std::make_shared<DataTypeString>(), "database" ) };
Block block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database")};
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
return block.getByPosition(0).column;
}


BlockInputStreams StorageSystemTables::read(
const Names & column_names,
BlockInputStreams StorageSystemTables::read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
Expand All @@ -161,7 +159,8 @@ BlockInputStreams StorageSystemTables::read(
bool has_engine_full = false;

VirtualColumnsProcessor virtual_columns_processor(virtual_columns);
real_column_names = virtual_columns_processor.process(column_names, {&has_metadata_modification_time, &has_create_table_query, &has_engine_full});
real_column_names
= virtual_columns_processor.process(column_names, {&has_metadata_modification_time, &has_create_table_query, &has_engine_full});
check(real_column_names);

Block res_block = getSampleBlock();
Expand Down Expand Up @@ -190,7 +189,18 @@ BlockInputStreams StorageSystemTables::read(
size_t j = 0;
res_columns[j++]->insert(database_name);
res_columns[j++]->insert(table_name);
res_columns[j++]->insert(iterator->table()->getName());
const String engine_name = iterator->table()->getName();
res_columns[j++]->insert(engine_name);
TableID table_id = -1;
if (engine_name == MutableSupport::txn_storage_name || engine_name == MutableSupport::delta_tree_storage_name)
{
auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(iterator->table());
if (managed_storage)
{
table_id = managed_storage->getTableInfo().id;
}
}
res_columns[j++]->insert(Int64(table_id));
res_columns[j++]->insert(UInt64(0));
res_columns[j++]->insert(iterator->table()->getDataPath());
res_columns[j++]->insert(database->getTableMetadataPath(table_name));
Expand Down Expand Up @@ -268,4 +278,4 @@ NameAndTypePair StorageSystemTables::getColumn(const String & column_name) const
return !virtual_column.name.empty() ? virtual_column : ITableDeclaration::getColumn(column_name);
}

}
} // namespace DB
5 changes: 3 additions & 2 deletions dbms/src/Storages/Transaction/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ void SchemaBuilder<Getter>::applyAlterPartition(TiDB::DBInfoPtr db_info, TableID
auto storage = tmt_context.getStorages().get(table_id).get();
if (storage == nullptr)
{
throw Exception("miss table in Flash " + table_info->name, ErrorCodes::DDL_ERROR);
throw Exception("miss table in Flash `" + table_info->db_name + "`.`" + table_info->name + "`, id: " + DB::toString(table_id),
ErrorCodes::DDL_ERROR);
}
const String & db_name = storage->getDatabaseName();
const auto & orig_table_info = storage->getTableInfo();
Expand Down Expand Up @@ -878,7 +879,7 @@ void SchemaBuilder<Getter>::syncAllSchema()
std::vector<TableInfoPtr> tables = getter.listTables(db->id);
for (const auto & table : tables)
{
LOG_DEBUG(log, "collect table: " << table->name << " with id "<< table->id);
LOG_DEBUG(log, "collect table: " << table->name << " with id " << table->id);
all_tables.emplace_back(table, db);
if (table->isLogicalPartitionTable())
{
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/Storages/Transaction/TMTStorages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ ManageableStoragePtr ManagedStorages::get(TableID table_id) const
{
std::lock_guard lock(mutex);

auto it = storages.find(table_id);
if (it == storages.end())
return nullptr;
return it->second;
if (auto it = storages.find(table_id); it != storages.end())
return it->second;
return nullptr;
}

std::unordered_map<TableID, ManageableStoragePtr> ManagedStorages::getAllStorage() const
Expand All @@ -36,7 +35,7 @@ ManageableStoragePtr ManagedStorages::getByName(const std::string & db, const st
std::lock_guard lock(mutex);

auto it = std::find_if(storages.begin(), storages.end(), [&](const std::pair<TableID, ManageableStoragePtr> & pair) {
auto &storage = pair.second;
auto & storage = pair.second;
return storage->getDatabaseName() == db && storage->getTableName() == table;
});
if (it == storages.end())
Expand Down
56 changes: 56 additions & 0 deletions tests/docker/cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
version: '2.3'

services:
pd0:
image: hub.pingcap.net/qa/pd:${BRANCH:-master}
ports:
- "2379:2379"
volumes:
- ./config/pd.toml:/pd.toml:ro
- ./data/pd0:/data
- ./log/pd0:/log
command:
- --name=pd0
- --client-urls=http://0.0.0.0:2379
- --peer-urls=http://0.0.0.0:2380
- --advertise-client-urls=http://pd0:2379
- --advertise-peer-urls=http://pd0:2380
- --initial-cluster=pd0=http://pd0:2380
- --config=/pd.toml
- --data-dir=/data
- --log-file=/log/pd.log
restart: on-failure
tikv0:
image: hub.pingcap.net/qa/tikv:${BRANCH:-master}
ports:
- "20160:20160"
volumes:
- ./config/tikv.toml:/tikv.toml:ro
- ./data/tikv0:/data
- ./log/tikv0:/log
command:
- --addr=0.0.0.0:20160
- --advertise-addr=tikv0:20160
- --pd=pd0:2379
- --config=/tikv.toml
- --data-dir=/data
- --log-file=/log/tikv.log
depends_on:
- "pd0"
restart: on-failure
tidb0:
image: hub.pingcap.net/qa/tidb:${BRANCH:-master}
ports:
- "4000:4000"
- "10080:10080"
volumes:
- ./config/tidb.toml:/tidb.toml:ro
- ./log/tidb0:/log
command:
- --store=tikv
- --path=pd0:2379
- --config=/tidb.toml
- --log-file=/log/tidb.log
depends_on:
- "tikv0"
restart: on-failure
3 changes: 1 addition & 2 deletions tests/docker/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,5 @@ interserver_http_port = 9009
[application]
runAsDaemon = true
[raft]
# specify which storage engine we use. tmt or dm
# specify which storage engine we use. tmt or dt
storage_engine = "tmt"
disable_bg_flush = false
39 changes: 39 additions & 0 deletions tests/docker/config/tiflash_dt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
tmp_path = "/tmp/tiflash/data/tmp"
display_name = "TiFlash"
default_profile = "default"
users_config = "users.toml"
# specify paths used for store data, multiple path should be seperated by comma
path = "/tmp/tiflash/data/db"
mark_cache_size = 5368709120
minmax_index_cache_size = 5368709120
listen_host = "0.0.0.0"
tcp_port = 9000
http_port = 8123
interserver_http_port = 9009
[flash]
tidb_status_addr = "tidb0:10080"
service_addr = "tiflash0:3930"
[flash.flash_cluster]
refresh_interval = 20
update_rule_interval = 5
master_ttl = 60
cluster_manager_path = "/tiflash/flash_cluster_manager"
[flash.proxy]
addr = "0.0.0.0:20170"
advertise-addr = "tiflash0:20170"
data-dir = "/data"
config = "/tikv.toml"
log-file = "/log/tikv.log"
[logger]
count = 10
errorlog = "/tmp/tiflash/log/error.log"
size = "1000M"
log = "/tmp/tiflash/log/server.log"
level = "trace"
[application]
runAsDaemon = true

[raft]
pd_addr = "pd0:2379"
# specify which storage engine we use. tmt or dt
storage_engine = "dt"
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,5 @@ interserver_http_port = 9009

[raft]
pd_addr = "pd0:2379"
# specify which storage engine we use. tmt or dm
# specify which storage engine we use. tmt or dt
storage_engine = "tmt"
disable_bg_flush = false
Loading

0 comments on commit 8fd8dc5

Please sign in to comment.