From 98d1e0c74770a6fa6c932877ec9177bf0e58019a Mon Sep 17 00:00:00 2001 From: Michael Erickson Date: Wed, 17 Aug 2022 16:11:04 -0700 Subject: [PATCH 1/7] sql/stats: remove NumRange-stealing behavior from histogram prediction We should be able to handle NumEq=0 just fine everywhere that uses histograms, so delete this NumRange-stealing code. Fixes: #86344 Release justification: low-risk updates to new functionality. Release note: None --- .../opt/exec/execbuilder/testdata/forecast | 139 +++++++++++++++++- pkg/sql/stats/forecast_test.go | 10 +- pkg/sql/stats/quantile.go | 9 -- pkg/sql/stats/quantile_test.go | 16 +- 4 files changed, 151 insertions(+), 23 deletions(-) diff --git a/pkg/sql/opt/exec/execbuilder/testdata/forecast b/pkg/sql/opt/exec/execbuilder/testdata/forecast index 17d009ac4d28..8d5d5b71efbe 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/forecast +++ b/pkg/sql/opt/exec/execbuilder/testdata/forecast @@ -592,8 +592,145 @@ scan c ├── columns: h:1 ├── constraint: /1: [/'1988-08-07 00:00:00+00:00' - ] ├── stats: [rows=24, distinct(1)=24, null(1)=0, avgsize(1)=7] - │ histogram(1)= 0 1 5 1 5 1 5 1 4 1 + │ histogram(1)= 0 1 5 1 5 1 5 1 5 0 │ <--- '1988-08-07 00:00:00+00:00' --- '1988-08-07 06:00:00+00:00' --- '1988-08-07 12:00:00+00:00' --- '1988-08-07 18:00:00+00:00' --- '1988-08-08 00:00:00+00:00' ├── cost: 39.7 ├── key: (1) └── distribution: test + +# Test for issue 86344. + +statement ok +CREATE TABLE x (a INT PRIMARY KEY) WITH (sql_stats_automatic_collection_enabled = false); + +statement ok +ALTER TABLE x INJECT STATISTICS '[ + { + "avg_size": 1, + "columns": [ + "a" + ], + "created_at": "2020-03-13 00:00:00.000000", + "distinct_count": 4, + "histo_buckets": [ + { + "distinct_range": 0, + "num_eq": 0, + "num_range": 0, + "upper_bound": "4" + }, + { + "distinct_range": 2, + "num_eq": 0, + "num_range": 2, + "upper_bound": "7" + }, + { + "distinct_range": 2, + "num_eq": 0, + "num_range": 2, + "upper_bound": "10" + } + ], + "histo_col_type": "INT8", + "histo_version": 2, + "name": "__auto__", + "null_count": 0, + "row_count": 4 + }, + { + "avg_size": 1, + "columns": [ + "a" + ], + "created_at": "2020-03-14 00:00:00.000000", + "distinct_count": 4, + "histo_buckets": [ + { + "distinct_range": 0, + "num_eq": 0, + "num_range": 0, + "upper_bound": "7" + }, + { + "distinct_range": 2, + "num_eq": 0, + "num_range": 2, + "upper_bound": "10" + }, + { + "distinct_range": 2, + "num_eq": 0, + "num_range": 2, + "upper_bound": "13" + } + ], + "histo_col_type": "INT8", + "histo_version": 2, + "name": "__auto__", + "null_count": 0, + "row_count": 4 + }, + { + "avg_size": 1, + "columns": [ + "a" + ], + "created_at": "2020-03-15 00:00:00.000000", + "distinct_count": 4, + "histo_buckets": [ + { + "distinct_range": 0, + "num_eq": 0, + "num_range": 0, + "upper_bound": "10" + }, + { + "distinct_range": 2, + "num_eq": 0, + "num_range": 2, + "upper_bound": "13" + }, + { + "distinct_range": 2, + "num_eq": 0, + "num_range": 2, + "upper_bound": "16" + } + ], + "histo_col_type": "INT8", + "histo_version": 2, + "name": "__auto__", + "null_count": 0, + "row_count": 4 + } +]'; + +query T +SELECT jsonb_pretty(stat->'histo_buckets') +FROM ( + SELECT jsonb_array_elements(statistics) AS stat + FROM [SHOW STATISTICS USING JSON FOR TABLE x WITH FORECAST] +) +WHERE stat->>'name' = '__forecast__'; +---- +[ + { + "distinct_range": 0, + "num_eq": 0, + "num_range": 0, + "upper_bound": "13" + }, + { + "distinct_range": 2, + "num_eq": 0, + "num_range": 2, + "upper_bound": "16" + }, + { + "distinct_range": 2, + "num_eq": 0, + "num_range": 2, + "upper_bound": "19" + } +] diff --git a/pkg/sql/stats/forecast_test.go b/pkg/sql/stats/forecast_test.go index 65d384711964..271461e6b80c 100644 --- a/pkg/sql/stats/forecast_test.go +++ b/pkg/sql/stats/forecast_test.go @@ -490,22 +490,22 @@ func TestForecastColumnStatistics(t *testing.T) { hist: testHistogram{}, }, { - at: 2, row: 5, dist: 2, null: 3, size: 2, + at: 2, row: 5, dist: 3, null: 3, size: 2, hist: testHistogram{{1, 0, 0, 200}, {0, 1, 1, 800}}, }, { - at: 3, row: 7, dist: 3, null: 3, size: 2, + at: 3, row: 7, dist: 4, null: 3, size: 2, hist: testHistogram{{2, 0, 0, 200}, {0, 2, 2, 800}}, }, { - at: 4, row: 9, dist: 4, null: 3, size: 2, + at: 4, row: 9, dist: 5, null: 3, size: 2, hist: testHistogram{{3, 0, 0, 200}, {0, 3, 3, 800}}, }, }, at: 5, forecast: &testStat{ - at: 5, row: 11, dist: 5, null: 3, size: 2, - hist: testHistogram{{4, 0, 0, 200}, {1, 3, 2, 800}}, + at: 5, row: 11, dist: 6, null: 3, size: 2, + hist: testHistogram{{4, 0, 0, 200}, {0, 4, 4, 800}}, }, }, // Histogram, constant numbers but changing shape diff --git a/pkg/sql/stats/quantile.go b/pkg/sql/stats/quantile.go index 2e69d6cd154c..e8f68d5e8a66 100644 --- a/pkg/sql/stats/quantile.go +++ b/pkg/sql/stats/quantile.go @@ -282,15 +282,6 @@ func (q quantile) toHistogram(colType *types.T, rowCount float64) (histogram, er if !isValidCount(numEq) { return errors.AssertionFailedf("invalid histogram NumEq: %v", numEq) } - if numEq < 1 && currentBucket.NumRange+numEq >= 2 { - // Steal from NumRange so that NumEq is at least 1, if it wouldn't make - // NumRange 0. This makes the histogram look more like something - // EquiDepthHistogram would produce. - // TODO(michae2): Consider removing this logic if statistics_builder - // doesn't need it. - currentBucket.NumRange -= 1 - numEq - numEq = 1 - } currentBucket.NumEq = numEq // Calculate DistinctRange for this bucket now that NumRange is finalized. diff --git a/pkg/sql/stats/quantile_test.go b/pkg/sql/stats/quantile_test.go index 48381cd1898f..320d5b3dc4ae 100644 --- a/pkg/sql/stats/quantile_test.go +++ b/pkg/sql/stats/quantile_test.go @@ -479,9 +479,9 @@ func TestQuantileToHistogram(t *testing.T) { hist: testHistogram{{0, 0, 0, 0}, {1, 1, 1, 100}}, }, { - qfun: quantile{{0, 0}, {0.9, 100}, {1, 100}}, - rows: 10, - hist: testHistogram{{0, 0, 0, 0}, {1, 9, 9, 100}}, + qfun: quantile{{0, 0}, {0.9375, 100}, {1, 100}}, + rows: 16, + hist: testHistogram{{0, 0, 0, 0}, {1, 15, 15, 100}}, }, { qfun: quantile{{0, 100}, {0.25, 100}, {0.75, 200}, {1, 200}}, @@ -503,26 +503,26 @@ func TestQuantileToHistogram(t *testing.T) { rows: 32, hist: testHistogram{{4, 0, 0, 310}, {4, 0, 0, 320}, {8, 0, 0, 330}, {4, 0, 0, 340}, {4, 0, 0, 350}, {4, 0, 0, 360}, {4, 0, 0, 370}}, }, - // Cases where we steal a row from NumRange to give to NumEq. + // Cases with 0 NumEq. { qfun: quantile{{0, 0}, {1, 100}}, rows: 2, - hist: testHistogram{{0, 0, 0, 0}, {1, 1, 1, 100}}, + hist: testHistogram{{0, 0, 0, 0}, {0, 2, 2, 100}}, }, { qfun: quantile{{0, 100}, {0.5, 100}, {1, 200}, {1, 300}}, rows: 4, - hist: testHistogram{{2, 0, 0, 100}, {1, 1, 1, 200}}, + hist: testHistogram{{2, 0, 0, 100}, {0, 2, 2, 200}}, }, { qfun: quantile{{0, 0}, {0.875, 87.5}, {1, 100}}, rows: 8, - hist: testHistogram{{0, 0, 0, 0}, {1, 6, 6, 87.5}, {0, 1, 1, 100}}, + hist: testHistogram{{0, 0, 0, 0}, {0, 7, 7, 87.5}, {0, 1, 1, 100}}, }, { qfun: quantile{{0, 400}, {0.5, 600}, {0.75, 700}, {1, 800}}, rows: 16, - hist: testHistogram{{0, 0, 0, 400}, {1, 7, 7, 600}, {1, 3, 3, 700}, {1, 3, 3, 800}}, + hist: testHistogram{{0, 0, 0, 400}, {0, 8, 8, 600}, {0, 4, 4, 700}, {0, 4, 4, 800}}, }, // Error cases. { From 4a3a3023296d8fc421b6e9fe66a8536567789fc6 Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Thu, 18 Aug 2022 09:49:32 -0400 Subject: [PATCH 2/7] sql: move UDF execution tests to bottom of test file This commit moves UDF execution logic tests to the bottom of the UDF test file so that execution-related tests add in the future will not change the output of schema-related tests. Release justification: This is a test-only change. Release note: None --- pkg/sql/logictest/testdata/logic_test/udf | 522 +++++++++++----------- 1 file changed, 262 insertions(+), 260 deletions(-) diff --git a/pkg/sql/logictest/testdata/logic_test/udf b/pkg/sql/logictest/testdata/logic_test/udf index fa662480abe7..bd7dd3e6fb16 100644 --- a/pkg/sql/logictest/testdata/logic_test/udf +++ b/pkg/sql/logictest/testdata/logic_test/udf @@ -612,211 +612,6 @@ CREATE FUNCTION public.test_vf_f() SELECT lower('hello'); $$ -subtest execution - -statement ok -INSERT INTO ab VALUES (1, 1), (2, 2), (3, 3), (4, 1), (5, 1) - -statement ok -CREATE FUNCTION one() RETURNS INT LANGUAGE SQL AS 'SELECT 2-1'; - -query I -SELECT one() ----- -1 - -query I colnames -SELECT * FROM one() ----- -one -1 - -query III colnames -SELECT *, one() FROM ab WHERE a = one() ----- -a b one -1 1 1 - -query III colnames -SELECT *, one() FROM ab WHERE b = one() ----- -a b one -1 1 1 -4 1 1 -5 1 1 - -query II colnames -SELECT * FROM ab WHERE b = one() + 1 ----- -a b -2 2 - -statement ok -CREATE FUNCTION max_in_values() RETURNS INT LANGUAGE SQL AS $$ -SELECT i FROM (VALUES (1, 0), (2, 0), (3, 0)) AS v(i, j) ORDER BY i DESC -$$ - -query I -SELECT max_in_values() ----- -3 - -statement ok -CREATE FUNCTION fetch_one_then_two() RETURNS INT LANGUAGE SQL AS $$ -SELECT b FROM ab WHERE a = 1; -SELECT b FROM ab WHERE a = 2; -$$ - -query II -SELECT i, fetch_one_then_two() -FROM (VALUES (1), (2), (3)) AS v(i) -WHERE i = fetch_one_then_two() ----- -2 2 - -query I colnames -SELECT * FROM fetch_one_then_two() ----- -fetch_one_then_two -2 - -statement ok -CREATE TABLE empty (e INT); -CREATE FUNCTION empty_result() RETURNS INT LANGUAGE SQL AS $$ -SELECT e FROM empty -$$ - -query I -SELECT empty_result() ----- -NULL - -statement ok -CREATE FUNCTION int_identity(i INT) RETURNS INT LANGUAGE SQL AS 'SELECT i'; - -query I -SELECT int_identity(1) ----- -1 - -query I -SELECT int_identity(10 + int_identity(1)) ----- -11 - -query II -SELECT a+b, int_identity(a+b) FROM ab WHERE a = int_identity(a) AND b = int_identity(b) ----- -2 2 -4 4 -6 6 -5 5 -6 6 - -# Define some custom arithmetic functions that we can write interesting tests -# with that use builtin operators as oracles. -statement ok -CREATE FUNCTION add(x INT, y INT) RETURNS INT LANGUAGE SQL AS 'SELECT x+y'; - -statement ok -CREATE FUNCTION sub(x INT, y INT) RETURNS INT LANGUAGE SQL AS 'SELECT x-y'; - -statement ok -CREATE FUNCTION mult(x INT, y INT) RETURNS INT LANGUAGE SQL AS 'SELECT x*y'; - -query II -SELECT a + a + a + b + b + b, add(a, add(a, add(a, add(b, add(b, b))))) FROM ab ----- -6 6 -12 12 -18 18 -15 15 -18 18 - -query II -SELECT (a * (a + b)) - b, sub(mult(a, add(a, b)), b) FROM ab ----- -1 1 -6 6 -15 15 -19 19 -29 29 - -query II -SELECT a * (3 + b - a) + a * b * a, add(mult(a, add(3, sub(b, a))), mult(a, mult(b, a))) FROM ab ----- -4 4 -14 14 -36 36 -16 16 -20 20 - -statement ok -CREATE FUNCTION fetch_b(arg_a INT) RETURNS INT LANGUAGE SQL AS $$ -SELECT b FROM ab WHERE a = arg_a -$$ - -query II -SELECT b, fetch_b(a) FROM ab ----- -1 1 -2 2 -3 3 -1 1 -1 1 - -query II -SELECT b + (a * 7) - (a * b), add(fetch_b(a), sub(mult(a, 7), mult(a, fetch_b(a)))) FROM ab ----- -7 7 -12 12 -15 15 -25 25 -31 31 - -query I -SELECT fetch_b(99999999) ----- -NULL - -subtest volatility - -statement ok -CREATE TABLE kv (k INT PRIMARY KEY, v INT); -INSERT INTO kv VALUES (1, 1), (2, 2), (3, 3); -CREATE FUNCTION get_l(i INT) RETURNS INT IMMUTABLE LEAKPROOF LANGUAGE SQL AS $$ -SELECT v FROM kv WHERE k = i; -$$; -CREATE FUNCTION get_i(i INT) RETURNS INT IMMUTABLE LANGUAGE SQL AS $$ -SELECT v FROM kv WHERE k = i; -$$; -CREATE FUNCTION get_s(i INT) RETURNS INT STABLE LANGUAGE SQL AS $$ -SELECT v FROM kv WHERE k = i; -$$; -CREATE FUNCTION get_v(i INT) RETURNS INT VOLATILE LANGUAGE SQL AS $$ -SELECT v FROM kv WHERE k = i; -$$; -CREATE FUNCTION int_identity_v(i INT) RETURNS INT VOLATILE LANGUAGE SQL AS $$ -SELECT i; -$$; - -# Only the volatile functions should see the changes made by the UPDATE in the -# CTE. -query IIIIIIII colnames -WITH u AS ( - UPDATE kv SET v = v + 10 RETURNING k -) -SELECT -get_l(k) l1, get_l(int_identity_v(k)) l2, -get_i(k) i1, get_i(int_identity_v(k)) i2, -get_s(k) s1, get_s(int_identity_v(k)) s2, -get_v(k) v1, get_v(int_identity_v(k)) v2 -FROM u; ----- -l1 l2 i1 i2 s1 s2 v1 v2 -1 1 1 1 1 1 11 11 -2 2 2 2 2 2 12 12 -3 3 3 3 3 3 13 13 subtest grant_revoke @@ -833,9 +628,9 @@ WHERE routine_name IN ('test_priv_f1', 'test_priv_f2', 'test_priv_f3') ORDER BY grantee, routine_name; ---- grantor grantee specific_catalog specific_schema specific_name routine_catalog routine_schema routine_name privilege_type is_grantable -NULL root test public test_priv_f1_100157 test public test_priv_f1 EXECUTE YES -NULL root test public test_priv_f2_100158 test public test_priv_f2 EXECUTE YES -NULL root test test_priv_sc1 test_priv_f3_100159 test test_priv_sc1 test_priv_f3 EXECUTE YES +NULL root test public test_priv_f1_100141 test public test_priv_f1 EXECUTE YES +NULL root test public test_priv_f2_100142 test public test_priv_f2 EXECUTE YES +NULL root test test_priv_sc1 test_priv_f3_100143 test test_priv_sc1 test_priv_f3 EXECUTE YES statement ok GRANT EXECUTE ON FUNCTION test_priv_f1(), test_priv_f2(int), test_priv_sc1.test_priv_f3 TO udf_test_user WITH GRANT OPTION; @@ -846,12 +641,12 @@ WHERE routine_name IN ('test_priv_f1', 'test_priv_f2', 'test_priv_f3') ORDER BY grantee, routine_name; ---- grantor grantee specific_catalog specific_schema specific_name routine_catalog routine_schema routine_name privilege_type is_grantable -NULL root test public test_priv_f1_100157 test public test_priv_f1 EXECUTE YES -NULL root test public test_priv_f2_100158 test public test_priv_f2 EXECUTE YES -NULL root test test_priv_sc1 test_priv_f3_100159 test test_priv_sc1 test_priv_f3 EXECUTE YES -NULL udf_test_user test public test_priv_f1_100157 test public test_priv_f1 EXECUTE YES -NULL udf_test_user test public test_priv_f2_100158 test public test_priv_f2 EXECUTE YES -NULL udf_test_user test test_priv_sc1 test_priv_f3_100159 test test_priv_sc1 test_priv_f3 EXECUTE YES +NULL root test public test_priv_f1_100141 test public test_priv_f1 EXECUTE YES +NULL root test public test_priv_f2_100142 test public test_priv_f2 EXECUTE YES +NULL root test test_priv_sc1 test_priv_f3_100143 test test_priv_sc1 test_priv_f3 EXECUTE YES +NULL udf_test_user test public test_priv_f1_100141 test public test_priv_f1 EXECUTE YES +NULL udf_test_user test public test_priv_f2_100142 test public test_priv_f2 EXECUTE YES +NULL udf_test_user test test_priv_sc1 test_priv_f3_100143 test test_priv_sc1 test_priv_f3 EXECUTE YES statement error pq: cannot drop role/user udf_test_user: grants still exist on.* DROP USER udf_test_user; @@ -865,12 +660,12 @@ WHERE routine_name IN ('test_priv_f1', 'test_priv_f2', 'test_priv_f3') ORDER BY grantee, routine_name; ---- grantor grantee specific_catalog specific_schema specific_name routine_catalog routine_schema routine_name privilege_type is_grantable -NULL root test public test_priv_f1_100157 test public test_priv_f1 EXECUTE YES -NULL root test public test_priv_f2_100158 test public test_priv_f2 EXECUTE YES -NULL root test test_priv_sc1 test_priv_f3_100159 test test_priv_sc1 test_priv_f3 EXECUTE YES -NULL udf_test_user test public test_priv_f1_100157 test public test_priv_f1 EXECUTE NO -NULL udf_test_user test public test_priv_f2_100158 test public test_priv_f2 EXECUTE NO -NULL udf_test_user test test_priv_sc1 test_priv_f3_100159 test test_priv_sc1 test_priv_f3 EXECUTE NO +NULL root test public test_priv_f1_100141 test public test_priv_f1 EXECUTE YES +NULL root test public test_priv_f2_100142 test public test_priv_f2 EXECUTE YES +NULL root test test_priv_sc1 test_priv_f3_100143 test test_priv_sc1 test_priv_f3 EXECUTE YES +NULL udf_test_user test public test_priv_f1_100141 test public test_priv_f1 EXECUTE NO +NULL udf_test_user test public test_priv_f2_100142 test public test_priv_f2 EXECUTE NO +NULL udf_test_user test test_priv_sc1 test_priv_f3_100143 test test_priv_sc1 test_priv_f3 EXECUTE NO statement ok REVOKE EXECUTE ON FUNCTION test_priv_f1(), test_priv_f2(int), test_priv_sc1.test_priv_f3 FROM udf_test_user; @@ -881,9 +676,9 @@ WHERE routine_name IN ('test_priv_f1', 'test_priv_f2', 'test_priv_f3') ORDER BY grantee, routine_name; ---- grantor grantee specific_catalog specific_schema specific_name routine_catalog routine_schema routine_name privilege_type is_grantable -NULL root test public test_priv_f1_100157 test public test_priv_f1 EXECUTE YES -NULL root test public test_priv_f2_100158 test public test_priv_f2 EXECUTE YES -NULL root test test_priv_sc1 test_priv_f3_100159 test test_priv_sc1 test_priv_f3 EXECUTE YES +NULL root test public test_priv_f1_100141 test public test_priv_f1 EXECUTE YES +NULL root test public test_priv_f2_100142 test public test_priv_f2 EXECUTE YES +NULL root test test_priv_sc1 test_priv_f3_100143 test test_priv_sc1 test_priv_f3 EXECUTE YES statement ok GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public, test_priv_sc1 TO udf_test_user WITH GRANT OPTION; @@ -894,12 +689,12 @@ WHERE routine_name IN ('test_priv_f1', 'test_priv_f2', 'test_priv_f3') ORDER BY grantee, routine_name; ---- grantor grantee specific_catalog specific_schema specific_name routine_catalog routine_schema routine_name privilege_type is_grantable -NULL root test public test_priv_f1_100157 test public test_priv_f1 EXECUTE YES -NULL root test public test_priv_f2_100158 test public test_priv_f2 EXECUTE YES -NULL root test test_priv_sc1 test_priv_f3_100159 test test_priv_sc1 test_priv_f3 EXECUTE YES -NULL udf_test_user test public test_priv_f1_100157 test public test_priv_f1 EXECUTE YES -NULL udf_test_user test public test_priv_f2_100158 test public test_priv_f2 EXECUTE YES -NULL udf_test_user test test_priv_sc1 test_priv_f3_100159 test test_priv_sc1 test_priv_f3 EXECUTE YES +NULL root test public test_priv_f1_100141 test public test_priv_f1 EXECUTE YES +NULL root test public test_priv_f2_100142 test public test_priv_f2 EXECUTE YES +NULL root test test_priv_sc1 test_priv_f3_100143 test test_priv_sc1 test_priv_f3 EXECUTE YES +NULL udf_test_user test public test_priv_f1_100141 test public test_priv_f1 EXECUTE YES +NULL udf_test_user test public test_priv_f2_100142 test public test_priv_f2 EXECUTE YES +NULL udf_test_user test test_priv_sc1 test_priv_f3_100143 test test_priv_sc1 test_priv_f3 EXECUTE YES statement ok REVOKE GRANT OPTION FOR EXECUTE ON ALL FUNCTIONS in schema public, test_priv_sc1 FROM udf_test_user; @@ -910,12 +705,12 @@ WHERE routine_name IN ('test_priv_f1', 'test_priv_f2', 'test_priv_f3') ORDER BY grantee, routine_name; ---- grantor grantee specific_catalog specific_schema specific_name routine_catalog routine_schema routine_name privilege_type is_grantable -NULL root test public test_priv_f1_100157 test public test_priv_f1 EXECUTE YES -NULL root test public test_priv_f2_100158 test public test_priv_f2 EXECUTE YES -NULL root test test_priv_sc1 test_priv_f3_100159 test test_priv_sc1 test_priv_f3 EXECUTE YES -NULL udf_test_user test public test_priv_f1_100157 test public test_priv_f1 EXECUTE NO -NULL udf_test_user test public test_priv_f2_100158 test public test_priv_f2 EXECUTE NO -NULL udf_test_user test test_priv_sc1 test_priv_f3_100159 test test_priv_sc1 test_priv_f3 EXECUTE NO +NULL root test public test_priv_f1_100141 test public test_priv_f1 EXECUTE YES +NULL root test public test_priv_f2_100142 test public test_priv_f2 EXECUTE YES +NULL root test test_priv_sc1 test_priv_f3_100143 test test_priv_sc1 test_priv_f3 EXECUTE YES +NULL udf_test_user test public test_priv_f1_100141 test public test_priv_f1 EXECUTE NO +NULL udf_test_user test public test_priv_f2_100142 test public test_priv_f2 EXECUTE NO +NULL udf_test_user test test_priv_sc1 test_priv_f3_100143 test test_priv_sc1 test_priv_f3 EXECUTE NO statement ok REVOKE EXECUTE ON ALL FUNCTIONS IN SCHEMA public, test_priv_sc1 FROM udf_test_user; @@ -926,9 +721,9 @@ WHERE routine_name IN ('test_priv_f1', 'test_priv_f2', 'test_priv_f3') ORDER BY grantee, routine_name; ---- grantor grantee specific_catalog specific_schema specific_name routine_catalog routine_schema routine_name privilege_type is_grantable -NULL root test public test_priv_f1_100157 test public test_priv_f1 EXECUTE YES -NULL root test public test_priv_f2_100158 test public test_priv_f2 EXECUTE YES -NULL root test test_priv_sc1 test_priv_f3_100159 test test_priv_sc1 test_priv_f3 EXECUTE YES +NULL root test public test_priv_f1_100141 test public test_priv_f1 EXECUTE YES +NULL root test public test_priv_f2_100142 test public test_priv_f2 EXECUTE YES +NULL root test test_priv_sc1 test_priv_f3_100143 test test_priv_sc1 test_priv_f3 EXECUTE YES statement ok DROP FUNCTION test_priv_f1; @@ -948,7 +743,7 @@ WHERE routine_name IN ('test_priv_f1', 'test_priv_f2', 'test_priv_f3') ORDER BY grantee, routine_name; ---- grantor grantee specific_catalog specific_schema specific_name routine_catalog routine_schema routine_name privilege_type is_grantable -NULL root test public test_priv_f1_100160 test public test_priv_f1 EXECUTE YES +NULL root test public test_priv_f1_100144 test public test_priv_f1 EXECUTE YES # Add default privilege and make sure new function statement ok @@ -964,11 +759,11 @@ WHERE routine_name IN ('test_priv_f1', 'test_priv_f2', 'test_priv_f3') ORDER BY grantee, routine_name; ---- grantor grantee specific_catalog specific_schema specific_name routine_catalog routine_schema routine_name privilege_type is_grantable -NULL root test public test_priv_f1_100160 test public test_priv_f1 EXECUTE YES -NULL root test public test_priv_f2_100161 test public test_priv_f2 EXECUTE YES -NULL root test test_priv_sc1 test_priv_f3_100162 test test_priv_sc1 test_priv_f3 EXECUTE YES -NULL udf_test_user test public test_priv_f2_100161 test public test_priv_f2 EXECUTE YES -NULL udf_test_user test test_priv_sc1 test_priv_f3_100162 test test_priv_sc1 test_priv_f3 EXECUTE YES +NULL root test public test_priv_f1_100144 test public test_priv_f1 EXECUTE YES +NULL root test public test_priv_f2_100145 test public test_priv_f2 EXECUTE YES +NULL root test test_priv_sc1 test_priv_f3_100146 test test_priv_sc1 test_priv_f3 EXECUTE YES +NULL udf_test_user test public test_priv_f2_100145 test public test_priv_f2 EXECUTE YES +NULL udf_test_user test test_priv_sc1 test_priv_f3_100146 test test_priv_sc1 test_priv_f3 EXECUTE YES statement ok DROP FUNCTION test_priv_f2; @@ -980,7 +775,7 @@ WHERE routine_name IN ('test_priv_f1', 'test_priv_f2', 'test_priv_f3') ORDER BY grantee, routine_name; ---- grantor grantee specific_catalog specific_schema specific_name routine_catalog routine_schema routine_name privilege_type is_grantable -NULL root test public test_priv_f1_100160 test public test_priv_f1 EXECUTE YES +NULL root test public test_priv_f1_100144 test public test_priv_f1 EXECUTE YES statement ok ALTER DEFAULT PRIVILEGES IN SCHEMA public, test_priv_sc1 REVOKE EXECUTE ON FUNCTIONS FROM udf_test_user; @@ -995,9 +790,9 @@ WHERE routine_name IN ('test_priv_f1', 'test_priv_f2', 'test_priv_f3') ORDER BY grantee, routine_name; ---- grantor grantee specific_catalog specific_schema specific_name routine_catalog routine_schema routine_name privilege_type is_grantable -NULL root test public test_priv_f1_100160 test public test_priv_f1 EXECUTE YES -NULL root test public test_priv_f2_100163 test public test_priv_f2 EXECUTE YES -NULL root test test_priv_sc1 test_priv_f3_100164 test test_priv_sc1 test_priv_f3 EXECUTE YES +NULL root test public test_priv_f1_100144 test public test_priv_f1 EXECUTE YES +NULL root test public test_priv_f2_100147 test public test_priv_f2 EXECUTE YES +NULL root test test_priv_sc1 test_priv_f3_100148 test test_priv_sc1 test_priv_f3 EXECUTE YES subtest alter_function_options @@ -1201,9 +996,9 @@ query TTT SELECT oid, proname, prosrc FROM pg_catalog.pg_proc WHERE proname IN ('f_test_sc'); ---- -100170 f_test_sc SELECT 1; -100171 f_test_sc SELECT 2; -100173 f_test_sc SELECT 3; +100154 f_test_sc SELECT 1; +100155 f_test_sc SELECT 2; +100157 f_test_sc SELECT 3; query TT WITH fns AS ( @@ -1224,9 +1019,9 @@ SELECT fn->>'id' AS id, fn->'parentSchemaId' FROM fns ORDER BY id; ---- -170 105 -171 105 -173 172 +154 105 +155 105 +157 156 statement error pq: cannot move objects into or out of virtual schemas ALTER FUNCTION f_test_sc() SET SCHEMA pg_catalog; @@ -1257,9 +1052,9 @@ SELECT fn->>'id' AS id, fn->'parentSchemaId' FROM fns ORDER BY id; ---- -170 105 -171 105 -173 172 +154 105 +155 105 +157 156 query T SELECT @2 FROM [SHOW CREATE FUNCTION public.f_test_sc]; @@ -1307,9 +1102,9 @@ SELECT fn->>'id' AS id, fn->'parentSchemaId' FROM fns ORDER BY id; ---- -170 105 -171 172 -173 172 +154 105 +155 156 +157 156 query T SELECT @2 FROM [SHOW CREATE FUNCTION public.f_test_sc]; @@ -1455,3 +1250,210 @@ CREATE FUNCTION public.f_test_cor_implicit() AS $$ SELECT a, b FROM test.public.t_implicit_type; $$ + + +subtest execution + +statement ok +INSERT INTO ab VALUES (1, 1), (2, 2), (3, 3), (4, 1), (5, 1) + +statement ok +CREATE FUNCTION one() RETURNS INT LANGUAGE SQL AS 'SELECT 2-1'; + +query I +SELECT one() +---- +1 + +query I colnames +SELECT * FROM one() +---- +one +1 + +query III colnames +SELECT *, one() FROM ab WHERE a = one() +---- +a b one +1 1 1 + +query III colnames +SELECT *, one() FROM ab WHERE b = one() +---- +a b one +1 1 1 +4 1 1 +5 1 1 + +query II colnames +SELECT * FROM ab WHERE b = one() + 1 +---- +a b +2 2 + +statement ok +CREATE FUNCTION max_in_values() RETURNS INT LANGUAGE SQL AS $$ +SELECT i FROM (VALUES (1, 0), (2, 0), (3, 0)) AS v(i, j) ORDER BY i DESC +$$ + +query I +SELECT max_in_values() +---- +3 + +statement ok +CREATE FUNCTION fetch_one_then_two() RETURNS INT LANGUAGE SQL AS $$ +SELECT b FROM ab WHERE a = 1; +SELECT b FROM ab WHERE a = 2; +$$ + +query II +SELECT i, fetch_one_then_two() +FROM (VALUES (1), (2), (3)) AS v(i) +WHERE i = fetch_one_then_two() +---- +2 2 + +query I colnames +SELECT * FROM fetch_one_then_two() +---- +fetch_one_then_two +2 + +statement ok +CREATE TABLE empty (e INT); +CREATE FUNCTION empty_result() RETURNS INT LANGUAGE SQL AS $$ +SELECT e FROM empty +$$ + +query I +SELECT empty_result() +---- +NULL + +statement ok +CREATE FUNCTION int_identity(i INT) RETURNS INT LANGUAGE SQL AS 'SELECT i'; + +query I +SELECT int_identity(1) +---- +1 + +query I +SELECT int_identity(10 + int_identity(1)) +---- +11 + +query II +SELECT a+b, int_identity(a+b) FROM ab WHERE a = int_identity(a) AND b = int_identity(b) +---- +2 2 +4 4 +6 6 +5 5 +6 6 + +# Define some custom arithmetic functions that we can write interesting tests +# with that use builtin operators as oracles. +statement ok +CREATE FUNCTION add(x INT, y INT) RETURNS INT LANGUAGE SQL AS 'SELECT x+y'; + +statement ok +CREATE FUNCTION sub(x INT, y INT) RETURNS INT LANGUAGE SQL AS 'SELECT x-y'; + +statement ok +CREATE FUNCTION mult(x INT, y INT) RETURNS INT LANGUAGE SQL AS 'SELECT x*y'; + +query II +SELECT a + a + a + b + b + b, add(a, add(a, add(a, add(b, add(b, b))))) FROM ab +---- +6 6 +12 12 +18 18 +15 15 +18 18 + +query II +SELECT (a * (a + b)) - b, sub(mult(a, add(a, b)), b) FROM ab +---- +1 1 +6 6 +15 15 +19 19 +29 29 + +query II +SELECT a * (3 + b - a) + a * b * a, add(mult(a, add(3, sub(b, a))), mult(a, mult(b, a))) FROM ab +---- +4 4 +14 14 +36 36 +16 16 +20 20 + +statement ok +CREATE FUNCTION fetch_b(arg_a INT) RETURNS INT LANGUAGE SQL AS $$ +SELECT b FROM ab WHERE a = arg_a +$$ + +query II +SELECT b, fetch_b(a) FROM ab +---- +1 1 +2 2 +3 3 +1 1 +1 1 + +query II +SELECT b + (a * 7) - (a * b), add(fetch_b(a), sub(mult(a, 7), mult(a, fetch_b(a)))) FROM ab +---- +7 7 +12 12 +15 15 +25 25 +31 31 + +query I +SELECT fetch_b(99999999) +---- +NULL + +subtest volatility + +statement ok +CREATE TABLE kv (k INT PRIMARY KEY, v INT); +INSERT INTO kv VALUES (1, 1), (2, 2), (3, 3); +CREATE FUNCTION get_l(i INT) RETURNS INT IMMUTABLE LEAKPROOF LANGUAGE SQL AS $$ +SELECT v FROM kv WHERE k = i; +$$; +CREATE FUNCTION get_i(i INT) RETURNS INT IMMUTABLE LANGUAGE SQL AS $$ +SELECT v FROM kv WHERE k = i; +$$; +CREATE FUNCTION get_s(i INT) RETURNS INT STABLE LANGUAGE SQL AS $$ +SELECT v FROM kv WHERE k = i; +$$; +CREATE FUNCTION get_v(i INT) RETURNS INT VOLATILE LANGUAGE SQL AS $$ +SELECT v FROM kv WHERE k = i; +$$; +CREATE FUNCTION int_identity_v(i INT) RETURNS INT VOLATILE LANGUAGE SQL AS $$ +SELECT i; +$$; + +# Only the volatile functions should see the changes made by the UPDATE in the +# CTE. +query IIIIIIII colnames +WITH u AS ( + UPDATE kv SET v = v + 10 RETURNING k +) +SELECT +get_l(k) l1, get_l(int_identity_v(k)) l2, +get_i(k) i1, get_i(int_identity_v(k)) i2, +get_s(k) s1, get_s(int_identity_v(k)) s2, +get_v(k) v1, get_v(int_identity_v(k)) v2 +FROM u; +---- +l1 l2 i1 i2 s1 s2 v1 v2 +1 1 1 1 1 1 11 11 +2 2 2 2 2 2 12 12 +3 3 3 3 3 3 13 13 From c863dea24555b23022e9a5abf469347795aa5b9e Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Thu, 18 Aug 2022 10:13:15 -0400 Subject: [PATCH 3/7] opt: clarify the return type of Index.Ordinal() The return type of `Index.Ordinal()` is now the type alias `cat.IndexOrdinal` to be consistent with other functions that use an index ordinal, like `Table.Index(i cat.IndexOrdinal)`. It is now more clear that `idx == Table().Index(idx.Ordinal())` Release justification: This is a very small, low-risk change. Release note: None --- pkg/sql/opt/cat/index.go | 2 +- pkg/sql/opt/exec/explain/plan_gist_factory.go | 2 +- pkg/sql/opt/indexrec/hypothetical_index.go | 2 +- pkg/sql/opt/testutils/testcat/test_catalog.go | 2 +- pkg/sql/opt_catalog.go | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/sql/opt/cat/index.go b/pkg/sql/opt/cat/index.go index cbe70228a82b..a7ec6cfdc1d3 100644 --- a/pkg/sql/opt/cat/index.go +++ b/pkg/sql/opt/cat/index.go @@ -48,7 +48,7 @@ type Index interface { // Ordinal returns the ordinal of this index within the context of its Table. // Specifically idx = Table().Index(idx.Ordinal). - Ordinal() int + Ordinal() IndexOrdinal // IsUnique returns true if this index is declared as UNIQUE in the schema. IsUnique() bool diff --git a/pkg/sql/opt/exec/explain/plan_gist_factory.go b/pkg/sql/opt/exec/explain/plan_gist_factory.go index 15c8085fbc34..8199f1266875 100644 --- a/pkg/sql/opt/exec/explain/plan_gist_factory.go +++ b/pkg/sql/opt/exec/explain/plan_gist_factory.go @@ -591,7 +591,7 @@ func (u *unknownIndex) Table() cat.Table { panic(errors.AssertionFailedf("not implemented")) } -func (u *unknownIndex) Ordinal() int { +func (u *unknownIndex) Ordinal() cat.IndexOrdinal { return 0 } diff --git a/pkg/sql/opt/indexrec/hypothetical_index.go b/pkg/sql/opt/indexrec/hypothetical_index.go index b9a6398192b8..906e3b011245 100644 --- a/pkg/sql/opt/indexrec/hypothetical_index.go +++ b/pkg/sql/opt/indexrec/hypothetical_index.go @@ -198,7 +198,7 @@ func (hi *hypotheticalIndex) Table() cat.Table { } // Ordinal is part of the cat.Index interface. -func (hi *hypotheticalIndex) Ordinal() int { +func (hi *hypotheticalIndex) Ordinal() cat.IndexOrdinal { return hi.indexOrdinal } diff --git a/pkg/sql/opt/testutils/testcat/test_catalog.go b/pkg/sql/opt/testutils/testcat/test_catalog.go index f337fe857927..747a90bd203f 100644 --- a/pkg/sql/opt/testutils/testcat/test_catalog.go +++ b/pkg/sql/opt/testutils/testcat/test_catalog.go @@ -986,7 +986,7 @@ func (ti *Index) Table() cat.Table { } // Ordinal is part of the cat.Index interface. -func (ti *Index) Ordinal() int { +func (ti *Index) Ordinal() cat.IndexOrdinal { return ti.ordinal } diff --git a/pkg/sql/opt_catalog.go b/pkg/sql/opt_catalog.go index 351bfe334e6e..6af95fb072ed 100644 --- a/pkg/sql/opt_catalog.go +++ b/pkg/sql/opt_catalog.go @@ -1541,7 +1541,7 @@ func (oi *optIndex) Table() cat.Table { } // Ordinal is part of the cat.Index interface. -func (oi *optIndex) Ordinal() int { +func (oi *optIndex) Ordinal() cat.IndexOrdinal { return oi.indexOrdinal } @@ -2352,7 +2352,7 @@ func (oi *optVirtualIndex) Table() cat.Table { } // Ordinal is part of the cat.Index interface. -func (oi *optVirtualIndex) Ordinal() int { +func (oi *optVirtualIndex) Ordinal() cat.IndexOrdinal { return oi.indexOrdinal } From db2cbd9722ae51bde56a74bff153954747d6c225 Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Wed, 17 Aug 2022 15:16:37 -0400 Subject: [PATCH 4/7] ui/cluster-ui: show status as waiting when txn is waiting for lock Now that we have surfaced contention information in the UI, we can update the stmt / txn status field for active executions to be 'Waiting' when the stmt or txn is waiting to acquire a lock. Release justification: low risk update to existing functionality Release note (ui change): txns and stmts in active exec pages that are waiting for a lock will now have the status 'Waiting' --- .../src/selectors/activeExecutionsCommon.selectors.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/ui/workspaces/cluster-ui/src/selectors/activeExecutionsCommon.selectors.ts b/pkg/ui/workspaces/cluster-ui/src/selectors/activeExecutionsCommon.selectors.ts index 9bae6f0c1a01..25e28c267d77 100644 --- a/pkg/ui/workspaces/cluster-ui/src/selectors/activeExecutionsCommon.selectors.ts +++ b/pkg/ui/workspaces/cluster-ui/src/selectors/activeExecutionsCommon.selectors.ts @@ -24,7 +24,7 @@ import { export const selectExecutionID = ( _state: unknown, props: RouteComponentProps, -) => getMatchParamByName(props.match, executionIdAttr); +): string | null => getMatchParamByName(props.match, executionIdAttr); export const selectActiveExecutionsCombiner = ( sessions: SessionsResponse | null, @@ -39,10 +39,12 @@ export const selectActiveExecutionsCombiner = ( return { statements: execs.statements.map(s => ({ ...s, + status: waitTimeByTxnID[s.transactionID] != null ? "Waiting" : s.status, timeSpentWaiting: waitTimeByTxnID[s.transactionID], })), transactions: execs.transactions.map(t => ({ ...t, + status: waitTimeByTxnID[t.transactionID] != null ? "Waiting" : t.status, timeSpentWaiting: waitTimeByTxnID[t.transactionID], })), }; From 592ce269177f923acdb03f81d48f5c6608aa24d3 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 12 Aug 2022 14:16:06 -0400 Subject: [PATCH 5/7] changefeedccl: make core changefeeds more resilient This change updates core changefeeds to save checkpoints inside the EvalCtx. With this change, core changefeeds can retry from the last checkpoint instead of restarting from the beginning. Fixes: https://github.com/cockroachdb/cockroach/issues/84511 Release note (general change): This change updates core/experimental changefeeds to be more resilient to transient errors (ex. network blips) by adding checkpointing. Previously, transient errors would result in a core changefeed stopping and terminating the underlying SQL statement. This would require the SQL statement to be restarted by a user. Furtheremore, if the core changefeed were restarted during an inital scan, the initial scan would start from the beginning. For large initial scans, transient errors are more likely, so restarting from the beginning would likely see more transient errors and restarts which would not progress the changefeed. Now, an experimental changefeed will automatically take frequent checkpoints and retry from the last checkpoint when a transient errors occurs. Release justification: This change updates an experimental feature. --- .../changefeedccl/changefeed_processors.go | 163 +++++++++++------- pkg/ccl/changefeedccl/changefeed_stmt.go | 18 +- pkg/ccl/changefeedccl/changefeed_test.go | 59 +++++++ .../changefeedccl/changefeedbase/BUILD.bazel | 1 - .../changefeedccl/changefeedbase/errors.go | 27 +-- pkg/ccl/changefeedccl/testing_knobs.go | 2 + pkg/sql/sem/eval/context.go | 3 + pkg/sql/sem/eval/deps.go | 12 ++ 8 files changed, 188 insertions(+), 97 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 7b9b39f0cf38..bc46b8c98e56 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -720,6 +720,8 @@ type changeFrontier struct { // metricsID is used as the unique id of this changefeed in the // metrics.MaxBehindNanos map. metricsID int + + knobs TestingKnobs } const ( @@ -729,7 +731,11 @@ const ( // jobState encapsulates changefeed job state. type jobState struct { - job *jobs.Job + // job is set for changefeeds other than core/sinkless changefeeds. + job *jobs.Job + // coreProgress is set for only core/sinkless changefeeds. + coreProgress *coreChangefeedProgress + settings *cluster.Settings metrics *Metrics ts timeutil.TimeSource @@ -744,11 +750,36 @@ type jobState struct { progressUpdatesSkipped bool } +type coreChangefeedProgress struct { + progress jobspb.Progress +} + +// SetHighwater implements the eval.ChangefeedState interface. +func (cp *coreChangefeedProgress) SetHighwater(frontier *hlc.Timestamp) { + cp.progress.Progress = &jobspb.Progress_HighWater{ + HighWater: frontier, + } +} + +// SetCheckpoint implements the eval.ChangefeedState interface. +func (cp *coreChangefeedProgress) SetCheckpoint(spans []roachpb.Span, timestamp hlc.Timestamp) { + changefeedProgress := cp.progress.Details.(*jobspb.Progress_Changefeed).Changefeed + changefeedProgress.Checkpoint = &jobspb.ChangefeedProgress_Checkpoint{ + Spans: spans, + Timestamp: timestamp, + } +} + func newJobState( - j *jobs.Job, st *cluster.Settings, metrics *Metrics, ts timeutil.TimeSource, + j *jobs.Job, + coreProgress *coreChangefeedProgress, + st *cluster.Settings, + metrics *Metrics, + ts timeutil.TimeSource, ) *jobState { return &jobState{ job: j, + coreProgress: coreProgress, settings: st, metrics: metrics, ts: ts, @@ -830,6 +861,7 @@ func newChangeFrontierProcessor( if err != nil { return nil, err } + cf := &changeFrontier{ flowCtx: flowCtx, spec: spec, @@ -838,6 +870,11 @@ func newChangeFrontierProcessor( frontier: sf, slowLogEveryN: log.Every(slowSpanMaxFrequency), } + + if cfKnobs, ok := flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok { + cf.knobs = *cfKnobs + } + if err := cf.Init( cf, post, @@ -937,7 +974,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.MoveToDraining(err) return } - cf.js = newJobState(job, cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{}) + cf.js = newJobState(job, nil, cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{}) if changefeedbase.FrontierCheckpointFrequency.Get(&cf.flowCtx.Cfg.Settings.SV) == 0 { log.Warning(ctx, @@ -968,6 +1005,10 @@ func (cf *changeFrontier) Start(ctx context.Context) { // running status around for a while before we override it. cf.js.lastRunStatusUpdate = timeutil.Now() } + } else { + cf.js = newJobState(nil, + cf.EvalCtx.ChangefeedState.(*coreChangefeedProgress), + cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{}) } cf.metrics.mu.Lock() @@ -1038,7 +1079,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad // Detect whether this boundary should be used to kill or restart the // changefeed. if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART { - err = changefeedbase.MarkRetryableErrorWithTimestamp(err, cf.frontier.boundaryTime) + err = changefeedbase.MarkRetryableError(err) } } @@ -1139,22 +1180,16 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error { // If frontier changed, we emit resolved timestamp. emitResolved := frontierChanged - // Checkpoint job record progress if needed. - // NB: Sinkless changefeeds will not have a job state (js). In fact, they - // have no distributed state whatsoever. Because of this they also do not - // use protected timestamps. - if cf.js != nil { - checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged) - if err != nil { - return err - } - - // Emit resolved timestamp only if we have checkpointed the job. - // Usually, this happens every time frontier changes, but we can skip some updates - // if we update frontier too rapidly. - emitResolved = checkpointed + checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged) + if err != nil { + return err } + // Emit resolved timestamp only if we have checkpointed the job. + // Usually, this happens every time frontier changes, but we can skip some updates + // if we update frontier too rapidly. + emitResolved = checkpointed + if emitResolved { // Keeping this after the checkpointJobProgress call will avoid // some duplicates if a restart happens. @@ -1239,53 +1274,59 @@ func (cf *changeFrontier) checkpointJobProgress( } cf.metrics.FrontierUpdates.Inc(1) var updateSkipped error - if err := cf.js.job.Update(cf.Ctx, nil, func( - txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, - ) error { - // If we're unable to update the job due to the job state, such as during - // pause-requested, simply skip the checkpoint - if err := md.CheckRunningOrReverting(); err != nil { - updateSkipped = err - return nil - } + if cf.js.job != nil { + + if err := cf.js.job.Update(cf.Ctx, nil, func( + txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, + ) error { + // If we're unable to update the job due to the job state, such as during + // pause-requested, simply skip the checkpoint + if err := md.CheckRunningOrReverting(); err != nil { + updateSkipped = err + return nil + } - // Advance resolved timestamp. - progress := md.Progress - progress.Progress = &jobspb.Progress_HighWater{ - HighWater: &frontier, - } + // Advance resolved timestamp. + progress := md.Progress + progress.Progress = &jobspb.Progress_HighWater{ + HighWater: &frontier, + } - changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed - changefeedProgress.Checkpoint = &checkpoint + changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed + changefeedProgress.Checkpoint = &checkpoint - timestampManager := cf.manageProtectedTimestamps - // TODO(samiskin): Remove this conditional and the associated deprecated - // methods once we're confident in ActiveProtectedTimestampsEnabled - if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) { - timestampManager = cf.deprecatedManageProtectedTimestamps - } - if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil { - log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err) - return err - } + timestampManager := cf.manageProtectedTimestamps + // TODO(samiskin): Remove this conditional and the associated deprecated + // methods once we're confident in ActiveProtectedTimestampsEnabled + if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) { + timestampManager = cf.deprecatedManageProtectedTimestamps + } + if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil { + log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err) + return err + } - if updateRunStatus { - md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier) - } + if updateRunStatus { + md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier) + } - ju.UpdateProgress(progress) + ju.UpdateProgress(progress) - // Reset RunStats.NumRuns to 1 since the changefeed is - // now running. By resetting the NumRuns, we avoid - // future job system level retries from having large - // backoffs because of past failures. - if md.RunStats != nil { - ju.UpdateRunStats(1, md.RunStats.LastRun) - } + // Reset RunStats.NumRuns to 1 since the changefeed is + // now running. By resetting the NumRuns, we avoid + // future job system level retries from having large + // backoffs because of past failures. + if md.RunStats != nil { + ju.UpdateRunStats(1, md.RunStats.LastRun) + } - return nil - }); err != nil { - return false, err + return nil + }); err != nil { + return false, err + } + } else { + cf.js.coreProgress.SetHighwater(&frontier) + cf.js.coreProgress.SetCheckpoint(checkpoint.Spans, checkpoint.Timestamp) } if updateSkipped != nil { @@ -1293,6 +1334,12 @@ func (cf *changeFrontier) checkpointJobProgress( return false, nil } + if cf.knobs.RaiseRetryableError != nil { + if err := cf.knobs.RaiseRetryableError(); err != nil { + return false, changefeedbase.MarkRetryableError(errors.New("cf.knobs.RaiseRetryableError")) + } + } + return true, nil } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index c536f0c7023f..44dff1e4a3fe 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -191,6 +191,11 @@ func changefeedPlanHook( } if details.SinkURI == `` { + + p.ExtendedEvalContext().ChangefeedState = &coreChangefeedProgress{ + progress: progress, + } + // If this is a sinkless changefeed, then we should not hold on to the // descriptor leases accessed to plan the changefeed. If changes happen // to descriptors, they will be addressed during the execution. @@ -220,18 +225,7 @@ func changefeedPlanHook( return err } - // Check for a schemachange boundary timestamp returned via a - // retryable error. Retrying without updating the changefeed progress - // will result in the changefeed performing the schema change again, - // causing an infinite loop. - if ts, ok := changefeedbase.MaybeGetRetryableErrorTimestamp(err); ok { - progress = jobspb.Progress{ - Progress: &jobspb.Progress_HighWater{HighWater: &ts}, - Details: &jobspb.Progress_Changefeed{ - Changefeed: &jobspb.ChangefeedProgress{}, - }, - } - } + progress = p.ExtendedEvalContext().ChangefeedState.(*coreChangefeedProgress).progress } telemetry.Count(`changefeed.core.error`) return changefeedbase.MaybeStripRetryableErrorMarker(err) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 48c1ee88fd60..c8d636e167b3 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -5752,6 +5752,64 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { } } +// TestCoreChangefeedBackfillScanCheckpoint tests that a core changefeed +// successfully completes the initial scan of a table when transient errors occur. +// This test only succeeds if checkpoints are taken. +func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t) + skip.UnderShort(t) + + rnd, _ := randutil.NewPseudoRand() + + rowCount := 10000 + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo(a INT PRIMARY KEY)`) + sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO foo (a) SELECT * FROM generate_series(%d, %d)`, 0, rowCount)) + + knobs := s.TestingKnobs. + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + + // Ensure Scan Requests are always small enough that we receive multiple + // resolved events during a backfill. Also ensure that checkpoint frequency + // and size are large enough to induce several checkpoints when + // writing `rowCount` rows. + knobs.FeedKnobs.BeforeScanRequest = func(b *kv.Batch) error { + b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(25) + return nil + } + changefeedbase.FrontierCheckpointFrequency.Override( + context.Background(), &s.Server.ClusterSettings().SV, 1) + changefeedbase.FrontierCheckpointMaxBytes.Override( + context.Background(), &s.Server.ClusterSettings().SV, 100<<20) + + emittedCount := 0 + knobs.RaiseRetryableError = func() error { + emittedCount++ + if emittedCount%200 == 0 { + return changefeedbase.MarkRetryableError(errors.New("test transient error")) + } + return nil + } + + foo := feed(t, f, `CREATE CHANGEFEED FOR TABLE foo`) + defer closeFeed(t, foo) + + payloads := make([]string, rowCount+1) + for i := 0; i < rowCount+1; i++ { + payloads[i] = fmt.Sprintf(`foo: [%d]->{"after": {"a": %d}}`, i, i) + } + assertPayloads(t, foo, payloads) + } + + cdcTest(t, testFn, feedTestForceSink("sinkless")) +} + func TestCheckpointFrequency(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -5765,6 +5823,7 @@ func TestCheckpointFrequency(t *testing.T) { ts := timeutil.NewManualTime(timeutil.Now()) js := newJobState( nil, /* job */ + nil, /* core progress */ cluster.MakeTestingClusterSettings(), MakeMetrics(time.Second).(*Metrics), ts, ) diff --git a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel index f34ecdaffd42..64d92e46876e 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel +++ b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel @@ -21,7 +21,6 @@ go_library( "//pkg/sql", "//pkg/sql/catalog/descpb", "//pkg/sql/flowinfra", - "//pkg/util/hlc", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/ccl/changefeedccl/changefeedbase/errors.go b/pkg/ccl/changefeedccl/changefeedbase/errors.go index 5dd16ca3debe..a10415b4ad4c 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/errors.go +++ b/pkg/ccl/changefeedccl/changefeedbase/errors.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -78,14 +77,7 @@ func (e *taggedError) Unwrap() error { return e.wrapped } const retryableErrorString = "retryable changefeed error" type retryableError struct { - // A schema change may result in a changefeed returning retryableError, - // which can signal the changefeed to restart. - // boundaryTimestamp can be returned inside this error so - // the changefeed knows where to restart from. Note this is - // only useful for sinkless/core changefeeds because they do not have - // the ability to read/write their state to jobs tables during restarts. - boundaryTimestamp hlc.Timestamp - wrapped error + wrapped error } // MarkRetryableError wraps the given error, marking it as retryable to @@ -94,12 +86,6 @@ func MarkRetryableError(e error) error { return &retryableError{wrapped: e} } -// MarkRetryableErrorWithTimestamp wraps the given error, marks it as -// retryable, and attaches a timestamp to the error. -func MarkRetryableErrorWithTimestamp(e error, ts hlc.Timestamp) error { - return &retryableError{boundaryTimestamp: ts, wrapped: e} -} - // Error implements the error interface. func (e *retryableError) Error() string { return fmt.Sprintf("%s: %s", retryableErrorString, e.wrapped.Error()) @@ -139,17 +125,6 @@ func IsRetryableError(err error) bool { errors.Is(err, sql.ErrPlanChanged)) } -// MaybeGetRetryableErrorTimestamp will get the timestamp of an error if -// the error is a retryableError and the timestamp field is populated. -func MaybeGetRetryableErrorTimestamp(err error) (timestamp hlc.Timestamp, ok bool) { - if retryableErr := (*retryableError)(nil); errors.As(err, &retryableErr) { - if !retryableErr.boundaryTimestamp.IsEmpty() { - return retryableErr.boundaryTimestamp, true - } - } - return hlc.Timestamp{}, false -} - // MaybeStripRetryableErrorMarker performs some minimal attempt to clean the // RetryableError marker out. This won't do anything if the RetryableError // itself has been wrapped, but that's okay, we'll just have an uglier string. diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index cafb5c04986a..f7a993da2340 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -45,6 +45,8 @@ type TestingKnobs struct { OnDistflowSpec func(aggregatorSpecs []*execinfrapb.ChangeAggregatorSpec, frontierSpec *execinfrapb.ChangeFrontierSpec) // ShouldReplan is used to see if a replan for a changefeed should be triggered ShouldReplan func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool + // RaiseRetryableError is a knob used to possibly return an error. + RaiseRetryableError func() error } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index 8d92cf6973c7..3c4c019f128e 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -230,6 +230,9 @@ type Context struct { // RangeStatsFetcher is used to fetch RangeStats. RangeStatsFetcher RangeStatsFetcher + + // ChangefeedState stores the state (progress) of core changefeeds. + ChangefeedState ChangefeedState } // DescIDGenerator generates unique descriptor IDs. diff --git a/pkg/sql/sem/eval/deps.go b/pkg/sql/sem/eval/deps.go index e2e15a3a22ff..f42fd291303a 100644 --- a/pkg/sql/sem/eval/deps.go +++ b/pkg/sql/sem/eval/deps.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" @@ -502,6 +503,17 @@ type SequenceOperators interface { SetSequenceValueByID(ctx context.Context, seqID uint32, newVal int64, isCalled bool) error } +// ChangefeedState is used to track progress and checkpointing for sinkless/core changefeeds. +// Because a CREATE CHANGEFEED statement for a sinkless changefeed will hang and return data +// over the SQL connection, this state belongs in the EvalCtx. +type ChangefeedState interface { + // SetHighwater sets the frontier timestamp for the changefeed. + SetHighwater(frontier *hlc.Timestamp) + + // SetCheckpoint sets the checkpoint for the changefeed. + SetCheckpoint(spans []roachpb.Span, timestamp hlc.Timestamp) +} + // TenantOperator is capable of interacting with tenant state, allowing SQL // builtin functions to create, configure, and destroy tenants. The methods will // return errors when run by any tenant other than the system tenant. From b6dc58b2b4253967fceebfeabecb5881290283db Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Sun, 17 Jul 2022 23:02:55 -0400 Subject: [PATCH 6/7] sql: make sequence integer bound consistent with `default_int_size` Previously, the default bound of sequence are always `math.MaxInt64` or `math.MinInt64` (depending on the sequence's order). This is inconsistent with the cluster setting `default_int_size`. This commit is to fix it. fixes #84554 Release note (bug fix): make sequence integer bound consistent with the cluster setting `default_int_size`. Release justification: bug fix --- .../catalog/schemaexpr/sequence_options.go | 22 ++++--- pkg/sql/catalog/table_elements.go | 2 +- pkg/sql/catalog/tabledesc/column.go | 9 ++- pkg/sql/information_schema.go | 2 +- .../testdata/logic_test/information_schema | 59 +++++++++++++++++++ .../logictest/testdata/logic_test/pg_catalog | 38 ++++++++++++ pkg/sql/sequence.go | 22 ++++++- 7 files changed, 136 insertions(+), 18 deletions(-) diff --git a/pkg/sql/catalog/schemaexpr/sequence_options.go b/pkg/sql/catalog/schemaexpr/sequence_options.go index a25a99f106c8..8634a931c01a 100644 --- a/pkg/sql/catalog/schemaexpr/sequence_options.go +++ b/pkg/sql/catalog/schemaexpr/sequence_options.go @@ -25,7 +25,9 @@ import ( // descriptor to a descpb.TableDescriptor_SequenceOpts. // Note that this function is used to acquire the sequence option for the // information schema table, so it doesn't parse for the sequence owner info. -func ParseSequenceOpts(s string) (*descpb.TableDescriptor_SequenceOpts, error) { +func ParseSequenceOpts( + s string, defaultIntSize int32, +) (*descpb.TableDescriptor_SequenceOpts, error) { stmt, err := parser.ParseOne("CREATE SEQUENCE fake_seq " + s) if err != nil { return nil, errors.Wrap(err, "cannot parse sequence option") @@ -42,6 +44,7 @@ func ParseSequenceOpts(s string) (*descpb.TableDescriptor_SequenceOpts, error) { if err := AssignSequenceOptions( opts, createSeqNode.Options, + defaultIntSize, true, /* setDefaults */ nil, /* existingType */ ); err != nil { @@ -125,13 +128,14 @@ func setSequenceIntegerBounds( func AssignSequenceOptions( opts *descpb.TableDescriptor_SequenceOpts, optsNode tree.SequenceOptions, + defaultIntSize int32, setDefaults bool, existingType *types.T, ) error { wasAscending := opts.Increment > 0 // Set the default integer type of a sequence. - var integerType = types.Int + integerType := parser.NakedIntTypeFromDefaultIntSize(defaultIntSize) // All other defaults are dependent on the value of increment // and the AS integerType. (i.e. whether the sequence is ascending // or descending, bigint vs. smallint) @@ -148,25 +152,25 @@ func AssignSequenceOptions( } isAscending := opts.Increment > 0 + lowerIntBound, upperIntBound, err := getSequenceIntegerBounds(integerType) + if err != nil { + return err + } + // Set increment-dependent defaults. if setDefaults { if isAscending { opts.MinValue = 1 - opts.MaxValue = math.MaxInt64 + opts.MaxValue = upperIntBound opts.Start = opts.MinValue } else { - opts.MinValue = math.MinInt64 + opts.MinValue = lowerIntBound opts.MaxValue = -1 opts.Start = opts.MaxValue } opts.CacheSize = 1 } - lowerIntBound, upperIntBound, err := getSequenceIntegerBounds(integerType) - if err != nil { - return err - } - // Set default MINVALUE and MAXVALUE if AS option value for integer type is specified. if opts.AsIntegerType != "" { // We change MINVALUE and MAXVALUE if it is the originally set to the default during ALTER. diff --git a/pkg/sql/catalog/table_elements.go b/pkg/sql/catalog/table_elements.go index e3e03a1946d1..66249fe61421 100644 --- a/pkg/sql/catalog/table_elements.go +++ b/pkg/sql/catalog/table_elements.go @@ -385,7 +385,7 @@ type Column interface { // If the column is not an identity column, return nil for both sequence option // and the error. // Note it doesn't return the sequence owner info. - GetGeneratedAsIdentitySequenceOption() (*descpb.TableDescriptor_SequenceOpts, error) + GetGeneratedAsIdentitySequenceOption(defaultIntSize int32) (*descpb.TableDescriptor_SequenceOpts, error) } // ConstraintToUpdate is an interface around a constraint mutation. diff --git a/pkg/sql/catalog/tabledesc/column.go b/pkg/sql/catalog/tabledesc/column.go index 2571f62e92aa..055bad5d97c5 100644 --- a/pkg/sql/catalog/tabledesc/column.go +++ b/pkg/sql/catalog/tabledesc/column.go @@ -249,14 +249,13 @@ func (w column) GetGeneratedAsIdentitySequenceOptionStr() string { // If the column is not an identity column, return nil for both sequence option // and the error. // Note it doesn't return the sequence owner info. -func (w column) GetGeneratedAsIdentitySequenceOption() ( - *descpb.TableDescriptor_SequenceOpts, - error, -) { +func (w column) GetGeneratedAsIdentitySequenceOption( + defaultIntSize int32, +) (*descpb.TableDescriptor_SequenceOpts, error) { if !w.HasGeneratedAsIdentitySequenceOption() { return nil, nil } - seqOpts, err := schemaexpr.ParseSequenceOpts(*w.desc.GeneratedAsIdentitySequenceOption) + seqOpts, err := schemaexpr.ParseSequenceOpts(*w.desc.GeneratedAsIdentitySequenceOption, defaultIntSize) if err != nil { return nil, err } diff --git a/pkg/sql/information_schema.go b/pkg/sql/information_schema.go index 4a3a26ab1b5c..33adce65f1d1 100644 --- a/pkg/sql/information_schema.go +++ b/pkg/sql/information_schema.go @@ -488,7 +488,7 @@ https://www.postgresql.org/docs/9.5/infoschema-columns.html`, identityIncrement := tree.DNull identityMax := tree.DNull identityMin := tree.DNull - generatedAsIdentitySeqOpt, err := column.GetGeneratedAsIdentitySequenceOption() + generatedAsIdentitySeqOpt, err := column.GetGeneratedAsIdentitySequenceOption(column.GetType().Width()) if err != nil { return err } diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index d1c3a4bc603c..5719b33cdf30 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -5051,3 +5051,62 @@ query TTTTT select identity_start, identity_increment, identity_maximum, identity_minimum, identity_cycle from information_schema.columns where table_name = 't' and column_name='id27'; ---- 6 3 12 6 NULL + + +subtest get_seq_opt_with_default_int_size + +statement ok +DROP TABLE IF EXISTS t + +statement ok +SET default_int_size=4 + +statement ok +CREATE TABLE t ( + id1 INT GENERATED BY DEFAULT AS IDENTITY (START WITH 10) +) + +query TTTTT +select identity_start, identity_increment, identity_maximum, identity_minimum, identity_cycle from information_schema.columns where table_name = 't' and column_name='id1'; +---- +10 1 2147483647 1 NULL + +statement ok +DROP TABLE IF EXISTS t + +statement ok +SET default_int_size=8 + +statement ok +CREATE TABLE t ( + id1 INT GENERATED BY DEFAULT AS IDENTITY (START WITH 10) +) + +query TTTTT +select identity_start, identity_increment, identity_maximum, identity_minimum, identity_cycle from information_schema.columns where table_name = 't' and column_name='id1'; +---- +10 1 9223372036854775807 1 NULL + +statement ok +DROP TABLE IF EXISTS t + +statement ok +set default_int_size = 8; + +statement ok +CREATE TABLE t ( + id1 INT GENERATED BY DEFAULT AS IDENTITY (START WITH 10) +) + +query TTTTT +select identity_start, identity_increment, identity_maximum, identity_minimum, identity_cycle from information_schema.columns where table_name = 't' and column_name='id1'; +---- +10 1 9223372036854775807 1 NULL + +statement ok +set default_int_size = 4; + +query TTTTT +select identity_start, identity_increment, identity_maximum, identity_minimum, identity_cycle from information_schema.columns where table_name = 't' and column_name='id1'; +---- +10 1 9223372036854775807 1 NULL diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 46d4e0a363ff..5f643990f086 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -5947,3 +5947,41 @@ oid castsource casttarget castfunc castcontext castmethod 3922032069 18 1043 NULL a NULL 3989142581 18 23 NULL e NULL 3989142587 18 25 NULL i NULL + +subtest seq_bound_should_consistent_with_session_var + +statement ok +DROP sequence serial + +statement ok +set default_int_size=4 + +statement ok +CREATE SEQUENCE serial START 101 INCREMENT 5 + +query TTTOIIIIBII colnames +SELECT * FROM pg_sequences WHERE sequencename = 'serial' +---- +schemaname sequencename sequenceowner data_type start_value min_value max_value increment_by cycle cache_size last_value +public serial root 20 101 1 2147483647 5 false 1 NULL + +statement ok +set default_int_size=8 + +query TTTOIIIIBII colnames +SELECT * FROM pg_sequences WHERE sequencename = 'serial' +---- +schemaname sequencename sequenceowner data_type start_value min_value max_value increment_by cycle cache_size last_value +public serial root 20 101 1 2147483647 5 false 1 NULL + +statement ok +DROP sequence serial + +statement ok +CREATE SEQUENCE serial START 101 INCREMENT 5 + +query TTTOIIIIBII colnames +SELECT * FROM pg_sequences WHERE sequencename = 'serial' +---- +schemaname sequencename sequenceowner data_type start_value min_value max_value increment_by cycle cache_size last_value +public serial root 20 101 1 9223372036854775807 5 false 1 NULL diff --git a/pkg/sql/sequence.go b/pkg/sql/sequence.go index 3e068b6732f0..0679ba61332c 100644 --- a/pkg/sql/sequence.go +++ b/pkg/sql/sequence.go @@ -480,10 +480,28 @@ func assignSequenceOptions( if err := checkDupSeqOption(optsNode); err != nil { return err } - if err := schemaexpr.AssignSequenceOptions(opts, optsNode, setDefaults, existingType); err != nil { + + defaultIntSize := int32(64) + if p != nil && p.SessionData() != nil { + defaultIntSize = p.SessionData().DefaultIntSize + } + if err := schemaexpr.AssignSequenceOptions( + opts, + optsNode, + defaultIntSize, + setDefaults, + existingType, + ); err != nil { return pgerror.WithCandidateCode(err, pgcode.InvalidParameterValue) } - if err := assignSequenceOwner(ctx, p, opts, optsNode, sequenceID, sequenceParentID); err != nil { + if err := assignSequenceOwner( + ctx, + p, + opts, + optsNode, + sequenceID, + sequenceParentID, + ); err != nil { return pgerror.WithCandidateCode(err, pgcode.InvalidParameterValue) } return nil From cf996d26866415a6bb38f3002d5501d8dc1e15a4 Mon Sep 17 00:00:00 2001 From: Matthew Todd Date: Tue, 16 Aug 2022 14:46:57 -0400 Subject: [PATCH 7/7] insights: execution_insights_capacity cluster setting Fixes #79450. Whereas we previously retained only the 10 most recent insights, we now let the user choose how many they'd like to hang onto. It may make sense to be more sophisticated than a simple LRU cache here: perhaps we won't want a single fingerprint to dominate the list. That work is captured in #86271. Release justification: Category 2: Bug fixes and low-risk updates to new functionality. Release note (ops change): The `sql.insights.execution_insights_capacity` cluster setting was introduced, limiting the number of SQL execution insights retained in memory per node. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/sql/sqlstats/insights/insights.go | 10 +++++ pkg/sql/sqlstats/insights/registry.go | 9 +---- pkg/sql/sqlstats/insights/registry_test.go | 37 +++++++++++++++++++ 5 files changed, 50 insertions(+), 8 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 69fe43009f8c..78fb5a83d6b1 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -232,6 +232,7 @@ sql.distsql.max_running_flows integer -128 the value - when positive - used as i sql.distsql.temp_storage.workmem byte size 64 MiB maximum amount of memory in bytes a processor can use before falling back to temp storage sql.guardrails.max_row_size_err byte size 512 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable sql.guardrails.max_row_size_log byte size 64 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable +sql.insights.execution_insights_capacity integer 1000 the size of the per-node store of execution insights sql.insights.latency_threshold duration 100ms amount of time after which an executing statement is considered slow. Use 0 to disable. sql.log.slow_query.experimental_full_table_scans.enabled boolean false when set to true, statements that perform a full table/index scan will be logged to the slow query log even if they do not meet the latency threshold. Must have the slow query log enabled for this setting to have any effect. sql.log.slow_query.internal_queries.enabled boolean false when set to true, internal queries which exceed the slow query log threshold are logged to a separate log. Must have the slow query log enabled for this setting to have any effect. diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 9aefdc75ff19..79b50190d8b0 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -163,6 +163,7 @@ sql.guardrails.max_row_size_errbyte size512 MiBmaximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable sql.guardrails.max_row_size_logbyte size64 MiBmaximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable sql.hash_sharded_range_pre_split.maxinteger16max pre-split ranges to have when adding hash sharded index to an existing table +sql.insights.execution_insights_capacityinteger1000the size of the per-node store of execution insights sql.insights.latency_thresholdduration100msamount of time after which an executing statement is considered slow. Use 0 to disable. sql.log.slow_query.experimental_full_table_scans.enabledbooleanfalsewhen set to true, statements that perform a full table/index scan will be logged to the slow query log even if they do not meet the latency threshold. Must have the slow query log enabled for this setting to have any effect. sql.log.slow_query.internal_queries.enabledbooleanfalsewhen set to true, internal queries which exceed the slow query log threshold are logged to a separate log. Must have the slow query log enabled for this setting to have any effect. diff --git a/pkg/sql/sqlstats/insights/insights.go b/pkg/sql/sqlstats/insights/insights.go index fc65980c42b0..fd6cf1d76507 100644 --- a/pkg/sql/sqlstats/insights/insights.go +++ b/pkg/sql/sqlstats/insights/insights.go @@ -22,6 +22,16 @@ import ( prometheus "github.com/prometheus/client_model/go" ) +// ExecutionInsightsCapacity limits the number of execution insights retained in memory. +// As further insights are had, the oldest ones are evicted. +var ExecutionInsightsCapacity = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.insights.execution_insights_capacity", + "the size of the per-node store of execution insights", + 1000, + settings.NonNegativeInt, +).WithPublic() + // LatencyThreshold configures the execution time beyond which a statement is // considered slow. A LatencyThreshold of 0 (the default) disables this // detection. diff --git a/pkg/sql/sqlstats/insights/registry.go b/pkg/sql/sqlstats/insights/registry.go index 2036a6456e4a..6c5fb1531f2b 100644 --- a/pkg/sql/sqlstats/insights/registry.go +++ b/pkg/sql/sqlstats/insights/registry.go @@ -20,13 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) -// maxCacheSize is the number of detected outliers we will retain in memory. -// We choose a small value for the time being to allow us to iterate without -// worrying about memory usage. See #79450. -const ( - maxCacheSize = 10 -) - // registry is the central object in the outliers subsystem. It observes // statement execution to determine which statements are outliers and // exposes the set of currently retained outliers. @@ -49,7 +42,7 @@ func newRegistry(st *cluster.Settings, metrics Metrics) Registry { config := cache.Config{ Policy: cache.CacheFIFO, ShouldEvict: func(size int, key, value interface{}) bool { - return size > maxCacheSize + return int64(size) > ExecutionInsightsCapacity.Get(&st.SV) }, } r := ®istry{ diff --git a/pkg/sql/sqlstats/insights/registry_test.go b/pkg/sql/sqlstats/insights/registry_test.go index 6e02fd62062e..d1d6c9bc451e 100644 --- a/pkg/sql/sqlstats/insights/registry_test.go +++ b/pkg/sql/sqlstats/insights/registry_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" + "github.com/cockroachdb/cockroach/pkg/util/uint128" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) @@ -142,4 +143,40 @@ func TestRegistry(t *testing.T) { require.Equal(t, expected, actual) }) + + t.Run("retention", func(t *testing.T) { + st := cluster.MakeTestingClusterSettings() + LatencyThreshold.Override(ctx, &st.SV, 100*time.Millisecond) + slow := 2 * LatencyThreshold.Get(&st.SV).Seconds() + r := newRegistry(st, NewMetrics()) + + // With the ExecutionInsightsCapacity set to 5, we retain the 5 most recently-seen insights. + ExecutionInsightsCapacity.Override(ctx, &st.SV, 5) + for id := 0; id < 10; id++ { + observeStatementExecution(r, uint64(id), slow) + } + assertInsightStatementIDs(t, r, []uint64{9, 8, 7, 6, 5}) + + // Lowering the ExecutionInsightsCapacity requires having a new insight to evict the others. + ExecutionInsightsCapacity.Override(ctx, &st.SV, 2) + assertInsightStatementIDs(t, r, []uint64{9, 8, 7, 6, 5}) + observeStatementExecution(r, 10, slow) + assertInsightStatementIDs(t, r, []uint64{10, 9}) + }) +} + +func observeStatementExecution(registry Registry, idBase uint64, latencyInSeconds float64) { + sessionID := clusterunique.ID{Uint128: uint128.FromInts(2, 0)} + txnID := uuid.FromUint128(uint128.FromInts(1, idBase)) + stmtID := clusterunique.ID{Uint128: uint128.FromInts(0, idBase)} + registry.ObserveStatement(sessionID, &Statement{ID: stmtID, LatencyInSeconds: latencyInSeconds}) + registry.ObserveTransaction(sessionID, &Transaction{ID: txnID}) +} + +func assertInsightStatementIDs(t *testing.T, registry Registry, expected []uint64) { + var actual []uint64 + registry.IterateInsights(context.Background(), func(ctx context.Context, insight *Insight) { + actual = append(actual, insight.Statement.ID.Lo) + }) + require.ElementsMatch(t, expected, actual) }