Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fullstack test for engine DeltaTree (#524) #527

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/InterpreterManageQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <Interpreters/InterpreterManageQuery.h>
#include <Parsers/ASTManageQuery.h>
#include <Storages/IStorage.h>

#include <Storages/MutableSupport.h>
#include <Storages/StorageDeltaMerge.h>

namespace DB
Expand All @@ -14,7 +14,7 @@ BlockIO InterpreterManageQuery::execute()

StoragePtr table = context.getTable(ast.database, ast.table);
IManageableStorage * manageable_storage;
if (table->getName() == "DeltaMerge")
if (table->getName() == MutableSupport::delta_tree_storage_name)
{
manageable_storage = &dynamic_cast<StorageDeltaMerge &>(*table);
}
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Server/ClusterManagerService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,21 @@ constexpr long MILLISECOND = 1000;
constexpr long INIT_DELAY = 5;

void ClusterManagerService::run(const std::string & bin_path, const std::vector<std::string> & args)
try
{
auto proc = ShellCommand::executeDirect(bin_path, args);
proc->wait();
}
catch (DB::Exception & e)
{
std::stringstream ss;
ss << bin_path;
for (const auto & arg : args)
{
ss << " " << arg;
}
e.addMessage("(while running `" + ss.str() + "`)");
}

ClusterManagerService::ClusterManagerService(DB::Context & context_, const std::string & config_path)
: context(context_), timer(), log(&Logger::get("ClusterManagerService"))
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/MutableSupport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace DB

const String MutableSupport::mmt_storage_name = "MutableMergeTree";
const String MutableSupport::txn_storage_name = "TxnMergeTree";
const String MutableSupport::delta_tree_storage_name = "DeltaMerge";

const String MutableSupport::tidb_pk_column_name = "_tidb_rowid";
const String MutableSupport::version_column_name = "_INTERNAL_VERSION";
Expand All @@ -15,4 +16,4 @@ const DataTypePtr MutableSupport::tidb_pk_column_type = DataTypeFactory::instanc
const DataTypePtr MutableSupport::version_column_type = DataTypeFactory::instance().get("UInt64");
const DataTypePtr MutableSupport::delmark_column_type = DataTypeFactory::instance().get("UInt8");

}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Storages/MutableSupport.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class MutableSupport : public ext::singleton<MutableSupport>

static const String mmt_storage_name;
static const String txn_storage_name;
static const String delta_tree_storage_name;

static const String tidb_pk_column_name;
static const String version_column_name;
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/FilterParser/FilterParser.h>
#include <Storages/StorageDeltaMergeHelpers.h>
#include <Storages/MutableSupport.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/StorageDeltaMergeHelpers.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionException.h>
Expand Down Expand Up @@ -838,6 +839,8 @@ void StorageDeltaMerge::alterImpl(const AlterCommands & commands,
setColumns(std::move(new_columns));
}

String StorageDeltaMerge::getName() const { return MutableSupport::delta_tree_storage_name; }

