Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved efficiency with a huge number of MergeTree tables. #2609

Merged
merged 10 commits into from
Dec 14, 2018
2 changes: 0 additions & 2 deletions dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096

#define DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS 7500

#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
/// Maximum namber of http-connections between two endpoints
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Interpreters/Cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ inline bool isLocal(const Cluster::Address & address, const Poco::Net::SocketAdd

/// Implementation of Cluster::Address class

Cluster::Address::Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix)
Cluster::Address::Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
UInt16 clickhouse_port = static_cast<UInt16>(config.getInt("tcp_port", 0));

Expand Down Expand Up @@ -125,7 +125,7 @@ String Cluster::Address::toStringFull() const

/// Implementation of Clusters class

Clusters::Clusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
Clusters::Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
{
updateClusters(config, settings, config_name);
}
Expand All @@ -147,7 +147,7 @@ void Clusters::setCluster(const String & cluster_name, const std::shared_ptr<Clu
}


void Clusters::updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
void Clusters::updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_name, config_keys);
Expand All @@ -174,7 +174,7 @@ Clusters::Impl Clusters::getContainer() const

/// Implementation of `Cluster` class

Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name)
Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(cluster_name, config_keys);
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Interpreters/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace DB
class Cluster
{
public:
Cluster(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name);
Cluster(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name);

/// Construct a cluster by the names of shards and replicas.
/// Local are treated as well as remote ones if treat_local_as_remote is true.
Expand Down Expand Up @@ -66,7 +66,7 @@ class Cluster
Protocol::Secure secure = Protocol::Secure::Disable;

Address() = default;
Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix);
Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
Address(const String & host_port_, const String & user_, const String & password_, UInt16 clickhouse_port);

/// Returns 'escaped_host_name:port'
Expand Down Expand Up @@ -178,15 +178,15 @@ using ClusterPtr = std::shared_ptr<Cluster>;
class Clusters
{
public:
Clusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers");
Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers");

Clusters(const Clusters &) = delete;
Clusters & operator=(const Clusters &) = delete;

ClusterPtr getCluster(const std::string & cluster_name) const;
void setCluster(const String & cluster_name, const ClusterPtr & cluster);

void updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name);
void updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name);

public:
using Impl = std::map<String, ClusterPtr>;
Expand Down
79 changes: 17 additions & 62 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ struct ContextShared
public:
size_t operator()(const Context::SessionKey & key) const
{
size_t seed = 0;
boost::hash_combine(seed, key.first);
boost::hash_combine(seed, key.second);
return seed;
SipHash hash;
hash.update(key.first);
hash.update(key.second);
return hash.get64();
}
};

Expand Down Expand Up @@ -549,7 +549,7 @@ void Context::setConfig(const ConfigurationPtr & config)
shared->config = config;
}

