-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend the simple UDAF interface with function-level states #9167
base: main
Are you sure you want to change the base?
Changes from 7 commits
34e7e42
6fbe730
3906ab7
75a908b
cb89b74
a40de62
9072f7c
28855c3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -152,13 +152,31 @@ A simple aggregation function is implemented as a class as the following. | |||||
using IntermediateType = Array<Generic<T1>>; | ||||||
using OutputType = Array<Generic<T1>>; | ||||||
|
||||||
// If UDAF does not require the use of FunctionState, it is necessary | ||||||
// to declare an empty FunctionState struct. | ||||||
struct FunctionState { | ||||||
// Optional. | ||||||
TypePtr resultType; | ||||||
}; | ||||||
|
||||||
// Optional. Used only when the UDAF needs to use FunctionState. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add: This method is called once when the aggregation function is created. |
||||||
static void initialize( | ||||||
core::AggregationNode::Step step, | ||||||
FunctionState& state, | ||||||
const std::vector<TypePtr>& rawInputTypes, | ||||||
const TypePtr& resultType, | ||||||
const std::vector<VectorPtr>& constantInputs, | ||||||
std::optional<core::AggregationNode::Step> companionStep) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you document all the parameters? What is 'companionStep'? It seems strange that we have such a parameter as function implementations should be agnostic to whether they are used as "regular" or a "companion" function. CC: @kagamiori There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added document for all parameters in the definition of For the partial companion function, we need to know that its companion step is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @liujiayi771 Thank you for updating PR description. It sounds like we want to allow stateful simple aggregate functions. This makes sense, but I wonder if we can make it work similar to Simple Function API for scalar functions. There, function author defines a struct with call-once initialize and call-per-row call methods. The author is then free to add member variables to hold state and initialize it however they want from 'initialize'. Would it make sense to follow this pattern for aggregate functions as well?
Can we expose this?
Would you clarify what is "heavy computation" done by approx_most_frequent to help readers understand a bit better?
I wonder if a more accurate term would be "per-instance state". There is only one function, foo, but there are many instances of 'foo' and each has its own state, no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @mbasmanova,
The SimpleAggregateAdapter currently doesn't hold an instance of the user-defined simple UDAF class (i.e., it only creates instances of the AccumulatorType struct inside the UDAF class). We can change SimpleAggregateAdapter to hold an instance of the UDAF class if we want to allow authors to freely access member variables in it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This would be nice. Thanks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @kagamiori, I will try to understand this method and see how it can be modified. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @liujiayi771, sorry for the delay. Here is a code pointer of how the simple scalar function interface calls the initialize() method:
The velox/velox/functions/lib/Re2Functions.h Line 262 in ffc28ac
What @mbasmanova suggested is that we can do it similar in the SimpleAggregateAdapter so that the UDAF authors doesn't have to keep function-level states. Specifically, below is what I’m thinking:
I’ll try to make a prototype to see if this works. Let’s discuss and review this design before coding in #8711 first. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your reply @kagamiori. Let's discuss in #8711 further once the prototype has been validated for feasibility. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @kagamiori. Do you have time to make a prototype for this design. cc @rui-mo. |
||||||
state.resultType = resultType; | ||||||
} | ||||||
|
||||||
// Optional. Default is true. | ||||||
static constexpr bool default_null_behavior_ = false; | ||||||
|
||||||
// Optional. | ||||||
static bool toIntermediate( | ||||||
exec::out_type<Array<Generic<T1>>>& out, | ||||||
exec::optional_arg_type<Generic<T1>> in); | ||||||
exec::out_type<Array<Generic<T1>>>& out, | ||||||
exec::optional_arg_type<Generic<T1>> in); | ||||||
|
||||||
struct AccumulatorType { ... }; | ||||||
}; | ||||||
|
@@ -169,6 +187,15 @@ function's argument type(s) wrapped in a Row<> even if the function only takes | |||||
one argument. This is needed for the SimpleAggregateAdapter to parse input | ||||||
types for arbitrary aggregation functions properly. | ||||||
|
||||||
A FunctionState struct needs to be declared in the simple aggregation function | ||||||
class, it is used to hold the function-level variables that are typically | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: ...in the simple aggregation function class. FunctionState is initialized once when the aggregation function is created and used at every row when adding inputs to accumulators or extracting values from accumulators... |
||||||
computed once and used at every row when adding inputs to accumulators or | ||||||
extracting values from accumulators. For example, if the UDAF needs to get the | ||||||
result type or the raw input type of the aggregaiton function, the author can | ||||||
hold them in the FunctionState struct, and initialize them in the initialize() | ||||||
method. If the UDAF does not require the use ofFunctionState, it is necessary | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Let's use aggregation function instead of UDAF in this documentation to be consistent with the existing content. ofFunctionState --> of FunctionState |
||||||
to declare an empty FunctionState struct. | ||||||
|
||||||
The author can define an optional flag `default_null_behavior_` indicating | ||||||
whether the aggregation function has default-null behavior. This flag is true | ||||||
by default. Next, the class can have an optional method `toIntermediate()` | ||||||
|
@@ -257,17 +284,21 @@ For aggregaiton functions of default-null behavior, the author defines an | |||||
// Optional. Default is false. | ||||||
static constexpr bool is_aligned_ = true; | ||||||
|
||||||
explicit AccumulatorType(HashStringAllocator* allocator); | ||||||
explicit AccumulatorType(HashStringAllocator* allocator, const FunctionState& state); | ||||||
|
||||||
void addInput(HashStringAllocator* allocator, exec::arg_type<T1> value1, ...); | ||||||
void addInput( | ||||||
HashStringAllocator* allocator, | ||||||
exec::arg_type<T1> value1, ..., | ||||||
const FunctionState& state); | ||||||
|
||||||
void combine( | ||||||
HashStringAllocator* allocator, | ||||||
exec::arg_type<IntermediateType> other); | ||||||
exec::arg_type<IntermediateType> other, | ||||||
const FunctionState& state); | ||||||
|
||||||
bool writeIntermediateResult(exec::out_type<IntermediateType>& out); | ||||||
bool writeIntermediateResult(exec::out_type<IntermediateType>& out, const FunctionState& state); | ||||||
|
||||||
bool writeFinalResult(exec::out_type<OutputType>& out); | ||||||
bool writeFinalResult(exec::out_type<OutputType>& out, const FunctionState& state); | ||||||
|
||||||
// Optional. Called during destruction. | ||||||
void destroy(HashStringAllocator* allocator); | ||||||
|
@@ -296,7 +327,8 @@ addInput | |||||
|
||||||
This method adds raw input values to *this* accumulator. It receives a | ||||||
`HashStringAllocator*` followed by `exec::arg_type<T1>`-typed values, one for | ||||||
each argument type `Ti` wrapped in InputType. | ||||||
each argument type `Ti` wrapped in InputType. `const FunctionState&` hold the | ||||||
function-level variables. | ||||||
|
||||||
With default-null behavior, raw-input rows where at least one column is null are | ||||||
ignored before `addInput` is called. After `addInput` is called, *this* | ||||||
|
@@ -306,31 +338,32 @@ combine | |||||
""""""" | ||||||
|
||||||
This method adds an input intermediate state to *this* accumulator. It receives | ||||||
a `HashStringAllocator*` and one `exec::arg_type<IntermediateType>` value. With | ||||||
default-null behavior, nulls among the input intermediate states are ignored | ||||||
before `combine` is called. After `combine` is called, *this* accumulator is | ||||||
assumed to be non-null. | ||||||
a `HashStringAllocator*` and one `exec::arg_type<IntermediateType>` value. | ||||||
`const FunctionState&` hold the function-level variables. With default-null | ||||||
behavior, nulls among the input intermediate states are ignored before `combine` | ||||||
is called. After `combine` is called, *this* accumulator is assumed to be non-null. | ||||||
|
||||||
writeIntermediateResult | ||||||
""""""""""""""""""""""" | ||||||
|
||||||
This method writes *this* accumulator out to an intermediate state vector. It | ||||||
has an out-parameter of the type `exec::out_type<IntermediateType>&`. This | ||||||
method returns true if it writes a non-null value to `out`, or returns false | ||||||
meaning a null should be written to the intermediate state vector. Accumulators | ||||||
that are nulls (i.e., no value has been added to them) automatically become | ||||||
nulls in the intermediate state vector without `writeIntermediateResult` being | ||||||
called. | ||||||
has an out-parameter of the type `exec::out_type<IntermediateType>&`. | ||||||
`const FunctionState&` hold the function-level variables. This method returns | ||||||
true if it writes a non-null value to `out`, or returns false meaning a null | ||||||
should be written to the intermediate state vector. Accumulators that are | ||||||
nulls (i.e., no value has been added to them) automatically become nulls in | ||||||
the intermediate state vector without `writeIntermediateResult` being called. | ||||||
|
||||||
writeFinalResult | ||||||
"""""""""""""""" | ||||||
|
||||||
This method writes *this* accumulator out to a final result vector. It | ||||||
has an out-parameter of the type `exec::out_type<OutputType>&`. This | ||||||
method returns true if it writes a non-null value to `out`, or returns false | ||||||
meaning a null should be written to the final result vector. Accumulators | ||||||
that are nulls (i.e., no value has been added to them) automatically become | ||||||
nulls in the final result vector without `writeFinalResult` being called. | ||||||
has an out-parameter of the type `exec::out_type<OutputType>&`. | ||||||
`const FunctionState&` hold the function-level variables. This method returns | ||||||
true if it writes a non-null value to `out`, or returns false meaning a null | ||||||
should be written to the final result vector. Accumulators that are | ||||||
nulls (i.e., no value has been added to them) automatically become nulls in the | ||||||
final result vector without `writeFinalResult` being called. | ||||||
|
||||||
AccumulatorType of Non-Default-Null Behavior | ||||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||||||
|
@@ -355,15 +388,25 @@ For aggregaiton functions of non-default-null behavior, the author defines an | |||||
|
||||||
explicit AccumulatorType(HashStringAllocator* allocator); | ||||||
|
||||||
bool addInput(HashStringAllocator* allocator, exec::optional_arg_type<T1> value1, ...); | ||||||
bool addInput( | ||||||
HashStringAllocator* allocator, | ||||||
exec::optional_arg_type<T1> value1, ..., | ||||||
const FunctionState& state); | ||||||
|
||||||
bool combine( | ||||||
HashStringAllocator* allocator, | ||||||
exec::optional_arg_type<IntermediateType> other); | ||||||
exec::optional_arg_type<IntermediateType> other, | ||||||
const FunctionState& state); | ||||||
|
||||||
bool writeIntermediateResult(bool nonNullGroup, exec::out_type<IntermediateType>& out); | ||||||
bool writeIntermediateResult( | ||||||
bool nonNullGroup, | ||||||
exec::out_type<IntermediateType>& out, | ||||||
const FunctionState& state); | ||||||
|
||||||
bool writeFinalResult(bool nonNullGroup, exec::out_type<OutputType>& out); | ||||||
bool writeFinalResult( | ||||||
bool nonNullGroup, | ||||||
exec::out_type<OutputType>& out, | ||||||
const FunctionState& state); | ||||||
|
||||||
// Optional. | ||||||
void destroy(HashStringAllocator* allocator); | ||||||
|
@@ -384,7 +427,7 @@ addInput | |||||
|
||||||
This method receives a `HashStringAllocator*` followed by | ||||||
`exec::optional_arg_type<T1>` values, one for each argument type `Ti` wrapped | ||||||
in InputType. | ||||||
in InputType. `const FunctionState&` hold the function-level variables. | ||||||
|
||||||
This method is called on all raw-input rows even if some columns may be null. | ||||||
It returns a boolean meaning whether *this* accumulator is non-null after the | ||||||
|
@@ -397,26 +440,29 @@ combine | |||||
""""""" | ||||||
|
||||||
This method receives a `HashStringAllocator*` and an | ||||||
`exec::optional_arg_type<IntermediateType>` value. This method is called on | ||||||
all intermediate states even if some are nulls. Same as `addInput`, this method | ||||||
returns a boolean meaning whether *this* accumulator is non-null after the call. | ||||||
`exec::optional_arg_type<IntermediateType>` value. `const FunctionState&` hold | ||||||
the function-level variables.This method is called on all intermediate states | ||||||
even if some are nulls. Same as `addInput`, this method returns a boolean | ||||||
meaning whether *this* accumulator is non-null after the call. | ||||||
|
||||||
writeIntermediateResult | ||||||
""""""""""""""""""""""" | ||||||
|
||||||
This method has an out-parameter of the type `exec::out_type<IntermediateType>&` | ||||||
and a boolean flag `nonNullGroup` indicating whether *this* accumulator is | ||||||
non-null. This method returns true if it writes a non-null value to `out`, or | ||||||
return false meaning a null should be written to the intermediate state vector. | ||||||
non-null. `const FunctionState&` hold the function-level variables. This method | ||||||
returns true if it writes a non-null value to `out`, or return false meaning a | ||||||
null should be written to the intermediate state vector. | ||||||
|
||||||
writeFinalResult | ||||||
"""""""""""""""" | ||||||
|
||||||
This method writes *this* accumulator out to a final result vector. It has an | ||||||
out-parameter of the type `exec::out_type<OutputType>&` and a boolean flag | ||||||
`nonNullGroup` indicating whether *this* accumulator is non-null. This method | ||||||
returns true if it writes a non-null value to `out`, or return false meaning a | ||||||
null should be written to the final result vector. | ||||||
`nonNullGroup` indicating whether *this* accumulator is non-null. | ||||||
`const FunctionState&` hold the function-level variables.This method returns | ||||||
true if it writes a non-null value to `out`, or return false meaning a null | ||||||
should be written to the final result vector. | ||||||
|
||||||
Limitations | ||||||
^^^^^^^^^^^ | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -129,6 +129,27 @@ class Aggregate { | |
rowSizeOffset); | ||
} | ||
|
||
// Initialize the function-level state of the simple function interface for | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... this API should not be aware of Simple Function Interface... looks like there might be some leak in the design. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @mbasmanova, thank you for the feedback. I saw that for simple scalar functions, we call initialize() in the constructor of SimpleFunctionAdapter. It can do this because ExprCompiler passes constantInputs to the function factory. The Aggregate::create() and aggregation function factory currently do not receive constantInputs as an argument. What about we pass constantInputs to them and move the call of initialize() into the constructor of SimpleAggregateAdapter? We'll have to pass constantInputs to all aggregation function factories though, since we cannot tell simple UDAFs from regular UDAFs apart in the function registry. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That sounds good. Thanks. |
||
// UDAF. | ||
// @param step The aggregation step. | ||
// @param rawInputType The raw input type of the UDAF. | ||
// @param resultType The result type of the current aggregation step. | ||
// @param constantInputs Optional constant input values for aggregate | ||
// function. constantInputs should be empty if there are no constant inputs, | ||
// aligned with inputTypes if there is at least one constant input, with | ||
// non-constant inputs represented as nullptr, and must be instances of | ||
// ConstantVector. | ||
// @param companionStep The step used to register aggregate companion | ||
// functions. kPartial for partial companion function, kIntermediate for merge | ||
// and merge extract companion function. | ||
virtual void initialize( | ||
core::AggregationNode::Step step, | ||
const std::vector<TypePtr>& rawInputType, | ||
const TypePtr& resultType, | ||
const std::vector<VectorPtr>& constantInputs, | ||
std::optional<core::AggregationNode::Step> companionStep = std::nullopt) { | ||
} | ||
|
||
// Initializes null flags and accumulators for newly encountered groups. This | ||
// function should be called only once for each group. | ||
// | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -124,13 +124,41 @@ void AggregateCompanionFunctionBase::extractAccumulators( | |
fn_->extractAccumulators(groups, numGroups, result); | ||
} | ||
|
||
void AggregateCompanionAdapter::PartialFunction::initialize( | ||
core::AggregationNode::Step step, | ||
const std::vector<TypePtr>& rawInputType, | ||
const facebook::velox::TypePtr& resultType, | ||
const std::vector<VectorPtr>& constantInputs, | ||
std::optional<core::AggregationNode::Step> /*companionStep*/) { | ||
fn_->initialize( | ||
step, | ||
rawInputType, | ||
resultType, | ||
constantInputs, | ||
core::AggregationNode::Step::kPartial); | ||
} | ||
|
||
void AggregateCompanionAdapter::PartialFunction::extractValues( | ||
char** groups, | ||
int32_t numGroups, | ||
VectorPtr* result) { | ||
fn_->extractAccumulators(groups, numGroups, result); | ||
} | ||
|
||
void AggregateCompanionAdapter::MergeFunction::initialize( | ||
core::AggregationNode::Step step, | ||
const std::vector<TypePtr>& rawInputType, | ||
const facebook::velox::TypePtr& resultType, | ||
const std::vector<VectorPtr>& constantInputs, | ||
std::optional<core::AggregationNode::Step> /*companionStep*/) { | ||
fn_->initialize( | ||
step, | ||
rawInputType, | ||
resultType, | ||
constantInputs, | ||
core::AggregationNode::Step::kIntermediate); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's also override the initialize() method of MergeExtractFunction to pass kFinal as the companion step. |
||
} | ||
|
||
void AggregateCompanionAdapter::MergeFunction::addRawInput( | ||
char** groups, | ||
const SelectivityVector& rows, | ||
|
@@ -156,6 +184,20 @@ void AggregateCompanionAdapter::MergeFunction::extractValues( | |
fn_->extractAccumulators(groups, numGroups, result); | ||
} | ||
|
||
void AggregateCompanionAdapter::MergeExtractFunction::initialize( | ||
core::AggregationNode::Step step, | ||
const std::vector<TypePtr>& rawInputType, | ||
const facebook::velox::TypePtr& resultType, | ||
const std::vector<VectorPtr>& constantInputs, | ||
std::optional<core::AggregationNode::Step> /*companionStep*/) { | ||
fn_->initialize( | ||
step, | ||
rawInputType, | ||
resultType, | ||
constantInputs, | ||
core::AggregationNode::Step::kFinal); | ||
} | ||
|
||
void AggregateCompanionAdapter::MergeExtractFunction::extractValues( | ||
char** groups, | ||
int32_t numGroups, | ||
|
@@ -229,6 +271,25 @@ void AggregateCompanionAdapter::ExtractFunction::apply( | |
// Perform per-row aggregation. | ||
std::vector<vector_size_t> allSelectedRange; | ||
rows.applyToSelected([&](auto row) { allSelectedRange.push_back(row); }); | ||
|
||
// Get the raw input types. | ||
std::vector<TypePtr> rawInputTypes{args.size()}; | ||
std::vector<VectorPtr> constantInputs{args.size()}; | ||
for (auto i = 0; i < args.size(); i++) { | ||
rawInputTypes[i] = args[i]->type(); | ||
if (args[i]->isConstantEncoding()) { | ||
constantInputs[i] = args[i]; | ||
} else { | ||
constantInputs[i] = nullptr; | ||
} | ||
} | ||
|
||
fn_->initialize( | ||
core::AggregationNode::Step::kFinal, | ||
rawInputTypes, | ||
outputType, | ||
constantInputs, | ||
core::AggregationNode::Step::kFinal); | ||
fn_->initializeNewGroups(groups, allSelectedRange); | ||
fn_->enableValidateIntermediateInputs(); | ||
fn_->addIntermediateResults(groups, rows, args, false); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Define a struct for function-level states. Even if the aggregation function doesn't use function-level states, it is still necessary to define an empty FunctionState struct.