Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pingcap/tiflash into fix_cn_serve…
Browse files Browse the repository at this point in the history
…r_info
  • Loading branch information
guo-shaoge committed Jun 1, 2023
2 parents 333bde6 + c65dd72 commit f00bc79
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 18 deletions.
1 change: 1 addition & 0 deletions dbms/src/Core/SpillHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ void SpillHandler::spillBlocks(Blocks && blocks)
continue;
/// erase constant column
spiller->removeConstantColumns(block);
RUNTIME_CHECK_MSG(block.columns() > 0, "Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns");
if (unlikely(writer == nullptr))
{
std::tie(rows_in_file, bytes_in_file) = setUpNextSpilledFile();
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Core/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ void SpilledFiles::makeAllSpilledFilesImmutable()
mutable_spilled_files.clear();
}

bool Spiller::supportSpill(const Block & header)
{
for (const auto & column_with_type_and_name : header)
{
if (column_with_type_and_name.column == nullptr || !column_with_type_and_name.column->isColumnConst())
return true;
}
return false;
}

Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, UInt64 partition_num_, const Block & input_schema_, const LoggerPtr & logger_, Int64 spill_version_, bool release_spilled_file_on_restore_)
: config(config_)
, is_input_sorted(is_input_sorted_)
Expand Down Expand Up @@ -106,7 +116,6 @@ Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, UInt64 part
if (input_schema.getByPosition(i).column != nullptr && input_schema.getByPosition(i).column->isColumnConst())
const_column_indexes.push_back(i);
}
RUNTIME_CHECK_MSG(const_column_indexes.size() < input_schema.columns(), "Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns");
header_without_constants = input_schema;
removeConstantColumns(header_without_constants);
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Core/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ struct SpilledFiles
class Spiller
{
public:
static bool supportSpill(const Block & header);
Spiller(const SpillConfig & config, bool is_input_sorted, UInt64 partition_num, const Block & input_schema, const LoggerPtr & logger, Int64 spill_version = 1, bool release_spilled_file_on_restore = true);
void spillBlocks(Blocks && blocks, UInt64 partition_id);
SpillHandler createSpillHandler(UInt64 partition_id);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Core/tests/gtest_spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,12 @@ try
type_and_name.column = type_and_name.type->createColumnConst(1, Field(static_cast<Int64>(1)));

Spiller spiller(*spill_config_ptr, false, 1, constant_header, logger);
spiller.spillBlocks({constant_header}, 0);
GTEST_FAIL();
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Check const_column_indexes.size() < input_schema.columns() failed: Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns");
GTEST_ASSERT_EQ(e.message().find("Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns") != std::string::npos, true);
}

TEST_F(SpillerTest, SpillWithConstantSchemaAndNonConstantData)
Expand Down
19 changes: 15 additions & 4 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,18 @@ MergeSortingBlockInputStream::MergeSortingBlockInputStream(
header_without_constants = header;
SortHelper::removeConstantsFromBlock(header_without_constants);
SortHelper::removeConstantsFromSortDescription(header, description);
spiller = std::make_unique<Spiller>(spill_config, true, 1, header_without_constants, log);
if (max_bytes_before_external_sort > 0)
{
if (Spiller::supportSpill(header_without_constants))
{
spiller = std::make_unique<Spiller>(spill_config, true, 1, header_without_constants, log);
}
else
{
max_bytes_before_external_sort = 0;
LOG_WARNING(log, "Sort/TopN does not support spill, reason: input data contains only constant columns");
}
}
}


Expand Down Expand Up @@ -79,7 +90,7 @@ Block MergeSortingBlockInputStream::readImpl()
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
if (!spiller->hasSpilledData())
if (!hasSpilledData())
{
LOG_INFO(log, "Begin spill in sort");
}
Expand All @@ -95,10 +106,10 @@ Block MergeSortingBlockInputStream::readImpl()
}
}

if (isCancelledOrThrowIfKilled() || (blocks.empty() && !spiller->hasSpilledData()))
if (isCancelledOrThrowIfKilled() || (blocks.empty() && !hasSpilledData()))
return Block();

if (!spiller->hasSpilledData())
if (!hasSpilledData())
{
impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description, log->identifier(), max_merged_block_size, limit);
}
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class MergeSortingBlockInputStream : public IProfilingBlockInputStream
void appendInfo(FmtBuffer & buffer) const override;

private:
bool hasSpilledData() const
{
return max_bytes_before_external_sort > 0 && spiller->hasSpilledData();
}
SortDescription description;
size_t max_merged_block_size;
size_t limit;
Expand Down
20 changes: 16 additions & 4 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,23 @@ Aggregator::Aggregator(const Params & params_, const String & req_id)

