Skip to content

Commit

Permalink
.*: Refine FilterBlockInputStream (#6250)
Browse files Browse the repository at this point in the history
ref #5900
  • Loading branch information
SeaRise authored Nov 7, 2022
1 parent 9d7d060 commit 2e71675
Show file tree
Hide file tree
Showing 5 changed files with 345 additions and 174 deletions.
155 changes: 6 additions & 149 deletions dbms/src/DataStreams/FilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,36 +34,18 @@ FilterBlockInputStream::FilterBlockInputStream(
const ExpressionActionsPtr & expression_,
const String & filter_column_name,
const String & req_id)
: expression(expression_)
: filter_transform_action(input->getHeader(), expression_, filter_column_name)
, log(Logger::get(req_id))
{
children.push_back(input);

/// Determine position of filter column.
header = input->getHeader();
expression->execute(header);

filter_column = header.getPositionByName(filter_column_name);
auto & column_elem = header.safeGetByPosition(filter_column);

/// Isn't the filter already constant?
if (column_elem.column)
constant_filter_description = ConstantFilterDescription(*column_elem.column);

if (!constant_filter_description.always_false && !constant_filter_description.always_true)
{
/// Replace the filter column to a constant with value 1.
FilterDescription filter_description_check(*column_elem.column);
column_elem.column = column_elem.type->createColumnConst(header.rows(), UInt64(1));
}
}

Block FilterBlockInputStream::getTotals()
{
if (auto * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
expression->executeOnTotals(totals);
filter_transform_action.getExperssion()->executeOnTotals(totals);
}

return totals;
Expand All @@ -72,152 +54,27 @@ Block FilterBlockInputStream::getTotals()

Block FilterBlockInputStream::getHeader() const
{
return header;
return filter_transform_action.getHeader();
}


Block FilterBlockInputStream::readImpl()
{
Block res;

if (constant_filter_description.always_false)
if (filter_transform_action.alwaysFalse())
return res;

/// Until non-empty block after filtering or end of stream.
while (true)
{
IColumn::Filter * child_filter = nullptr;

res = children.back()->read(child_filter, true);
res = children.back()->read();

if (!res)
return res;

expression->execute(res);

if (constant_filter_description.always_true && !child_filter)
return res;

size_t columns = res.columns();
size_t rows = res.rows();
ColumnPtr column_of_filter = res.safeGetByPosition(filter_column).column;

if (unlikely(child_filter && child_filter->size() != rows))
throw Exception("Unexpected child filter size", ErrorCodes::LOGICAL_ERROR);

/** It happens that at the stage of analysis of expressions (in sample_block) the columns-constants have not been calculated yet,
* and now - are calculated. That is, not all cases are covered by the code above.
* This happens if the function returns a constant for a non-constant argument.
* For example, `ignore` function.
*/
constant_filter_description = ConstantFilterDescription(*column_of_filter);

if (constant_filter_description.always_false)
{
res.clear();
return res;
}

IColumn::Filter * filter;
ColumnPtr filter_holder;

if (constant_filter_description.always_true)
{
if (child_filter)
filter = child_filter;
else
return res;
}
else
{
FilterDescription filter_and_holder(*column_of_filter);
filter = const_cast<IColumn::Filter *>(filter_and_holder.data);
filter_holder = filter_and_holder.data_holder;

if (child_filter)
{
/// Merge child_filter
UInt8 * a = filter->data();
UInt8 * b = child_filter->data();
for (size_t i = 0; i < rows; ++i)
{
*a = *a > 0 && *b != 0;
++a;
++b;
}
}
}

/** Let's find out how many rows will be in result.
* To do this, we filter out the first non-constant column
* or calculate number of set bytes in the filter.
*/
size_t first_non_constant_column = 0;
for (size_t i = 0; i < columns; ++i)
{
if (!res.safeGetByPosition(i).column->isColumnConst())
{
first_non_constant_column = i;

if (first_non_constant_column != static_cast<size_t>(filter_column))
break;
}
}

size_t filtered_rows = 0;
if (first_non_constant_column != static_cast<size_t>(filter_column))
{
ColumnWithTypeAndName & current_column = res.safeGetByPosition(first_non_constant_column);
current_column.column = current_column.column->filter(*filter, -1);
filtered_rows = current_column.column->size();
}
else
{
filtered_rows = countBytesInFilter(*filter);
}

/// If the current block is completely filtered out, let's move on to the next one.
if (filtered_rows == 0)
continue;

/// If all the rows pass through the filter.
if (filtered_rows == rows)
{
/// Replace the column with the filter by a constant.
res.safeGetByPosition(filter_column).column
= res.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, UInt64(1));
/// No need to touch the rest of the columns.
if (filter_transform_action.transform(res))
return res;
}

/// Filter the rest of the columns.
for (size_t i = 0; i < columns; ++i)
{
ColumnWithTypeAndName & current_column = res.safeGetByPosition(i);

if (i == static_cast<size_t>(filter_column))
{
/// The column with filter itself is replaced with a column with a constant `1`, since after filtering, nothing else will remain.
/// NOTE User could pass column with something different than 0 and 1 for filter.
/// Example:
/// SELECT materialize(100) AS x WHERE x
/// will work incorrectly.
current_column.column = current_column.type->createColumnConst(filtered_rows, UInt64(1));
continue;
}

if (i == first_non_constant_column)
continue;

if (current_column.column->isColumnConst())
current_column.column = current_column.column->cut(0, filtered_rows);
else
current_column.column = current_column.column->filter(*filter, filtered_rows);
}

return res;
}
}


} // namespace DB
12 changes: 2 additions & 10 deletions dbms/src/DataStreams/FilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@

#pragma once

#include <Columns/FilterDescription.h>
#include <DataStreams/FilterTransformAction.h>
#include <DataStreams/IProfilingBlockInputStream.h>


namespace DB
{
class ExpressionActions;


/** Implements WHERE, HAVING operations.
* A stream of blocks and an expression, which adds to the block one ColumnUInt8 column containing the filtering conditions, are passed as input.
* The expression is evaluated and a stream of blocks is returned, which contains only the filtered rows.
Expand All @@ -49,11 +45,7 @@ class FilterBlockInputStream : public IProfilingBlockInputStream
Block readImpl() override;

private:
ExpressionActionsPtr expression;
Block header;
ssize_t filter_column;

ConstantFilterDescription constant_filter_description;
FilterTransformAction filter_transform_action;

const LoggerPtr log;
};
Expand Down
Loading

0 comments on commit 2e71675

Please sign in to comment.