void StorageDeltaMerge::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
const String new_path = new_path_to_db + "/" + new_table_name;
Expand Down Expand Up @@ -915,8 +918,8 @@ void updateDeltaMergeTableCreateStatement( //
else if (args.children.size() == 2)
args.children.back() = literal;
else
throw Exception(
"Wrong arguments num:" + DB::toString(args.children.size()) + " in table: " + table_name + " with engine=DeltaMerge",
throw Exception("Wrong arguments num:" + DB::toString(args.children.size()) + " in table: " + table_name
+ " with engine=" + MutableSupport::delta_tree_storage_name,
ErrorCodes::BAD_ARGUMENTS);
};

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class StorageDeltaMerge : public ext::shared_ptr_helper<StorageDeltaMerge>, publ

bool supportsModification() const override { return true; }

String getName() const override { return "DeltaMerge"; }
String getName() const override;
String getTableName() const override { return table_name; }

void drop() override;
Expand Down
62 changes: 36 additions & 26 deletions dbms/src/Storages/System/StorageSystemTables.cpp
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Common/typeid_cast.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemTables.h>
#include <Storages/VirtualColumnUtils.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/queryToString.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypesNumber.h>

#include <Storages/IManageableStorage.h>
#include <Storages/MutableSupport.h>
#include <Storages/System/StorageSystemTables.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/Types.h>
#include <Storages/VirtualColumnUtils.h>

namespace DB
{

namespace ErrorCodes
{
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
}

/// Some virtual columns routines
Expand Down Expand Up @@ -52,7 +55,8 @@ NameAndTypePair tryGetColumn(const ColumnsWithTypeAndName & columns, const Strin
struct VirtualColumnsProcessor
{
explicit VirtualColumnsProcessor(const ColumnsWithTypeAndName & all_virtual_columns_)
: all_virtual_columns(all_virtual_columns_), virtual_columns_mask(all_virtual_columns_.size(), 0) {}
: all_virtual_columns(all_virtual_columns_), virtual_columns_mask(all_virtual_columns_.size(), 0)
{}

/// Separates real and virtual column names, returns real ones
Names process(const Names & column_names, const std::vector<bool *> & virtual_columns_exists_flag = {})
Expand Down Expand Up @@ -108,28 +112,23 @@ struct VirtualColumnsProcessor
std::vector<UInt8> virtual_columns_mask;
};

}
} // namespace


StorageSystemTables::StorageSystemTables(const std::string & name_)
: name(name_)
StorageSystemTables::StorageSystemTables(const std::string & name_) : name(name_)
{
setColumns(ColumnsDescription(
{
setColumns(ColumnsDescription({
{"database", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"tidb_table_id", std::make_shared<DataTypeInt64>()},
{"is_temporary", std::make_shared<DataTypeUInt8>()},
{"data_path", std::make_shared<DataTypeString>()},
{"metadata_path", std::make_shared<DataTypeString>()},
}));

virtual_columns =
{
{std::make_shared<DataTypeDateTime>(), "metadata_modification_time"},
{std::make_shared<DataTypeString>(), "create_table_query"},
{std::make_shared<DataTypeString>(), "engine_full"}
};
virtual_columns = {{std::make_shared<DataTypeDateTime>(), "metadata_modification_time"},
{std::make_shared<DataTypeString>(), "create_table_query"}, {std::make_shared<DataTypeString>(), "engine_full"}};
}


Expand All @@ -139,14 +138,13 @@ static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & cont
for (const auto & db : context.getDatabases())
column->insert(db.first);

Block block { ColumnWithTypeAndName( std::move(column), std::make_shared<DataTypeString>(), "database" ) };
Block block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database")};
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
return block.getByPosition(0).column;
}


BlockInputStreams StorageSystemTables::read(
const Names & column_names,
BlockInputStreams StorageSystemTables::read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
Expand All @@ -161,7 +159,8 @@ BlockInputStreams StorageSystemTables::read(
bool has_engine_full = false;

VirtualColumnsProcessor virtual_columns_processor(virtual_columns);
real_column_names = virtual_columns_processor.process(column_names, {&has_metadata_modification_time, &has_create_table_query, &has_engine_full});
real_column_names
= virtual_columns_processor.process(column_names, {&has_metadata_modification_time, &has_create_table_query, &has_engine_full});
check(real_column_names);

Block res_block = getSampleBlock();
Expand Down Expand Up @@ -190,7 +189,18 @@ BlockInputStreams StorageSystemTables::read(
size_t j = 0;
res_columns[j++]->insert(database_name);
res_columns[j++]->insert(table_name);
res_columns[j++]->insert(iterator->table()->getName());
const String engine_name = iterator->table()->getName();
res_columns[j++]->insert(engine_name);
TableID table_id = -1;
if (engine_name == MutableSupport::txn_storage_name || engine_name == MutableSupport::delta_tree_storage_name)
{
auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(iterator->table());
if (managed_storage)
{
table_id = managed_storage->getTableInfo().id;
}
}
res_columns[j++]->insert(Int64(table_id));
res_columns[j++]->insert(UInt64(0));
res_columns[j++]->insert(iterator->table()->getDataPath());
res_columns[j++]->insert(database->getTableMetadataPath(table_name));
Expand Down Expand Up @@ -268,4 +278,4 @@ NameAndTypePair StorageSystemTables::getColumn(const String & column_name) const
return !virtual_column.name.empty() ? virtual_column : ITableDeclaration::getColumn(column_name);
}

}
} // namespace DB
5 changes: 3 additions & 2 deletions dbms/src/Storages/Transaction/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ void SchemaBuilder<Getter>::applyAlterPartition(TiDB::DBInfoPtr db_info, TableID
auto storage = tmt_context.getStorages().get(table_id).get();
if (storage == nullptr)
{
throw Exception("miss table in Flash " + table_info->name, ErrorCodes::DDL_ERROR);
throw Exception("miss table in Flash `" + table_info->db_name + "`.`" + table_info->name + "`, id: " + DB::toString(table_id),
ErrorCodes::DDL_ERROR);
}
const String & db_name = storage->getDatabaseName();
const auto & orig_table_info = storage->getTableInfo();
Expand Down Expand Up @@ -878,7 +879,7 @@ void SchemaBuilder<Getter>::syncAllSchema()
std::vector<TableInfoPtr> tables = getter.listTables(db->id);
for (const auto & table : tables)
{
LOG_DEBUG(log, "collect table: " << table->name << " with id "<< table->id);
LOG_DEBUG(log, "collect table: " << table->name << " with id " << table->id);
all_tables.emplace_back(table, db);
if (table->isLogicalPartitionTable())
{
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/Storages/Transaction/TMTStorages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ ManageableStoragePtr ManagedStorages::get(TableID table_id) const
{
std::lock_guard lock(mutex);

auto it = storages.find(table_id);
if (it == storages.end())
return nullptr;
return it->second;
if (auto it = storages.find(table_id); it != storages.end())
return it->second;
return nullptr;
}

std::unordered_map<TableID, ManageableStoragePtr> ManagedStorages::getAllStorage() const
Expand All @@ -36,7 +35,7 @@ ManageableStoragePtr ManagedStorages::getByName(const std::string & db, const st
std::lock_guard lock(mutex);

auto it = std::find_if(storages.begin(), storages.end(), [&](const std::pair<TableID, ManageableStoragePtr> & pair) {
auto &storage = pair.second;
auto & storage = pair.second;
return storage->getDatabaseName() == db && storage->getTableName() == table;
});
if (it == storages.end())
Expand Down
56 changes: 56 additions & 0 deletions tests/docker/cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
version: '2.3'

services:
pd0:
image: hub.pingcap.net/qa/pd:${BRANCH:-master}
ports:
- "2379:2379"
volumes:
- ./config/pd.toml:/pd.toml:ro
- ./data/pd0:/data
- ./log/pd0:/log
command:
- --name=pd0
- --client-urls=http://0.0.0.0:2379
- --peer-urls=http://0.0.0.0:2380
- --advertise-client-urls=http://pd0:2379
- --advertise-peer-urls=http://pd0:2380
- --initial-cluster=pd0=http://pd0:2380
- --config=/pd.toml
- --data-dir=/data
- --log-file=/log/pd.log
restart: on-failure
tikv0:
image: hub.pingcap.net/qa/tikv:${BRANCH:-master}
ports:
- "20160:20160"
volumes:
- ./config/tikv.toml:/tikv.toml:ro
- ./data/tikv0:/data
- ./log/tikv0:/log
command:
- --addr=0.0.0.0:20160
- --advertise-addr=tikv0:20160
- --pd=pd0:2379
- --config=/tikv.toml
- --data-dir=/data
- --log-file=/log/tikv.log
depends_on:
- "pd0"
restart: on-failure
tidb0:
image: hub.pingcap.net/qa/tidb:${BRANCH:-master}
ports:
- "4000:4000"
- "10080:10080"
volumes:
- ./config/tidb.toml:/tidb.toml:ro
- ./log/tidb0:/log
command:
- --store=tikv
- --path=pd0:2379
- --config=/tidb.toml
- --log-file=/log/tidb.log
depends_on:
- "tikv0"
restart: on-failure
3 changes: 1 addition & 2 deletions tests/docker/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,5 @@ interserver_http_port = 9009
[application]
runAsDaemon = true
[raft]
# specify which storage engine we use. tmt or dm
# specify which storage engine we use. tmt or dt
storage_engine = "tmt"
disable_bg_flush = false
39 changes: 39 additions & 0 deletions tests/docker/config/tiflash_dt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
tmp_path = "/tmp/tiflash/data/tmp"
display_name = "TiFlash"
default_profile = "default"
users_config = "users.toml"
# specify paths used for store data, multiple path should be seperated by comma
path = "/tmp/tiflash/data/db"
mark_cache_size = 5368709120
minmax_index_cache_size = 5368709120
listen_host = "0.0.0.0"
tcp_port = 9000
http_port = 8123
interserver_http_port = 9009
[flash]
tidb_status_addr = "tidb0:10080"
service_addr = "tiflash0:3930"
[flash.flash_cluster]
refresh_interval = 20
update_rule_interval = 5
master_ttl = 60
cluster_manager_path = "/tiflash/flash_cluster_manager"
[flash.proxy]
addr = "0.0.0.0:20170"
advertise-addr = "tiflash0:20170"
data-dir = "/data"
config = "/tikv.toml"
log-file = "/log/tikv.log"
[logger]
count = 10
errorlog = "/tmp/tiflash/log/error.log"
size = "1000M"
log = "/tmp/tiflash/log/server.log"
level = "trace"
[application]
runAsDaemon = true

[raft]
pd_addr = "pd0:2379"
# specify which storage engine we use. tmt or dt
storage_engine = "dt"
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,5 @@ interserver_http_port = 9009

[raft]
pd_addr = "pd0:2379"
# specify which storage engine we use. tmt or dm
# specify which storage engine we use. tmt or dt
storage_engine = "tmt"
disable_bg_flush = false
Loading