Skip to content

Commit

Permalink
physicalplan: add support for multi-stage execution of corr, covar_samp,
Browse files Browse the repository at this point in the history
sqrdiff, and regr_count aggregate functions.

Fixes #58347.

Release note (performance improvement): corr, covar_samp, sqrdiff, and
regr_count aggregate functions are now evaluated more efficiently in a
distributed setting
  • Loading branch information
mneverov authored and yuzefovich committed Feb 25, 2022
1 parent 0679c4c commit 6fb88ef
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 79 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/distsql/columnar_operators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ var aggregateFuncToNumArguments = map[execinfrapb.AggregatorSpec_Func]int{
execinfrapb.FinalRegrIntercept: 1,
execinfrapb.FinalRegrR2: 1,
execinfrapb.FinalRegrSlope: 1,
execinfrapb.FinalCovarSamp: 1,
execinfrapb.FinalCorr: 1,
execinfrapb.FinalSqrdiff: 3,
}

// TestAggregateFuncToNumArguments ensures that all aggregate functions are
Expand Down
10 changes: 9 additions & 1 deletion pkg/sql/execinfra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
//
// ATTENTION: When updating these fields, add a brief description of what
// changed to the version history below.
const Version execinfrapb.DistSQLVersion = 63
const Version execinfrapb.DistSQLVersion = 64

// MinAcceptedVersion is the oldest version that the server is compatible with.
// A server will not accept flows with older versions.
Expand All @@ -76,6 +76,14 @@ const MinAcceptedVersion execinfrapb.DistSQLVersion = 63
Please add new entries at the top.
- Version: 64 (MinAcceptedVersion: 63)
- final_covar_samp, final_corr, and final_sqrdiff aggregate functions were
introduced to support local and final aggregation of the corresponding
builtin functions. It would be unrecognized by a server running older
versions, hence the version bump.
However, a server running v64 can still process all plans from servers
running v63, thus the MinAcceptedVersion is kept at 63.
- Version: 63 (MinAcceptedVersion: 63):
- Changed JoinReaderSpec to use a descpb.IndexFetchSpec and a list of family
IDs instead of table and index descriptors.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfrapb/aggregate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,7 @@ const (
FinalRegrIntercept = AggregatorSpec_FINAL_REGR_INTERCEPT
FinalRegrR2 = AggregatorSpec_FINAL_REGR_R2
FinalRegrSlope = AggregatorSpec_FINAL_REGR_SLOPE
FinalCovarSamp = AggregatorSpec_FINAL_COVAR_SAMP
FinalCorr = AggregatorSpec_FINAL_CORR
FinalSqrdiff = AggregatorSpec_FINAL_SQRDIFF
)
3 changes: 3 additions & 0 deletions pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,9 @@ message AggregatorSpec {
FINAL_REGR_INTERCEPT = 55;
FINAL_REGR_R2 = 56;
FINAL_REGR_SLOPE = 57;
FINAL_COVAR_SAMP = 58;
FINAL_CORR = 59;
FINAL_SQRDIFF = 60;
}

