Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge master branch #556

Merged
merged 14 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dbms/src/Common/FailPoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ extern const int FAIL_POINT_ERROR;

#define FAIL_POINT_REGISTER(name) static constexpr char name[] = #name "";

#define FAIL_POINT_ENABLE(trigger, name) else if (trigger == name) fiu_enable(name, 1, nullptr, FIU_ONETIME);
#define FAIL_POINT_ENABLE(trigger, name) else if (trigger == name) { fiu_enable(name, 1, nullptr, FIU_ONETIME); }

FAIL_POINT_REGISTER(exception_between_drop_meta_and_data)
FAIL_POINT_REGISTER(exception_between_alter_data_and_meta)
FAIL_POINT_REGISTER(exception_drop_table_during_remove_meta)
FAIL_POINT_REGISTER(exception_between_rename_table_data_and_metadata);

#define FAIL_POINT_TRIGGER_EXCEPTION(fail_point) \
fiu_do_on(fail_point, throw Exception("Fail point " #fail_point " is triggered.", ErrorCodes::FAIL_POINT_ERROR);)
Expand All @@ -33,7 +34,8 @@ class FailPointHelper
FAIL_POINT_ENABLE(fail_point_name, exception_between_alter_data_and_meta)
FAIL_POINT_ENABLE(fail_point_name, exception_between_drop_meta_and_data)
FAIL_POINT_ENABLE(fail_point_name, exception_drop_table_during_remove_meta)
FAIL_POINT_ENABLE(fail_point_name, exception_between_rename_table_data_and_metadata)
else throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR);
}
};
} // namespace DB
} // namespace DB
20 changes: 7 additions & 13 deletions dbms/src/Core/ColumnWithTypeAndName.cpp
Original file line number Diff line number Diff line change
@@ -1,38 +1,30 @@
#include <Core/ColumnsWithTypeAndName.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>


namespace DB
{

ColumnWithTypeAndName ColumnWithTypeAndName::cloneEmpty() const
{
ColumnWithTypeAndName res;

res.name = name;
res.type = type;
res.column_id = column_id;
if (column)
res.column = column->cloneEmpty();

ColumnWithTypeAndName res{(column != nullptr ? column->cloneEmpty() : nullptr), type, name, column_id, default_value};
return res;
}


bool ColumnWithTypeAndName::operator==(const ColumnWithTypeAndName & other) const
{
// TODO should we check column_id here?
return name == other.name
&& ((!type && !other.type) || (type && other.type && type->equals(*other.type)))
return name == other.name && ((!type && !other.type) || (type && other.type && type->equals(*other.type)))
&& ((!column && !other.column) || (column && other.column && column->getName() == other.column->getName()));
}


void ColumnWithTypeAndName::dumpStructure(WriteBuffer & out) const
{
out << name;
out << name << ' ' << column_id;

if (type)
out << ' ' << type->getName();
Expand All @@ -43,6 +35,8 @@ void ColumnWithTypeAndName::dumpStructure(WriteBuffer & out) const
out << ' ' << column->dumpStructure();
else
out << " nullptr";

out << " " << column_id;
}

String ColumnWithTypeAndName::dumpStructure() const
Expand All @@ -52,4 +46,4 @@ String ColumnWithTypeAndName::dumpStructure() const
return out.str();
}

}
} // namespace DB
7 changes: 7 additions & 0 deletions dbms/src/Databases/DatabaseOrdinary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ void DatabaseOrdinary::removeTable(

try
{
// If tiflash crash before remove metadata, next time it restart, will
// full apply schema from TiDB. And the old table's metadata and data
// will be removed.
FAIL_POINT_TRIGGER_EXCEPTION(exception_drop_table_during_remove_meta);
Poco::File(table_metadata_path).remove();
}
Expand Down Expand Up @@ -410,13 +413,17 @@ void DatabaseOrdinary::renameTable(
throw Exception{e};
}

// TODO: Atomic rename table is not fixed.
FAIL_POINT_TRIGGER_EXCEPTION(exception_between_rename_table_data_and_metadata);

ASTPtr ast = getQueryFromMetadata(detail::getTableMetadataPath(metadata_path, table_name));
if (!ast)
throw Exception("There is no metadata file for table " + table_name, ErrorCodes::FILE_DOESNT_EXIST);
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
ast_create_query.table = to_table_name;

/// NOTE Non-atomic.
// Create new metadata and remove old metadata.
to_database_concrete->createTable(context, to_table_name, table, ast);
removeTable(context, table_name);
}
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#include <cstring>
#include <thread>

