Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* Backport ClickHouse#37978 to 22.3: Fix reading of sparse columns from s3

* Merge pull request ClickHouse#38239 from CurtizJ/fix-reading-from-s3

Fix reading from s3 in some corner cases

Co-authored-by: robot-clickhouse <[email protected]>
Co-authored-by: Nikita Taranov <[email protected]>
Co-authored-by: Kruglov Pavel <[email protected]>
  • Loading branch information
4 people authored Aug 3, 2022
1 parent 2a83062 commit 25886f5
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 56 deletions.
107 changes: 51 additions & 56 deletions src/Storages/MergeTree/MergeTreeReaderStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,63 +125,58 @@ size_t MergeTreeReaderStream::getRightOffset(size_t right_mark_non_included)
size_t result_right_offset;
if (0 < right_mark_non_included && right_mark_non_included < marks_count)
{
auto right_mark = marks_loader.getMark(right_mark_non_included);
result_right_offset = right_mark.offset_in_compressed_file;

bool need_to_check_marks_from_the_right = false;

/// If the end of range is inside the block, we will need to read it too.
if (right_mark.offset_in_decompressed_block > 0)
{
need_to_check_marks_from_the_right = true;
}
else if (is_low_cardinality_dictionary)
{
/// Also, in LowCardinality dictionary several consecutive marks can point to
/// the same offset. So to get true bytes offset we have to get first
/// non-equal mark.
/// Example:
/// Mark 186, points to [2003111, 0]
/// Mark 187, points to [2003111, 0]
/// Mark 188, points to [2003111, 0] <--- for example need to read until 188
/// Mark 189, points to [2003111, 0] <--- not suitable, because have same offset
/// Mark 190, points to [2003111, 0]
/// Mark 191, points to [2003111, 0]
/// Mark 192, points to [2081424, 0] <--- what we are looking for
/// Mark 193, points to [2081424, 0]
/// Mark 194, points to [2081424, 0]

/// Also, in some cases, when one granule is not-atomically written (which is possible at merges)
/// one granule may require reading of two dictionaries which starts from different marks.
/// The only correct way is to take offset from at least next different granule from the right one.

/// Check test_s3_low_cardinality_right_border.

need_to_check_marks_from_the_right = true;
}


/// Let's go to the right and find mark with bigger offset in compressed file
if (need_to_check_marks_from_the_right)
{
bool found_bigger_mark = false;
for (size_t i = right_mark_non_included + 1; i < marks_count; ++i)
/// Find the right border of the last mark we need to read.
/// To do that let's find the upper bound of the offset of the last
/// included mark.

/// In LowCardinality dictionary and in values of Sparse columns
/// several consecutive marks can point to the same offset.
///
/// Example:
/// Mark 186, points to [2003111, 0]
/// Mark 187, points to [2003111, 0]
/// Mark 188, points to [2003111, 0] <--- for example need to read until 188
/// Mark 189, points to [2003111, 0] <--- not suitable, because have same offset
/// Mark 190, points to [2003111, 0]
/// Mark 191, points to [2003111, 0]
/// Mark 192, points to [2081424, 0] <--- what we are looking for
/// Mark 193, points to [2081424, 0]
/// Mark 194, points to [2081424, 0]

/// Also, in some cases, when one granule is not-atomically written (which is possible at merges)
/// one granule may require reading of two dictionaries which starts from different marks.
/// The only correct way is to take offset from at least next different granule from the right one.
/// So, that's why we have to read one extra granule to the right,
/// while reading dictionary of LowCardinality.

/// If right_mark_non_included has non-zero offset in decompressed block, we have to
/// read its compressed block in a whole, because it may consist data from previous granule.
///
/// For example:
/// Mark 10: (758287, 0) <--- right_mark_included
/// Mark 11: (908457, 53477) <--- right_mark_non_included
/// Mark 12: (1064746, 20742) <--- what we are looking for
/// Mark 13: (2009333, 40123)
///
/// Since mark 11 starts from offset in decompressed block 53477,
/// it has some data from mark 10 and we have to read
/// compressed block [908457; 1064746 in a whole.

size_t right_mark_included = right_mark_non_included - 1;
if (is_low_cardinality_dictionary || marks_loader.getMark(right_mark_non_included).offset_in_decompressed_block != 0)
++right_mark_included;

auto indices = collections::range(right_mark_included, marks_count);
auto it = std::upper_bound(indices.begin(), indices.end(), right_mark_included,
[&](auto lhs, auto rhs)
{
const auto & candidate_mark = marks_loader.getMark(i);
if (result_right_offset < candidate_mark.offset_in_compressed_file)
{
result_right_offset = candidate_mark.offset_in_compressed_file;
found_bigger_mark = true;
break;
}
}

if (!found_bigger_mark)
{
/// If there are no marks after the end of range, just use file size
result_right_offset = file_size;
}
}
return marks_loader.getMark(lhs).offset_in_compressed_file < marks_loader.getMark(rhs).offset_in_compressed_file;
});

