-
Notifications
You must be signed in to change notification settings - Fork 7
aggregate
##Terminology:
-
Group: a set of unique column values, one for each column in the
GROUP BY clause.
- e.g. ("A", "a") and ("A", "b") are a query groups of the example
- If there is only one column in the GROUP BY clause, a group is equivalent to a unique value.
A high level implementation is as follows:
Initialize an empty output table T
Get all the unique values of each column in the GROUP BY clause
Iterate over all possible groups G:
Get filters for each unique column value in G
Perform a logical AND on all the filters
Apply the filter to the aggregate column
If the filtered aggregate column is nonempty:
Perform the aggregate on the filtered aggregate column.
Insert (G, aggregate) into T
Sort the columns of T with respect to the columns in the ORDER BY clause.
Consider the following example query:
SELECT R.group1, R.group2, sum(R.data) as sum_data
FROM R
GROUP BY R.group1, R.group2
R.key | R.group1 | R.group2 | R.data |
---|---|---|---|
0 | A | a | 1 |
1 | A | a | 10 |
2 | B | b | 100 |
- Get the unique values of R.group1 and R.group2
R.group1 unique values |
---|
A |
B |
R.group2 unique values |
---|
a |
b |
- Iterate over all possible combinations of these unique values: (A, a ), (A, b), (B, a), (b, b). First, we consider (A, a)
- Get a filter for R.group1 = A and a filter R.group2 = a.
R.group1 = A |
---|
1 |
1 |
0 |
R.group2 = a |
---|
1 |
1 |
0 |
- AND the two filters. Now, you have a filter corresponding to all rows of R that are part of group (A, a).
R.group1 = A and R.group2 = a |
---|
1 |
1 |
0 |
- Apply the filter to the R.data
R.data |
---|
1 |
10 |
- Since the filtered column is nonempty, we perform the aggregate over the filtered column and insert the aggregate and its corresponding group into the output table.
R.group1 | R.group2 | sum_data |
---|---|---|
A | a | 11 |
- Now we consider the next query group (A, b). Get the filters for R.group1 = A and a R.group2 = b.
R.group1 = A |
---|
1 |
1 |
0 |
R.group2 = b |
---|
0 |
0 |
0 |
- AND the two filters. Now, you have a filter corresponding to all rows of R that are part of group (A, b).
R.group1 = A and R.group2 = b |
---|
0 |
0 |
0 |
- Apply the filter to the R.data
R.data |
---|
-
Since the filtered column is empty, we skip to the next query group.
-
The remaining groups for us to process are (B, a) and (B, b). Following the same procedure, we find that there are no tuples in group (B, a) and only one tuple in query group (B, b). We output the following table:
R.group1 | R.group2 | sum_data |
---|---|---|
A | a | 11 |
B | b | 100 |
When the scheduler runs the Task that executes the Aggregate operator, this task spawns two additional tasks:
- run
compute_aggregate()
to compute the aggregate for each group - run
sort()
to the aggregates with respect to the columns in the ORDER BY clause
The second task does not run until the first task finishes. We now consider the first task.
One would think to loop over all possible groups using a nested for loop over the unique values of each GROUP BY column. However, the operator must be able to handle an arbitrary number of GROUP BY columns, we need to implement a nested loop that has a dynamic depth.
The following code is a barebones implementation of a dynamic-depth nested loop.
int depth = 2 ; // initialize nested loop depth to something, e.g. 2
if (depth > 0) {
std::vector<int> group_id(depth); // group_id[i] is the loop variable for the for loop at depth i
int maxes[depth]; // maxes[i] is the number of iterations for the for loop at depth i
for (int i = 0; i < depth; i++) {
group_id[i] = 0; //
maxes[i] = i+2 ; // initialize maxes to something, e.g. {2, 3}
}
int index = depth - 1; // loop index
bool exit = false;
while (!exit) {
// LOOP BODY START
// An example loop body that prints group_id
for (int id : group_id) {
std::cout << id << " " << std::endl;
}
// LOOP BODY END
// Increment group_id
group_id[depth - 1]++;
while (group_id[index] == maxes[index]) {
if (index == 0) {
exit = true;
break;
}
group_id[index--] = 0;
group_id[index]++;
}
index = depth - 1;
}
}
This loop would print:
0 0
0 1
0 2
1 0
1 1
1 2
Note that with depth = 2
and maxes = {2, 3}
, the above code is equivalent to
for (int i=0; i<maxes[0]; i++) {
for (int j=0; j<maxes[1]; j++ {
std::cout << i << " " << j << std::endl;
}
}
In the context of the Aggregate operator, depth
equals the number of GROUP
BY columns, maxes[i]
equals the number of unique values in the ith
GROUP BY column, and the loop body would be the code that computes that
aggregate for a single group. However, we still want the loop body to
execute once even if there is no GROUP BY clause for the query, i.e. even if
depth = 0
. We can easily transform this loop into a do-while loop by
removing the if (depth > 0)
and adding if (depth == 0) break ;
immediately after the loop body:
int depth = 2 ; // initialize nested loop depth to something, e.g. 2
std::vector<int> group_id(depth); // group_id[i] is the loop variable for the for loop at depth i
int maxes[depth]; // maxes[i] is the number of iterations for the for loop at depth i
for (int i = 0; i < depth; i++) {
group_id[i] = 0; //
maxes[i] = i+2 ; // initialize maxes to something, e.g. {2, 3}
}
int index = depth - 1; // loop index
bool exit = false;
while (!exit) {
// LOOP BODY START
// An example loop body that prints group_id
for (int id : group_id) {
std::cout << id << " " << std::endl;
}
// LOOP BODY END
// if depth == 0, we have no GROUP BY clause and should exit after one iteration.
if (depth == 0) break;
// Increment group_id
group_id[depth - 1]++;
while (group_id[index] == maxes[index]) {
if (index == 0) {
exit = true;
break;
}
group_id[index--] = 0;
group_id[index]++;
}
index = depth - 1;
}
The actual loop body of the aggregate is here. As previously mentioned, each task computes the aggregate over one group. Let's look at the body of a single task:
auto group_filter = get_group_filter(group_id);
auto aggregate = compute_aggregate(kernel, aggregate_col, group_filter);
std::unique_lock<std::mutex> lock(builder_mutex_);
insert_group_aggregate(aggregate);
insert_group(group_id);
get_group_filter(group_id)
returns a filter corresponding to all rows of table
that are part of the group with this particular group_id
. Internally, it calls the
get_unique_value_filter(group_ref, value)
function for each group column,
which returns a filter corresponding to all rows of of a particular group column
(specified by group_ref
) that have a particular value
. Then, it performs a
logical AND on all of these filters.
compute_aggregate(kernel, aggregate_col, group_filter)
applies group_filter
to aggregate_col
and then computes the aggregate (specified by kernel
) over
the filtered column.
The last two function calls insert the group and aggregate values into builders, which are Arrow data structures that behave similarly to standard library vectors. Only one thread can insert into the builders at a time to guarantee that the group and its corresponding aggregate appear on the same row of the table.
NOTE: We can avoid the mutex if we pre-specify the index of the builder at which each group and aggregate should be stored. This guarantees that different threads will never write to the same memory location, but then it becomes difficult to exclude groups that have an empty filter, since we already reserved a space for that group in the builder. So we must first eliminate all "false" groups before we enter the loop body. But to do this, we must fetch all the group filters and check if any of them are empty, which of course requires us to iterate over all groups! In brief, we have the following tradeoff:
- Eliminate all "false" gorups in the output, but use a mutex.
- Get rid of the mutex, but we have to get rid off all "false" groups after we have already constructed the output table. This would require a memory copy to create a new table. The expense depends on how many groups are in the output table.