Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow empty parts as a result of a merge #2815

Merged
merged 4 commits into from
Aug 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 28 additions & 43 deletions dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,60 +40,45 @@ void CollapsingSortedBlockInputStream::reportIncorrectData()
}


void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_columns, size_t & merged_rows, bool last_in_stream)
void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_columns, size_t & merged_rows)
{
if (count_positive == 0 && count_negative == 0)
{
/// No input rows have been read.
return;
}

if (count_positive == count_negative && !last_is_positive)
{
/// If all the rows in the input streams was collapsed, we still want to give at least one block in the result.
if (last_in_stream && merged_rows == 0 && !blocks_written)
{
LOG_INFO(log, "All rows collapsed");
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*last_negative.columns)[i], last_negative.row_num);

if (out_row_sources_buf)
{
/// true flag value means "skip row"
current_row_sources[last_positive_pos].setSkipFlag(false);
current_row_sources[last_negative_pos].setSkipFlag(false);
}
}
/// Input rows exactly cancel out.
return;
}
else

if (count_positive <= count_negative)
{
if (count_positive <= count_negative)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num);
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num);

if (out_row_sources_buf)
current_row_sources[first_negative_pos].setSkipFlag(false);
}
if (out_row_sources_buf)
current_row_sources[first_negative_pos].setSkipFlag(false);
}

if (count_positive >= count_negative)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
if (count_positive >= count_negative)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);

if (out_row_sources_buf)
current_row_sources[last_positive_pos].setSkipFlag(false);
}
if (out_row_sources_buf)
current_row_sources[last_positive_pos].setSkipFlag(false);
}

if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
{
if (count_incorrect_data < MAX_ERROR_MESSAGES)
reportIncorrectData();
++count_incorrect_data;
}
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
{
if (count_incorrect_data < MAX_ERROR_MESSAGES)
reportIncorrectData();
++count_incorrect_data;
}

if (out_row_sources_buf)
Expand Down Expand Up @@ -211,7 +196,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
}

/// Write data for last primary key.
insertRows(merged_columns, merged_rows, true);
insertRows(merged_columns, merged_rows);

finished = true;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/CollapsingSortedBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);

/// Output to result rows for the current primary key.
void insertRows(MutableColumns & merged_columns, size_t & merged_rows, bool last_in_stream = false);
void insertRows(MutableColumns & merged_columns, size_t & merged_rows);

void reportIncorrectData();
};
Expand Down
13 changes: 6 additions & 7 deletions dbms/src/DataStreams/SummingSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
}