if (it != indices.end())
result_right_offset = marks_loader.getMark(*it).offset_in_compressed_file;
else
result_right_offset = file_size;
}
else if (right_mark_non_included == 0)
result_right_offset = marks_loader.getMark(right_mark_non_included).offset_in_compressed_file;
Expand Down
2 changes: 2 additions & 0 deletions tests/queries/0_stateless/02336_sparse_columns_s3.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Sparse
50787
42 changes: 42 additions & 0 deletions tests/queries/0_stateless/02336_sparse_columns_s3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- Tags: no-parallel, no-fasttest, no-s3-storage

DROP TABLE IF EXISTS t_sparse_s3;

CREATE TABLE t_sparse_s3 (id UInt32, cond UInt8, s String)
engine = MergeTree ORDER BY id
settings ratio_of_defaults_for_sparse_serialization = 0.01, storage_policy = 's3_cache',
min_bytes_for_wide_part = 0, min_compress_block_size = 1;

INSERT INTO t_sparse_s3 SELECT 1, number % 2, '' FROM numbers(8192);
INSERT INTO t_sparse_s3 SELECT 2, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 3, number % 2, '' FROM numbers(8192);
INSERT INTO t_sparse_s3 SELECT 4, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 5, number % 2, '' FROM numbers(8192);
INSERT INTO t_sparse_s3 SELECT 6, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 7, number % 2, '' FROM numbers(8192);
INSERT INTO t_sparse_s3 SELECT 8, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 9, number % 2, '' FROM numbers(8192);
INSERT INTO t_sparse_s3 SELECT 10, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 11, number % 2, '' FROM numbers(8000);
INSERT INTO t_sparse_s3 SELECT 12, number % 2, 'foo' FROM numbers(192);
INSERT INTO t_sparse_s3 SELECT 13, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 14, number % 2, 'foo' FROM numbers(8192);
INSERT INTO t_sparse_s3 SELECT 15, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 16, number % 2, 'foo' FROM numbers(4730);
INSERT INTO t_sparse_s3 SELECT 17, number % 2, 'foo' FROM numbers(3462);
INSERT INTO t_sparse_s3 SELECT 18, number % 2, '' FROM numbers(24576);

OPTIMIZE TABLE t_sparse_s3 FINAL;

SELECT serialization_kind FROM system.parts_columns
WHERE table = 't_sparse_s3' AND active AND column = 's'
AND database = currentDatabase();

SET max_threads = 1;

SELECT count() FROM t_sparse_s3
PREWHERE cond
WHERE id IN (1, 3, 5, 7, 9, 11, 13, 15, 17)
AND NOT ignore(s);

DROP TABLE t_sparse_s3;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
57344
15 changes: 15 additions & 0 deletions tests/queries/0_stateless/02343_read_from_s3_compressed_blocks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Tags: no-parallel, no-fasttest, no-s3-storage

DROP TABLE IF EXISTS t_s3_compressed_blocks;

CREATE TABLE t_s3_compressed_blocks (id UInt64, s String CODEC(NONE))
ENGINE = MergeTree ORDER BY id
SETTINGS storage_policy = 's3_cache',
min_bytes_for_wide_part = 0;

INSERT INTO t_s3_compressed_blocks SELECT number, randomPrintableASCII(128) from numbers(57344);

SET max_threads = 1;
SELECT count() FROM t_s3_compressed_blocks WHERE NOT ignore(s);

DROP TABLE t_s3_compressed_blocks;

0 comments on commit 25886f5

Please sign in to comment.