-
Notifications
You must be signed in to change notification settings - Fork 7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISSUES-2581 fix SELECT with sequential consistency #2863
Conversation
@@ -403,6 +403,14 @@ class StorageReplicatedMergeTree : public ext::shared_ptr_helper<StorageReplicat | |||
*/ | |||
bool queueTask(); | |||
|
|||
std::unordered_map<String, Int64> getMaxBlocksWithQuorum(const std::unordered_map<String, String> & last_parts); | |||
|
|||
/// Get parts, wich were wrote with quorum. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
written
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't understand what does this function return.
std::unordered_map<String, String> getLastPartsWithQuorum(const String & last_parts_str); | ||
|
||
/// Rewrite last parts with quorum | ||
String rewriteLastParts(const String & old_last_parts, const String & new_part_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need more descriptive comment.
Is it about a data in ZooKeeper? (no)
@@ -403,6 +403,14 @@ class StorageReplicatedMergeTree : public ext::shared_ptr_helper<StorageReplicat | |||
*/ | |||
bool queueTask(); | |||
|
|||
std::unordered_map<String, Int64> getMaxBlocksWithQuorum(const std::unordered_map<String, String> & last_parts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing comment. Is the return value keyed by partition id?
|
||
if (last_part.empty()) /// If no part has been written with quorum. | ||
if (last_parts.empty()) /// If no part has been written with quorum. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
@@ -137,11 +137,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( | |||
const Context & context, | |||
const size_t max_block_size, | |||
const unsigned num_streams, | |||
Int64 max_block_number_to_read) const | |||
const std::unordered_map<String, Int64> & max_blocks_number_to_read) const |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't typedef (using)?
@@ -27,7 +27,7 @@ class StorageFromMergeTreeDataPart : public ext::shared_ptr_helper<StorageFromMe | |||
unsigned num_streams) override | |||
{ | |||
return MergeTreeDataSelectExecutor(part->storage).readFromParts( | |||
{part}, column_names, query_info, context, max_block_size, num_streams, 0); | |||
{part}, column_names, query_info, context, max_block_size, num_streams, std::unordered_map<String, Int64>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, You can write {} as a function argument.
@@ -115,7 +115,7 @@ BlockInputStreams StorageMergeTree::read( | |||
const unsigned num_streams) | |||
{ | |||
checkQueryProcessingStage(processed_stage, context); | |||
return reader.read(column_names, query_info, context, max_block_size, num_streams, 0); | |||
return reader.read(column_names, query_info, context, max_block_size, num_streams, std::unordered_map<String, Int64>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some cosmetic changes are necessary. Also missing tests.
added_parts[partition_id] = part_name; | ||
} | ||
} | ||
catch (Exception e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception used in non-exceptional situation (also, error code is not checked and exception is caught by value). Better use checkString
function from IO/ReadHelpers.h
{ | ||
try | ||
{ | ||
in >> "parts_count " >> count >> "\n"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have two serialization versions of the same struct, better have an explicit version string (see e.g. MergeTreeDataPartChecksums
class.
} | ||
|
||
|
||
String StorageReplicatedMergeTree::rewriteAddedParts(const String & old_added_parts_str, const String & new_part_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to split logic responsible for serialization of the struct and logic responsible for its update. That is, there should be a generic write() method.
} | ||
|
||
|
||
PartsNames StorageReplicatedMergeTree::getAddedPartsWithQuorum(const String & parts_str) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be called read...
or parse...
, not get...
@@ -403,6 +403,20 @@ class StorageReplicatedMergeTree : public ext::shared_ptr_helper<StorageReplicat | |||
*/ | |||
bool queueTask(); | |||
|
|||
using MaxAddedBlocks = std::unordered_map<String, Int64>; | |||
using PartsNames = std::unordered_map<String, String>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No information about the meaning of keys. Also, MaxAddedBlocks
is too specific. I would call these PartitionToBlockNum
and PartitionToPartName
.
auto part_info = MergeTreePartInfo::fromPartName(last_part, data.format_version); | ||
max_block_number_to_read = part_info.max_block; | ||
auto added_parts = getAddedPartsWithQuorum(added_parts_str); | ||
for (auto & added_part : added_parts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why non-const reference?
|
||
/// Rewrite data with inserted partiton with quorum. | ||
/// This is added new partition name in data for ZooKeeper. | ||
String rewriteAddedParts(const String & old_added_parts_str, const String & new_added_part_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these methods deserve to be separated into another class. StorageReplicatedMergeTree is already overloaded.
@@ -267,7 +267,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts( | |||
part->minmax_idx.parallelogram, data.minmax_idx_column_types)) | |||
continue; | |||
|
|||
if (max_block_number_to_read && part->info.max_block > max_block_number_to_read) | |||
if (!max_blocks_number_to_read.empty() && max_blocks_number_to_read.find(part->info.partition_id) != max_blocks_number_to_read.end() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
empty()
check is unnecessary. The call to at()
is unnecessary too (can use the results of find()
).
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en