Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed incompatibility when versions prior to 18.12.17 are used on remote servers and newer is used on initiating server, and GROUP BY both fixed and non-fixed keys, and when two-level group by method is activated #3254

Merged
merged 3 commits into from
Oct 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dbms/src/Client/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,14 @@ void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64
revision = server_revision;
}

UInt64 Connection::getServerRevision()
{
if (!connected)
connect();

return server_revision;
}

const String & Connection::getServerTimezone()
{
if (!connected)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Client/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class Connection : private boost::noncopyable
void setDefaultDatabase(const String & database);

void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & version_patch, UInt64 & revision);
UInt64 getServerRevision();

const String & getServerTimezone();
const String & getServerDisplayName();
Expand Down
8 changes: 2 additions & 6 deletions dbms/src/Client/ConnectionPoolWithFailover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,9 @@ ConnectionPoolWithFailover::tryGetEntry(
{
result.entry = pool.get(settings, /* force_connected = */ false);

String server_name;
UInt64 server_version_major;
UInt64 server_version_minor;
UInt64 server_version_patch;
UInt64 server_revision;
UInt64 server_revision = 0;
if (table_to_check)
result.entry->getServerVersion(server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
server_revision = result.entry->getServerRevision();

if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
{
Expand Down
36 changes: 22 additions & 14 deletions dbms/src/Client/MultiplexedConnections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,28 +87,36 @@ void MultiplexedConnections::sendQuery(
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);

if (replica_states.size() > 1)
Settings modified_settings = settings;

for (auto & replica : replica_states)
{
Settings query_settings = settings;
query_settings.parallel_replicas_count = replica_states.size();
if (!replica.connection)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);

for (size_t i = 0; i < replica_states.size(); ++i)
if (replica.connection->getServerRevision() < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
{
Connection * connection = replica_states[i].connection;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
/// Disable two-level aggregation due to version incompatibility.
modified_settings.group_by_two_level_threshold = 0;
modified_settings.group_by_two_level_threshold_bytes = 0;
}
}

query_settings.parallel_replica_offset = i;
connection->sendQuery(query, query_id, stage, &query_settings, client_info, with_pending_data);
size_t num_replicas = replica_states.size();
if (num_replicas > 1)
{
/// Use multiple replicas for parallel query processing.
modified_settings.parallel_replicas_count = num_replicas;
for (size_t i = 0; i < num_replicas; ++i)
{
modified_settings.parallel_replica_offset = i;
replica_states[i].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
}
}
else
{
Connection * connection = replica_states[0].connection;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);

connection->sendQuery(query, query_id, stage, &settings, client_info, with_pending_data);
/// Use single replica.
replica_states[0].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
}

sent_query = true;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
#define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372
#define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401
#define DBMS_MIN_REVISION_WITH_SERVER_LOGS 54406
/// Minimum revision with exactly the same set of aggregation methods and rules to select them.
/// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules
/// (keys will be placed in different buckets and result will not be fully aggregated).
#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54408

/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 54226
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2085,7 +2085,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV

/** `minus one` means the absence of information about the bucket
* - in the case of single-level aggregation, as well as for blocks with "overflowing" values.
* If there is at least one block with a bucket number greater than zero, then there was a two-level aggregation.
* If there is at least one block with a bucket number greater or equal than zero, then there was a two-level aggregation.
*/
auto max_bucket = bucket_to_blocks.rbegin()->first;
size_t has_two_level = max_bucket >= 0;
Expand Down