From 8068295691940c2f4efe3841e709eceb7a7bd14c Mon Sep 17 00:00:00 2001 From: Max Neverov Date: Fri, 4 Feb 2022 06:59:08 +0100 Subject: [PATCH] physicalplan: add support for multi-stage execution of regr_avgx, regr_avgy, regr_intercept, regr_r2, and regr_slope aggregate functions. See #58347. Release note (performance improvement): regr_avgx, regr_avgy, regr_intercept, regr_r2, and regr_slope aggregate functions are now evaluated more efficiently in a distributed setting --- pkg/sql/distsql/columnar_operators_test.go | 5 + pkg/sql/execinfra/version.go | 10 +- pkg/sql/execinfrapb/aggregate_funcs.go | 5 + pkg/sql/execinfrapb/processors_sql.proto | 5 + .../logictest/testdata/logic_test/distsql_agg | 10 + .../opt/exec/execbuilder/testdata/distsql_agg | 23 +- pkg/sql/physicalplan/aggregator_funcs.go | 50 ++++ pkg/sql/sem/builtins/aggregate_builtins.go | 264 +++++++++++++++--- 8 files changed, 318 insertions(+), 54 deletions(-) diff --git a/pkg/sql/distsql/columnar_operators_test.go b/pkg/sql/distsql/columnar_operators_test.go index 082239f2799b..86c160bb2e5b 100644 --- a/pkg/sql/distsql/columnar_operators_test.go +++ b/pkg/sql/distsql/columnar_operators_test.go @@ -93,6 +93,11 @@ var aggregateFuncToNumArguments = map[execinfrapb.AggregatorSpec_Func]int{ execinfrapb.FinalRegrSxx: 1, execinfrapb.FinalRegrSxy: 1, execinfrapb.FinalRegrSyy: 1, + execinfrapb.FinalRegrAvgx: 1, + execinfrapb.FinalRegrAvgy: 1, + execinfrapb.FinalRegrIntercept: 1, + execinfrapb.FinalRegrR2: 1, + execinfrapb.FinalRegrSlope: 1, } // TestAggregateFuncToNumArguments ensures that all aggregate functions are diff --git a/pkg/sql/execinfra/version.go b/pkg/sql/execinfra/version.go index 8fff24396d63..5fd80a774804 100644 --- a/pkg/sql/execinfra/version.go +++ b/pkg/sql/execinfra/version.go @@ -39,7 +39,7 @@ import "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" // // ATTENTION: When updating these fields, add a brief description of what // changed to the version history below. -const Version execinfrapb.DistSQLVersion = 60 +const Version execinfrapb.DistSQLVersion = 61 // MinAcceptedVersion is the oldest version that the server is compatible with. // A server will not accept flows with older versions. @@ -51,6 +51,14 @@ const MinAcceptedVersion execinfrapb.DistSQLVersion = 60 Please add new entries at the top. +- Version: 61 (MinAcceptedVersion: 60) + - final_regr_avgx, final_regr_avgy, final_regr_intercept, final_regr_r2, and + final_regr_slope aggregate functions were introduced to support local and + final aggregation of the corresponding builtin functions. It would be + unrecognized by a server running older versions, hence the version bump. + However, a server running v61 can still process all plans from servers + running v60, thus the MinAcceptedVersion is kept at 60. + - Version: 60 (MinAcceptedVersion: 60): - Deprecated ExportWriterSpec and ParquetWriterSpec and merged them into ExportSpec diff --git a/pkg/sql/execinfrapb/aggregate_funcs.go b/pkg/sql/execinfrapb/aggregate_funcs.go index 2a5a5ce8e23e..028e47cee91e 100644 --- a/pkg/sql/execinfrapb/aggregate_funcs.go +++ b/pkg/sql/execinfrapb/aggregate_funcs.go @@ -65,4 +65,9 @@ const ( FinalRegrSxx = AggregatorSpec_FINAL_REGR_SXX FinalRegrSxy = AggregatorSpec_FINAL_REGR_SXY FinalRegrSyy = AggregatorSpec_FINAL_REGR_SYY + FinalRegrAvgx = AggregatorSpec_FINAL_REGR_AVGX + FinalRegrAvgy = AggregatorSpec_FINAL_REGR_AVGY + FinalRegrIntercept = AggregatorSpec_FINAL_REGR_INTERCEPT + FinalRegrR2 = AggregatorSpec_FINAL_REGR_R2 + FinalRegrSlope = AggregatorSpec_FINAL_REGR_SLOPE ) diff --git a/pkg/sql/execinfrapb/processors_sql.proto b/pkg/sql/execinfrapb/processors_sql.proto index dc165a3477a1..4b68f42979d3 100644 --- a/pkg/sql/execinfrapb/processors_sql.proto +++ b/pkg/sql/execinfrapb/processors_sql.proto @@ -832,6 +832,11 @@ message AggregatorSpec { FINAL_REGR_SXX = 50; FINAL_REGR_SXY = 51; FINAL_REGR_SYY = 52; + FINAL_REGR_AVGX = 53; + FINAL_REGR_AVGY = 54; + FINAL_REGR_INTERCEPT = 55; + FINAL_REGR_R2 = 56; + FINAL_REGR_SLOPE = 57; } enum Type { diff --git a/pkg/sql/logictest/testdata/logic_test/distsql_agg b/pkg/sql/logictest/testdata/logic_test/distsql_agg index df7b5606ca1b..fd77754c2371 100644 --- a/pkg/sql/logictest/testdata/logic_test/distsql_agg +++ b/pkg/sql/logictest/testdata/logic_test/distsql_agg @@ -622,11 +622,21 @@ SELECT covar_pop(y, x)::decimal FROM statistics_agg_test ---- 3.75 +query FFF +SELECT regr_intercept(y, x), regr_r2(y, x), regr_slope(y, x) FROM statistics_agg_test +---- +48.4545454545455 0.00204565911136568 0.454545454545455 + query FFF SELECT regr_sxx(y, x), regr_sxy(y, x), regr_syy(y, x) FROM statistics_agg_test ---- 825 375 83325 +query FF +SELECT regr_avgx(y, x), regr_avgy(y, x) FROM statistics_agg_test +---- +4.5 50.5 + # Regression test for #37211 (incorrect ordering between aggregator stages). statement ok CREATE TABLE uv (u INT PRIMARY KEY, v INT); diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_agg b/pkg/sql/opt/exec/execbuilder/testdata/distsql_agg index daafb6f630f0..726c87cf59a0 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_agg +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_agg @@ -54,7 +54,12 @@ EXPLAIN (DISTSQL) SELECT covar_pop(a, b), regr_sxx(a, b), regr_sxy(a, b), - regr_syy(a, b) + regr_syy(a, b), + regr_avgx(a, b), + regr_avgy(a, b), + regr_intercept(a, b), + regr_r2(a, b), + regr_slope(a, b) FROM data GROUP BY b ---- distribution: full @@ -68,7 +73,7 @@ vectorized: true table: data@data_pkey spans: FULL SCAN · -Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzsmNFvujoUx9_vX9H0id3UaAHd5hP8NmdIHOwHbHfLzUKqNI5EqRdw2bLsf78pIoLbr9XhIw8b55R-T4_nWz8PfsD0vwUcwtHj3cS0bKBcW57v_Z6cAW80GV35YIoAeZ0r5AyBKWOLgMShEvBou8CSMp-xdZzlW5fkbfOM4vyZZmFIXythkJLlqpqvWJGul9tnEBXVXkkSkXhGt8lOzJOt8o0lAZnPlWD6ntG0bOdv3miUFY1HWZHmbefZjJVVEJieIZDQeRKkb297-Xs9fy9ycOM6tyAkGQFj17m_A7-ewBQiGLOQ2mRJUzj8F2KIoAoR1CCCOkSwD58RXCVsRtOUJXzLRy6wwjc47CEYxat1xpefEZyxhMLhB8yibEHhEPpkuqAuJSFNuj2IYEgzEi3yY3gXxiqJliR5hwhescV6GadDwBtFILeJP_h8-DPKIILeivAtnS4GJA4BBix7oQl8_kSQrbNdG2lG5hQO8Sc6vFVzPk_onGQs6fbrnRp8Gqb9FNiOH9j3k4liqGe8m_tbxcA8unLubb-IfznOJDDta8XQytRxiyzfGLjOP57C01vzsVDdWnYReb_da-vmZpvd3wZWWfvRcQNzPFYMPS9t-ZuD-tuMn5MnvmvanuVbjh24o7E78jwemuOxOxqb_kgxMOKfoT643Sym7-CFpC97Y8Dw-XM3XPWPw93VWccsCWlCw1qlvIpg_Lj35eD9-eNy_mptSlrdAb3mQL-2dVAacF4acMGjG8s2J4HnX1-PHhTjEhkq2tStvgjunLvay51Rvd3eB9O1TPtq9E2ZB9P9UqO0F-Oav1itGowrVa6csg7Wd8vc88B7fPx29em71ady1aVxSJMhMDAChto1NAQMHQGjj4AxQMA4R8C4QMC4LP5wj-_jARdgtfzHhVgvVJjLcR7xAphXwFyu9v749dVOe8Ns1mGrLu7v7fz-bL12Nj6ccvhklOviTlf9AeckzVa-aIOWc8dyDp-Sc7jlXMu5fc6ph7NGPR1r1E5X-wFrJM1WLvt5y5pjWaOekjVqy5qWNfus0Q5njXY61midrv4D1kiarVz2i5Y1x7JGOyVrtJY1LWv2WaMfzhr9dKzRO93-D1gjabZy2S9b1hzLGv2UrNFb1rSsEf1W9M39cWm6YnFKD_olqMdvIA3ndHNdU7ZOZvQuYbP8mE3q5Lp8IaRptnmLN4kVb17xBqtivC_GVbFaE-PjxIMm4ssmYtyob9wXq1XhvDWxWBObNRC7pQvVfbG438RqsVhitVgssVosllktUUusHjSx-lwovhCbddHELLFYYpZYLDFLLJaZJVFLzLpsYhaWUFSG0WYcbQbSZiRtiNJmLMWNYIolNNUlpn3B6VGmidUy08RqmWlitdQ0iVxm2heoCk17_vzr_wAAAP__GJu0AA== +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzsmUFvqkoUx_fvU0xmZV_G6AxgW1dwLTUkFrxA-9q8NARl4iVRxgfY2Nz0u78MKoL1MlpcdMGiZc5h_mdOz7_zW-hvmPw3h32oP49HmmGC1p3huM7P0RVw9JE-cMEEAf9t1vKvEJgwNvf8KGh5fLVLsDiPp2wVpdnWhb_ePMMoeyZpENC3wtJL_MWyGC_ZNlwtdk8v3FZ78-PQj6Z0F-zFPNgp1yz2_Nms5U3eU5rk7fzNGw3TbeNhug2ztrNoyvIqCEyuEIjpLPaS9fogfi_H7-XYf5utDxPlHWGU0nhKl2kpG5Ny2Tlb0k0G3NvWAwj81AdD23ocgx8vYAIRjFhATX9BE9j_F2KIIIEIShBBGSKowFcElzGb0iRhMd_yOxMYwRr2uwiG0XKV8vQrglMWU9j_DdMwnVPYh64_mVOb-gGNO12IYEBTP5xnx_Au1GUcLvz4HSI4YPPVIkr6gDeKQOY_f_DB82eYQgSdpc-3tDsY-FEAMGDpLxrD1w8E2Srdt5Gk_ozCPv5Ap7eqzWYxnfkpiztKuVOVT0MzXzzTcj3zcTRqqeSKd_P40FIxXw2sR9Pdrn9Y1sjTzLuWKuWhZW-jbKNnW_84LR4-aM9b1YNhblfOT_vOuL_fRY8PnpHXfrZsTxsOW6qclTbczUHKLuLnZIFra6ZjuIZlerY-tHXH4UttOLT1oebqLRUj_jeUB7efxeQd_PKTXwdjwPD1Yz9c8sfh7uusIhYHNKZBqVJWpWL8uPvp4MP543z-pDQlqeyAXHJAKW3t5QZc5wbc8NW9YWojz3Hv7vSnlnqLVII2dYsvvLE1Lr3cG9Xd733SbEMzB_qRMk-a_alGbi_GJX8xKRqMC1UGVl4Hy_s099xznp-PZl-OZV-OZbWn4bES2tPw2G7DdHV7oI_dI-9scuzQkTXWd3mbRgGN-0DFCKiko0oIqDICqoKA2kNAvUZAvUFAvd3-4C7fxxdcgEn-iwuxvFVhLsfZihfAvALmcpLJs7O4inAV4Sqi_BEn0mX_403WZssOVg52Hj9bLp2NT6cuvhh1O7jdIV_grqDZwsXvNdw9l7v4ktzFDXcb7n537pLT2Ucuxz7S7khfYJ-g2cLlu27Ydy77yCXZRxr2Nez77uyTTmefdDn2Se2O_AX2CZotXL6bhn3nsk-6JPukhn0N-747--TT2Sdfjn1yu6N8gX2CZguX77Zh37nsky_JPrlhX8O-784-wVcXNk2WLEroSZ8kdvmNoMGMbq5PwlbxlI5jNs2O2YRWpssSAU3SzVu8CYxo84o3WBTjQzEuiklJjM8T9-qIb-uIca2-sVKtJpXzlqrFUrVZvWq35Eq1Ui1W6lhdLRZYXS0WWF0tFlktUAus7tWx-rpSfFNt1k0ds6rFArOqxQKzqsUiswRqgVm3dczCAoqKMFqPo_VAWo-kNVFaj6W4FkyxgKaywLRPOD3LtGq1yLRqtci0arXQNIFcZNonqFaa9vrx1_8BAAD__yCBgnk= statement ok @@ -266,7 +271,7 @@ Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJy0lNGK4jAUhu_3K # Regression aggregate functions have two local, and one final stage aggregations. # Calculation and rendering are happening at the end. query T -EXPLAIN (DISTSQL) SELECT covar_pop(a, c), regr_sxx(a, c), regr_sxy(a, c), regr_syy(a, c) FROM data +EXPLAIN (DISTSQL) SELECT covar_pop(a, c), regr_sxx(a, c), regr_sxy(a, c), regr_syy(a, c), regr_avgx(a, c), regr_avgy(a, c), regr_intercept(a, c), regr_r2(a, c), regr_slope(a, c) FROM data ---- distribution: full vectorized: true @@ -278,12 +283,13 @@ vectorized: true table: data@data_pkey spans: FULL SCAN · -Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJy8lNGrokAUxt_3rxjOU8GEjVq361PSekNoq1VZuiwhszq4QTnujC1F9L8v5oVbUaMY7VvnTJ_fN78znAPIP2uwwFnMJ7Y7Ra2vrh_43ydt5DsTZxSgiP-lIsx41qIYRW2MBEtEKHe7q3p_We8_avTmzb6hmOYUMKQ8ZlO6YRKsn0AAgw4YDMBgAoYeLDFkgkdMSi6KvxxOAjfegdXFsEqzbV60lxgiLhhYB8hX-ZqBBQH9tWYeozETWhcwxCynq_XJprAeZmK1oWIPGEZ8vd2k0kJFOsDgZ7SoOhpBNI0RQTz_zQQsjxj4Nv90lDlNGFjkiOunspNEsITmXGi9y1CBZ099N3Bn09Bzxp7j-8VPezz2nLEdOK0hwUO9fTeFfjfFp_k25SJmgsUXzsujOie5ovfmTu1JOJr9sL1wPpu3hqQN-KNbJA_9xeJW8_1G871s3ruUcXEpUn_gpMnANdLR9AYjr8h1hrL_xJHr9enojejoHc1oQKci1xmdlyfSMerTMRrRMTqa2YBORa4zOoMn0jHr0zEb0TE7Wq8BnYpcZ3Re_9MyvZHCYzLjqWRXS_X2l7vFsmVxwsrNLPlWRGwueHSyKcvZSXdqxEzm5SkpCzctj4qA52KiFOsXYnIt1tXOFdaGUm2qxeYjuXtKcV_t3H_E-UUpHqidB484v6pn1a14JupHdu29PH75FwAA__9Lzzph +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJy8lV9ro0AUxd_3U8h9SmCCGTX941Mka4OQTbIqpWUJMqsXN2Add5yUlJLvvqgLja7VYOm-5R7neA4_L5NXyH8nYIL9sF1ZzloZfXU83_u-GiuevbIXvhLyZyaCjGcjRpRwTBSBsQjy47Exv9Tnl_rMnuNjU6if2KcSRYiZrKlCq7824RlWinLnbr4pEZMMCKQ8wjV7whzMH0CBgAYEdCBgAIEZ7AhkgoeY51wUR15LgxMdwZwS2KfZQRbyjkDIBYL5CnIvEwQTfPYzQRdZhEKdAoEIJdsnZUwRPc_E_omJFyCw4MnhKc1NpWgHBLyMFdNEpQpLI4UqXP5CAbsTAX6Qb4m5ZDGCSU_k8lZWHAuMmeRCndVL-a619hzf2awD1166tucVP63l0rWXlm-P5pTMtfG7LbR3W7yFH1IuIhQY1ZJ3p-6etEHvzllbq2CxubfcYLvZjuZ0DOSvWjQPvIeHNvGxRXxsEa37ZYvful-2nHXWvu0u7K3_7yNXa8lbbbZ2Kb-HUa9hpJevGB2yYiqdqNqAJevpdfbxrj5xybTL6WiD6GgTVR9Ap6fXGZ3rT6SjX05HH0RHn6jGADo9vc7o3HwiHeNyOsYgOsZEnQ2g09PrjM7tf7q-W1q4mGc8zbFxjbe_eVpc7xjFWP0X5PwgQtwKHpYx1bgpfaUQYS6rp7QanLR6VBQ8N9NOs1Yz06ZZ607uidY73Ua32fhI71mn-ao7-eojyded5pvu5JuPJN92f6tpz5p0L1kze3f68icAAP__g2h04w== + # Test various combinations of aggregation functions and verify that the # aggregation processors are set up correctly. query T -EXPLAIN (DISTSQL) SELECT sum(a), avg(b), sum(c), avg(d), stddev(a), stddev_samp(a), stddev_pop(a), variance(b), var_samp(b), var_pop(b), sum(a+b+c::INT+d), covar_pop(a, c), regr_sxx(a, c), regr_sxy(a, c), regr_syy(a, c) FROM data +EXPLAIN (DISTSQL) SELECT sum(a), avg(b), sum(c), avg(d), stddev(a), stddev_samp(a), stddev_pop(a), variance(b), var_samp(b), var_pop(b), sum(a+b+c::INT+d), covar_pop(a, c), regr_sxx(a, c), regr_sxy(a, c), regr_syy(a, c), regr_avgx(a, c), regr_avgy(a, c), regr_intercept(a, c), regr_r2(a, c), regr_slope(a, c) FROM data ---- distribution: full vectorized: true @@ -297,11 +303,11 @@ vectorized: true table: data@data_pkey spans: FULL SCAN · -Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzcVl-LskoYvz-fYniujCZs1NrWq5HWXYRe61Xfl305REw6dILSzmixy9J3P4xaWbQWJ-iiC-X595vnz-8ZmC9I_12ACfb7aGA5LlJeHD_wfw4ayLcHdj9A6XqpsAZGbDNTpg2c62GpR1LPoohv8pBCnKRsuarqq6RQN0zMWRzy_JgNE0XgTpFRu_MVhaEmmjZQE4Wm6bhBT4oyW5jsYhlGsgzBZ2KSfnyc6J_H-mepo1dv-ANFLGOAIU4i7rIlT8H8Gwhg0ACDDhgMwNCBMYaVSEKepomQIV85wIk-wGxjmMerdSbNYwxhIjiYX5DNswUHEwI2XXCPs4gLtQ0YIp6x-SJPI1PTlZgvmfgEDP1ksV7GqYkYRlOMQowiwOCvmLS1VIJYHCGCkuwfLgCDx-OICxNRo6kolDSp1mhSvZwQRpRgRDWMqI4RNWC8xZCss0OdacZmHEyyxdf3Ys1mgs9Ylgi1c9yK_-uHQrWGrFdKupT6w19uUMq51dhLnYo_l_2f3ovz-lqeUXq0I8_hHCKlwLNc3wmcoTvx7DfP9n0pWm9vnv1mBbZCNSwTfte39m3fh3bXcSIiLnh01Ot4Wz8Z0j4zGrIvfjekiVM7nH1EV2qvjmsNJn7w8mL_VugTpgTT3qljMhqOzjh_W55juX1boc9YDkU_chWgiqcot30I6g_3YYQczHLoE__9_az1zznrn711v7r5jqrFimJEOyrtYkSfyq-HEX0uP9KWPxlP5FITCSESQzrfcqwfcUyuv6fk_99TlbRU7Q439UI3lX3sPtRN1a5nUbuBRa2l6ndg8UI3FRafHopF_XoW9RtY1FuqcQcWL3RTYbH3UCwa17No3MCi0VI7d2DxQjcVFp8fisULbz6Pp6skTvnJG-j8yW35NuLRjBcPqTRZi5CPRBLmaQp1mONyQ8TTrPCSQnHiwiULrIJJLVg7ApNTsFaf-UJqvRZt1IONW-ru1IK79Zm7t2R-qgX36jP3bsn8XM9V-8Ka1C_Zae7x9q__AgAA___FoV71 +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzcVt1vsjoYvz9_RfNcYazBAjrHVYljhsQXfYF32XJiTAeNx0SBU9BsWfa_nxT8wjA0x2QXu4A8X78-H7-26Qdk_67ABPt5OrYcFykPjh_4v8ct5NtjexigbLNWWAsjtl0ory1c6OFOj6SeRxHfFiGlOM_YOj3V06RUt0wsWRzyYpktE2XgXpFR-_UVhaE2em2hNgpN03GDgRRltjDZxzKMZBmCL8Q8e3s709-r-ntVZ9vF27mhGrGMcy5CnuYVq9Cqy66SlJcW9OhNfqGI5QwwxEnEXbbmGZh_AwEMGmDQAYMBGHoww5CKJORZlggZ8lEAnOgNzC6GZZxucmmeYQgTwcH8gHyZrziYELDXFfc4i7hQu4Ah4jlbroo0MjVNxXLNxDtgGCarzTrOTMQwesUoxCgCDH7KpK2jEsTiCBGU5P9wARg8HkdcmIgabUWhpE21Vpvqu9FjRAlGVMOI6hhRA2afGJJNfqwzy9mCg0k-8fW9WIuF4AuWJ0LtVVvx__xSqNaS9UpJl9Jw8scNdnJhNQ5S78RfyP5v78F5fNytsfNoFc9xHSKlwLNc3wmciTv37JFn-74UrdHIs0dWYCtUwzLhV31rX_Z9bHcTJyLigkeVXmefzZMh3ZrRkEPx-yHNncbhHCL6Unt0XGs894OHB_tJoXeYEkwH5475dDKtcT5ZnmO5Q1uh91gORa-4StCJpyy3ewwaTg5hhBzNcuhz__m51vpSZ32ps1pPo7olrKdRXbTjBrY3tKdBjc_T6pKOJ1N7bz-cmOJoqOXJwIj2VNrHiN7tvgFG9H73ka78yXgizxKRECIxpCd_EkYkhkgQkQit--We0yt7jlx_b5D_f2-opKNq33BzXOjm5Hz0f9TNoV3PonYDi1pH1b-BxQvdnLB496NY1K9nUb-BRb2jGt_A4oVuTlgc_CgWjetZNG5g0eiovW9g8UI3Jyze_ygWL7xBPZ6lSZzxszdZ_cpd-Vbj0YKXD7ss2YiQT0USFmlKdVLgCkPEs7z0klJx4tIlCzwFk0awVgGTc7DWnPlCar0RbTSDjVvq7jWC-82Z-7dkvmsED5ozD27JfN_MVffCNmneZOe5Z59__RcAAP__8oSfLA== query T -EXPLAIN (DISTSQL) SELECT sum(a), min(b), max(c), count(d), avg(a+b+c::INT+d), stddev(a+b), variance(c::INT+d), covar_pop(b, d), regr_sxx(a, c), regr_sxy(a, c), regr_syy(a, c) FROM data +EXPLAIN (DISTSQL) SELECT sum(a), min(b), max(c), count(d), avg(a+b+c::INT+d), stddev(a+b), variance(c::INT+d), covar_pop(b, d), regr_sxx(a, c), regr_sxy(a, c), regr_syy(a, c), regr_avgx(a, c), regr_avgy(b, c), regr_intercept(a, b), regr_r2(b, c), regr_slope(a, c) FROM data ---- distribution: full vectorized: true @@ -315,7 +321,8 @@ vectorized: true table: data@data_pkey spans: FULL SCAN · -Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzsVl1v6jgQfd9fYc2TEYPASfhonhzRtIrUht4kt9urFUImsVgkSFgnVK2q_veVAwXSbQN7eehLX8Bz5kzGc3ws-QXyfxZgg_twd-N4PqGXXhiFP24aJHRv3GFE8vWSigaS5TylU_0vnmjcQBJn67SgSQOJeJxRSgVpkmmDNEls254fDfRSZ_MiSeTjNo3kUai5SGNJt7QtK84ehZqsshWdYgkoOVOT_OmJCiTxPn6uxs_bmFwFo1uSiEIAQpol0hdLmYP9FzBAMADBBAQLELowRlipLJZ5nilNeSkLvOQJ7A7CPF2tCw2PEeJMSbBfoJgXCwk2RGK6kIEUiVTtDiAkshDzRdlGt-YrNV8K9QwIw2yxXqa5TQSSKZIYSQII4UporNVmRKQJYSQr_pYKEAKZJlLZhFtNSjlrcqPR5OZWRyQlgjr7BmoMSQmaOgHjV4RsXez3nhdiJsFmr3j6fM5spuRMFJlqd6vjhT9vKbcagHDr-ZR3y5XzQHlPr4ajn340CUZ_hlSHJZntEtt1-CO49K6uKDd2HOOAY1Q45o5jHnDKdRQ4fuhF3sifBO514IahXjrX14F77UQu5V3k_ZOIFurtf6ac8alye8HWaaYSqWRSUWv8Wq8t63wgLtuJa-zEfVNh4unprZ0m3QpeHsGV5zs3kzC6vHTvKe8jHyC_2CfuncBz_KFLOesgZww5M_bZ4ejeCSZ3ozvKmbmHtWqT8OGBcmb9B_31Efprh-4cXbEpEt5t8x4S3kfCtYsvtJM7-kcTmfHpcZiV42CnX1T2-xe1zVpt44uu6pEJD-zU-76qFeWM071hnOENo9U2v8gbRyY88Eb_2xsV5czTvWGe4Q2z1ba-yBtHJjzwxuDbGxXlrNO9YZ3hDavV7n6RN45MeOCNi29v_J-HcyDzVZbm8t0z8OMvd_TzUCYzuXlL5tlaxfJOZXHZZhOOyroSSGRebLJsE3jpJqU3eFjMaouNSjF7X2zUdz7S2qyttuqLrXP23a0t7tV37p3TuV9bPKjvPDin80X9WXWO2KTeZO97j1__-DcAAP__rUWnTA== +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzsV11v6jgQfd9fYc2TUV2B8wFtnhzRFEWigZvksr1aIWQSi0WChHUCoqr631cOFAhLA1rU25e-BPvMGY9n5tgyr5D9MwMLnOd-13Y9hB_cIAx-dGsocLpOO0TZco55jaD5NMFj9cvXOKoRFKXLJMdxjSC-mmCMObpB4xq6QZFluV54p4bKmuVxLFZbM0ErLqc8iQTe0rasKF1xOVqkCzwmBSDFRI6y9RpzgqL9_KU8fynP-WqyPgZe1Io7YJrkQkZikSva-B2VWomUzdKF2KyDHv3eE4p5zoFAksbC43ORgfUXUCCgAQEdCBhAwIQhgYVMI5FlqVSU18LBjddgNQhMk8UyV_CQQJRKAdYr5NN8JsCCkI9nwhc8FrLeAAKxyPl0VoRRodlCTudcvgCBdjpbzpPMQmr7BEUExUAgWHCF3dYp4kmMKErzv4UEAr5IYiEtxIwbjBm9YVrthunbBhFUIERZ30GFEVSAujLA8I1Ausz3e89yPhFg0TdyeX72ZCLFhOeprJvl9IKfT5gZNSDw5HqYmcXIfsasqUbt3k8vHPm9PwOspgWZ7gzbcfDDf3AfHzHTdhztgKOVOPqOox9winHo217ghm7PG_lOx3eCQA3tTsd3OnboYGYS1rqIaJDN9i9Z8TKiQVRpPuqF9mEv9i1YJqmMhRRxqf7Dt-pu0caJdtFdu7Rdu97rOnJVPY1dlc0SXiT76Hp2dxSEDw_OALMWYXeE3e8NA9t3ba_tYEYbhFFKGNX21nZvYPujfq-PGdX3sKraKHh-xowa_0F_nUJ_nULtQefUEvago9jmEex6oeO3nb4SYvPI5msnHIJur--8r787mqXzRhAz66xJEGsRxNRxvFdHsqE-ikgVkyoqVVxqqo-i09aH8tBL8qCXX0X0_19FdXpb177oMjqT4YG8m9-X0SdfRtrlatOuUJt2W9e_SG1nMjxQW-tbbZ-sNv1ytelXqE2_rRtfpLYzGR6o7e5bbZ-sNuNytRlXqM24rZtfpLYzGR6o7f5bbb_xWX-iF77IFmmSiaPn_emVG-rZL-KJ2PxHyNKljERfplERZjPtFX4FEIss31jpZuImG5Pa4KEzrXTWSs702FmrjnwmtF7pbVQ7G9fs26x0blZHbl4TuVXpfFcd-e6ayPfVvWqckUm1yI5jD9_--DcAAP__ERJXMA== + # Verify that local and final aggregation is correctly shared and de-duplicated. query T diff --git a/pkg/sql/physicalplan/aggregator_funcs.go b/pkg/sql/physicalplan/aggregator_funcs.go index 8a720e3831f8..18a67917ded0 100644 --- a/pkg/sql/physicalplan/aggregator_funcs.go +++ b/pkg/sql/physicalplan/aggregator_funcs.go @@ -368,4 +368,54 @@ var DistAggregationTable = map[execinfrapb.AggregatorSpec_Func]DistAggregationIn }, }, }, + + execinfrapb.RegrAvgx: { + LocalStage: []execinfrapb.AggregatorSpec_Func{execinfrapb.TransitionRegrAggregate}, + FinalStage: []FinalStageInfo{ + { + Fn: execinfrapb.FinalRegrAvgx, + LocalIdxs: passThroughLocalIdxs, + }, + }, + }, + + execinfrapb.RegrAvgy: { + LocalStage: []execinfrapb.AggregatorSpec_Func{execinfrapb.TransitionRegrAggregate}, + FinalStage: []FinalStageInfo{ + { + Fn: execinfrapb.FinalRegrAvgy, + LocalIdxs: passThroughLocalIdxs, + }, + }, + }, + + execinfrapb.RegrIntercept: { + LocalStage: []execinfrapb.AggregatorSpec_Func{execinfrapb.TransitionRegrAggregate}, + FinalStage: []FinalStageInfo{ + { + Fn: execinfrapb.FinalRegrIntercept, + LocalIdxs: passThroughLocalIdxs, + }, + }, + }, + + execinfrapb.RegrR2: { + LocalStage: []execinfrapb.AggregatorSpec_Func{execinfrapb.TransitionRegrAggregate}, + FinalStage: []FinalStageInfo{ + { + Fn: execinfrapb.FinalRegrR2, + LocalIdxs: passThroughLocalIdxs, + }, + }, + }, + + execinfrapb.RegrSlope: { + LocalStage: []execinfrapb.AggregatorSpec_Func{execinfrapb.TransitionRegrAggregate}, + FinalStage: []FinalStageInfo{ + { + Fn: execinfrapb.FinalRegrSlope, + LocalIdxs: passThroughLocalIdxs, + }, + }, + }, } diff --git a/pkg/sql/sem/builtins/aggregate_builtins.go b/pkg/sql/sem/builtins/aggregate_builtins.go index 0ed982f71745..8a10b951165f 100644 --- a/pkg/sql/sem/builtins/aggregate_builtins.go +++ b/pkg/sql/sem/builtins/aggregate_builtins.go @@ -204,6 +204,41 @@ var aggregates = map[string]builtinDefinition{ ), )), + "final_regr_avgx": makePrivate(makeBuiltin(aggProps(), + makeAggOverload([]*types.T{types.DecimalArray}, types.Float, newFinalRegressionAvgXAggregate, + "Calculates the average of the independent variable (sum(X)/N) in final stage.", + tree.VolatilityImmutable, + ), + )), + + "final_regr_avgy": makePrivate(makeBuiltin(aggProps(), + makeAggOverload([]*types.T{types.DecimalArray}, types.Float, newFinalRegressionAvgYAggregate, + "Calculates the average of the dependent variable (sum(Y)/N) in final stage.", + tree.VolatilityImmutable, + ), + )), + + "final_regr_intercept": makePrivate(makeBuiltin(aggProps(), + makeAggOverload([]*types.T{types.DecimalArray}, types.Float, newFinalRegressionInterceptAggregate, + "Calculates y-intercept of the least-squares-fit linear equation determined by the (X, Y) pairs in final stage.", + tree.VolatilityImmutable, + )), + ), + + "final_regr_r2": makePrivate(makeBuiltin(aggProps(), + makeAggOverload([]*types.T{types.DecimalArray}, types.Float, newFinalRegressionR2Aggregate, + "Calculates square of the correlation coefficient in final stage.", + tree.VolatilityImmutable, + )), + ), + + "final_regr_slope": makePrivate(makeBuiltin(aggProps(), + makeAggOverload([]*types.T{types.DecimalArray}, types.Float, newFinalRegressionSlopeAggregate, + "Calculates slope of the least-squares-fit linear equation determined by the (X, Y) pairs in final stage.", + tree.VolatilityImmutable, + )), + ), + "transition_regression_aggregate": makePrivate(makeTransitionRegressionAggregateBuiltin()), "covar_samp": makeRegressionAggregateBuiltin( @@ -1171,6 +1206,11 @@ var _ tree.AggregateFunc = &finalCovarPopAggregate{} var _ tree.AggregateFunc = &finalRegrSXXAggregate{} var _ tree.AggregateFunc = &finalRegrSXYAggregate{} var _ tree.AggregateFunc = &finalRegrSYYAggregate{} +var _ tree.AggregateFunc = &finalRegressionAvgXAggregate{} +var _ tree.AggregateFunc = &finalRegressionAvgYAggregate{} +var _ tree.AggregateFunc = &finalRegressionInterceptAggregate{} +var _ tree.AggregateFunc = &finalRegressionR2Aggregate{} +var _ tree.AggregateFunc = &finalRegressionSlopeAggregate{} var _ tree.AggregateFunc = &covarSampAggregate{} var _ tree.AggregateFunc = ®ressionInterceptAggregate{} var _ tree.AggregateFunc = ®ressionR2Aggregate{} @@ -2153,6 +2193,87 @@ func (a *regressionAccumulatorDecimalBase) regrSYYLastStage() (tree.Datum, error return mapToDFloat(&a.syy, a.ed.Err()) } +// regressionAvgXLastStage computes SQL:2003 average of the independent variable +// (sum(X)/N) from the precalculated transition values. +func (a *regressionAccumulatorDecimalBase) regressionAvgXLastStage() (tree.Datum, error) { + if a.n.Cmp(decimalOne) < 0 { + return tree.DNull, nil + } + + // a.sx / a.n + a.ed.Quo(&a.tmp, &a.sx, &a.n) + return mapToDFloat(&a.tmp, a.ed.Err()) +} + +// regressionAvgYLastStage computes SQL:2003 average of the dependent variable +// (sum(Y)/N) from the precalculated transition values. +func (a *regressionAccumulatorDecimalBase) regressionAvgYLastStage() (tree.Datum, error) { + if a.n.Cmp(decimalOne) < 0 { + return tree.DNull, nil + } + + // a.sy / a.n + a.ed.Quo(&a.tmp, &a.sy, &a.n) + return mapToDFloat(&a.tmp, a.ed.Err()) +} + +// regressionInterceptLastStage computes y-intercept from the precalculated +// transition values. +func (a *regressionAccumulatorDecimalBase) regressionInterceptLastStage() (tree.Datum, error) { + if a.n.Cmp(decimalOne) < 0 { + return tree.DNull, nil + } + if a.sxx.Cmp(decimalZero) == 0 { + return tree.DNull, nil + } + + // (a.sy - a.sx*a.sxy/a.sxx) / a.n + a.ed.Quo( + &a.tmp, + a.ed.Sub(&a.tmp, &a.sy, a.ed.Mul(&a.tmp, &a.sx, a.ed.Quo(&a.tmp, &a.sxy, &a.sxx))), + &a.n, + ) + return mapToDFloat(&a.tmp, a.ed.Err()) +} + +// regressionR2LastStage computes square of the correlation coefficient from the +// precalculated transition values. +func (a *regressionAccumulatorDecimalBase) regressionR2LastStage() (tree.Datum, error) { + if a.n.Cmp(decimalOne) < 0 { + return tree.DNull, nil + } + if a.sxx.Cmp(decimalZero) == 0 { + return tree.DNull, nil + } + if a.syy.Cmp(decimalZero) == 0 { + return tree.NewDFloat(tree.DFloat(1.0)), nil + } + + // (a.sxy * a.sxy) / (a.sxx * a.syy) + a.ed.Quo( + &a.tmp, + a.ed.Mul(&a.tmp, &a.sxy, &a.sxy), + a.ed.Mul(&a.tmpN, &a.sxx, &a.syy), + ) + return mapToDFloat(&a.tmp, a.ed.Err()) +} + +// regressionSlopeLastStage computes slope of the least-squares-fit linear +// equation determined by the (X, Y) pairs from the precalculated transition +// values. +func (a *regressionAccumulatorDecimalBase) regressionSlopeLastStage() (tree.Datum, error) { + if a.n.Cmp(decimalOne) < 0 { + return tree.DNull, nil + } + if a.sxx.Cmp(decimalZero) == 0 { + return tree.DNull, nil + } + + // a.sxy / a.sxx + a.ed.Quo(&a.tmp, &a.sxy, &a.sxx) + return mapToDFloat(&a.tmp, a.ed.Err()) +} + type finalRegressionAccumulatorDecimalBase struct { regressionAccumulatorDecimalBase otherTransitionValues [regrFieldsTotal]*apd.Decimal @@ -2503,13 +2624,28 @@ func newRegressionAvgXAggregate( // Result implements tree.AggregateFunc interface. func (a *regressionAvgXAggregate) Result() (tree.Datum, error) { - if a.n.Cmp(decimalOne) < 0 { - return tree.DNull, nil + return a.regressionAvgXLastStage() +} + +// finalRegressionAvgXAggregate represents SQL:2003 average of the independent +// variable (sum(X)/N). +type finalRegressionAvgXAggregate struct { + finalRegressionAccumulatorDecimalBase +} + +func newFinalRegressionAvgXAggregate( + _ []*types.T, ctx *tree.EvalContext, _ tree.Datums, +) tree.AggregateFunc { + return &finalRegressionAvgXAggregate{ + finalRegressionAccumulatorDecimalBase{ + regressionAccumulatorDecimalBase: makeRegressionAccumulatorDecimalBase(ctx), + }, } +} - // a.sx / a.n - a.ed.Quo(&a.tmp, &a.sx, &a.n) - return mapToDFloat(&a.tmp, a.ed.Err()) +// Result implements tree.AggregateFunc interface. +func (a *finalRegressionAvgXAggregate) Result() (tree.Datum, error) { + return a.regressionAvgXLastStage() } // regressionAvgYAggregate represents SQL:2003 average of the dependent @@ -2528,13 +2664,28 @@ func newRegressionAvgYAggregate( // Result implements tree.AggregateFunc interface. func (a *regressionAvgYAggregate) Result() (tree.Datum, error) { - if a.n.Cmp(decimalOne) < 0 { - return tree.DNull, nil + return a.regressionAvgYLastStage() +} + +// finalRegressionAvgYAggregate represents SQL:2003 average of the independent +// variable (sum(Y)/N). +type finalRegressionAvgYAggregate struct { + finalRegressionAccumulatorDecimalBase +} + +func newFinalRegressionAvgYAggregate( + _ []*types.T, ctx *tree.EvalContext, _ tree.Datums, +) tree.AggregateFunc { + return &finalRegressionAvgYAggregate{ + finalRegressionAccumulatorDecimalBase{ + regressionAccumulatorDecimalBase: makeRegressionAccumulatorDecimalBase(ctx), + }, } +} - // a.sy / a.n - a.ed.Quo(&a.tmp, &a.sy, &a.n) - return mapToDFloat(&a.tmp, a.ed.Err()) +// Result implements tree.AggregateFunc interface. +func (a *finalRegressionAvgYAggregate) Result() (tree.Datum, error) { + return a.regressionAvgYLastStage() } // regressionInterceptAggregate represents y-intercept. @@ -2552,20 +2703,27 @@ func newRegressionInterceptAggregate( // Result implements tree.AggregateFunc interface. func (a *regressionInterceptAggregate) Result() (tree.Datum, error) { - if a.n.Cmp(decimalOne) < 0 { - return tree.DNull, nil - } - if a.sxx.Cmp(decimalZero) == 0 { - return tree.DNull, nil + return a.regressionInterceptLastStage() +} + +// finalRegressionInterceptAggregate represents y-intercept. +type finalRegressionInterceptAggregate struct { + finalRegressionAccumulatorDecimalBase +} + +func newFinalRegressionInterceptAggregate( + _ []*types.T, ctx *tree.EvalContext, _ tree.Datums, +) tree.AggregateFunc { + return &finalRegressionInterceptAggregate{ + finalRegressionAccumulatorDecimalBase{ + regressionAccumulatorDecimalBase: makeRegressionAccumulatorDecimalBase(ctx), + }, } +} - // (a.sy - a.sx*a.sxy/a.sxx) / a.n - a.ed.Quo( - &a.tmp, - a.ed.Sub(&a.tmp, &a.sy, a.ed.Mul(&a.tmp, &a.sx, a.ed.Quo(&a.tmp, &a.sxy, &a.sxx))), - &a.n, - ) - return mapToDFloat(&a.tmp, a.ed.Err()) +// Result implements tree.AggregateFunc interface. +func (a *finalRegressionInterceptAggregate) Result() (tree.Datum, error) { + return a.regressionInterceptLastStage() } // regressionR2Aggregate represents square of the correlation coefficient. @@ -2583,23 +2741,27 @@ func newRegressionR2Aggregate( // Result implements tree.AggregateFunc interface. func (a *regressionR2Aggregate) Result() (tree.Datum, error) { - if a.n.Cmp(decimalOne) < 0 { - return tree.DNull, nil - } - if a.sxx.Cmp(decimalZero) == 0 { - return tree.DNull, nil - } - if a.syy.Cmp(decimalZero) == 0 { - return tree.NewDFloat(tree.DFloat(1.0)), nil + return a.regressionR2LastStage() +} + +// finalRegressionR2Aggregate represents square of the correlation coefficient. +type finalRegressionR2Aggregate struct { + finalRegressionAccumulatorDecimalBase +} + +func newFinalRegressionR2Aggregate( + _ []*types.T, ctx *tree.EvalContext, _ tree.Datums, +) tree.AggregateFunc { + return &finalRegressionR2Aggregate{ + finalRegressionAccumulatorDecimalBase{ + regressionAccumulatorDecimalBase: makeRegressionAccumulatorDecimalBase(ctx), + }, } +} - // (a.sxy * a.sxy) / (a.sxx * a.syy) - a.ed.Quo( - &a.tmp, - a.ed.Mul(&a.tmp, &a.sxy, &a.sxy), - a.ed.Mul(&a.tmpN, &a.sxx, &a.syy), - ) - return mapToDFloat(&a.tmp, a.ed.Err()) +// Result implements tree.AggregateFunc interface. +func (a *finalRegressionR2Aggregate) Result() (tree.Datum, error) { + return a.regressionR2LastStage() } // regressionSlopeAggregate represents slope of the least-squares-fit linear @@ -2618,16 +2780,28 @@ func newRegressionSlopeAggregate( // Result implements tree.AggregateFunc interface. func (a *regressionSlopeAggregate) Result() (tree.Datum, error) { - if a.n.Cmp(decimalOne) < 0 { - return tree.DNull, nil - } - if a.sxx.Cmp(decimalZero) == 0 { - return tree.DNull, nil + return a.regressionSlopeLastStage() +} + +// finalRegressionSlopeAggregate represents slope of the least-squares-fit +// linear equation determined by the (X, Y) pairs. +type finalRegressionSlopeAggregate struct { + finalRegressionAccumulatorDecimalBase +} + +func newFinalRegressionSlopeAggregate( + _ []*types.T, ctx *tree.EvalContext, _ tree.Datums, +) tree.AggregateFunc { + return &finalRegressionSlopeAggregate{ + finalRegressionAccumulatorDecimalBase{ + regressionAccumulatorDecimalBase: makeRegressionAccumulatorDecimalBase(ctx), + }, } +} - // a.sxy / a.sxx - a.ed.Quo(&a.tmp, &a.sxy, &a.sxx) - return mapToDFloat(&a.tmp, a.ed.Err()) +// Result implements tree.AggregateFunc interface. +func (a *finalRegressionSlopeAggregate) Result() (tree.Datum, error) { + return a.regressionSlopeLastStage() } // regressionSXXAggregate represents sum of squares of the independent variable.