diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 452f84b0c65..38872db9b87 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -307,6 +307,31 @@ int Server::main(const std::vector & /*args*/) global_context->initializeSystemLogs(); /// After the system database is created, attach virtual system tables (in addition to query_log and part_log) attachSystemTablesServer(*global_context->getDatabase("system"), has_zookeeper); + /// Load raft related configs ahead of loading metadata, as TMT storage relies on TMT context, which needs these configs. + if (config().has("raft")) + { + String raft_service_addr = config().getString("raft.service_addr"); + if (config().has("raft.pd_addr")) + { + String pd_service_addrs = config().getString("raft.pd_addr"); + Poco::StringTokenizer string_tokens(pd_service_addrs, ";"); + std::vector pd_addrs; + for (auto it = string_tokens.begin(); it != string_tokens.end(); it++) { + pd_addrs.push_back(*it); + } + global_context->setPDAddrs(pd_addrs); + LOG_INFO(log, "Found pd addrs."); + } else { + LOG_INFO(log, "Not found pd addrs."); + } + global_context->initializeRaftService(raft_service_addr); + } + if (config().has("tidb")) + { + String service_ip = config().getString("tidb.service_ip"); + String status_port = config().getString("tidb.status_port"); + global_context->initializeTiDBService(service_ip, status_port); + } /// Then, load remaining databases loadMetadata(*global_context); LOG_DEBUG(log, "Loaded metadata."); @@ -331,31 +356,6 @@ int Server::main(const std::vector & /*args*/) global_context->setDDLWorker(std::make_shared(ddl_zookeeper_path, *global_context, &config(), "distributed_ddl")); } - if (config().has("raft")) - { - String raft_service_addr = config().getString("raft.service_addr"); - if (config().has("raft.pd_addr")) - { - String pd_service_addrs = config().getString("raft.pd_addr"); - Poco::StringTokenizer string_tokens(pd_service_addrs, ";"); - std::vector pd_addrs; - for (auto it = string_tokens.begin(); it != string_tokens.end(); it++) { - pd_addrs.push_back(*it); - } - global_context->setPDAddrs(pd_addrs); - LOG_INFO(log, "Found pd addrs."); - } else { - LOG_INFO(log, "Not found pd addrs."); - } - global_context->initializeRaftService(raft_service_addr); - } - - if (config().has("tidb")) - { - String service_ip = config().getString("tidb.service_ip"); - String status_port = config().getString("tidb.status_port"); - global_context->initializeTiDBService(service_ip, status_port); - } { Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0); diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index a90cc5a5db5..05c53f96496 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -224,12 +224,16 @@ static const Seconds FTH_PERIOD_2(60 * 5); // 5 minutes static const Seconds FTH_PERIOD_3(60); // 1 minute static const Seconds FTH_PERIOD_4(5); // 5 seconds -RegionTable::RegionTable(Context & context_, const std::string & parent_path_, std::function region_fetcher) +RegionTable::RegionTable(Context & context_, const std::string & parent_path_) : parent_path(parent_path_), flush_thresholds(RegionTable::FlushThresholds::FlushThresholdsData{ {FTH_BYTES_1, FTH_PERIOD_1}, {FTH_BYTES_2, FTH_PERIOD_2}, {FTH_BYTES_3, FTH_PERIOD_3}, {FTH_BYTES_4, FTH_PERIOD_4}}), context(context_), log(&Logger::get("RegionTable")) +{ +} + +void RegionTable::restore(std::function region_fetcher) { Poco::File dir(parent_path + "tables/"); if (!dir.exists()) diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 1461a5aa5fc..78655fbb10a 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -154,7 +154,9 @@ class RegionTable : private boost::noncopyable void flushRegion(TableID table_id, RegionID partition_id, size_t & rest_cache_size); public: - RegionTable(Context & context_, const std::string & parent_path_, std::function region_fetcher); + RegionTable(Context & context_, const std::string & parent_path_); + void restore(std::function region_fetcher); + void setFlushThresholds(const FlushThresholds::FlushThresholdsData & flush_thresholds_); /// After the region is updated (insert or delete KVs). diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index 5ec91fbfe5e..f718ad322cb 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -8,7 +8,7 @@ namespace DB TMTContext::TMTContext(Context & context, std::vector addrs) : kvstore(std::make_shared(context.getPath() + "kvstore/")), - region_table(context, context.getPath() + "regmap/", std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1)), + region_table(context, context.getPath() + "regmap/"), schema_syncer(std::make_shared()), pd_client(addrs.size() == 0 ? static_cast(new pingcap::pd::MockPDClient()) : static_cast(new pingcap::pd::Client(addrs))), @@ -18,6 +18,7 @@ TMTContext::TMTContext(Context & context, std::vector addrs) kvstore->restore([&](pingcap::kv::RegionVerID id) -> pingcap::kv::RegionClientPtr { return this->createRegionClient(id); }, ®ions_to_remove); + region_table.restore(std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1)); for (RegionID id : regions_to_remove) kvstore->removeRegion(id, &context); regions_to_remove.clear();