Poco::Util::AbstractConfiguration & Context::getConfigRef() const
const Poco::Util::AbstractConfiguration & Context::getConfigRef() const
{
auto lock = getLock();
return shared->config ? *shared->config : Poco::Util::Application::instance().config();
Expand Down Expand Up @@ -1537,95 +1537,50 @@ Compiler & Context::getCompiler()
void Context::initializeSystemLogs()
{
auto lock = getLock();
system_logs = std::make_shared<SystemLogs>();

if (!global_context)
throw Exception("Logical error: no global context for system logs", ErrorCodes::LOGICAL_ERROR);

system_logs = std::make_shared<SystemLogs>(*global_context, getConfigRef());
}


QueryLog * Context::getQueryLog(bool create_if_not_exists)
QueryLog * Context::getQueryLog()
{
auto lock = getLock();

if (!system_logs)
if (!system_logs || !system_logs->query_log)
return nullptr;

if (!system_logs->query_log)
{
if (!create_if_not_exists)
return nullptr;

if (shared->shutdown_called)
throw Exception("Logical error: query log should be destroyed before tables shutdown", ErrorCodes::LOGICAL_ERROR);

if (!global_context)
throw Exception("Logical error: no global context for query log", ErrorCodes::LOGICAL_ERROR);

system_logs->query_log = createDefaultSystemLog<QueryLog>(*global_context, "system", "query_log", getConfigRef(), "query_log");
}

return system_logs->query_log.get();
}


QueryThreadLog * Context::getQueryThreadLog(bool create_if_not_exists)
QueryThreadLog * Context::getQueryThreadLog()
{
auto lock = getLock();

if (!system_logs)
if (!system_logs || !system_logs->query_thread_log)
return nullptr;

if (!system_logs->query_thread_log)
{
if (!create_if_not_exists)
return nullptr;

if (shared->shutdown_called)
throw Exception("Logical error: query log should be destroyed before tables shutdown", ErrorCodes::LOGICAL_ERROR);

if (!global_context)
throw Exception("Logical error: no global context for query thread log", ErrorCodes::LOGICAL_ERROR);

system_logs->query_thread_log = createDefaultSystemLog<QueryThreadLog>(
*global_context, "system", "query_thread_log", getConfigRef(), "query_thread_log");
}

return system_logs->query_thread_log.get();
}


PartLog * Context::getPartLog(const String & part_database, bool create_if_not_exists)
PartLog * Context::getPartLog(const String & part_database)
{
auto lock = getLock();

auto & config = getConfigRef();
if (!config.has("part_log"))
return nullptr;

/// System logs are shutting down.
if (!system_logs)
if (!system_logs || !system_logs->part_log)
return nullptr;

String database = config.getString("part_log.database", "system");

/// Will not log operations on system tables (including part_log itself).
/// It doesn't make sense and not allow to destruct PartLog correctly due to infinite logging and flushing,
/// and also make troubles on startup.
if (!part_database.empty() && part_database == database)
if (part_database == system_logs->part_log_database)
return nullptr;

if (!system_logs->part_log)
{
if (!create_if_not_exists)
return nullptr;

if (shared->shutdown_called)
throw Exception("Logical error: part log should be destroyed before tables shutdown", ErrorCodes::LOGICAL_ERROR);

if (!global_context)
throw Exception("Logical error: no global context for part log", ErrorCodes::LOGICAL_ERROR);

system_logs->part_log = createDefaultSystemLog<PartLog>(*global_context, "system", "part_log", getConfigRef(), "part_log");
}

return system_logs->part_log.get();
}

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class Context

/// Global application configuration settings.
void setConfig(const ConfigurationPtr & config);
Poco::Util::AbstractConfiguration & getConfigRef() const;
const Poco::Util::AbstractConfiguration & getConfigRef() const;

/** Take the list of users, quotas and configuration profiles from this config.
* The list of users is completely replaced.
Expand Down Expand Up @@ -389,12 +389,12 @@ class Context
void initializeSystemLogs();

/// Nullptr if the query log is not ready for this moment.
QueryLog * getQueryLog(bool create_if_not_exists = true);
QueryThreadLog * getQueryThreadLog(bool create_if_not_exists = true);
QueryLog * getQueryLog();
QueryThreadLog * getQueryThreadLog();

/// Returns an object used to log opertaions with parts if it possible.
/// Provide table name to make required cheks.
PartLog * getPartLog(const String & part_database, bool create_if_not_exists = true);
PartLog * getPartLog(const String & part_database);

const MergeTreeSettings & getMergeTreeSettings() const;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/ISecurityManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ISecurityManager
public:
using UserPtr = std::shared_ptr<const User>;

virtual void loadFromConfig(Poco::Util::AbstractConfiguration & config) = 0;
virtual void loadFromConfig(const Poco::Util::AbstractConfiguration & config) = 0;

/// Find user and make authorize checks
virtual UserPtr authorizeAndGetUser(
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ BlockIO InterpreterSystemQuery::execute()
break;
case Type::FLUSH_LOGS:
executeCommandsAndThrowIfError(
[&] () { if (auto query_log = context.getQueryLog(false)) query_log->flush(); },
[&] () { if (auto part_log = context.getPartLog("", false)) part_log->flush(); },
[&] () { if (auto query_thread_log = context.getQueryThreadLog(false)) query_thread_log->flush(); }
[&] () { if (auto query_log = context.getQueryLog()) query_log->flush(); },
[&] () { if (auto part_log = context.getPartLog("")) part_log->flush(); },
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); }
);
break;
case Type::STOP_LISTEN_QUERIES:
Expand Down
15 changes: 8 additions & 7 deletions dbms/src/Interpreters/Quota.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace ErrorCodes


template <typename Counter>
void QuotaValues<Counter>::initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
void QuotaValues<Counter>::initFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config)
{
queries = config.getUInt64(config_elem + ".queries", 0);
errors = config.getUInt64(config_elem + ".errors", 0);
Expand All @@ -34,11 +34,12 @@ void QuotaValues<Counter>::initFromConfig(const String & config_elem, Poco::Util
execution_time_usec = config.getUInt64(config_elem + ".execution_time", 0) * 1000000ULL;
}

template void QuotaValues<size_t>::initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config);
template void QuotaValues<std::atomic<size_t>>::initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config);
template void QuotaValues<size_t>::initFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config);
template void QuotaValues<std::atomic<size_t>>::initFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config);


void QuotaForInterval::initFromConfig(const String & config_elem, time_t duration_, bool randomize_, time_t offset_, Poco::Util::AbstractConfiguration & config)
void QuotaForInterval::initFromConfig(
const String & config_elem, time_t duration_, bool randomize_, time_t offset_, const Poco::Util::AbstractConfiguration & config)
{
rounded_time.store(0, std::memory_order_relaxed);
duration = duration_;
Expand Down Expand Up @@ -160,7 +161,7 @@ void QuotaForInterval::check(
}


void QuotaForIntervals::initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config, pcg64 & rng)
void QuotaForIntervals::initFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, pcg64 & rng)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_elem, config_keys);
Expand Down Expand Up @@ -251,7 +252,7 @@ String QuotaForIntervals::toString() const
}


void Quota::loadFromConfig(const String & config_elem, const String & name_, Poco::Util::AbstractConfiguration & config, pcg64 & rng)
void Quota::loadFromConfig(const String & config_elem, const String & name_, const Poco::Util::AbstractConfiguration & config, pcg64 & rng)
{
name = name_;

Expand Down Expand Up @@ -307,7 +308,7 @@ QuotaForIntervalsPtr Quota::get(const String & quota_key, const String & user_na
}


void Quotas::loadFromConfig(Poco::Util::AbstractConfiguration & config)
void Quotas::loadFromConfig(const Poco::Util::AbstractConfiguration & config)
{
pcg64 rng;

Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Interpreters/Quota.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ struct QuotaValues
tuple() = std::make_tuple(0, 0, 0, 0, 0, 0, 0);
}

void initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config);
void initFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config);

bool operator== (const QuotaValues & rhs) const
{
Expand Down Expand Up @@ -109,7 +109,7 @@ struct QuotaForInterval
QuotaForInterval() = default;
QuotaForInterval(time_t duration_) : duration(duration_) {}

void initFromConfig(const String & config_elem, time_t duration_, bool randomize_, time_t offset_, Poco::Util::AbstractConfiguration & config);
void initFromConfig(const String & config_elem, time_t duration_, bool randomize_, time_t offset_, const Poco::Util::AbstractConfiguration & config);

/// Increase current value.
void addQuery() noexcept;
Expand Down Expand Up @@ -191,7 +191,7 @@ class QuotaForIntervals
return cont.empty();
}

void initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config, pcg64 & rng);
void initFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, pcg64 & rng);

/// Set maximum values (limits) from passed argument.
/// Remove intervals that does not exist in argument. Add intervals from argument, that we don't have.
Expand Down Expand Up @@ -241,7 +241,7 @@ struct Quota

bool keyed_by_ip = false;

void loadFromConfig(const String & config_elem, const String & name_, Poco::Util::AbstractConfiguration & config, pcg64 & rng);
void loadFromConfig(const String & config_elem, const String & name_, const Poco::Util::AbstractConfiguration & config, pcg64 & rng);
QuotaForIntervalsPtr get(const String & quota_key, const String & user_name, const Poco::Net::IPAddress & ip);
};

Expand All @@ -254,7 +254,7 @@ class Quotas
Container cont;

public:
void loadFromConfig(Poco::Util::AbstractConfiguration & config);
void loadFromConfig(const Poco::Util::AbstractConfiguration & config);
QuotaForIntervalsPtr get(const String & name, const String & quota_key,
const String & user_name, const Poco::Net::IPAddress & ip);
};
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/SecurityManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace ErrorCodes

using UserPtr = SecurityManager::UserPtr;

void SecurityManager::loadFromConfig(Poco::Util::AbstractConfiguration & config)
void SecurityManager::loadFromConfig(const Poco::Util::AbstractConfiguration & config)
{
Container new_users;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/SecurityManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class SecurityManager : public ISecurityManager
Container users;

public:
void loadFromConfig(Poco::Util::AbstractConfiguration & config) override;
void loadFromConfig(const Poco::Util::AbstractConfiguration & config) override;

UserPtr authorizeAndGetUser(
const String & user_name,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ bool Settings::tryGet(const String & name, String & value) const
/** Set the settings from the profile (in the server configuration, many settings can be listed in one profile).
* The profile can also be set using the `set` functions, like the `profile` setting.
*/
void Settings::setProfile(const String & profile_name, Poco::Util::AbstractConfiguration & config)
void Settings::setProfile(const String & profile_name, const Poco::Util::AbstractConfiguration & config)
{
String elem = "profiles." + profile_name;

Expand Down
Loading