Skip to content

Commit

Permalink
FLASH-181: Fix region table restore (#17)
Browse files Browse the repository at this point in the history
* Fix null region client after region restored

* Fix region table restore

* Fix typo
  • Loading branch information
zanmato1984 authored Mar 22, 2019
1 parent 1a19638 commit e3cf23e
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 28 deletions.
50 changes: 25 additions & 25 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,31 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->initializeSystemLogs();
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(*global_context->getDatabase("system"), has_zookeeper);
/// Load raft related configs ahead of loading metadata, as TMT storage relies on TMT context, which needs these configs.
if (config().has("raft"))
{
String raft_service_addr = config().getString("raft.service_addr");
if (config().has("raft.pd_addr"))
{
String pd_service_addrs = config().getString("raft.pd_addr");
Poco::StringTokenizer string_tokens(pd_service_addrs, ";");
std::vector<std::string> pd_addrs;
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++) {
pd_addrs.push_back(*it);
}
global_context->setPDAddrs(pd_addrs);
LOG_INFO(log, "Found pd addrs.");
} else {
LOG_INFO(log, "Not found pd addrs.");
}
global_context->initializeRaftService(raft_service_addr);
}
if (config().has("tidb"))
{
String service_ip = config().getString("tidb.service_ip");
String status_port = config().getString("tidb.status_port");
global_context->initializeTiDBService(service_ip, status_port);
}
/// Then, load remaining databases
loadMetadata(*global_context);
LOG_DEBUG(log, "Loaded metadata.");
Expand All @@ -331,31 +356,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setDDLWorker(std::make_shared<DDLWorker>(ddl_zookeeper_path, *global_context, &config(), "distributed_ddl"));
}

if (config().has("raft"))
{
String raft_service_addr = config().getString("raft.service_addr");
if (config().has("raft.pd_addr"))
{
String pd_service_addrs = config().getString("raft.pd_addr");
Poco::StringTokenizer string_tokens(pd_service_addrs, ";");
std::vector<std::string> pd_addrs;
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++) {
pd_addrs.push_back(*it);
}
global_context->setPDAddrs(pd_addrs);
LOG_INFO(log, "Found pd addrs.");
} else {
LOG_INFO(log, "Not found pd addrs.");
}
global_context->initializeRaftService(raft_service_addr);
}

if (config().has("tidb"))
{
String service_ip = config().getString("tidb.service_ip");
String status_port = config().getString("tidb.status_port");
global_context->initializeTiDBService(service_ip, status_port);
}

{
Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0);
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,16 @@ static const Seconds FTH_PERIOD_2(60 * 5); // 5 minutes
static const Seconds FTH_PERIOD_3(60); // 1 minute
static const Seconds FTH_PERIOD_4(5); // 5 seconds

RegionTable::RegionTable(Context & context_, const std::string & parent_path_, std::function<RegionPtr(RegionID)> region_fetcher)
RegionTable::RegionTable(Context & context_, const std::string & parent_path_)
: parent_path(parent_path_),
flush_thresholds(RegionTable::FlushThresholds::FlushThresholdsData{
{FTH_BYTES_1, FTH_PERIOD_1}, {FTH_BYTES_2, FTH_PERIOD_2}, {FTH_BYTES_3, FTH_PERIOD_3}, {FTH_BYTES_4, FTH_PERIOD_4}}),
context(context_),
log(&Logger::get("RegionTable"))
{
}

void RegionTable::restore(std::function<RegionPtr(RegionID)> region_fetcher)
{
Poco::File dir(parent_path + "tables/");
if (!dir.exists())
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ class RegionTable : private boost::noncopyable
void flushRegion(TableID table_id, RegionID partition_id, size_t & rest_cache_size);

public:
RegionTable(Context & context_, const std::string & parent_path_, std::function<RegionPtr(RegionID)> region_fetcher);
RegionTable(Context & context_, const std::string & parent_path_);
void restore(std::function<RegionPtr(RegionID)> region_fetcher);

void setFlushThresholds(const FlushThresholds::FlushThresholdsData & flush_thresholds_);

/// After the region is updated (insert or delete KVs).
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace DB

TMTContext::TMTContext(Context & context, std::vector<String> addrs)
: kvstore(std::make_shared<KVStore>(context.getPath() + "kvstore/")),
region_table(context, context.getPath() + "regmap/", std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1)),
region_table(context, context.getPath() + "regmap/"),
schema_syncer(std::make_shared<HttpJsonSchemaSyncer>()),
pd_client(addrs.size() == 0 ? static_cast<pingcap::pd::IClient *>(new pingcap::pd::MockPDClient())
: static_cast<pingcap::pd::IClient *>(new pingcap::pd::Client(addrs))),
Expand All @@ -18,6 +18,7 @@ TMTContext::TMTContext(Context & context, std::vector<String> addrs)
kvstore->restore([&](pingcap::kv::RegionVerID id) -> pingcap::kv::RegionClientPtr {
return this->createRegionClient(id);
}, &regions_to_remove);
region_table.restore(std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1));
for (RegionID id : regions_to_remove)
kvstore->removeRegion(id, &context);
regions_to_remove.clear();
Expand Down

0 comments on commit e3cf23e

Please sign in to comment.