Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-181: Fix region table restore #17

Merged
merged 5 commits into from
Mar 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,31 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->initializeSystemLogs();
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(*global_context->getDatabase("system"), has_zookeeper);
/// Load raft related configs ahead of loading metadata, as TMT storage relies on TMT context, which needs these configs.
if (config().has("raft"))
{
String raft_service_addr = config().getString("raft.service_addr");
if (config().has("raft.pd_addr"))
{
String pd_service_addrs = config().getString("raft.pd_addr");
Poco::StringTokenizer string_tokens(pd_service_addrs, ";");
std::vector<std::string> pd_addrs;
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++) {
pd_addrs.push_back(*it);
}
global_context->setPDAddrs(pd_addrs);
LOG_INFO(log, "Found pd addrs.");
} else {
LOG_INFO(log, "Not found pd addrs.");
}
global_context->initializeRaftService(raft_service_addr);
}
if (config().has("tidb"))
{
String service_ip = config().getString("tidb.service_ip");
String status_port = config().getString("tidb.status_port");
global_context->initializeTiDBService(service_ip, status_port);
}
/// Then, load remaining databases
loadMetadata(*global_context);
LOG_DEBUG(log, "Loaded metadata.");
Expand All @@ -331,31 +356,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setDDLWorker(std::make_shared<DDLWorker>(ddl_zookeeper_path, *global_context, &config(), "distributed_ddl"));
}

if (config().has("raft"))
{
String raft_service_addr = config().getString("raft.service_addr");
if (config().has("raft.pd_addr"))
{
String pd_service_addrs = config().getString("raft.pd_addr");
Poco::StringTokenizer string_tokens(pd_service_addrs, ";");
std::vector<std::string> pd_addrs;
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++) {
pd_addrs.push_back(*it);
}
global_context->setPDAddrs(pd_addrs);
LOG_INFO(log, "Found pd addrs.");
} else {
LOG_INFO(log, "Not found pd addrs.");
}
global_context->initializeRaftService(raft_service_addr);
}

if (config().has("tidb"))
{
String service_ip = config().getString("tidb.service_ip");
String status_port = config().getString("tidb.status_port");
global_context->initializeTiDBService(service_ip, status_port);
}

{
Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0);
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,16 @@ static const Seconds FTH_PERIOD_2(60 * 5); // 5 minutes
static const Seconds FTH_PERIOD_3(60); // 1 minute
static const Seconds FTH_PERIOD_4(5); // 5 seconds

RegionTable::RegionTable(Context & context_, const std::string & parent_path_, std::function<RegionPtr(RegionID)> region_fetcher)
RegionTable::RegionTable(Context & context_, const std::string & parent_path_)
: parent_path(parent_path_),
flush_thresholds(RegionTable::FlushThresholds::FlushThresholdsData{
{FTH_BYTES_1, FTH_PERIOD_1}, {FTH_BYTES_2, FTH_PERIOD_2}, {FTH_BYTES_3, FTH_PERIOD_3}, {FTH_BYTES_4, FTH_PERIOD_4}}),
context(context_),
log(&Logger::get("RegionTable"))
{
}

void RegionTable::restore(std::function<RegionPtr(RegionID)> region_fetcher)
{
Poco::File dir(parent_path + "tables/");
if (!dir.exists())
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ class RegionTable : private boost::noncopyable
void flushRegion(TableID table_id, RegionID partition_id, size_t & rest_cache_size);

public:
RegionTable(Context & context_, const std::string & parent_path_, std::function<RegionPtr(RegionID)> region_fetcher);
RegionTable(Context & context_, const std::string & parent_path_);
void restore(std::function<RegionPtr(RegionID)> region_fetcher);

void setFlushThresholds(const FlushThresholds::FlushThresholdsData & flush_thresholds_);

/// After the region is updated (insert or delete KVs).
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace DB

TMTContext::TMTContext(Context & context, std::vector<String> addrs)
: kvstore(std::make_shared<KVStore>(context.getPath() + "kvstore/")),
region_table(context, context.getPath() + "regmap/", std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1)),
region_table(context, context.getPath() + "regmap/"),
schema_syncer(std::make_shared<HttpJsonSchemaSyncer>()),
pd_client(addrs.size() == 0 ? static_cast<pingcap::pd::IClient *>(new pingcap::pd::MockPDClient())
: static_cast<pingcap::pd::IClient *>(new pingcap::pd::Client(addrs))),
Expand All @@ -18,6 +18,7 @@ TMTContext::TMTContext(Context & context, std::vector<String> addrs)
kvstore->restore([&](pingcap::kv::RegionVerID id) -> pingcap::kv::RegionClientPtr {
return this->createRegionClient(id);
}, &regions_to_remove);
region_table.restore(std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1));
for (RegionID id : regions_to_remove)
kvstore->removeRegion(id, &context);
regions_to_remove.clear();
Expand Down