Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.*: Refine FilterBlockInputStream #6250

Merged
merged 9 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 6 additions & 149 deletions dbms/src/DataStreams/FilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,36 +34,18 @@ FilterBlockInputStream::FilterBlockInputStream(
const ExpressionActionsPtr & expression_,
const String & filter_column_name,
const String & req_id)
: expression(expression_)
: filter_transform_action(input->getHeader(), expression_, filter_column_name)
, log(Logger::get(req_id))
{
children.push_back(input);

/// Determine position of filter column.
header = input->getHeader();
expression->execute(header);

filter_column = header.getPositionByName(filter_column_name);
auto & column_elem = header.safeGetByPosition(filter_column);

/// Isn't the filter already constant?
if (column_elem.column)
constant_filter_description = ConstantFilterDescription(*column_elem.column);

if (!constant_filter_description.always_false && !constant_filter_description.always_true)
{
/// Replace the filter column to a constant with value 1.
FilterDescription filter_description_check(*column_elem.column);
column_elem.column = column_elem.type->createColumnConst(header.rows(), UInt64(1));
}
}

Block FilterBlockInputStream::getTotals()
{
if (auto * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
expression->executeOnTotals(totals);
filter_transform_action.getExperssion()->executeOnTotals(totals);
}

return totals;
Expand All @@ -72,152 +54,27 @@ Block FilterBlockInputStream::getTotals()

Block FilterBlockInputStream::getHeader() const
{
return header;
return filter_transform_action.getHeader();
}


Block FilterBlockInputStream::readImpl()
{
Block res;

if (constant_filter_description.always_false)
if (filter_transform_action.alwaysFalse())
return res;

/// Until non-empty block after filtering or end of stream.
while (true)
{
IColumn::Filter * child_filter = nullptr;

res = children.back()->read(child_filter, true);
res = children.back()->read();

if (!res)
return res;

expression->execute(res);

if (constant_filter_description.always_true && !child_filter)
return res;

size_t columns = res.columns();
size_t rows = res.rows();
ColumnPtr column_of_filter = res.safeGetByPosition(filter_column).column;

if (unlikely(child_filter && child_filter->size() != rows))
throw Exception("Unexpected child filter size", ErrorCodes::LOGICAL_ERROR);

/** It happens that at the stage of analysis of expressions (in sample_block) the columns-constants have not been calculated yet,
* and now - are calculated. That is, not all cases are covered by the code above.
* This happens if the function returns a constant for a non-constant argument.
* For example, `ignore` function.
*/
constant_filter_description = ConstantFilterDescription(*column_of_filter);

if (constant_filter_description.always_false)
{
res.clear();
return res;
}

IColumn::Filter * filter;
ColumnPtr filter_holder;

if (constant_filter_description.always_true)
{
if (child_filter)
filter = child_filter;
else
return res;
}
else
{
FilterDescription filter_and_holder(*column_of_filter);
filter = const_cast<IColumn::Filter *>(filter_and_holder.data);
filter_holder = filter_and_holder.data_holder;

if (child_filter)
{
/// Merge child_filter
UInt8 * a = filter->data();
UInt8 * b = child_filter->data();
for (size_t i = 0; i < rows; ++i)
{
*a = *a > 0 && *b != 0;
++a;
++b;
}
}
}

/** Let's find out how many rows will be in result.
* To do this, we filter out the first non-constant column
* or calculate number of set bytes in the filter.
*/
size_t first_non_constant_column = 0;
for (size_t i = 0; i < columns; ++i)
{
if (!res.safeGetByPosition(i).column->isColumnConst())
{
first_non_constant_column = i;

if (first_non_constant_column != static_cast<size_t>(filter_column))
break;
}
}

size_t filtered_rows = 0;
if (first_non_constant_column != static_cast<size_t>(filter_column))
{
ColumnWithTypeAndName & current_column = res.safeGetByPosition(first_non_constant_column);
current_column.column = current_column.column->filter(*filter, -1);
filtered_rows = current_column.column->size();
}
else
{
filtered_rows = countBytesInFilter(*filter);
}

/// If the current block is completely filtered out, let's move on to the next one.
if (filtered_rows == 0)
continue;

/// If all the rows pass through the filter.
if (filtered_rows == rows)
{
/// Replace the column with the filter by a constant.
res.safeGetByPosition(filter_column).column
= res.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, UInt64(1));
/// No need to touch the rest of the columns.
if (filter_transform_action.transform(res))
return res;
}

/// Filter the rest of the columns.
for (size_t i = 0; i < columns; ++i)
{
ColumnWithTypeAndName & current_column = res.safeGetByPosition(i);

if (i == static_cast<size_t>(filter_column))
{
/// The column with filter itself is replaced with a column with a constant `1`, since after filtering, nothing else will remain.
/// NOTE User could pass column with something different than 0 and 1 for filter.
/// Example:
/// SELECT materialize(100) AS x WHERE x
/// will work incorrectly.
current_column.column = current_column.type->createColumnConst(filtered_rows, UInt64(1));
continue;
}

if (i == first_non_constant_column)
continue;

if (current_column.column->isColumnConst())
current_column.column = current_column.column->cut(0, filtered_rows);
else
current_column.column = current_column.column->filter(*filter, filtered_rows);
}

return res;
}
}


} // namespace DB
12 changes: 2 additions & 10 deletions dbms/src/DataStreams/FilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@

#pragma once

#include <Columns/FilterDescription.h>
#include <DataStreams/FilterTransformAction.h>
#include <DataStreams/IProfilingBlockInputStream.h>


namespace DB
{
class ExpressionActions;


/** Implements WHERE, HAVING operations.
* A stream of blocks and an expression, which adds to the block one ColumnUInt8 column containing the filtering conditions, are passed as input.
* The expression is evaluated and a stream of blocks is returned, which contains only the filtered rows.
Expand All @@ -49,11 +45,7 @@ class FilterBlockInputStream : public IProfilingBlockInputStream
Block readImpl() override;

private:
ExpressionActionsPtr expression;
Block header;
ssize_t filter_column;

ConstantFilterDescription constant_filter_description;
FilterTransformAction filter_transform_action;

const LoggerPtr log;
};
Expand Down
Loading