From b5e21f2006fc05fcfce9f7d4b8de68a951c3a6da Mon Sep 17 00:00:00 2001 From: fzhedu Date: Mon, 30 Aug 2021 11:43:00 +0800 Subject: [PATCH] address comments --- .../AggregateFunctionGroupConcat.h | 271 ++++++++++++++++++ .../AggregateFunctionNull.cpp | 6 +- .../AggregateFunctionNull.h | 264 ----------------- dbms/src/AggregateFunctions/CMakeLists.txt | 2 +- .../Coprocessor/DAGExpressionAnalyzer.cpp | 1 + 5 files changed, 276 insertions(+), 268 deletions(-) create mode 100644 dbms/src/AggregateFunctions/AggregateFunctionGroupConcat.h diff --git a/dbms/src/AggregateFunctions/AggregateFunctionGroupConcat.h b/dbms/src/AggregateFunctions/AggregateFunctionGroupConcat.h new file mode 100644 index 00000000000..8216e88f1aa --- /dev/null +++ b/dbms/src/AggregateFunctions/AggregateFunctionGroupConcat.h @@ -0,0 +1,271 @@ +#include +#include + + +namespace DB +{ + +/// a warp function on the top of groupArray and groupUniqArray, like the AggregateFunctionNull +/// +/// the input argument is in following two types: +/// 1. only one column with original data type and without order_by items, for example: group_concat(c) +/// 2. one column combined with more than one columns including concat items and order-by items, it should be like tuple(concat0, concat1... order0, order1 ...), for example: +/// all columns = concat items + order-by items +/// (c0,c1,o0,o1) = group_concat(c0,c1 order by o0,o1) +/// group_concat(distinct c0,c1 order by b0,b1) = groupUniqArray(tuple(c0,c1,b0,b1)) -> distinct (c0, c1) , i.e., remove duplicates further + +template +class AggregateFunctionGroupConcat final : public AggregateFunctionNullBase> + { + using State = AggregateFunctionGroupUniqArrayGenericData; + + public: + AggregateFunctionGroupConcat(AggregateFunctionPtr nested_function, const DataTypes & input_args, const String& sep, const UInt64& max_len_, const SortDescription & sort_desc_, const NamesAndTypes& all_columns_names_and_types_, const TiDB::TiDBCollators& collators_, const bool has_distinct) + : AggregateFunctionNullBase>(nested_function), + separator(sep),max_len(max_len_), sort_desc(sort_desc_), + all_columns_names_and_types(all_columns_names_and_types_), collators(collators_) + { + if (input_args.size() != 1) + throw Exception("Logical error: more than 1 arguments are passed to AggregateFunctionGroupConcat", ErrorCodes::LOGICAL_ERROR); + nested_type = std::make_shared(removeNullable(input_args[0])); + + number_of_concat_items = all_columns_names_and_types.size() - sort_desc.size(); + + is_nullable.resize(number_of_concat_items); + for (size_t i = 0; i < number_of_concat_items; ++i) + { + is_nullable[i] = all_columns_names_and_types[i].type->isNullable(); + /// the inputs of a nested agg reject null, but for more than one args, tuple(args...) is already not nullable, + /// so here just remove null for the only_one_column case + if constexpr (only_one_column) + { + all_columns_names_and_types[i].type = removeNullable(all_columns_names_and_types[i].type); + } + } + + /// remove redundant rows excluding extra sort items (which do not occur in the concat list) or considering collation + if(has_distinct) + { + for (auto & desc : sort_desc) + { + bool is_extra = true; + for (size_t i = 0; i < number_of_concat_items; ++i) + { + if (desc.column_name == all_columns_names_and_types[i].name) + { + is_extra = false; + break; + } + } + if (is_extra) + { + to_get_unique = true; + break; + } + } + /// because GroupUniqArray does consider collations, so if there are collations, + /// we should additionally remove redundant rows with consideration of collations + if(!to_get_unique) + { + bool has_collation = false; + for (size_t i = 0; i < number_of_concat_items; ++i) + { + if (collators[i] != nullptr) + { + has_collation = true; + break; + } + } + to_get_unique = has_collation; + } + } + } + + DataTypePtr getReturnType() const override + { + return result_is_nullable + ? makeNullable(ret_type) + : ret_type; + } + + /// reject nulls before add() of nested agg + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + if constexpr (only_one_column) + { + if(is_nullable[0]) + { + const ColumnNullable * column = static_cast(columns[0]); + if (!column->isNullAt(row_num)) + { + this->setFlag(place); + const IColumn * nested_column = &column->getNestedColumn(); + this->nested_function->add(this->nestedPlace(place), &nested_column, row_num, arena); + } + return; + } + } + else + { + /// remove the row with null, except for sort columns + const ColumnTuple & tuple = static_cast(*columns[0]); + for (size_t i = 0; i < number_of_concat_items; ++i) + { + if (is_nullable[i]) + { + const ColumnNullable & nullable_col = static_cast(tuple.getColumn(i)); + if (nullable_col.isNullAt(row_num)) + { + /// If at least one column has a null value in the current row, + /// we don't process this row. + return; + } + } + } + } + this->setFlag(place); + this->nested_function->add(this->nestedPlace(place), columns, row_num, arena); + } + + void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override + { + ColumnString * col_str= nullptr; + ColumnNullable * col_null= nullptr; + if constexpr (result_is_nullable) + { + col_null = &static_cast(to); + col_str = &static_cast(col_null->getNestedColumn()); + } + else + { + col_str = & static_cast(to); + } + + if (this->getFlag(place)) + { + if constexpr (result_is_nullable) + { + col_null->getNullMapData().push_back(0); + } + + /// get results from nested function, named nested_results + auto mutable_nested_cols = nested_type->createColumn(); + this->nested_function->insertResultInto(this->nestedPlace(place), *mutable_nested_cols, arena); + const auto column_array = checkAndGetColumn(mutable_nested_cols.get()); + + /// nested_columns are not nullable, because the nullable rows are removed in add() + Columns nested_cols; + if constexpr (only_one_column) + { + nested_cols.push_back(column_array->getDataPtr()); + } + else + { + auto & cols = checkAndGetColumn(&column_array->getData())->getColumns(); + nested_cols.insert(nested_cols.begin(),cols.begin(),cols.end()); + } + + /// sort the nested_col of Array type + if(!sort_desc.empty()) + sortColumns(nested_cols); + + /// get unique flags + std::vector unique; + if (to_get_unique) + getUnique(nested_cols, unique); + + writeToStringColumn(nested_cols,col_str, unique); + + } + else + { + if constexpr (result_is_nullable) + col_null->insertDefault(); + else + col_str->insertDefault(); + } + } + + bool allocatesMemoryInArena() const override + { + return this->nested_function->allocatesMemoryInArena(); + } + + private: + /// construct a block to sort in the case with order-by requirement + void sortColumns(Columns& nested_cols) const + { + Block res; + int concat_size = nested_cols.size(); + for(int i = 0 ; i < concat_size; ++i ) + { + res.insert(ColumnWithTypeAndName(nested_cols[i], all_columns_names_and_types[i].type, all_columns_names_and_types[i].name)); + } + /// sort a block with collation + sortBlock(res, sort_desc); + nested_cols = res.getColumns(); + } + + /// get unique argument columns by inserting the unique of the first N of (N + M sort) internal columns within tuple + void getUnique(const Columns & cols, std::vector & unique) const + { + std::unique_ptr state = std::make_unique(); + Arena arena1; + auto size = cols[0]->size(); + unique.resize(size); + std::vector containers(collators.size()); + for (size_t i = 0; i < size; ++i) + { + bool inserted=false; + State::Set::LookupResult it; + const char * begin = nullptr; + size_t values_size = 0; + for (size_t j = 0; j< number_of_concat_items; ++j) + values_size += cols[j]->serializeValueIntoArena(i, arena1, begin, collators[j],containers[j]).size; + + StringRef str_serialized= StringRef(begin, values_size); + state->value.emplace(str_serialized, it, inserted); + unique[i] = inserted; + } + } + + /// write each column cell to string with separator + void writeToStringColumn(const Columns& cols, ColumnString * const col_str, const std::vector & unique) const + { + WriteBufferFromOwnString write_buffer; + auto size = cols[0]->size(); + for (size_t i = 0; i < size; ++i) + { + if(unique.empty() || unique[i]) + { + if (i != 0) + { + writeString(separator, write_buffer); + } + for (size_t j = 0; j < number_of_concat_items; ++j) + { + all_columns_names_and_types[j].type->serializeText(*cols[j], i, write_buffer); + } + } + /// TODO(FZH) output just one warning ("Some rows were cut by GROUPCONCAT()") if this happen + if(write_buffer.count() >=max_len) + { + break; + } + } + col_str->insertData(write_buffer.str().c_str(),std::min(max_len,write_buffer.count())); + } + + bool to_get_unique =false; + DataTypePtr ret_type = std::make_shared(); + DataTypePtr nested_type; + size_t number_of_concat_items = 0; + String separator =","; + UInt64 max_len; + SortDescription sort_desc; + NamesAndTypes all_columns_names_and_types; + TiDB::TiDBCollators collators; + BoolVec is_nullable; + }; +} + diff --git a/dbms/src/AggregateFunctions/AggregateFunctionNull.cpp b/dbms/src/AggregateFunctions/AggregateFunctionNull.cpp index d2f995dc61f..e967875fa9b 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionNull.cpp +++ b/dbms/src/AggregateFunctions/AggregateFunctionNull.cpp @@ -35,9 +35,9 @@ class AggregateFunctionCombinatorNull final : public IAggregateFunctionCombinato AggregateFunctionPtr transformAggregateFunction( const AggregateFunctionPtr & nested_function, const DataTypes & arguments, const Array &) const override { - /// group_concat reuses groupArray and groupUniqArray, it has the special warp function `AggregateFunctionGroupConcat` to process - /// the issues of null, but the warp function needs more complex arguments, it is specially added outside, - /// instead of being added here, so directly return in this function. + /// group_concat reuses groupArray and groupUniqArray with the special warp function `AggregateFunctionGroupConcat` to process, + /// the warp function needs more complex arguments, including collators, sort descriptions and others, which are hard to deliver via Array type, + /// so it is specially added outside, instead of being added here, so directly return in this function. if (nested_function && (nested_function->getName() == "groupArray" || nested_function->getName() == "groupUniqArray")) return nested_function; bool has_nullable_types = false; diff --git a/dbms/src/AggregateFunctions/AggregateFunctionNull.h b/dbms/src/AggregateFunctions/AggregateFunctionNull.h index a889da7f3e4..285cae85e84 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionNull.h +++ b/dbms/src/AggregateFunctions/AggregateFunctionNull.h @@ -4,7 +4,6 @@ #include #include -#include #include #include #include @@ -516,267 +515,4 @@ class AggregateFunctionNullVariadic final : public AggregateFunctionNullBase is_nullable; /// Plain array is better than std::vector due to one indirection less. }; -/// a warp function on the top of groupArray and groupUniqArray, like the AggregateFunctionNull - -/// the input argument is in following two types: -/// 1. only one column with original data type and without order_by items, for example: group_concat(c) -/// 2. one column combined with more than one columns including concat items and order_by items, it should be like tuple(concat0, concat1... order0, order1 ...), for example: -/// all columns = concat items + order_by items -/// (c0,c1,o0,o1) = group_concat(c0,c1 order by o0,o1) -/// group_concat(distinct c0,c1 order by b0,b1) = groupUniqArray(tuple(c0,c1,b0,b1)) -> distinct (c0, c1) , i.e., remove duplicates once more - -template -class AggregateFunctionGroupConcat final : public AggregateFunctionNullBase> -{ - using State = AggregateFunctionGroupUniqArrayGenericData; - -public: - AggregateFunctionGroupConcat(AggregateFunctionPtr nested_function, const DataTypes & input_args, const String sep, const UInt64& max_len_, const SortDescription & sort_desc_, const NamesAndTypes& all_columns_names_and_types_, const TiDB::TiDBCollators& collators_, const bool has_distinct) - : AggregateFunctionNullBase>(nested_function), - separator(sep),max_len(max_len_), sort_desc(sort_desc_), - all_columns_names_and_types(all_columns_names_and_types_), collators(collators_) - { - if (input_args.size() != 1) - throw Exception("Logical error: more than 1 arguments are passed to AggregateFunctionGroupConcat", ErrorCodes::LOGICAL_ERROR); - nested_type = std::make_shared(removeNullable(input_args[0])); - - number_of_concat_items = all_columns_names_and_types.size() - sort_desc.size(); - - is_nullable.resize(number_of_concat_items); - for (size_t i = 0; i < number_of_concat_items; ++i) - { - is_nullable[i] = all_columns_names_and_types[i].type->isNullable(); - /// the inputs of a nested agg reject null, but for more than one args, tuple(args...) is already not nullable, - /// so here just remove null for the only_one_column case - if constexpr (only_one_column) - { - all_columns_names_and_types[i].type = removeNullable(all_columns_names_and_types[i].type); - } - } - - /// remove redundant rows excluding extra sort items (which do not occur in the concat list) or considering collation - if(has_distinct) - { - for (auto & desc : sort_desc) - { - bool is_extra = true; - for (size_t i = 0; i < number_of_concat_items; ++i) - { - if (desc.column_name == all_columns_names_and_types[i].name) - { - is_extra = false; - break; - } - } - if (is_extra) - { - to_get_unique = true; - break; - } - } - /// because GroupUniqArray does consider collations, so if there are collations, - /// we should additionally remove redundant rows with consideration of collations - if(!to_get_unique) - { - bool has_collation = false; - for (size_t i = 0; i < number_of_concat_items; ++i) - { - if (collators[i] != nullptr) - { - has_collation = true; - break; - } - } - to_get_unique = has_collation; - } - } - } - - DataTypePtr getReturnType() const override - { - return result_is_nullable - ? makeNullable(ret_type) - : ret_type; - } - - /// reject nulls before add() of nested agg - void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override - { - if constexpr (only_one_column) - { - if(is_nullable[0]) - { - const ColumnNullable * column = static_cast(columns[0]); - if (!column->isNullAt(row_num)) - { - this->setFlag(place); - const IColumn * nested_column = &column->getNestedColumn(); - this->nested_function->add(this->nestedPlace(place), &nested_column, row_num, arena); - } - return; - } - } - else - { - /// remove the row with null, except for sort columns - const ColumnTuple & tuple = static_cast(*columns[0]); - for (size_t i = 0; i < number_of_concat_items; ++i) - { - if (is_nullable[i]) - { - const ColumnNullable & nullable_col = static_cast(tuple.getColumn(i)); - if (nullable_col.isNullAt(row_num)) - { - /// If at least one column has a null value in the current row, - /// we don't process this row. - return; - } - } - } - } - this->setFlag(place); - this->nested_function->add(this->nestedPlace(place), columns, row_num, arena); - } - - void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override - { - ColumnString * col_str= nullptr; - ColumnNullable * col_null= nullptr; - if constexpr (result_is_nullable) - { - col_null = &static_cast(to); - col_str = &static_cast(col_null->getNestedColumn()); - } - else - { - col_str = & static_cast(to); - } - - if (this->getFlag(place)) - { - if constexpr (result_is_nullable) - { - col_null->getNullMapData().push_back(0); - } - - /// get results from nested function, named nested_results - auto mutable_nested_cols = nested_type->createColumn(); - this->nested_function->insertResultInto(this->nestedPlace(place), *mutable_nested_cols, arena); - const auto column_array = checkAndGetColumn(mutable_nested_cols.get()); - - /// nested_columns are not nullable, because the nullable rows are removed in add() - Columns nested_cols; - if constexpr (only_one_column) - { - nested_cols.push_back(column_array->getDataPtr()); - } - else - { - auto & cols = checkAndGetColumn(&column_array->getData())->getColumns(); - nested_cols.insert(nested_cols.begin(),cols.begin(),cols.end()); - } - - /// sort the nested_col of Array type - if(!sort_desc.empty()) - sortColumns(nested_cols); - - /// get unique flags - std::vector unique; - if (to_get_unique) - getUnique(nested_cols, unique); - - writeToStringColumn(nested_cols,col_str, unique); - - } - else - { - if constexpr (result_is_nullable) - col_null->insertDefault(); - else - col_str->insertDefault(); - } - } - - bool allocatesMemoryInArena() const override - { - return this->nested_function->allocatesMemoryInArena(); - } - -private: - /// construct a block to sort in the case with order-by requirement - void sortColumns(Columns& nested_cols) const - { - Block res; - int concat_size = nested_cols.size(); - for(int i = 0 ; i < concat_size; ++i ) - { - res.insert(ColumnWithTypeAndName(nested_cols[i], all_columns_names_and_types[i].type, all_columns_names_and_types[i].name)); - } - /// sort a block with collation - sortBlock(res, sort_desc); - nested_cols = res.getColumns(); - } - - /// get unique argument columns by inserting the unique of the first N of (N + M sort) internal columns within tuple - void getUnique(const Columns & cols, std::vector & unique) const - { - std::unique_ptr state = std::make_unique(); - Arena arena1; - auto size = cols[0]->size(); - unique.resize(size); - std::vector containers(collators.size()); - for (size_t i = 0; i < size; ++i) - { - bool inserted=false; - State::Set::LookupResult it; - const char * begin = nullptr; - size_t values_size = 0; - for (size_t j = 0; j< number_of_concat_items; ++j) - values_size += cols[j]->serializeValueIntoArena(i, arena1, begin, collators[j],containers[j]).size; - - StringRef str_serialized= StringRef(begin, values_size); - state->value.emplace(str_serialized, it, inserted); - unique[i] = inserted; - } - } - - /// write each column cell to string with separator - void writeToStringColumn(const Columns& cols, ColumnString * const col_str, const std::vector & unique) const - { - WriteBufferFromOwnString write_buffer; - auto size = cols[0]->size(); - for (size_t i = 0; i < size; ++i) - { - if(unique.empty() || unique[i]) - { - if (i != 0) - { - writeString(separator, write_buffer); - } - for (size_t j = 0; j < number_of_concat_items; ++j) - { - all_columns_names_and_types[j].type->serializeText(*cols[j], i, write_buffer); - } - } - /// TODO(FZH) output just one warning ("Some rows were cut by GROUPCONCAT()") if this happen - if(write_buffer.count() >=max_len) - { - break; - } - } - col_str->insertData(write_buffer.str().c_str(),std::min(max_len,write_buffer.count())); - } - - bool to_get_unique =false; - DataTypePtr ret_type = std::make_shared(); - DataTypePtr nested_type; - size_t number_of_concat_items = 0; - String separator =","; - UInt64 max_len; - SortDescription sort_desc; - NamesAndTypes all_columns_names_and_types; - TiDB::TiDBCollators collators; - std::vector is_nullable; -}; - } diff --git a/dbms/src/AggregateFunctions/CMakeLists.txt b/dbms/src/AggregateFunctions/CMakeLists.txt index d81f3e668b5..ecfeecfa5f0 100644 --- a/dbms/src/AggregateFunctions/CMakeLists.txt +++ b/dbms/src/AggregateFunctions/CMakeLists.txt @@ -19,6 +19,6 @@ list(REMOVE_ITEM clickhouse_aggregate_functions_headers FactoryHelpers.h ) -add_library(clickhouse_aggregate_functions ${clickhouse_aggregate_functions_sources}) +add_library(clickhouse_aggregate_functions ${clickhouse_aggregate_functions_sources} AggregateFunctionGroupConcat.h) target_link_libraries(clickhouse_aggregate_functions dbms) target_include_directories (clickhouse_aggregate_functions BEFORE PRIVATE ${COMMON_INCLUDE_DIR}) diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index 8bf98872ec2..0dcf6c14d6f 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include