void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion)
void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns)
{
for (auto & desc : columns_to_aggregate)
{
Expand Down Expand Up @@ -237,9 +237,9 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & me
desc.merged_column->insertDefault();
}

/// If it is "zero" row and it is not the last row of the result block, then
/// rollback the insertion (at this moment we need rollback only cols from columns_to_aggregate)
if (!force_insertion && current_row_is_zero)
/// If it is "zero" row, then rollback the insertion
/// (at this moment we need rollback only cols from columns_to_aggregate)
if (current_row_is_zero)
{
for (auto & desc : columns_to_aggregate)
desc.merged_column->popBack(1);
Expand All @@ -252,7 +252,6 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & me

/// Update per-block and per-group flags
++merged_rows;
output_is_non_empty = true;
}


Expand Down Expand Up @@ -333,7 +332,7 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
{
if (!current_key.empty())
/// Write the data for the previous group.
insertCurrentRowIfNeeded(merged_columns, false);
insertCurrentRowIfNeeded(merged_columns);

if (merged_rows >= max_block_size)
{
Expand Down Expand Up @@ -393,7 +392,7 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::

/// We will write the data for the last group, if it is non-zero.
/// If it is zero, and without it the output stream will be empty, we will write it anyway.
insertCurrentRowIfNeeded(merged_columns, !output_is_non_empty);
insertCurrentRowIfNeeded(merged_columns);
finished = true;
}

Expand Down
4 changes: 1 addition & 3 deletions dbms/src/DataStreams/SummingSortedBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ class SummingSortedBlockInputStream : public MergingSortedBlockInputStream
Row current_row;
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.

bool output_is_non_empty = false; /// Have we given out at least one row as a result.
size_t merged_rows = 0; /// Number of rows merged into current result block

/** We support two different cursors - with Collation and without.
Expand All @@ -143,8 +142,7 @@ class SummingSortedBlockInputStream : public MergingSortedBlockInputStream
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);

/// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero".
/// If force_insertion=true, then the row will be inserted even if it is "zero"
void insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion);
void insertCurrentRowIfNeeded(MutableColumns & merged_columns);

/// Returns true if merge result is not empty
bool mergeMap(const MapDescription & map, Row & row, SortCursor & cursor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ namespace ErrorCodes

VersionedCollapsingSortedBlockInputStream::VersionedCollapsingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_,
const String & sign_column_, size_t max_block_size_,
WriteBuffer * out_row_sources_buf_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2)
, current_keys(max_rows_in_queue + 1), can_collapse_all_rows(can_collapse_all_rows_)
, current_keys(max_rows_in_queue + 1)
{
sign_column_number = header.getPositionByName(sign_column_);
}
Expand Down Expand Up @@ -130,10 +130,7 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co
{
update_queue(current);

/// If all the rows was collapsed, we still want to give at least one block in the result.
/// If queue is empty then don't collapse two last rows.
if (sign == sign_in_queue || (!can_collapse_all_rows && blocks_written == 0
&& merged_rows == 0 && queue.empty() && current_keys.size() == 1))
if (sign == sign_in_queue)
current_keys.pushBack(next_key);
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class VersionedCollapsingSortedBlockInputStream : public MergingSortedBlockInput
/// max_rows_in_queue should be about max_block_size_ if we won't store a lot of extra blocks (RowRef holds SharedBlockPtr).
VersionedCollapsingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_,
const String & sign_column_, size_t max_block_size_,
WriteBuffer * out_row_sources_buf_ = nullptr);

String getName() const override { return "VersionedCollapsingSorted"; }
Expand All @@ -203,8 +203,6 @@ class VersionedCollapsingSortedBlockInputStream : public MergingSortedBlockInput
/// Sources of rows for VERTICAL merge algorithm. Size equals to (size + number of gaps) in current_keys.
std::queue<RowSourcePart> current_row_sources;

const bool can_collapse_all_rows;

void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);

/// Output to result row for the current primary key.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor

case MergeTreeData::MergingParams::VersionedCollapsing:
merged_stream = std::make_unique<VersionedCollapsingSortedBlockInputStream>(
src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, false, rows_sources_write_buf.get());
src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get());
break;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal

case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream
merged = std::make_shared<VersionedCollapsingSortedBlockInputStream>(
to_merge, sort_description, data.merging_params.sign_column, max_block_size, true);
to_merge, sort_description, data.merging_params.sign_column, max_block_size);
break;

case MergeTreeData::MergingParams::Graphite:
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->partition.store(storage, part_path, checksums);
if (new_part->minmax_idx.initialized)
new_part->minmax_idx.store(storage, part_path, checksums);
else if (rows_count)
throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR);

WriteBufferFromFile count_out(part_path + "count.txt", 4096);
HashingWriteBuffer count_out_hashing(count_out);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
2015-01-01 1 0
2015-01-01 2 -9
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
table with 2 blocks final
table with 2 blocks optimized
2018-01-31 str_8 0 -1
2018-01-31 str_9 0 1
-------------------------
table with 2 blocks final
2018-01-31 str_0 0 -1
Expand Down Expand Up @@ -48,8 +46,6 @@ table with 2 blocks optimized
-------------------------
table with 2 blocks final
table with 2 blocks optimized
2018-01-31 str_9 0 1
2018-01-31 str_9 0 -1
-------------------------
table with 2 blocks final
2018-01-31 str_0 0 -1
Expand Down Expand Up @@ -96,8 +92,6 @@ table with 2 blocks optimized
-------------------------
table with 4 blocks final
table with 4 blocks optimized
2018-01-31 str_9 0 1
2018-01-31 str_9 0 -1
-------------------------
table with 5 blocks final
2018-01-31 str_0 1 -1
Expand All @@ -124,8 +118,6 @@ table with 5 blocks optimized
-------------------------
table with 2 blocks final
table with 2 blocks optimized
2018-01-31 str_999999 0 1
2018-01-31 str_999999 0 -1
-------------------------
table with 2 blocks final
2018-01-31 0 0 1
Expand Down Expand Up @@ -261,15 +253,11 @@ table with 2 blocks final
2018-01-31 126 0 1
2018-01-31 127 0 1
table with 2 blocks optimized
2018-01-31 0 0 -1
2018-01-31 127 0 1
-------------------------
Vertival merge
-------------------------
table with 2 blocks final
table with 2 blocks optimized
2018-01-31 str_8 0 -1
2018-01-31 str_9 0 1
-------------------------
table with 2 blocks final
2018-01-31 str_0 0 -1
Expand Down Expand Up @@ -316,8 +304,6 @@ table with 2 blocks optimized
-------------------------
table with 2 blocks final
table with 2 blocks optimized
2018-01-31 str_9 0 1
2018-01-31 str_9 0 -1
-------------------------
table with 2 blocks final
2018-01-31 str_0 0 -1
Expand Down Expand Up @@ -364,8 +350,6 @@ table with 2 blocks optimized
-------------------------
table with 4 blocks final
table with 4 blocks optimized
2018-01-31 str_9 0 1
2018-01-31 str_9 0 -1
-------------------------
table with 5 blocks final
2018-01-31 str_0 1 -1
Expand All @@ -392,8 +376,6 @@ table with 5 blocks optimized
-------------------------
table with 2 blocks final
table with 2 blocks optimized
2018-01-31 str_999999 0 1
2018-01-31 str_999999 0 -1
-------------------------
table with 2 blocks final
2018-01-31 0 0 1
Expand Down Expand Up @@ -529,5 +511,3 @@ table with 2 blocks final
2018-01-31 126 0 1
2018-01-31 127 0 1
table with 2 blocks optimized
2018-01-31 0 0 -1
2018-01-31 127 0 1
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh

. $CURDIR/00652_mergetree_mutations.lib
. $CURDIR/mergetree_mutations.lib

${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.mutations"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh

. $CURDIR/00652_mergetree_mutations.lib
. $CURDIR/mergetree_mutations.lib

${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.mutations_r1"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.mutations_r2"
Expand Down
Empty file.
Loading