method_chosen = chooseAggregationMethod();
RUNTIME_CHECK_MSG(method_chosen != AggregatedDataVariants::Type::EMPTY, "Invalid aggregation method");
if (AggregatedDataVariants::isConvertibleToTwoLevel(method_chosen))
if (params.getMaxBytesBeforeExternalGroupBy() > 0)
{
/// for aggregation, the input block is sorted by bucket number
/// so it can work with MergingAggregatedMemoryEfficientBlockInputStream
spiller = std::make_unique<Spiller>(params.spill_config, true, 1, getHeader(false), log);
/// init spiller if needed
auto header = getHeader(false);
bool is_convertible_to_two_level = AggregatedDataVariants::isConvertibleToTwoLevel(method_chosen);
bool is_input_support_spill = Spiller::supportSpill(header);
if (is_convertible_to_two_level && is_input_support_spill)
{
/// for aggregation, the input block is sorted by bucket number
/// so it can work with MergingAggregatedMemoryEfficientBlockInputStream
spiller = std::make_unique<Spiller>(params.spill_config, true, 1, header, log);
}
else
{
params.setMaxBytesBeforeExternalGroupBy(0);
LOG_WARNING(log, "Aggregation does not support spill, reason: {}", is_convertible_to_two_level ? "aggregator hash table does not support two level" : "input data contains only constant columns");
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1008,13 +1008,14 @@ class Aggregator
size_t getGroupByTwoLevelThreshold() const { return group_by_two_level_threshold; }
size_t getGroupByTwoLevelThresholdBytes() const { return group_by_two_level_threshold_bytes; }
size_t getMaxBytesBeforeExternalGroupBy() const { return max_bytes_before_external_group_by; }
void setMaxBytesBeforeExternalGroupBy(size_t threshold) { max_bytes_before_external_group_by = threshold; }

private:
/// Note these thresholds should not be used directly, they are only used to
/// init the threshold in Aggregator
const size_t group_by_two_level_threshold;
const size_t group_by_two_level_threshold_bytes;
const size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation.
size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation.
};


Expand Down
26 changes: 22 additions & 4 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,20 +347,26 @@ void Join::initBuild(const Block & sample_block, size_t build_concurrency_)
join_map_method = chooseJoinMapMethod(getKeyColumns(key_names_right, sample_block), key_sizes, collators);
setBuildConcurrencyAndInitJoinPartition(build_concurrency_);
build_sample_block = sample_block;
build_spiller = std::make_unique<Spiller>(build_spill_config, false, build_concurrency_, build_sample_block, log);
if (max_bytes_before_external_join > 0)
{
if (join_map_method == JoinMapMethod::CROSS)
{
/// todo support spill for cross join
max_bytes_before_external_join = 0;
LOG_WARNING(log, "Cross join does not support spilling, so set max_bytes_before_external_join = 0");
LOG_WARNING(log, "Join does not support spill, reason: cross join spill is not supported");
}
if (isNullAwareSemiFamily(kind))
{
max_bytes_before_external_join = 0;
LOG_WARNING(log, "null aware join does not support spilling, so set max_bytes_before_external_join = 0");
LOG_WARNING(log, "Join does not support spill, reason: null aware join spill is not supported");
}
if (!Spiller::supportSpill(build_sample_block))
{
max_bytes_before_external_join = 0;
LOG_WARNING(log, "Join does not support spill, reason: input data from build side contains only constant columns");
}
if (max_bytes_before_external_join > 0)
build_spiller = std::make_unique<Spiller>(build_spill_config, false, build_concurrency_, build_sample_block, log);
}
setSampleBlock(sample_block);
}
Expand All @@ -370,7 +376,19 @@ void Join::initProbe(const Block & sample_block, size_t probe_concurrency_)
std::unique_lock lock(rwlock);
setProbeConcurrency(probe_concurrency_);
probe_sample_block = sample_block;
probe_spiller = std::make_unique<Spiller>(probe_spill_config, false, build_concurrency, probe_sample_block, log);
if (max_bytes_before_external_join > 0)
{
if (!Spiller::supportSpill(probe_sample_block))
{
max_bytes_before_external_join = 0;
build_spiller = nullptr;
LOG_WARNING(log, "Join does not support spill, reason: input data from probe side contains only constant columns");
}
else
{
probe_spiller = std::make_unique<Spiller>(probe_spill_config, false, build_concurrency, probe_sample_block, log);
}
}
}

/// the block should be valid.
Expand Down
17 changes: 14 additions & 3 deletions dbms/src/Operators/MergeSortTransformOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,18 @@ void MergeSortTransformOp::operatePrefix()
// For order by constants, generate LimitOperator instead of SortOperator.
assert(!order_desc.empty());

spiller = std::make_unique<Spiller>(spill_config, true, 1, header_without_constants, log);
if (max_bytes_before_external_sort > 0)
{
if (Spiller::supportSpill(header_without_constants))
{
spiller = std::make_unique<Spiller>(spill_config, true, 1, header_without_constants, log);
}
else
{
max_bytes_before_external_sort = 0;
LOG_WARNING(log, "Sort/TopN does not support spill, reason: input data contains only constant columns");
}
}
}

void MergeSortTransformOp::operateSuffix()
Expand Down Expand Up @@ -104,7 +115,7 @@ OperatorStatus MergeSortTransformOp::fromPartialToSpill()
// convert to restore phase.
status = MergeSortStatus::SPILL;
assert(!cached_handler);
if (!spiller->hasSpilledData())
if (!hasSpilledData())
LOG_INFO(log, "Begin spill in merge sort");
cached_handler = spiller->createCachedSpillHandler(
std::make_shared<MergeSortingBlocksBlockInputStream>(sorted_blocks, order_desc, log->identifier(), max_block_size, limit),
Expand Down Expand Up @@ -135,7 +146,7 @@ OperatorStatus MergeSortTransformOp::transformImpl(Block & block)
{
if unlikely (!block)
{
return spiller->hasSpilledData()
return hasSpilledData()
? fromPartialToRestore()
: fromPartialToMerge(block);
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Operators/MergeSortTransformOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class MergeSortTransformOp : public TransformOp
OperatorStatus fromPartialToMerge(Block & block);

private:
bool hasSpilledData() const { return max_bytes_before_external_sort > 0 && spiller->hasSpilledData(); }
SortDescription order_desc;
// 0 means no limit.
size_t limit;
Expand Down

0 comments on commit f00bc79

Please sign in to comment.