#include <DataStreams/StringStreamBlockInputStream.h>
#include <Debug/ClusterManage.h>
#include <Debug/DBGInvoker.h>
#include <Debug/dbgFuncCoprocessor.h>
#include <Debug/dbgFuncFailPoint.h>
#include <Debug/dbgFuncMockRaftCommand.h>
#include <Debug/dbgFuncMockTiDBData.h>
#include <Debug/dbgFuncMockTiDBTable.h>
#include <Debug/dbgFuncRegion.h>
#include <Debug/dbgFuncSchema.h>
#include <Debug/dbgFuncFailPoint.h>
#include <Parsers/ASTLiteral.h>

#include <cstring>
#include <thread>

namespace DB
{

Expand Down Expand Up @@ -88,6 +88,8 @@ DBGInvoker::DBGInvoker()

regSchemafulFunc("dag", dbgFuncDAG);
regSchemafulFunc("mock_dag", dbgFuncMockDAG);

regSchemalessFunc("region_mock_ingest_sst", dbgFuncIngestSST);
}

void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
else if (engine_type == "buggy")
table_info.engine_type = TiDB::StorageEngine::DEBUGGING_MEMORY;
else
throw Exception("Unknown engine type : " + engine_type + ", must be 'tmt' or 'dm'", ErrorCodes::BAD_ARGUMENTS);
throw Exception("Unknown engine type : " + engine_type + ", must be 'tmt' or 'dt'", ErrorCodes::BAD_ARGUMENTS);

auto table = std::make_shared<Table>(database_name, table_name, std::move(table_info));
tables_by_id.emplace(table->table_info.id, table);
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Debug/MockTiKV.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ class MockTiKV : public ext::singleton<MockTiKV>
std::lock_guard lock(mutex);
auto it = raft_index.find(region_id);
if (it == raft_index.end())
it = raft_index.emplace_hint(it, region_id, RAFT_INIT_LOG_INDEX);
{
// Usually index 6 is empty and we ignore it.
// https://github.com/tikv/tikv/issues/7047
auto init_index = RAFT_INIT_LOG_INDEX + 1;
it = raft_index.emplace_hint(it, region_id, init_index);
}
++(it->second);
return it->second;
}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/dbgFuncMockRaftCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ struct MockRaftCommand
// Usage:
// ./storages-client.sh "DBGInvoke region_rollback_merge(region_id, database_name, table_name, start1, end1, start2, end2)"
static void dbgFuncRollbackMerge(Context & context, const ASTs & args, DBGInvoker::Printer output);

