Skip to content

Commit

Permalink
[1/2] support multi-distinct (apache#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
stdpain authored and HappenLee committed Jul 13, 2021
1 parent 51ff865 commit 746e8e2
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 47 deletions.
5 changes: 3 additions & 2 deletions be/src/vec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/vec")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/vec")

set(VEC_FILES
aggregate_functions/aggregate_function_avg.cpp
aggregate_functions/aggregate_function_count.cpp
aggregate_functions/aggregate_function_null.cpp
aggregate_functions/aggregate_function_sum.cpp
aggregate_functions/aggregate_function_min_max.cpp
aggregate_functions/aggregate_function_avg.cpp
aggregate_functions/aggregate_function_null.cpp
aggregate_functions/aggregate_function_uniq.cpp
columns/collator.cpp
columns/column.cpp
columns/column_const.cpp
Expand Down
18 changes: 12 additions & 6 deletions be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ namespace doris::vectorized {

class AggregateFunctionSimpleFactory;
void registerAggregateFunctionSum(AggregateFunctionSimpleFactory& factory);
void registerAggregateFunctionCount(AggregateFunctionSimpleFactory& factory);
void registerAggregateFunctionCombinatorNull(AggregateFunctionSimpleFactory& factory);
void registerAggregateFunctionMinMax(AggregateFunctionSimpleFactory& factory);
void registerAggregateFunctionAvg(AggregateFunctionSimpleFactory& factory);
Expand All @@ -52,6 +51,12 @@ class AggregateFunctionSimpleFactory {
AggregateFunctions nullable_aggregate_functions;

public:
void registerNullableFunctionCombinator(Creator creator) {
for (auto entity : aggregate_functions) {
nullable_aggregate_functions[entity.first] = creator;
}
}

void registerFunction(const std::string& name, Creator creator, bool nullable = false) {
if (nullable) {
nullable_aggregate_functions[name] = creator;
Expand All @@ -69,11 +74,13 @@ class AggregateFunctionSimpleFactory {
}
}
if (nullable) {
return nullable_aggregate_functions[name] == nullptr ? nullptr :
nullable_aggregate_functions[name](name, argument_types, parameters);
return nullable_aggregate_functions[name] == nullptr
? nullptr
: nullable_aggregate_functions[name](name, argument_types, parameters);
} else {
return aggregate_functions[name] == nullptr ? nullptr :
aggregate_functions[name](name, argument_types, parameters);
return aggregate_functions[name] == nullptr
? nullptr
: aggregate_functions[name](name, argument_types, parameters);
}
}

Expand All @@ -83,7 +90,6 @@ class AggregateFunctionSimpleFactory {
static AggregateFunctionSimpleFactory instance;
std::call_once(oc, [&]() {
registerAggregateFunctionSum(instance);
registerAggregateFunctionCount(instance);
registerAggregateFunctionMinMax(instance);
registerAggregateFunctionAvg(instance);
registerAggregateFunctionCombinatorNull(instance);
Expand Down
48 changes: 48 additions & 0 deletions be/src/vec/aggregate_functions/aggregate_function_uniq.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#include <vec/aggregate_functions/aggregate_function_simple_factory.h>
#include <vec/aggregate_functions/aggregate_function_uniq.h>
#include <vec/aggregate_functions/factory_helpers.h>
#include <vec/aggregate_functions/helpers.h>
#include <vec/data_types/data_type_string.h>

namespace doris::vectorized {

namespace ErrorCodes {
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
} // namespace ErrorCodes

template <template <typename> class Data, typename DataForVariadic>
AggregateFunctionPtr createAggregateFunctionUniq(const std::string& name,
const DataTypes& argument_types,
const Array& params) {
assertNoParameters(name, params);

if (argument_types.empty())
throw Exception("Incorrect number of arguments for aggregate function " + name,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

if (argument_types.size() == 1) {
const IDataType& argument_type = *argument_types[0];

AggregateFunctionPtr res(createWithNumericType<AggregateFunctionUniq, Data>(
*argument_types[0], argument_types));

WhichDataType which(argument_type);
// TODO: DateType
if (res)
return res;
else if (which.isStringOrFixedString())
return std::make_shared<AggregateFunctionUniq<String, Data<String>>>(argument_types);
}

return nullptr;
}

void registerAggregateFunctionsUniq(AggregateFunctionSimpleFactory& factory) {
AggregateFunctionCreator creator =
createAggregateFunctionUniq<AggregateFunctionUniqExactData,
AggregateFunctionUniqExactData<String>>;
factory.registerFunction("uniqExact", creator);
}

} // namespace doris::vectorized
121 changes: 121 additions & 0 deletions be/src/vec/aggregate_functions/aggregate_function_uniq.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#pragma once

#include <city.h>

#include <type_traits>

#include "vec/common/bit_cast.h"

// #include <IO/WriteHelpers.h>
// #include <IO/ReadHelpers.h>

// #include <DataTypes/DataTypesNumber.h>
#include <vec/common/hash_table/hash_set.h>

#include "vec/common/aggregation_common.h"
#include "vec/data_types/data_types_number.h"
// #include <Common/HyperLogLogWithSmallSetOptimization.h>
// #include <Common/CombinedCardinalityEstimator.h>
#include <vec/common/assert_cast.h>
#include <vec/common/typeid_cast.h>

// #include <AggregateFunctions/UniquesHashSet.h>
#include "vec/aggregate_functions/aggregate_function.h"
// #include <AggregateFunctions/UniqVariadicHash.h>

namespace doris::vectorized {

/// uniqExact

template <typename T>
struct AggregateFunctionUniqExactData {
using Key = T;

/// When creating, the hash table must be small.
using Set = HashSet<Key, HashCRC32<Key>, HashTableGrower<4>,
HashTableAllocatorWithStackMemory<sizeof(Key) * (1 << 4)>>;

Set set;

static String getName() { return "uniqExact"; }
};

/// For rows, we put the SipHash values (128 bits) into the hash table.
template <>
struct AggregateFunctionUniqExactData<String> {
using Key = UInt128;

/// When creating, the hash table must be small.
using Set = HashSet<Key, UInt128TrivialHash, HashTableGrower<3>,
HashTableAllocatorWithStackMemory<sizeof(Key) * (1 << 3)>>;

Set set;

static String getName() { return "uniqExact"; }
};

namespace detail {

/** The structure for the delegation work to add one element to the `uniq` aggregate functions.
* Used for partial specialization to add strings.
*/
template <typename T, typename Data>
struct OneAdder {
static void ALWAYS_INLINE add(Data& data, const IColumn& column, size_t row_num) {
if constexpr (std::is_same_v<Data, AggregateFunctionUniqExactData<T>>) {
if constexpr (!std::is_same_v<T, String>) {
data.set.insert(assert_cast<const ColumnVector<T>&>(column).getData()[row_num]);
} else {
StringRef value = column.getDataAt(row_num);

UInt128 key;
SipHash hash;
hash.update(value.data, value.size);
hash.get128(key.low, key.high);

data.set.insert(key);
}
}
}
};

} // namespace detail

/// Calculates the number of different values approximately or exactly.
template <typename T, typename Data>
class AggregateFunctionUniq final
: public IAggregateFunctionDataHelper<Data, AggregateFunctionUniq<T, Data>> {
public:
AggregateFunctionUniq(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionUniq<T, Data>>(argument_types_,
{}) {}

String getName() const override { return Data::getName(); }

DataTypePtr getReturnType() const override { return std::make_shared<DataTypeUInt64>(); }

void add(AggregateDataPtr place, const IColumn** columns, size_t row_num,
Arena*) const override {
detail::OneAdder<T, Data>::add(this->data(place), *columns[0], row_num);
}

void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*) const override {
this->data(place).set.merge(this->data(rhs).set);
}

void serialize(ConstAggregateDataPtr place, std::ostream& buf) const override {
// this->data(place).set.write(buf);
}

void deserialize(AggregateDataPtr place, std::istream& buf, Arena*) const override {
// this->data(place).set.read(buf);
}

void insertResultInto(ConstAggregateDataPtr place, IColumn& to) const override {
assert_cast<ColumnUInt64&>(to).getData().push_back(this->data(place).set.size());
}

const char* getHeaderFilePath() const override { return __FILE__; }
};

} // namespace doris::vectorized
95 changes: 95 additions & 0 deletions be/src/vec/common/hash_table/hash_set.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#pragma once

#include <vec/common/hash_table/hash.h>
#include <vec/common/hash_table/hash_table.h>
#include <vec/common/hash_table/hash_table_allocator.h>

// #include <IO/WriteBuffer.h>
// #include <IO/WriteHelpers.h>
// #include <IO/ReadBuffer.h>
// #include <IO/ReadHelpers.h>
// #include <IO/VarInt.h>

/** NOTE HashSet could only be used for memmoveable (position independent) types.
* Example: std::string is not position independent in libstdc++ with C++11 ABI or in libc++.
* Also, key must be of type, that zero bytes is compared equals to zero key.
*/

template <typename Key, typename TCell, typename Hash = DefaultHash<Key>,
typename Grower = HashTableGrower<>, typename Allocator = HashTableAllocator>
class HashSetTable : public HashTable<Key, TCell, Hash, Grower, Allocator> {
public:
using Self = HashSetTable;
using Cell = TCell;

using Base = HashTable<Key, TCell, Hash, Grower, Allocator>;
using typename Base::LookupResult;

void merge(const Self& rhs) {
if (!this->hasZero() && rhs.hasZero()) {
this->setHasZero();
++this->m_size;
}

for (size_t i = 0; i < rhs.grower.bufSize(); ++i)
if (!rhs.buf[i].isZero(*this)) this->insert(Cell::getKey(rhs.buf[i].getValue()));
}

// void readAndMerge(DB::ReadBuffer & rb)
// {
// Cell::State::read(rb);

// size_t new_size = 0;
// DB::readVarUInt(new_size, rb);

// this->resize(new_size);

// for (size_t i = 0; i < new_size; ++i)
// {
// Cell x;
// x.read(rb);
// this->insert(Cell::getKey(x.getValue()));
// }
// }
};

template <typename Key, typename Hash, typename TState = HashTableNoState>
struct HashSetCellWithSavedHash : public HashTableCell<Key, Hash, TState> {
using Base = HashTableCell<Key, Hash, TState>;

size_t saved_hash;

HashSetCellWithSavedHash() : Base() {}
HashSetCellWithSavedHash(const Key& key_, const typename Base::State& state)
: Base(key_, state) {}

bool keyEquals(const Key& key_) const { return this->key == key_; }
bool keyEquals(const Key& key_, size_t hash_) const {
return saved_hash == hash_ && this->key == key_;
}
bool keyEquals(const Key& key_, size_t hash_, const typename Base::State&) const {
return keyEquals(key_, hash_);
}

void setHash(size_t hash_value) { saved_hash = hash_value; }
size_t getHash(const Hash& /*hash_function*/) const { return saved_hash; }
};

template <typename Key, typename Hash, typename State>
ALWAYS_INLINE inline auto lookupResultGetKey(HashSetCellWithSavedHash<Key, Hash, State>* cell) {
return &cell->key;
}

template <typename Key, typename Hash, typename State>
ALWAYS_INLINE inline void* lookupResultGetMapped(HashSetCellWithSavedHash<Key, Hash, State>*) {
return nullptr;
}

template <typename Key, typename Hash = DefaultHash<Key>, typename Grower = HashTableGrower<>,
typename Allocator = HashTableAllocator>
using HashSet = HashSetTable<Key, HashTableCell<Key, Hash>, Hash, Grower, Allocator>;

template <typename Key, typename Hash = DefaultHash<Key>, typename Grower = HashTableGrower<>,
typename Allocator = HashTableAllocator>
using HashSetWithSavedHash =
HashSetTable<Key, HashSetCellWithSavedHash<Key, Hash>, Hash, Grower, Allocator>;
Loading

0 comments on commit 746e8e2

Please sign in to comment.