enum Type {
Expand Down
178 changes: 147 additions & 31 deletions pkg/sql/logictest/testdata/logic_test/aggregate
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,7 @@ NULL NULL NULL
statement OK
TRUNCATE statistics_agg_test

subtest covariance
subtest covar_pop

statement OK
INSERT INTO statistics_agg_test (y, x, int_y, int_x, dy, dx)
Expand All @@ -1516,46 +1516,132 @@ VALUES (0.0, 0.09561, 1, 10, 0.0, 0.09561),
(100.0, 99.097, 4, 100, 100.0, 99.097),
(NULL, NULL, NULL, NULL, NULL, NULL);

query FFFFFFFFFF
SELECT covar_pop(y, x), covar_pop(int_y, int_x), covar_pop(y, int_x), covar_pop(int_y, x), round(covar_pop(dy, dx), 7),
covar_samp(y, x), covar_samp(int_y, int_x), covar_samp(y, int_x), covar_samp(int_y, x), round(covar_samp(dy, dx), 6)
query FFFFF
SELECT covar_pop(y, x), covar_pop(int_y, int_x), covar_pop(y, int_x), covar_pop(int_y, x), round(covar_pop(dy, dx), 7)
FROM statistics_agg_test
----
-149.7003372 33 1100.4 -25.336322 -149.7003372 -166.333708 36.6666666666667 1222.66666666667 -28.1514688888889 -166.333708
-149.7003372 33 1100.4 -25.336322 -149.7003372

query FFFFFFFF
SELECT covar_pop(y, dx), covar_pop(int_y, dx), covar_pop(dy, int_x), covar_pop(dy, x),
covar_samp(y, dx), covar_samp(int_y, dx), covar_samp(dy, int_x), covar_samp(dy, x)
query FFFF
SELECT covar_pop(y, dx), covar_pop(int_y, dx), covar_pop(dy, int_x), covar_pop(dy, x)
FROM statistics_agg_test
----
-149.7003372 -25.336322 1100.4 -149.7003372 -166.333708 -28.1514688888889 1222.66666666667 -166.333708
-149.7003372 -25.336322 1100.4 -149.7003372

query FF
SELECT covar_pop(DISTINCT y, x), covar_samp(DISTINCT y, x)
query F
SELECT covar_pop(DISTINCT y, x)
FROM statistics_agg_test
----
653.62895125 871.505268333333
653.62895125

query FF
SELECT CAST(covar_pop(DISTINCT y, x) FILTER (WHERE x > 3 AND y < 100) AS decimal),
CAST(covar_samp(DISTINCT y, x) FILTER (WHERE x > 3 AND y < 100) AS decimal)
query F
SELECT CAST(covar_pop(DISTINCT y, x) FILTER (WHERE x > 3 AND y < 100) AS decimal)
FROM statistics_agg_test
----
-1109.4299999999998 -2218.8599999999997
-1109.4299999999998

query error pq: unknown signature: covar_pop\(string, string\)
SELECT covar_pop(y::string, x::string) FROM statistics_agg_test

query error pq: unknown signature: covar_samp\(string, string\)
SELECT covar_samp(y::string, x::string) FROM statistics_agg_test

statement OK
INSERT INTO statistics_agg_test (y, x, int_y, int_x) VALUES
(1.797693134862315708145274237317043567981e+308, 0, 0, 0)

query error float out of range
SELECT covar_pop(y, x), covar_pop(int_y, int_x) FROM statistics_agg_test

statement OK
TRUNCATE statistics_agg_test

statement OK
INSERT INTO statistics_agg_test (y, x, int_y, int_x, dy, dx) VALUES
(1.0, 10.0, 1, 10, 1.0, 10.0),
(2.0, 20.0, 2, 20, 2.0, 20.0)

query RRR
SELECT covar_pop(y, x), covar_pop(int_y, int_x), covar_pop(dy, dx)
FROM statistics_agg_test
----
2.5 2.5 2.5

statement OK
TRUNCATE statistics_agg_test

statement OK
INSERT INTO statistics_agg_test (y, x, int_y, int_x, dy, dx) VALUES
(1.0, 10.0, 1, 10, 1.0, 10.0),
(2.0, -20.0, 2, -20, 2.0, -20.0)

query RRR
SELECT covar_pop(y, x), covar_pop(int_y, int_x), covar_pop(dy, dx)
FROM statistics_agg_test
----
-7.5 -7.5 -7.5

statement OK
TRUNCATE statistics_agg_test

statement OK
INSERT INTO statistics_agg_test (y, x, int_y, int_x, dy, dx) VALUES
(1.0, -1.0, 1, -1, 1.0, -1.0),
(1.0, 1.0, 1, 1, 1.0, 1.0)

query RRR
SELECT covar_pop(y, x), covar_pop(int_y, int_x), covar_pop(dy, dx)
FROM statistics_agg_test
----
0 0 0

statement OK
TRUNCATE statistics_agg_test

subtest covar_samp

statement OK
INSERT INTO statistics_agg_test (y, x, int_y, int_x, dy, dx)
VALUES (0.0, 0.09561, 1, 10, 0.0, 0.09561),
(42.0, 324.78, 2, 25, 42.0, 324.78),
(42.0, 324.78, 2, 25, 42.0, 324.78),
(56.0, 7.8, 3, 40, 56.0, 7.8),
(56.0, 7.8, 3, 40, 56.0, 7.8),
(56.0, 7.8, 3, 40, 56.0, 7.8),
(100.0, 99.097, 4, 100, 100.0, 99.097),
(100.0, 99.097, 4, 100, 100.0, 99.097),
(100.0, 99.097, 4, 100, 100.0, 99.097),
(100.0, 99.097, 4, 100, 100.0, 99.097),
(NULL, NULL, NULL, NULL, NULL, NULL);

query FFFFF
SELECT covar_samp(y, x), covar_samp(int_y, int_x), covar_samp(y, int_x), covar_samp(int_y, x), round(covar_samp(dy, dx), 6)
FROM statistics_agg_test
----
-166.333708 36.6666666666667 1222.66666666667 -28.1514688888889 -166.333708

query FFFF
SELECT covar_samp(y, dx), covar_samp(int_y, dx), covar_samp(dy, int_x), covar_samp(dy, x)
FROM statistics_agg_test
----
-166.333708 -28.1514688888889 1222.66666666667 -166.333708

query F
SELECT covar_samp(DISTINCT y, x)
FROM statistics_agg_test
----
871.505268333333

query F
SELECT CAST(covar_samp(DISTINCT y, x) FILTER (WHERE x > 3 AND y < 100) AS decimal)
FROM statistics_agg_test
----
-2218.8599999999997

query error pq: unknown signature: covar_samp\(string, string\)
SELECT covar_samp(y::string, x::string) FROM statistics_agg_test

statement OK
INSERT INTO statistics_agg_test (y, x, int_y, int_x) VALUES
(1.797693134862315708145274237317043567981e+308, 0, 0, 0)

query error float out of range
SELECT covar_samp(y, x), covar_samp(int_y, int_x) FROM statistics_agg_test

Expand All @@ -1567,12 +1653,11 @@ INSERT INTO statistics_agg_test (y, x, int_y, int_x, dy, dx) VALUES
(1.0, 10.0, 1, 10, 1.0, 10.0),
(2.0, 20.0, 2, 20, 2.0, 20.0)

query RRRRRR
SELECT covar_pop(y, x), covar_pop(int_y, int_x), covar_pop(dy, dx),
covar_samp(y, x), covar_samp(int_y, int_x), covar_samp(dy, dx)
query RRR
SELECT covar_samp(y, x), covar_samp(int_y, int_x), covar_samp(dy, dx)
FROM statistics_agg_test
----
2.5 2.5 2.5 5 5 5
5 5 5

statement OK
TRUNCATE statistics_agg_test
Expand All @@ -1582,12 +1667,11 @@ INSERT INTO statistics_agg_test (y, x, int_y, int_x, dy, dx) VALUES
(1.0, 10.0, 1, 10, 1.0, 10.0),
(2.0, -20.0, 2, -20, 2.0, -20.0)

query RRRRRR
SELECT covar_pop(y, x), covar_pop(int_y, int_x), covar_pop(dy, dx),
covar_samp(y, x), covar_samp(int_y, int_x), covar_samp(dy, dx)
query RRR
SELECT covar_samp(y, x), covar_samp(int_y, int_x), covar_samp(dy, dx)
FROM statistics_agg_test
----
-7.5 -7.5 -7.5 -15 -15 -15
-15 -15 -15

statement OK
TRUNCATE statistics_agg_test
Expand All @@ -1597,12 +1681,11 @@ INSERT INTO statistics_agg_test (y, x, int_y, int_x, dy, dx) VALUES
(1.0, -1.0, 1, -1, 1.0, -1.0),
(1.0, 1.0, 1, 1, 1.0, 1.0)

query RRRRRR
SELECT covar_pop(y, x), covar_pop(int_y, int_x), covar_pop(dy, dx),
covar_samp(y, x), covar_samp(int_y, int_x), covar_samp(dy, dx)
query RRR
SELECT covar_samp(y, x), covar_samp(int_y, int_x), covar_samp(dy, dx)
FROM statistics_agg_test
----
0 0 0 0 0 0
0 0 0

statement OK
TRUNCATE statistics_agg_test
Expand Down Expand Up @@ -3668,3 +3751,36 @@ query T
SELECT percentile_disc(0.95) WITHIN GROUP (ORDER BY current_database()) FROM osagg
----
test

subtest corrupt_combine

statement OK
CREATE TABLE corrupt_combine (
y float,
x float
)

statement OK
INSERT INTO corrupt_combine (y, x) VALUES
(1.0, 10.0),
(2.0, 25.0),
(3.0, 35.0),
(4.0, 50.0),
(5.0, 70.0),
(6.0, 70.0)

# PR #73062 introduced a bug that caused
# finalRegressionAccumulatorDecimalBase.combine function to corrupt values in
# regressionAccumulatorDecimalBase that are used across iterations (n, sx, sxx,
# sy, syy, sxy). Depending on the order of two local accumulators the result
# from the second accumulator could be directly assigned to the mentioned fields
# when "this.n == 0". In this case two or more functions in the bucket shared
# the same values and repeated the calculation
# (see aggregator.accumulateRowIntoBucket).
# This test checks that multiple aggregate functions in the same bucket preserve
# their values across multiple "combine".

query FFFF
select covar_pop(y, x), covar_samp(y, x), regr_sxx(y, x), regr_syy(y, x) from corrupt_combine
----
37.5 45 2983.333333333333 17.5
17 changes: 11 additions & 6 deletions pkg/sql/logictest/testdata/logic_test/distsql_agg
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,10 @@ SELECT regr_sxx(y, x), regr_sxy(y, x), regr_syy(y, x) FROM statistics_agg_test
----
825 375 83325

query I
SELECT regr_count(y, x) FROM statistics_agg_test
query IF
SELECT regr_count(y, x), sqrdiff(y) FROM statistics_agg_test
----
100
100 83325

query FF
SELECT regr_avgx(y, x), regr_avgy(y, x) FROM statistics_agg_test
Expand All @@ -617,10 +617,10 @@ statement ok
ALTER TABLE statistics_agg_test EXPERIMENTAL_RELOCATE
SELECT ARRAY[i%5+1], i FROM generate_series(0, 9) AS g(i)

query F
SELECT covar_pop(y, x)::decimal FROM statistics_agg_test
query FFF
SELECT corr(y, x)::decimal, covar_pop(y, x)::decimal, covar_samp(y, x)::decimal FROM statistics_agg_test
----
3.75
0.045228963191363145 3.75 3.787878787878788

query FFF
SELECT regr_intercept(y, x), regr_r2(y, x), regr_slope(y, x) FROM statistics_agg_test
Expand All @@ -632,6 +632,11 @@ SELECT regr_sxx(y, x), regr_sxy(y, x), regr_syy(y, x) FROM statistics_agg_test
----
825 375 83325

query IF
SELECT regr_count(y, x), sqrdiff(y) FROM statistics_agg_test
----
100 83325

query FF
SELECT regr_avgx(y, x), regr_avgy(y, x) FROM statistics_agg_test
----
Expand Down
Loading

0 comments on commit 6fb88ef

Please sign in to comment.