static void dbgFuncIngestSST(Context & context, const ASTs & args, DBGInvoker::Printer output);
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ extern const int LOGICAL_ERROR;
void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 3 && args.size() != 4 && args.size() != 5)
throw Exception("Args not matched, should be: database-name, table-name, schema-string [, handle_pk_name], [, engine-type(tmt|dm|buggy)]", ErrorCodes::BAD_ARGUMENTS);
throw Exception("Args not matched, should be: database-name, table-name, schema-string [, handle_pk_name], [, engine-type(tmt|dt|buggy)]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncMockTiDBTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct MockTiDBTable
// Inject mocked TiDB table.
// Usage:
// ./storages-client.sh "DBGInvoke mock_tidb_table(database_name, table_name, 'col1 type1, col2 type2, ...'[, engine])"
// engine: [tmt, dm, buggy], tmt by default
// engine: [tmt, dt, buggy], tmt by default
static void dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Inject mocked TiDB table.
Expand Down
77 changes: 66 additions & 11 deletions dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,6 @@ void dbgFuncRegionSnapshot(Context & context, const ASTs & args, DBGInvoker::Pri
TMTContext & tmt = context.getTMTContext();

metapb::Region region_info;
SnapshotDataView lock_cf;
SnapshotDataView write_cf;
SnapshotDataView default_cf;

region_info.set_id(region_id);
region_info.set_start_key(RecordKVFormat::genKey(table_id, start).getStr());
Expand All @@ -194,14 +191,8 @@ void dbgFuncRegionSnapshot(Context & context, const ASTs & args, DBGInvoker::Pri
*region_info.add_peers() = createPeer(2, true);
auto peer_id = 1;

tmt.getKVStore()->handleApplySnapshot(std::move(region_info),
peer_id,
(lock_cf),
(write_cf),
(default_cf),
MockTiKV::instance().getRaftIndex(region_id),
RAFT_INIT_LOG_TERM,
tmt);
tmt.getKVStore()->handleApplySnapshot(
std::move(region_info), peer_id, SnapshotViewArray(), MockTiKV::instance().getRaftIndex(region_id), RAFT_INIT_LOG_TERM, tmt);

std::stringstream ss;
ss << "put region #" << region_id << ", range[" << start << ", " << end << ")"
Expand Down Expand Up @@ -335,4 +326,68 @@ void dbgFuncRemoveRegion(Context & context, const ASTs & args, DBGInvoker::Print
output(ss.str());
}

void dbgFuncIngestSST(Context & context, const ASTs & args, DBGInvoker::Printer)
{
const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
RegionID region_id = (RegionID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[2]).value);
RegionID start_handle = (RegionID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[3]).value);
RegionID end_handle = (RegionID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[4]).value);
MockTiDB::TablePtr table = MockTiDB::instance().getTableByName(database_name, table_name);

std::vector<std::pair<TiKVKey, TiKVValue>> write_kv_list, default_kv_list;

for (auto handle_id = start_handle; handle_id < end_handle; ++handle_id)
{
// make it have only one column Int64 just for test
std::vector<Field> fields;
fields.emplace_back(-handle_id);
{
TiKVKey key = RecordKVFormat::genKey(table->id(), handle_id);
std::stringstream ss;
RegionBench::encodeRow(table->table_info, fields, ss);
TiKVValue prewrite_value(ss.str());
UInt64 commit_ts = handle_id;
UInt64 prewrite_ts = commit_ts;
TiKVValue commit_value = RecordKVFormat::encodeWriteCfValue(Region::PutFlag, prewrite_ts);
TiKVKey commit_key = RecordKVFormat::appendTs(key, commit_ts);
TiKVKey prewrite_key = RecordKVFormat::appendTs(key, prewrite_ts);

write_kv_list.emplace_back(std::make_pair(std::move(commit_key), std::move(commit_value)));
default_kv_list.emplace_back(std::make_pair(std::move(prewrite_key), std::move(prewrite_value)));
}
}

{
std::vector<BaseBuffView> keys;
std::vector<BaseBuffView> vals;
for (const auto & kv : write_kv_list)
{
keys.push_back({kv.first.data(), kv.first.dataSize()});
vals.push_back({kv.second.data(), kv.second.dataSize()});
}
std::vector<SnapshotView> snaps;
snaps.push_back(SnapshotView{keys.data(), vals.data(), ColumnFamilyType::Write, keys.size()});

auto & tmt = context.getTMTContext();
tmt.getKVStore()->handleIngestSST(region_id, SnapshotViewArray{snaps.data(), snaps.size()},
MockTiKV::instance().getRaftIndex(region_id), MockTiKV::instance().getRaftTerm(region_id), tmt);
}

{
std::vector<BaseBuffView> keys;
std::vector<BaseBuffView> vals;
for (const auto & kv : default_kv_list)
{
keys.push_back({kv.first.data(), kv.first.dataSize()});
vals.push_back({kv.second.data(), kv.second.dataSize()});
}
std::vector<SnapshotView> snaps;
snaps.push_back(SnapshotView{keys.data(), vals.data(), ColumnFamilyType::Default, keys.size()});
auto & tmt = context.getTMTContext();
tmt.getKVStore()->handleIngestSST(region_id, SnapshotViewArray{snaps.data(), snaps.size()},
MockTiKV::instance().getRaftIndex(region_id), MockTiKV::instance().getRaftTerm(region_id), tmt);
}
}

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Debug/dbgFuncRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ void dbgFuncTryFlushRegion(Context & context, const ASTs & args, DBGInvoker::Pri
// ./storage-client.sh "DBGInvoke remove_region(region_id)"
void dbgFuncRemoveRegion(Context & context, const ASTs & args, DBGInvoker::Printer output);

void dbgFuncIngestSST(Context & context, const ASTs & args, DBGInvoker::Printer output);

} // namespace DB
13 changes: 13 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
info.range_in_table = current_region->getHandleRangeByTable(table_id);
query_info.mvcc_query_info->regions_query_info.push_back(info);
query_info.mvcc_query_info->concurrent = 0.0;
<<<<<<< HEAD
if (ts.next_read_engine() == tipb::EngineType::Local) {
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size,
max_streams);
Expand Down Expand Up @@ -483,6 +484,18 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
>>>>>>> c2d620553... support batch cop

LOG_INFO(log, "dag execution stream size: " << dag.getRegions().size());
=======
try
{
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
}
catch (DB::Exception & e)
{
e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName()
+ "`, table_id: " + DB::toString(table_id) + ")");
throw;
}
>>>>>>> b7776d3a8452a4288445a91a364f2e88c66f0fe6

if (pipeline.streams.empty())
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/InterpreterDescribeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
Block sample_block = getSampleBlock();
MutableColumns res_columns = sample_block.cloneEmptyColumns();

OrderedNameSet filtered_names = MutableSupport::instance().hiddenColumns(table->getName());
const OrderedNameSet filtered_names = MutableSupport::instance().hiddenColumns(table->getName());

for (const auto & column : columns)
{
Expand Down
Loading