diff --git a/docs/generated/sql/bnf/alter_table.bnf b/docs/generated/sql/bnf/alter_table.bnf index 7f6bc5c4de66..3ea94d15bb4d 100644 --- a/docs/generated/sql/bnf/alter_table.bnf +++ b/docs/generated/sql/bnf/alter_table.bnf @@ -1,3 +1,3 @@ alter_onetable_stmt ::= - 'ALTER' 'TABLE' table_name ( ( ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by | 'INJECT' 'STATISTICS' a_expr ) ) ( ( ',' ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by | 'INJECT' 'STATISTICS' a_expr ) ) )* ) - | 'ALTER' 'TABLE' 'IF' 'EXISTS' table_name ( ( ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by | 'INJECT' 'STATISTICS' a_expr ) ) ( ( ',' ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by | 'INJECT' 'STATISTICS' a_expr ) ) )* ) + 'ALTER' 'TABLE' table_name ( ( ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by ) ) ( ( ',' ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by ) ) )* ) + | 'ALTER' 'TABLE' 'IF' 'EXISTS' table_name ( ( ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by ) ) ( ( ',' ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by ) ) )* ) diff --git a/docs/generated/sql/bnf/alter_type.bnf b/docs/generated/sql/bnf/alter_type.bnf new file mode 100644 index 000000000000..ccbd8b5526e8 --- /dev/null +++ b/docs/generated/sql/bnf/alter_type.bnf @@ -0,0 +1,3 @@ +alter_onetable_stmt ::= + 'ALTER' 'TABLE' table_name 'ALTER' column_name 'TYPE' new_type + | 'ALTER' 'TABLE' 'IF' 'EXISTS' table_name 'ALTER' column_name 'TYPE' new_type diff --git a/docs/generated/sql/bnf/show_var.bnf b/docs/generated/sql/bnf/show_var.bnf index 3e5a38b4bb0c..93082503a4eb 100644 --- a/docs/generated/sql/bnf/show_var.bnf +++ b/docs/generated/sql/bnf/show_var.bnf @@ -6,7 +6,6 @@ show_stmt ::= | show_csettings_stmt | show_databases_stmt | show_grants_stmt - | show_histogram_stmt | show_indexes_stmt | show_jobs_stmt | show_queries_stmt @@ -15,7 +14,6 @@ show_stmt ::= | show_schemas_stmt | show_session_stmt | show_sessions_stmt - | show_stats_stmt | show_tables_stmt | show_trace_stmt | show_users_stmt diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index f272faa75577..6dd82283c4dd 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -58,7 +58,6 @@ create_stmt ::= create_user_stmt | create_role_stmt | create_ddl_stmt - | create_stats_stmt deallocate_stmt ::= 'DEALLOCATE' name @@ -157,7 +156,6 @@ show_stmt ::= | show_csettings_stmt | show_databases_stmt | show_grants_stmt - | show_histogram_stmt | show_indexes_stmt | show_jobs_stmt | show_queries_stmt @@ -166,7 +164,6 @@ show_stmt ::= | show_schemas_stmt | show_session_stmt | show_sessions_stmt - | show_stats_stmt | show_tables_stmt | show_trace_stmt | show_users_stmt @@ -262,9 +259,6 @@ create_ddl_stmt ::= | create_view_stmt | create_sequence_stmt -create_stats_stmt ::= - 'CREATE' 'STATISTICS' statistics_name 'ON' name_list 'FROM' table_name - name ::= 'identifier' | unreserved_keyword @@ -322,7 +316,6 @@ explainable_stmt ::= preparable_stmt | alter_ddl_stmt | create_ddl_stmt - | create_stats_stmt | drop_ddl_stmt | execute_stmt @@ -462,9 +455,6 @@ show_databases_stmt ::= show_grants_stmt ::= 'SHOW' 'GRANTS' opt_on_targets_roles for_grantee_clause -show_histogram_stmt ::= - 'SHOW' 'HISTOGRAM' 'ICONST' - show_indexes_stmt ::= 'SHOW' 'INDEX' 'FROM' table_name | 'SHOW' 'INDEXES' 'FROM' table_name @@ -498,10 +488,6 @@ show_sessions_stmt ::= | 'SHOW' 'CLUSTER' 'SESSIONS' | 'SHOW' 'LOCAL' 'SESSIONS' -show_stats_stmt ::= - 'SHOW' 'STATISTICS' 'FOR' 'TABLE' table_name - | 'SHOW' 'STATISTICS' 'USING' 'JSON' 'FOR' 'TABLE' table_name - show_tables_stmt ::= 'SHOW' 'TABLES' 'FROM' name '.' name | 'SHOW' 'TABLES' 'FROM' name @@ -895,9 +881,6 @@ create_sequence_stmt ::= 'CREATE' 'SEQUENCE' sequence_name opt_sequence_option_list | 'CREATE' 'SEQUENCE' 'IF' 'NOT' 'EXISTS' sequence_name opt_sequence_option_list -statistics_name ::= - name - with_clause ::= 'WITH' cte_list @@ -1755,7 +1738,6 @@ alter_table_cmd ::= | 'DROP' 'CONSTRAINT' constraint_name opt_drop_behavior | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by - | 'INJECT' 'STATISTICS' a_expr alter_index_cmd ::= partition_by diff --git a/docs/generated/sql/bnf/table_ref.bnf b/docs/generated/sql/bnf/table_ref.bnf index a226ade41c9d..8c41cc8e66ab 100644 --- a/docs/generated/sql/bnf/table_ref.bnf +++ b/docs/generated/sql/bnf/table_ref.bnf @@ -1,7 +1,7 @@ table_ref ::= - table_name opt_index_flags ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) + table_name ( '@' scan_parameters | ) ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) | '(' select_stmt ')' ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) | joined_table | '(' joined_table ')' ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) - | func_table ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) + | func_application ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) | '[' explainable_stmt ']' ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) diff --git a/pkg/acceptance/decommission_test.go b/pkg/acceptance/decommission_test.go deleted file mode 100644 index 4605f3ec09dc..000000000000 --- a/pkg/acceptance/decommission_test.go +++ /dev/null @@ -1,528 +0,0 @@ -// Copyright 2015 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. - -package acceptance - -import ( - "context" - "reflect" - "regexp" - "strconv" - "strings" - "testing" - "time" - - gosql "database/sql" - - "github.com/cockroachdb/cockroach/pkg/acceptance/cluster" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/server/serverpb" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/util/encoding/csv" - "github.com/cockroachdb/cockroach/pkg/util/httputil" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/kr/pretty" - "github.com/pkg/errors" -) - -// TestDecommission starts up an >3 node cluster and decomissions and -// recommissions nodes in various ways. -func TestDecommission(t *testing.T) { - RunLocal(t, func(t *testing.T) { - s := log.Scope(t) - defer s.Close(t) - - runTestWithCluster(t, testDecommissionInner) - }) -} - -func decommission( - ctx context.Context, - c cluster.Cluster, - runNode int, - targetNodes []roachpb.NodeID, - verbs ...string, -) (string, string, error) { - args := append([]string{"node"}, verbs...) - for _, target := range targetNodes { - args = append(args, strconv.Itoa(int(target))) - } - o, e, err := c.ExecCLI(ctx, runNode, args) - return o, e, err -} - -func matchCSV(csvStr string, matchColRow [][]string) (err error) { - defer func() { - if err != nil { - err = errors.Errorf("csv input:\n%v\nexpected:\n%s\nerrors:%s", csvStr, pretty.Sprint(matchColRow), err) - } - }() - - reader := csv.NewReader(strings.NewReader(csvStr)) - reader.FieldsPerRecord = -1 - records, err := reader.ReadAll() - if err != nil { - return err - } - - lr, lm := len(records), len(matchColRow) - if lr < lm { - return errors.Errorf("csv has %d rows, but expected at least %d", lr, lm) - } - - // Compare only the last len(matchColRow) records. That is, if we want to - // match 4 rows and we have 100 records, we only really compare - // records[96:], that is, the last four rows. - records = records[lr-lm:] - - for i := range records { - if lr, lm := len(records[i]), len(matchColRow[i]); lr != lm { - return errors.Errorf("row #%d: csv has %d columns, but expected %d", i+1, lr, lm) - } - for j := range records[i] { - pat, str := matchColRow[i][j], records[i][j] - re := regexp.MustCompile(pat) - if !re.MatchString(str) { - err = errors.Errorf("%v\nrow #%d, col #%d: found %q which does not match %q", err, i+1, j+1, str, pat) - } - } - } - return err -} - -func testDecommissionInner( - ctx context.Context, t *testing.T, c cluster.Cluster, cfg cluster.TestConfig, -) { - if c.NumNodes() < 4 { - // TODO(tschottdorf): or we invent a way to change the ZoneConfig in - // this test and test less ambitiously (or split up the tests). - t.Skip("need at least four nodes") - } - - withDB := func(n int, stmt string) { - db, err := gosql.Open("postgres", c.PGUrl(ctx, n)) - if err != nil { - t.Fatal(err) - } - defer func() { - if err := db.Close(); err != nil { - t.Error(err) - } - }() - if _, err := db.ExecContext(ctx, stmt); err != nil { - t.Fatal(err) - } - } - - withDB(1, "SET CLUSTER SETTING server.remote_debugging.mode = 'any';") - - // Get the ids for each node. - idMap := make(map[int]roachpb.NodeID) - for i := 0; i < c.NumNodes(); i++ { - var details serverpb.DetailsResponse - if err := httputil.GetJSON(cluster.HTTPClient, c.URL(ctx, i)+"/_status/details/local", &details); err != nil { - t.Fatal(err) - } - idMap[i] = details.NodeID - } - - decommissionHeader := []string{"id", "is_live", "replicas", "is_decommissioning", "is_draining"} - decommissionFooter := []string{"No more data reported on target nodes. Please verify cluster health before removing the nodes."} - waitLiveDeprecated := "--wait=live is deprecated and is treated as --wait=all" - - statusHeader := []string{"id", "address", "build", "started_at", "updated_at", "is_live"} - - log.Info(ctx, "decommissioning first node from the second, polling the status manually") - retryOpts := retry.Options{ - InitialBackoff: time.Second, - MaxBackoff: 5 * time.Second, - Multiplier: 1, - MaxRetries: 20, - } - for r := retry.Start(retryOpts); r.Next(); { - o, _, err := decommission(ctx, c, 1, []roachpb.NodeID{idMap[0]}, "decommission", "--wait", "none", "--format", "csv") - if err != nil { - t.Fatal(err) - } - - exp := [][]string{ - decommissionHeader, - {strconv.Itoa(int(idMap[0])), "true", "0", "true", "false"}, - decommissionFooter, - } - log.Infof(ctx, o) - - if err := matchCSV(o, exp); err != nil { - continue - } - break - } - - // Check that even though the node is decommissioned, we still see it (since - // it remains live) in `node ls`. - { - o, _, err := c.ExecCLI(ctx, 2, []string{"node", "ls", "--format", "csv"}) - if err != nil { - t.Fatal(err) - } - exp := [][]string{ - {"id"}, - {"1"}, - {"2"}, - {"3"}, - {"4"}, - } - if err := matchCSV(o, exp); err != nil { - t.Fatal(err) - } - } - // Ditto `node status`. - { - o, _, err := c.ExecCLI(ctx, 2, []string{"node", "status", "--format", "csv"}) - if err != nil { - t.Fatal(err) - } - exp := [][]string{ - statusHeader, - {`1`, `.*`, `.*`, `.*`, `.*`, `.*`}, - {`2`, `.*`, `.*`, `.*`, `.*`, `.*`}, - {`3`, `.*`, `.*`, `.*`, `.*`, `.*`}, - {`4`, `.*`, `.*`, `.*`, `.*`, `.*`}, - } - if err := matchCSV(o, exp); err != nil { - t.Fatal(err) - } - } - - log.Info(ctx, "recommissioning first node (from third node)") - { - o, _, err := decommission(ctx, c, 2, []roachpb.NodeID{idMap[0]}, "recommission") - if err != nil { - t.Fatal(err) - } - log.Infof(ctx, o) - } - - log.Info(ctx, "decommissioning second node from third, using --wait=all") - { - target := idMap[1] - o, _, err := decommission(ctx, c, 2, []roachpb.NodeID{target}, "decommission", "--wait", "all", "--format", "csv") - if err != nil { - t.Fatal(err) - } - log.Infof(ctx, o) - - exp := [][]string{ - decommissionHeader, - {strconv.Itoa(int(target)), "true", "0", "true", "false"}, - decommissionFooter, - } - if err := matchCSV(o, exp); err != nil { - t.Fatal(err) - } - } - - log.Info(ctx, "recommissioning second node from itself") - { - o, _, err := decommission(ctx, c, 1, []roachpb.NodeID{idMap[1]}, "recommission") - if err != nil { - t.Fatalf("could no recommission: %s\n%s", err, o) - } - log.Infof(ctx, o) - } - - log.Info(ctx, "decommissioning third node via `quit --decommission`") - { - // This should not take longer than five minutes, and if it does, it's - // likely stuck forever and we want to see the output. - timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - o, e, err := c.ExecCLI(timeoutCtx, 2, []string{"quit", "--decommission"}) - if err != nil { - if timeoutCtx.Err() != nil { - t.Fatalf("quit --decommission failed: %s\nstdout:\n%s\nstderr:\n%s", err, o, e) - } - // TODO(tschottdorf): grep the process output for the string announcing success? - log.Warningf(ctx, "ignoring error on quit --decommission: %s", err) - } else { - log.Infof(ctx, o, e) - } - - // Kill the node to generate the expected event (Kill is idempotent, so this works). - if err := c.Kill(ctx, 2); err != nil { - log.Warning(ctx, err) - } - } - - // Now that the third node is down and decommissioned, decommissioning it - // again should be a no-op. We do it from node one but as always it doesn't - // matter. - log.Info(ctx, "checking that other nodes see node three as successfully decommissioned") - { - target := idMap[2] - o, _, err := decommission(ctx, c, 1, []roachpb.NodeID{target}, "decommission", "--format", "csv") // wait=all is implied - if err != nil { - t.Fatal(err) - } - log.Infof(ctx, o) - - exp := [][]string{ - decommissionHeader, - // Expect the same as usual, except this time the node should be draining - // because it shut down cleanly (thanks to `quit --decommission`). - {strconv.Itoa(int(target)), "true", "0", "true", "true"}, - decommissionFooter, - } - if err := matchCSV(o, exp); err != nil { - t.Fatal(err) - } - - // Bring the node back up. It's still decommissioned, so it won't be of much use. - if err := c.Restart(ctx, 2); err != nil { - t.Fatal(err) - } - - // Recommission. Welcome back! - o, _, err = decommission(ctx, c, 1, []roachpb.NodeID{target}, "recommission") - if err != nil { - t.Fatal(err) - } - log.Infof(ctx, o) - } - - // Kill the first node and verify that we can decommission it while it's down, - // bringing it back up to verify that its replicas still get removed. - log.Info(ctx, "intentionally killing first node") - if err := c.Kill(ctx, 0); err != nil { - t.Fatal(err) - } - log.Info(ctx, "decommission first node, starting with it down but restarting it for verification") - { - target := idMap[0] - o, e, err := decommission(ctx, c, 2, []roachpb.NodeID{target}, "decommission", "--wait", "live") - if err != nil { - t.Fatal(err) - } - log.Infof(ctx, o) - if strings.Split(e, "\n")[1] != waitLiveDeprecated { - t.Fatal("missing deprecate message for --wait=live") - } - if err := c.Restart(ctx, 0); err != nil { - t.Fatal(err) - } - // Run a second time to wait until the replicas have all been GC'ed. - // Note that we specify "all" because even though the first node is - // now running, it may not be live by the time the command runs. - o, _, err = decommission(ctx, c, 2, []roachpb.NodeID{target}, "decommission", "--wait", "all", "--format", "csv") - if err != nil { - t.Fatal(err) - } - - log.Info(ctx, o) - - exp := [][]string{ - decommissionHeader, - {strconv.Itoa(int(target)), "true", "0", "true", "false"}, - decommissionFooter, - } - if err := matchCSV(o, exp); err != nil { - t.Fatal(err) - } - } - - // Now we want to test decommissioning a truly dead node. Make sure we don't - // waste too much time waiting for the node to be recognized as dead. Note that - // we don't want to set this number too low or everything will seem dead to the - // allocator at all times, so nothing will ever happen. - withDB(1, "SET CLUSTER SETTING server.time_until_store_dead = '1m15s'") - - log.Info(ctx, "intentionally killing first node") - if err := c.Kill(ctx, 0); err != nil { - t.Fatal(err) - } - // It is being decommissioned in absentia, meaning that its replicas are - // being removed due to deadness. We can't see that reflected in the output - // since the current mechanism gets its replica counts from what the node - // reports about itself, so our assertion here is somewhat weak. - log.Info(ctx, "decommission first node in absentia using --wait=live") - { - target := idMap[0] - o, e, err := decommission(ctx, c, 2, []roachpb.NodeID{target}, "decommission", "--wait", "live", "--format", "csv") - if err != nil { - t.Fatal(err) - } - - log.Infof(ctx, o) - - // Note we don't check precisely zero replicas (which the node would write - // itself, but it's dead). We do check that the node isn't live, though, which - // is essentially what `--wait=live` waits for. - // Note that the target node may still be "live" when it's marked as - // decommissioned, as its replica count may drop to zero faster than - // liveness times out. - exp := [][]string{ - decommissionHeader, - {strconv.Itoa(int(target)), `true|false`, "0", `true`, `false`}, - decommissionFooter, - } - if err := matchCSV(o, exp); err != nil { - t.Fatal(err) - } - if strings.Split(e, "\n")[1] != waitLiveDeprecated { - t.Fatal("missing deprecate message for --wait=live") - } - } - - // Check that (at least after a bit) the node disappears from `node ls` - // because it is decommissioned and not live. - for { - o, _, err := c.ExecCLI(ctx, 2, []string{"node", "ls", "--format", "csv"}) - if err != nil { - t.Fatal(err) - } - - log.Info(ctx, o) - - exp := [][]string{ - {"id"}, - {"2"}, - {"3"}, - {"4"}, - } - - if err := matchCSV(o, exp); err != nil { - time.Sleep(time.Second) - continue - } - break - } - for { - o, _, err := c.ExecCLI(ctx, 2, []string{"node", "status", "--format", "csv"}) - if err != nil { - t.Fatal(err) - } - - log.Info(ctx, o) - - exp := [][]string{ - statusHeader, - {`2`, `.*`, `.*`, `.*`, `.*`, `.*`}, - {`3`, `.*`, `.*`, `.*`, `.*`, `.*`}, - {`4`, `.*`, `.*`, `.*`, `.*`, `.*`}, - } - if err := matchCSV(o, exp); err != nil { - time.Sleep(time.Second) - continue - } - break - } - - var rows *gosql.Rows - if err := retry.ForDuration(time.Minute, func() error { - // Verify the event log has recorded exactly one decommissioned or - // recommissioned event for each commissioning operation. - // - // Spurious errors appear to be possible since we might be trying to - // send RPCs to the (relatively recently) down node: - // - // pq: rpc error: code = Unavailable desc = grpc: the connection is - // unavailable - // - // Seen in https://teamcity.cockroachdb.com/viewLog.html?buildId=344802. - db, err := gosql.Open("postgres", c.PGUrl(ctx, 1)) - if err != nil { - t.Fatal(err) - } - defer func() { - if err := db.Close(); err != nil { - t.Error(err) - } - }() - - rows, err = db.Query(` - SELECT "eventType", "targetID" FROM system.eventlog - WHERE "eventType" IN ($1, $2) ORDER BY timestamp`, - sql.EventLogNodeDecommissioned, sql.EventLogNodeRecommissioned, - ) - if err != nil { - log.Warning(ctx, errors.Wrap(err, "retrying after")) - return err - } - return nil - }); err != nil { - t.Fatal(err) - } - - matrix, err := sqlutils.RowsToStrMatrix(rows) - if err != nil { - t.Fatal(err) - } - expMatrix := [][]string{ - {string(sql.EventLogNodeDecommissioned), idMap[0].String()}, - {string(sql.EventLogNodeRecommissioned), idMap[0].String()}, - {string(sql.EventLogNodeDecommissioned), idMap[1].String()}, - {string(sql.EventLogNodeRecommissioned), idMap[1].String()}, - {string(sql.EventLogNodeDecommissioned), idMap[2].String()}, - {string(sql.EventLogNodeRecommissioned), idMap[2].String()}, - {string(sql.EventLogNodeDecommissioned), idMap[0].String()}, - } - - if !reflect.DeepEqual(matrix, expMatrix) { - t.Fatalf("unexpected diff(matrix, expMatrix):\n%s", pretty.Diff(matrix, expMatrix)) - } - - // Last, verify that the operator can't shoot themselves in the foot by - // accidentally decommissioning all nodes. - var allNodeIDs []roachpb.NodeID - for _, nodeID := range idMap { - allNodeIDs = append(allNodeIDs, nodeID) - } - - // Specify wait=none because the command would block forever (the replicas have - // nowhere to go). - if _, _, err := decommission( - ctx, c, 1, allNodeIDs, "decommission", "--wait", "none", - ); err != nil { - t.Fatal(err) - } - - // Check that we can still do stuff. Creating a database should be good enough. - db, err := gosql.Open("postgres", c.PGUrl(ctx, 1)) - if err != nil { - t.Fatal(err) - } - defer func() { _ = db.Close() }() - - if _, err := db.Exec(`CREATE DATABASE still_working;`); err != nil { - t.Fatal(err) - } - - // Recommission all nodes. - if _, _, err := decommission( - ctx, c, 1, allNodeIDs, "recommission", - ); err != nil { - t.Fatal(err) - } - - // To verify that all nodes are actually accepting replicas again, decommission - // the first nodes (blocking until it's done). This proves that the other nodes - // absorb the first one's replicas. - if _, _, err := decommission( - ctx, c, 1, []roachpb.NodeID{idMap[0]}, "decommission", - ); err != nil { - t.Fatal(err) - } -} diff --git a/pkg/acceptance/init_test.go b/pkg/acceptance/init_test.go index 9896354a1541..c5f9bc3999cf 100644 --- a/pkg/acceptance/init_test.go +++ b/pkg/acceptance/init_test.go @@ -39,8 +39,7 @@ func TestInitModeBootstrapNodeZero(t *testing.T) { s := log.Scope(t) defer s.Close(t) - // TODO(tschottdorf): give LocalCluster support for the init modes and we should be able - // to switch this to RunLocal. Ditto below. + // TODO(peter): Move this test to a roachtest. RunDocker(t, func(t *testing.T) { runTestWithCluster(t, testInitModeInner, useInitMode(cluster.INIT_BOOTSTRAP_NODE_ZERO)) }) @@ -50,7 +49,7 @@ func TestInitModeCommand(t *testing.T) { s := log.Scope(t) defer s.Close(t) - // TODO(tschottdorf): see above. + // TODO(peter): Move this test to a roachtest. RunDocker(t, func(t *testing.T) { runTestWithCluster(t, testInitModeInner, useInitMode(cluster.INIT_COMMAND)) }) @@ -75,7 +74,7 @@ func TestInitModeNone(t *testing.T) { s := log.Scope(t) defer s.Close(t) - // TODO(tschottdorf): see above. + // TODO(peter): Move this test to a roachtest. RunDocker(t, func(t *testing.T) { runTestWithCluster(t, testInitModeNoneInner, useInitMode(cluster.INIT_NONE)) }) diff --git a/pkg/acceptance/util_cluster.go b/pkg/acceptance/util_cluster.go index 49b2f2d8a1d1..62898bfd88b8 100644 --- a/pkg/acceptance/util_cluster.go +++ b/pkg/acceptance/util_cluster.go @@ -16,8 +16,6 @@ package acceptance import ( "context" - "io/ioutil" - "os" "path/filepath" "regexp" "strings" @@ -25,23 +23,15 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/acceptance/cluster" - "github.com/cockroachdb/cockroach/pkg/acceptance/localcluster" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/binfetcher" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" ) const ( - localTest = "runMode=local" dockerTest = "runMode=docker" ) -// RunLocal runs the given acceptance test using a bare cluster. -func RunLocal(t *testing.T, testee func(t *testing.T)) { - t.Run(localTest, testee) -} - // RunDocker runs the given acceptance test using a Docker cluster. func RunDocker(t *testing.T, testee func(t *testing.T)) { t.Run(dockerTest, testee) @@ -87,15 +77,13 @@ func StartCluster(ctx context.Context, t *testing.T, cfg cluster.TestConfig) (c parts := strings.Split(t.Name(), "/") if len(parts) < 2 { - t.Fatal("must invoke RunLocal or RunDocker") + t.Fatal("must invoke RunDocker") } var runMode string for _, part := range parts[1:] { part = reStripTestEnumeration.ReplaceAllLiteralString(part, "") switch part { - case localTest: - fallthrough case dockerTest: if runMode != "" { t.Fatalf("test has more than one run mode: %s and %s", runMode, part) @@ -105,44 +93,6 @@ func StartCluster(ctx context.Context, t *testing.T, cfg cluster.TestConfig) (c } switch runMode { - case localTest: - pwd, err := os.Getwd() - if err != nil { - t.Fatal(err) - } - dataDir, err := ioutil.TempDir(pwd, ".localcluster") - if err != nil { - t.Fatal(err) - } - - logDir := *flagLogDir - if logDir != "" { - logDir = filepath.Join(logDir, filepath.Clean(t.Name())) - } - - perNodeCfg := localcluster.MakePerNodeFixedPortsCfg(len(cfg.Nodes)) - for i := 0; i < len(cfg.Nodes); i++ { - // TODO(tschottdorf): handle Nodes[i].Stores properly. - if cfg.Nodes[i].Version != "" { - nCfg := perNodeCfg[i] - nCfg.Binary = GetBinary(ctx, t, cfg.Nodes[i].Version) - perNodeCfg[i] = nCfg - } - } - clusterCfg := localcluster.ClusterConfig{ - Ephemeral: true, - DataDir: dataDir, - LogDir: logDir, - NumNodes: len(cfg.Nodes), - PerNodeCfg: perNodeCfg, - NoWait: cfg.NoWait, - } - - l := localcluster.New(clusterCfg) - - l.Start(ctx) - c = &localcluster.LocalCluster{Cluster: l} - case dockerTest: logDir := *flagLogDir if logDir != "" { @@ -153,7 +103,7 @@ func StartCluster(ctx context.Context, t *testing.T, cfg cluster.TestConfig) (c c = l default: - t.Fatalf("unable to run in mode %q, use either RunLocal or RunDocker", runMode) + t.Fatalf("unable to run in mode %q, use RunDocker", runMode) } // Don't wait for replication unless requested (usually it is). @@ -229,17 +179,3 @@ func StartCluster(ctx context.Context, t *testing.T, cfg cluster.TestConfig) (c completed = true return c } - -// GetBinary retrieves a binary for the specified version and returns it. -func GetBinary(ctx context.Context, t *testing.T, version string) string { - t.Helper() - bin, err := binfetcher.Download(ctx, binfetcher.Options{ - Binary: "cockroach", - Dir: ".localcluster_cache", - Version: version, - }) - if err != nil { - t.Fatalf("unable to set up binary for v%s: %s", version, err) - } - return bin -} diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 9b2f4470ce7c..841ab8216b2c 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -69,11 +69,12 @@ type emitEntry struct { // returns a closure that may be repeatedly called to advance the changefeed. // The returned closure is not threadsafe. func kvsToRows( - leaseManager *sql.LeaseManager, + leaseMgr *sql.LeaseManager, + tableHist *tableHistory, details jobspb.ChangefeedDetails, inputFn func(context.Context) (bufferEntry, error), ) func(context.Context) ([]emitEntry, error) { - rfCache := newRowFetcherCache(leaseManager) + rfCache := newRowFetcherCache(leaseMgr, tableHist) var kvs sqlbase.SpanKVFetcher appendEmitEntryForKV := func( diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index db9f10363b6b..9a347cbfb78e 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -34,11 +34,18 @@ type changeAggregator struct { // cancel shuts down the processor, both the `Next()` flow and the poller. cancel func() + // errCh contains the return values of poller and tableHistUpdater. + errCh chan error // poller runs in the background and puts kv changes and resolved spans into // a buffer, which is used by `Next()`. poller *poller - // pollerErrCh is written once with the poller error (or nil). - pollerErrCh chan error + // pollerDoneCh is closed when the poller exits. + pollerDoneCh chan struct{} + // tableHistUpdater runs in the background and continually advances the + // high-water of a tableHistory. + tableHistUpdater *tableHistoryUpdater + // tableHistUpdaterDoneCh is closed when the tableHistUpdater exits. + tableHistUpdaterDoneCh chan struct{} // sink is the Sink to write rows to. Resolved timestamps are never written // by changeAggregator. @@ -116,7 +123,25 @@ func newChangeAggregatorProcessor( ca.poller = makePoller( flowCtx.Settings, flowCtx.ClientDB, flowCtx.ClientDB.Clock(), flowCtx.Gossip, spans, spec.Feed, initialHighWater, buf) - rowsFn := kvsToRows(flowCtx.LeaseManager.(*sql.LeaseManager), spec.Feed, buf.Get) + + leaseMgr := flowCtx.LeaseManager.(*sql.LeaseManager) + tableHist := makeTableHistory(func(desc *sqlbase.TableDescriptor) error { + // NB: Each new `tableDesc.Version` is initially written with an mvcc + // timestamp equal to its `ModificationTime`. It might later update that + // `Version` with backfill progress, but we only validate a table + // descriptor through its `ModificationTime` before using it, so this + // validation function can't depend on anything that changes after a new + // `Version` of a table desc is written. + return validateChangefeedTable(spec.Feed.Targets, desc) + }, initialHighWater) + ca.tableHistUpdater = &tableHistoryUpdater{ + settings: flowCtx.Settings, + db: flowCtx.ClientDB, + targets: spec.Feed.Targets, + m: tableHist, + } + rowsFn := kvsToRows(leaseMgr, tableHist, spec.Feed, buf.Get) + ca.tickFn = emitEntries(spec.Feed, ca.sink, rowsFn) return ca, nil @@ -130,13 +155,25 @@ func (ca *changeAggregator) OutputTypes() []sqlbase.ColumnType { func (ca *changeAggregator) Start(ctx context.Context) context.Context { ctx, ca.cancel = context.WithCancel(ctx) - ca.pollerErrCh = make(chan error, 1) + // Give errCh enough buffer for both of these, but only the first one is + // ever used. + ca.errCh = make(chan error, 2) + ca.pollerDoneCh = make(chan struct{}) go func(ctx context.Context) { + defer close(ca.pollerDoneCh) err := ca.poller.Run(ctx) // Trying to call MoveToDraining here is racy (`MoveToDraining called in // state stateTrailingMeta`), so return the error via a channel. - ca.pollerErrCh <- err - close(ca.pollerErrCh) + ca.errCh <- err + ca.cancel() + }(ctx) + ca.tableHistUpdaterDoneCh = make(chan struct{}) + go func(ctx context.Context) { + defer close(ca.tableHistUpdaterDoneCh) + err := ca.tableHistUpdater.PollTableDescs(ctx) + // Trying to call MoveToDraining here is racy (`MoveToDraining called in + // state stateTrailingMeta`), so return the error via a channel. + ca.errCh <- err ca.cancel() }(ctx) @@ -145,11 +182,9 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { } func (ca *changeAggregator) close() { - // Wait for the poller to finish shutting down. If the poller errored first, - // then Next will have passed its error to MoveToDraining. Otherwise, the - // error will be related to the forced shutdown of the poller (probably - // context canceled) and we don't care what it is, so throw it away. - <-ca.pollerErrCh + // Wait for the poller and tableHistUpdater to finish shutting down. + <-ca.pollerDoneCh + <-ca.tableHistUpdaterDoneCh if err := ca.sink.Close(); err != nil { log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err) } @@ -170,12 +205,13 @@ func (ca *changeAggregator) Next() (sqlbase.EncDatumRow, *distsqlrun.ProducerMet if err := ca.tick(); err != nil { select { - // If the poller errored first, that's the interesting one, so - // overwrite `err`. - case err = <-ca.pollerErrCh: + // If the poller or tableHistUpdater errored first, that's the + // interesting one, so overwrite `err`. + case err = <-ca.errCh: default: } - // Shut down the poller if it wasn't already. + // Shut down the poller and tableHistUpdater if they weren't + // already. ca.cancel() ca.MoveToDraining(err) diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index aaeab223b834..4e2d3d0c61d7 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -166,12 +166,12 @@ func changefeedPlanHook( targets := make(jobspb.ChangefeedTargets, len(targetDescs)) for _, desc := range targetDescs { if tableDesc := desc.GetTable(); tableDesc != nil { - if err := validateChangefeedTable(tableDesc); err != nil { - return err - } targets[tableDesc.ID] = jobspb.ChangefeedTarget{ StatementTimeName: tableDesc.Name, } + if err := validateChangefeedTable(targets, tableDesc); err != nil { + return err + } } } @@ -275,7 +275,14 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails return details, nil } -func validateChangefeedTable(tableDesc *sqlbase.TableDescriptor) error { +func validateChangefeedTable( + targets jobspb.ChangefeedTargets, tableDesc *sqlbase.TableDescriptor, +) error { + t, ok := targets[tableDesc.ID] + if !ok { + return errors.Errorf(`unwatched table: %s`, tableDesc.Name) + } + // Technically, the only non-user table known not to work is system.jobs // (which creates a cycle since the resolved timestamp high-water mark is // saved in it), but there are subtle differences in the way many of them @@ -298,6 +305,18 @@ func validateChangefeedTable(tableDesc *sqlbase.TableDescriptor) error { `CHANGEFEEDs are currently supported on tables with exactly 1 column family: %s has %d`, tableDesc.Name, len(tableDesc.Families)) } + + if tableDesc.State == sqlbase.TableDescriptor_DROP { + return errors.Errorf(`"%s" was dropped or truncated`, t.StatementTimeName) + } + if tableDesc.Name != t.StatementTimeName { + return errors.Errorf(`"%s" was renamed to "%s"`, t.StatementTimeName, tableDesc.Name) + } + + if tableDesc.HasColumnBackfillMutation() { + return errors.Errorf(`CHANGEFEEDs cannot operate on tables being backfilled`) + } + return nil } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index f9bb52916c2e..170a007d56ac 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -218,37 +218,220 @@ func TestChangefeedSchemaChange(t *testing.T) { defer s.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '0ns'`) - sqlDB.Exec(t, `CREATE DATABASE d`) - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING DEFAULT 'before')`) - - var start string - sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&start) - sqlDB.Exec(t, `INSERT INTO foo (a, b) VALUES (0, '0')`) - sqlDB.Exec(t, `INSERT INTO foo (a) VALUES (1)`) - sqlDB.Exec(t, `ALTER TABLE foo ALTER COLUMN b SET DEFAULT 'after'`) - sqlDB.Exec(t, `INSERT INTO foo (a) VALUES (2)`) - sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN c INT`) - sqlDB.Exec(t, `INSERT INTO foo (a) VALUES (3)`) - sqlDB.Exec(t, `INSERT INTO foo (a, c) VALUES (4, 14)`) - rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, start) - defer closeFeedRowsHack(t, sqlDB, rows) - assertPayloads(t, rows, []string{ - `foo: [0]->{"a": 0, "b": "0"}`, - `foo: [1]->{"a": 1, "b": "before"}`, - `foo: [2]->{"a": 2, "b": "after"}`, - `foo: [3]->{"a": 3, "b": "after", "c": null}`, - `foo: [4]->{"a": 4, "b": "after", "c": 14}`, + + t.Run(`historical`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE historical (a INT PRIMARY KEY, b STRING DEFAULT 'before')`) + var start string + sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&start) + sqlDB.Exec(t, `INSERT INTO historical (a, b) VALUES (0, '0')`) + sqlDB.Exec(t, `INSERT INTO historical (a) VALUES (1)`) + sqlDB.Exec(t, `ALTER TABLE historical ALTER COLUMN b SET DEFAULT 'after'`) + sqlDB.Exec(t, `INSERT INTO historical (a) VALUES (2)`) + sqlDB.Exec(t, `ALTER TABLE historical ADD COLUMN c INT`) + sqlDB.Exec(t, `INSERT INTO historical (a) VALUES (3)`) + sqlDB.Exec(t, `INSERT INTO historical (a, c) VALUES (4, 14)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR historical WITH cursor=$1`, start) + defer closeFeedRowsHack(t, sqlDB, rows) + assertPayloads(t, rows, []string{ + `historical: [0]->{"a": 0, "b": "0"}`, + `historical: [1]->{"a": 1, "b": "before"}`, + `historical: [2]->{"a": 2, "b": "after"}`, + `historical: [3]->{"a": 3, "b": "after", "c": null}`, + `historical: [4]->{"a": 4, "b": "after", "c": 14}`, + }) }) - sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN d INT`) - sqlDB.Exec(t, `INSERT INTO foo (a, d) VALUES (5, 15)`) - assertPayloads(t, rows, []string{ - `foo: [5]->{"a": 5, "b": "after", "c": null, "d": 15}`, + t.Run(`add column`, func(t *testing.T) { + // NB: the default is a nullable column + sqlDB.Exec(t, `CREATE TABLE add_column (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO add_column VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_column`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_column ADD COLUMN b STRING`) + sqlDB.Exec(t, `INSERT INTO add_column VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `add_column: [1]->{"a": 1}`, + `add_column: [2]->{"a": 2, "b": "2"}`, + }) + }) + + t.Run(`add column not null`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_column_notnull (a INT PRIMARY KEY)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_column_notnull WITH resolved`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_column_notnull ADD COLUMN b STRING NOT NULL`) + sqlDB.Exec(t, `INSERT INTO add_column_notnull VALUES (2, '2')`) + skipResolvedTimestamps(t, rows) + if err := rows.Err(); !testutils.IsError(err, `cannot operate on tables being backfilled`) { + t.Fatalf(`expected "cannot operate on tables being backfilled" error got: %+v`, err) + } + }) + + t.Run(`add column with default`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_column_def (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO add_column_def VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_column_def`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'`) + sqlDB.Exec(t, `INSERT INTO add_column_def VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `add_column_def: [1]->{"a": 1}`, + }) + if rows.Next() { + t.Fatal(`unexpected row`) + } + if err := rows.Err(); !testutils.IsError(err, `cannot operate on tables being backfilled`) { + t.Fatalf(`expected "cannot operate on tables being backfilled" error got: %+v`, err) + } + }) + + t.Run(`add column computed`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_column_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`) + sqlDB.Exec(t, `INSERT INTO add_column_comp VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_column_comp`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_column_comp ADD COLUMN c INT AS (a + 10) STORED`) + sqlDB.Exec(t, `INSERT INTO add_column_comp (a) VALUES (2)`) + assertPayloads(t, rows, []string{ + `add_column_comp: [1]->{"a": 1, "b": 6}`, + }) + if rows.Next() { + t.Fatal(`unexpected row`) + } + if err := rows.Err(); !testutils.IsError(err, `cannot operate on tables being backfilled`) { + t.Fatalf(`expected "cannot operate on tables being backfilled" error got: %+v`, err) + } + }) + + t.Run(`rename column`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE rename_column (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO rename_column VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR rename_column`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE rename_column RENAME COLUMN b TO c`) + sqlDB.Exec(t, `INSERT INTO rename_column VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `rename_column: [1]->{"a": 1, "b": "1"}`, + `rename_column: [2]->{"a": 2, "c": "2"}`, + }) + }) + + t.Run(`drop column`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR drop_column`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE drop_column DROP COLUMN b`) + sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2)`) + assertPayloads(t, rows, []string{ + `drop_column: [1]->{"a": 1, "b": "1"}`, + }) + if rows.Next() { + t.Fatal(`unexpected row`) + } + if err := rows.Err(); !testutils.IsError(err, `cannot operate on tables being backfilled`) { + t.Fatalf(`expected "cannot operate on tables being backfilled" error got: %+v`, err) + } + }) + + t.Run(`add default`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_default (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO add_default (a, b) VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_default`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_default ALTER COLUMN b SET DEFAULT 'd'`) + sqlDB.Exec(t, `INSERT INTO add_default (a) VALUES (2)`) + assertPayloads(t, rows, []string{ + `add_default: [1]->{"a": 1, "b": "1"}`, + `add_default: [2]->{"a": 2, "b": "d"}`, + }) + }) + + t.Run(`alter default`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE alter_default (a INT PRIMARY KEY, b STRING DEFAULT 'before')`) + sqlDB.Exec(t, `INSERT INTO alter_default (a) VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR alter_default`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE alter_default ALTER COLUMN b SET DEFAULT 'after'`) + sqlDB.Exec(t, `INSERT INTO alter_default (a) VALUES (2)`) + assertPayloads(t, rows, []string{ + `alter_default: [1]->{"a": 1, "b": "before"}`, + `alter_default: [2]->{"a": 2, "b": "after"}`, + }) }) - // TODO(dan): Test a schema change that uses a backfill once we figure out - // the user facing semantics of that. + t.Run(`drop default`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE drop_default (a INT PRIMARY KEY, b STRING DEFAULT 'd')`) + sqlDB.Exec(t, `INSERT INTO drop_default (a) VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR drop_default`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE drop_default ALTER COLUMN b DROP DEFAULT`) + sqlDB.Exec(t, `INSERT INTO drop_default (a) VALUES (2)`) + assertPayloads(t, rows, []string{ + `drop_default: [1]->{"a": 1, "b": "d"}`, + `drop_default: [2]->{"a": 2, "b": null}`, + }) + }) + + t.Run(`drop not null`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE drop_notnull (a INT PRIMARY KEY, b STRING NOT NULL)`) + sqlDB.Exec(t, `INSERT INTO drop_notnull VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR drop_notnull`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE drop_notnull ALTER b DROP NOT NULL`) + sqlDB.Exec(t, `INSERT INTO drop_notnull VALUES (2, NULL)`) + assertPayloads(t, rows, []string{ + `drop_notnull: [1]->{"a": 1, "b": "1"}`, + `drop_notnull: [2]->{"a": 2, "b": null}`, + }) + }) + + t.Run(`checks`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE checks (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO checks VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR checks`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE checks ADD CONSTRAINT c CHECK (a < 5) NOT VALID`) + sqlDB.Exec(t, `INSERT INTO checks VALUES (2)`) + sqlDB.Exec(t, `ALTER TABLE checks VALIDATE CONSTRAINT c`) + sqlDB.Exec(t, `INSERT INTO checks VALUES (3)`) + sqlDB.Exec(t, `ALTER TABLE checks DROP CONSTRAINT c`) + sqlDB.Exec(t, `INSERT INTO checks VALUES (6)`) + assertPayloads(t, rows, []string{ + `checks: [1]->{"a": 1}`, + `checks: [2]->{"a": 2}`, + `checks: [3]->{"a": 3}`, + `checks: [6]->{"a": 6}`, + }) + }) + + t.Run(`add index`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_index (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO add_index VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_index`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `CREATE INDEX b_idx ON add_index (b)`) + sqlDB.Exec(t, `SELECT * FROM add_index@b_idx`) + sqlDB.Exec(t, `INSERT INTO add_index VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `add_index: [1]->{"a": 1, "b": "1"}`, + `add_index: [2]->{"a": 2, "b": "2"}`, + }) + }) + + t.Run(`unique`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE "unique" (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO "unique" VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR "unique"`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE "unique" ADD CONSTRAINT u UNIQUE (b)`) + sqlDB.Exec(t, `INSERT INTO "unique" VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `unique: [1]->{"a": 1, "b": "1"}`, + `unique: [2]->{"a": 2, "b": "2"}`, + }) + }) } func TestChangefeedInterleaved(t *testing.T) { @@ -689,6 +872,19 @@ func assertPayloads(t *testing.T, rows *gosql.Rows, expected []string) { } } +func skipResolvedTimestamps(t *testing.T, rows *gosql.Rows) { + for rows.Next() { + var table gosql.NullString + var key, value []byte + if err := rows.Scan(&table, &key, &value); err != nil { + t.Fatal(err) + } + if table.Valid { + t.Errorf(`unexpected row %s: %s->%s`, table.String, key, value) + } + } +} + func expectResolvedTimestampGreaterThan(t testing.TB, rows *gosql.Rows, ts string) { t.Helper() for { diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 7b9d74aec819..0dcb73553a9a 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -108,11 +108,19 @@ func createBenchmarkChangefeed( poller := makePoller( s.ClusterSettings(), s.DB(), feedClock, s.Gossip(), spans, details, initialHighWater, buf) - rowsFn := kvsToRows(s.LeaseManager().(*sql.LeaseManager), details, buf.Get) + th := makeTableHistory(func(*sqlbase.TableDescriptor) error { return nil }, initialHighWater) + thUpdater := &tableHistoryUpdater{ + settings: s.ClusterSettings(), + db: s.DB(), + targets: details.Targets, + m: th, + } + rowsFn := kvsToRows(s.LeaseManager().(*sql.LeaseManager), th, details, buf.Get) tickFn := emitEntries(details, sink, rowsFn) ctx, cancel := context.WithCancel(ctx) go func() { _ = poller.Run(ctx) }() + go func() { _ = thUpdater.PollTableDescs(ctx) }() errCh := make(chan error, 1) var wg sync.WaitGroup diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index bcfff30d9081..2bae6dbe5172 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -81,22 +81,9 @@ func fetchSpansForTargets( spans = nil txn.SetFixedTimestamp(ctx, ts) // Note that all targets are currently guaranteed to be tables. - for tableID, t := range targets { + for tableID := range targets { tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, tableID) if err != nil { - if errors.Cause(err) == sqlbase.ErrDescriptorNotFound { - return errors.Errorf(`"%s" was dropped or truncated`, t.StatementTimeName) - } - return err - } - if tableDesc.State == sqlbase.TableDescriptor_DROP { - return errors.Errorf(`"%s" was dropped or truncated`, t.StatementTimeName) - } - if tableDesc.Name != t.StatementTimeName { - return errors.Errorf( - `"%s" was renamed to "%s"`, t.StatementTimeName, tableDesc.Name) - } - if err := validateChangefeedTable(tableDesc); err != nil { return err } spans = append(spans, tableDesc.PrimaryIndexSpan()) @@ -137,11 +124,6 @@ func (p *poller) Run(ctx context.Context) error { log.VEventf(ctx, 1, `changefeed poll [%s,%s): %s`, p.highWater, nextHighWater, time.Duration(nextHighWater.WallTime-p.highWater.WallTime)) - _, err := fetchSpansForTargets(ctx, p.db, p.details.Targets, nextHighWater) - if err != nil { - return err - } - var ranges []roachpb.RangeDescriptor if err := p.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { var err error diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index 4dffd98511bc..f56b7f3ab61a 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -25,41 +25,42 @@ import ( // StartScanFrom can be used to turn that key (or all the keys making up the // column families of one row) into a row. type rowFetcherCache struct { - leaseMgr *sql.LeaseManager - fetchers map[*sqlbase.TableDescriptor]*sqlbase.RowFetcher + leaseMgr *sql.LeaseManager + tableHist *tableHistory + fetchers map[*sqlbase.TableDescriptor]*sqlbase.RowFetcher a sqlbase.DatumAlloc } -func newRowFetcherCache(leaseMgr *sql.LeaseManager) *rowFetcherCache { +func newRowFetcherCache(leaseMgr *sql.LeaseManager, tableHist *tableHistory) *rowFetcherCache { return &rowFetcherCache{ - leaseMgr: leaseMgr, - fetchers: make(map[*sqlbase.TableDescriptor]*sqlbase.RowFetcher), + leaseMgr: leaseMgr, + tableHist: tableHist, + fetchers: make(map[*sqlbase.TableDescriptor]*sqlbase.RowFetcher), } } func (c *rowFetcherCache) TableDescForKey( ctx context.Context, key roachpb.Key, ts hlc.Timestamp, ) (*sqlbase.TableDescriptor, error) { - var skippedCols int - for { + var tableDesc *sqlbase.TableDescriptor + for skippedCols := 0; ; { remaining, tableID, _, err := sqlbase.DecodeTableIDIndexID(key) if err != nil { return nil, err } - // TODO(dan): We don't really need a lease, this is just a convenient way to - // get the right descriptor for a timestamp, so release it immediately after - // we acquire it. Avoid the lease entirely. - tableDesc, _, err := c.leaseMgr.Acquire(ctx, ts, tableID) + // No caching of these are attempted, since the lease manager does its + // own caching. + tableDesc, _, err = c.leaseMgr.Acquire(ctx, ts, tableID) if err != nil { return nil, err } + // Immediately release the lease, since we only need it for the exact + // timestamp requested. if err := c.leaseMgr.Release(tableDesc); err != nil { return nil, err } - if err := validateChangefeedTable(tableDesc); err != nil { - return nil, err - } + // Skip over the column data. for ; skippedCols < len(tableDesc.PrimaryIndex.ColumnIDs); skippedCols++ { l, err := encoding.PeekLength(remaining) @@ -71,10 +72,19 @@ func (c *rowFetcherCache) TableDescForKey( var interleaved bool remaining, interleaved = encoding.DecodeIfInterleavedSentinel(remaining) if !interleaved { - return tableDesc, nil + break } key = remaining } + + // Leasing invariant: each new `tableDesc.Version` of a descriptor is + // initially written with an mvcc timestamp equal to its modification time. + // It might be updated later with backfill progress, but (critically) the + // `validateFn` we passed to `tableHist` doesn't care about this. + if err := c.tableHist.WaitForTS(ctx, tableDesc.ModificationTime); err != nil { + return nil, err + } + return tableDesc, nil } func (c *rowFetcherCache) RowFetcherForTableDesc( diff --git a/pkg/ccl/changefeedccl/table_history.go b/pkg/ccl/changefeedccl/table_history.go new file mode 100644 index 000000000000..69dbc734c20e --- /dev/null +++ b/pkg/ccl/changefeedccl/table_history.go @@ -0,0 +1,300 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "sort" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + + "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl" + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/pkg/errors" +) + +type tableHistoryWaiter struct { + ts hlc.Timestamp + errCh chan error +} + +// tableHistory tracks that a some invariants hold over a set of tables as time +// advances. +// +// Internally, two timestamps are tracked. The high-water is the highest +// timestamp such that every version of a TableDescriptor has met a provided +// invariant (via `validateFn`). An error timestamp is also kept, which is the +// lowest timestamp where at least one table doesn't meet the invariant. +// +// The `WaitForTS` method allows a user to block until some given timestamp is +// less (or equal) to either the high-water or the error timestamp. In the +// latter case, it returns the error. +type tableHistory struct { + validateFn func(*sqlbase.TableDescriptor) error + + mu struct { + syncutil.Mutex + + // the highest known valid timestamp + highWater hlc.Timestamp + + // the lowest known invalid timestamp + errTS hlc.Timestamp + + // the error associated with errTS + err error + + // callers waiting on a timestamp to be resolved as valid or invalid + waiters []tableHistoryWaiter + } +} + +// makeTableHistory creates tableHistory with the given initial high-water and +// invariant check function. It is expected that `validateFn` is deterministic. +func makeTableHistory( + validateFn func(*sqlbase.TableDescriptor) error, initialHighWater hlc.Timestamp, +) *tableHistory { + m := &tableHistory{validateFn: validateFn} + m.mu.highWater = initialHighWater + return m +} + +// HighWater returns the current high-water timestamp. +func (m *tableHistory) HighWater() hlc.Timestamp { + m.mu.Lock() + highWater := m.mu.highWater + m.mu.Unlock() + return highWater +} + +// WaitForTS blocks until the given timestamp is less than or equal to the +// high-water or error timestamp. In the latter case, the error is returned. +// +// If called twice with the same timestamp, two different errors may be returned +// (since the error timestamp can recede). However, the return for a given +// timestamp will never switch from nil to an error or vice-versa (assuming that +// `validateFn` is deterministic and the ingested descriptors are read +// transactionally). +func (m *tableHistory) WaitForTS(ctx context.Context, ts hlc.Timestamp) error { + var errCh chan error + + m.mu.Lock() + highWater := m.mu.highWater + var err error + if m.mu.errTS != (hlc.Timestamp{}) && !ts.Less(m.mu.errTS) { + err = m.mu.err + } + fastPath := err != nil || !highWater.Less(ts) + if !fastPath { + errCh = make(chan error, 1) + m.mu.waiters = append(m.mu.waiters, tableHistoryWaiter{ts: ts, errCh: errCh}) + } + m.mu.Unlock() + if fastPath { + if log.V(1) { + log.Infof(ctx, "fastpath for %s: %v", ts, err) + } + return err + } + + if log.V(1) { + log.Infof(ctx, "waiting for %s highwater", ts) + } + start := timeutil.Now() + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errCh: + if log.V(1) { + log.Infof(ctx, "waited %s for %s highwater: %v", timeutil.Since(start), ts, err) + } + return err + } +} + +// IngestDescriptors checks the given descriptors against the invariant check +// function and adjusts the high-water or error timestamp appropriately. It is +// required that the descriptors represent a transactional kv read between the +// two given timestamps. +func (m *tableHistory) IngestDescriptors( + startTS, endTS hlc.Timestamp, descs []*sqlbase.TableDescriptor, +) error { + sort.Slice(descs, func(i, j int) bool { + return descs[i].ModificationTime.Less(descs[j].ModificationTime) + }) + var validateErr error + for _, desc := range descs { + if err := m.validateFn(desc); validateErr == nil { + validateErr = err + } + } + return m.adjustTimestamps(startTS, endTS, validateErr) +} + +// adjustTimestamps adjusts the high-water or error timestamp appropriately. +func (m *tableHistory) adjustTimestamps(startTS, endTS hlc.Timestamp, validateErr error) error { + m.mu.Lock() + defer m.mu.Unlock() + + if validateErr != nil { + // don't care about startTS in the invalid case + if m.mu.errTS == (hlc.Timestamp{}) || endTS.Less(m.mu.errTS) { + m.mu.errTS = endTS + m.mu.err = validateErr + newWaiters := make([]tableHistoryWaiter, 0, len(m.mu.waiters)) + for _, w := range m.mu.waiters { + if w.ts.Less(m.mu.errTS) { + newWaiters = append(newWaiters, w) + continue + } + w.errCh <- validateErr + } + m.mu.waiters = newWaiters + } + return validateErr + } + + if m.mu.highWater.Less(startTS) { + return errors.Errorf(`gap between %s and %s`, m.mu.highWater, startTS) + } + if m.mu.highWater.Less(endTS) { + m.mu.highWater = endTS + newWaiters := make([]tableHistoryWaiter, 0, len(m.mu.waiters)) + for _, w := range m.mu.waiters { + if m.mu.highWater.Less(w.ts) { + newWaiters = append(newWaiters, w) + continue + } + w.errCh <- nil + } + m.mu.waiters = newWaiters + } + return nil +} + +type tableHistoryUpdater struct { + settings *cluster.Settings + db *client.DB + targets jobspb.ChangefeedTargets + m *tableHistory +} + +func (u *tableHistoryUpdater) PollTableDescs(ctx context.Context) error { + // TODO(dan): Replace this with a RangeFeed once it stabilizes. + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(changefeedPollInterval.Get(&u.settings.SV)): + } + + startTS, endTS := u.m.HighWater(), u.db.Clock().Now() + if !startTS.Less(endTS) { + continue + } + descs, err := fetchTableDescriptorVersions(ctx, u.db, startTS, endTS, u.targets) + if err != nil { + return err + } + if err := u.m.IngestDescriptors(startTS, endTS, descs); err != nil { + return err + } + } +} + +func fetchTableDescriptorVersions( + ctx context.Context, + db *client.DB, + startTS, endTS hlc.Timestamp, + targets jobspb.ChangefeedTargets, +) ([]*sqlbase.TableDescriptor, error) { + if log.V(2) { + log.Infof(ctx, `fetching table descs [%s,%s)`, startTS, endTS) + } + start := timeutil.Now() + span := roachpb.Span{Key: keys.MakeTablePrefix(keys.DescriptorTableID)} + span.EndKey = span.Key.PrefixEnd() + header := roachpb.Header{Timestamp: endTS} + req := &roachpb.ExportRequest{ + RequestHeader: roachpb.RequestHeaderFromSpan(span), + StartTime: startTS, + MVCCFilter: roachpb.MVCCFilter_All, + ReturnSST: true, + } + res, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req) + if log.V(2) { + log.Infof(ctx, `fetched table descs [%s,%s) took %s`, startTS, endTS, timeutil.Since(start)) + } + if pErr != nil { + return nil, errors.Wrapf( + pErr.GoError(), `fetching changes for [%s,%s)`, span.Key, span.EndKey) + } + + var tableDescs []*sqlbase.TableDescriptor + for _, file := range res.(*roachpb.ExportResponse).Files { + if err := func() error { + it, err := engineccl.NewMemSSTIterator(file.SST, false /* verify */) + if err != nil { + return err + } + defer it.Close() + for it.Seek(engine.NilKey); ; it.Next() { + if ok, err := it.Valid(); err != nil { + return err + } else if !ok { + return nil + } + remaining, _, _, err := sqlbase.DecodeTableIDIndexID(it.UnsafeKey().Key) + if err != nil { + return err + } + _, tableID, err := encoding.DecodeUvarintAscending(remaining) + if err != nil { + return err + } + // WIP: I think targets currently doesn't contain interleaved + // parents if they are not watched by the changefeed, but this + // seems wrong. + origName, ok := targets[sqlbase.ID(tableID)] + if !ok { + // Uninteresting table. + continue + } + unsafeValue := it.UnsafeValue() + if unsafeValue == nil { + return errors.Errorf(`"%s" was dropped or truncated`, origName) + } + value := roachpb.Value{RawBytes: unsafeValue} + var desc sqlbase.Descriptor + if err := value.GetProto(&desc); err != nil { + return err + } + if tableDesc := desc.GetTable(); tableDesc != nil { + // WIP + log.Infof(ctx, "%s %d %s", desc.GetName(), tableDesc.Version, it.UnsafeKey().Timestamp) + tableDescs = append(tableDescs, tableDesc) + } + } + }(); err != nil { + return nil, err + } + } + return tableDescs, nil +} diff --git a/pkg/ccl/changefeedccl/table_history_test.go b/pkg/ccl/changefeedccl/table_history_test.go new file mode 100644 index 000000000000..d6b769c40966 --- /dev/null +++ b/pkg/ccl/changefeedccl/table_history_test.go @@ -0,0 +1,144 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestTableHistory(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + ts := func(wt int64) hlc.Timestamp { return hlc.Timestamp{WallTime: wt} } + + validateFn := func(desc *sqlbase.TableDescriptor) error { + if desc.Name != `` { + return errors.New(desc.Name) + } + return nil + } + requireChannelEmpty := func(t *testing.T, ch chan error) { + t.Helper() + select { + case err := <-ch: + t.Fatalf(`expected empty channel got %v`, err) + default: + } + } + + m := makeTableHistory(validateFn, ts(0)) + + require.Equal(t, ts(0), m.HighWater()) + + // advance + require.NoError(t, m.IngestDescriptors(ts(0), ts(1), nil)) + require.Equal(t, ts(1), m.HighWater()) + require.NoError(t, m.IngestDescriptors(ts(1), ts(2), nil)) + require.Equal(t, ts(2), m.HighWater()) + + // no-ops + require.NoError(t, m.IngestDescriptors(ts(0), ts(1), nil)) + require.Equal(t, ts(2), m.HighWater()) + require.NoError(t, m.IngestDescriptors(ts(1), ts(2), nil)) + require.Equal(t, ts(2), m.HighWater()) + + // overlap + require.NoError(t, m.IngestDescriptors(ts(1), ts(3), nil)) + require.Equal(t, ts(3), m.HighWater()) + + // gap + require.EqualError(t, m.IngestDescriptors(ts(4), ts(5), nil), + `gap between 0.000000003,0 and 0.000000004,0`) + require.Equal(t, ts(3), m.HighWater()) + + // validates + require.NoError(t, m.IngestDescriptors(ts(3), ts(4), []*sqlbase.TableDescriptor{ + {ID: 0}, + })) + require.Equal(t, ts(4), m.HighWater()) + + // high-water already high enough. fast-path + require.NoError(t, m.WaitForTS(ctx, ts(3))) + require.NoError(t, m.WaitForTS(ctx, ts(4))) + + // high-water not there yet. blocks + errCh6 := make(chan error, 1) + errCh7 := make(chan error, 1) + go func() { errCh7 <- m.WaitForTS(ctx, ts(7)) }() + go func() { errCh6 <- m.WaitForTS(ctx, ts(6)) }() + requireChannelEmpty(t, errCh6) + requireChannelEmpty(t, errCh7) + + // high-water advances, but not enough + require.NoError(t, m.IngestDescriptors(ts(4), ts(5), nil)) + requireChannelEmpty(t, errCh6) + requireChannelEmpty(t, errCh7) + + // high-water advances, unblocks only errCh6 + require.NoError(t, m.IngestDescriptors(ts(5), ts(6), nil)) + require.NoError(t, <-errCh6) + requireChannelEmpty(t, errCh7) + + // high-water advances again, unblocks errCh7 + require.NoError(t, m.IngestDescriptors(ts(6), ts(7), nil)) + require.NoError(t, <-errCh7) + + // validate ctx cancellation + errCh8 := make(chan error, 1) + ctxTS8, cancelTS8 := context.WithCancel(ctx) + go func() { errCh8 <- m.WaitForTS(ctxTS8, ts(8)) }() + requireChannelEmpty(t, errCh8) + cancelTS8() + require.EqualError(t, <-errCh8, `context canceled`) + + // does not validate, high-water does not change + require.EqualError(t, m.IngestDescriptors(ts(7), ts(10), []*sqlbase.TableDescriptor{ + {ID: 0, Name: `whoops!`}, + }), `whoops!`) + require.Equal(t, ts(7), m.HighWater()) + + // ts 10 has errored, so validate can return its error without blocking + require.EqualError(t, m.WaitForTS(ctx, ts(10)), `whoops!`) + + // ts 8 and 9 are still unknown + errCh8 = make(chan error, 1) + errCh9 := make(chan error, 1) + go func() { errCh8 <- m.WaitForTS(ctx, ts(8)) }() + go func() { errCh9 <- m.WaitForTS(ctx, ts(9)) }() + requireChannelEmpty(t, errCh8) + requireChannelEmpty(t, errCh9) + + // turns out ts 10 is not a tight bound. ts 9 also has an error + require.EqualError(t, m.IngestDescriptors(ts(7), ts(9), []*sqlbase.TableDescriptor{ + {ID: 0, Name: `oh no!`}, + }), `oh no!`) + require.Equal(t, ts(7), m.HighWater()) + require.EqualError(t, <-errCh9, `oh no!`) + + // ts 8 is still unknown + requireChannelEmpty(t, errCh8) + + // always return the earlist error seen (so waiting for ts 10 immediately + // returns the 9 error now, it returned the ts 10 error above) + require.EqualError(t, m.WaitForTS(ctx, ts(9)), `oh no!`) + + // something earlier than ts 10 can still be okay + require.NoError(t, m.IngestDescriptors(ts(7), ts(8), nil)) + require.Equal(t, ts(8), m.HighWater()) + require.NoError(t, <-errCh8) +} diff --git a/pkg/cmd/docgen/diagrams.go b/pkg/cmd/docgen/diagrams.go index 17d71638f265..0b412c21dc23 100644 --- a/pkg/cmd/docgen/diagrams.go +++ b/pkg/cmd/docgen/diagrams.go @@ -377,6 +377,12 @@ var specs = []stmtSpec{ unlink: []string{"table_name"}, nosplit: true, }, + { + name: "alter_type", + stmt: "alter_onetable_stmt", + replace: map[string]string{"relation_expr": "table_name", "alter_table_cmds": "'ALTER' column_name 'TYPE' new_type"}, + unlink: []string{"table_name"}, + }, { name: "alter_view", stmt: "alter_rename_view_stmt", @@ -888,17 +894,17 @@ var specs = []stmtSpec{ name: "table_ref", inline: []string{"opt_ordinality", "opt_alias_clause", "opt_expr_list", "opt_column_list", "name_list", "alias_clause"}, replace: map[string]string{ - "select_with_parens": "'(' select_stmt ')'", - "opt_index_hints": "( '@' scan_parameters | )", - "relation_expr": "table_name", - "func_name '(' ( expr_list | ) ')'": "func_application", + "select_with_parens": "'(' select_stmt ')'", + "opt_index_flags": "( '@' scan_parameters | )", + "relation_expr": "table_name", + "func_table": "func_application", // "| func_name '(' ( expr_list | ) ')' ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | )": "", "| special_function ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | )": "", "| '(' joined_table ')' ( 'WITH' 'ORDINALITY' | ) ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) )": "| '(' joined_table ')' ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | )", }, unlink: []string{"index_name"}, relink: map[string]string{ - "scan_parameters": "opt_index_hints", + "scan_parameters": "opt_index_flags", }, nosplit: true, }, diff --git a/pkg/cmd/github-pull-request-make/main.go b/pkg/cmd/github-pull-request-make/main.go index ab426699870c..c6a1921d388b 100644 --- a/pkg/cmd/github-pull-request-make/main.go +++ b/pkg/cmd/github-pull-request-make/main.go @@ -112,17 +112,19 @@ func pkgsFromDiff(r io.Reader) (map[string]pkg, error) { curBenchmarkName = string(currentGoBenchmarkRE.ReplaceAll(line, []byte(replacement))) curTestName = "" case bytes.HasPrefix(line, []byte{'-'}) && bytes.Contains(line, []byte(".Skip")): - switch { - case len(curTestName) > 0: - if !(curPkgName == "build" && curTestName == "TestStyle") { + if curPkgName != "" { + switch { + case len(curTestName) > 0: + if !(curPkgName == "build" && curTestName == "TestStyle") { + curPkg := pkgs[curPkgName] + curPkg.tests = append(curPkg.tests, curTestName) + pkgs[curPkgName] = curPkg + } + case len(curBenchmarkName) > 0: curPkg := pkgs[curPkgName] - curPkg.tests = append(curPkg.tests, curTestName) + curPkg.benchmarks = append(curPkg.benchmarks, curBenchmarkName) pkgs[curPkgName] = curPkg } - case len(curBenchmarkName) > 0: - curPkg := pkgs[curPkgName] - curPkg.benchmarks = append(curPkg.benchmarks, curBenchmarkName) - pkgs[curPkgName] = curPkg } } } diff --git a/pkg/cmd/roachtest/acceptance.go b/pkg/cmd/roachtest/acceptance.go index f53188fb0c52..bd7966ab9045 100644 --- a/pkg/cmd/roachtest/acceptance.go +++ b/pkg/cmd/roachtest/acceptance.go @@ -38,6 +38,7 @@ func registerAcceptance(r *registry) { {"bank/node-restart", runBankNodeRestart}, {"build-info", runBuildInfo}, {"cli/node-status", runCLINodeStatus}, + {"decommission", runDecommissionAcceptance}, {"event-log", runEventLog}, {"gossip/peerings", runGossipPeerings}, {"gossip/restart", runGossipRestart}, diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 28e387e217cb..022b0428bc4f 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -282,6 +282,7 @@ func registerCDC(r *registry) { }, }) r.Add(testSpec{ + Skip: "https://github.com/cockroachdb/cockroach/issues/29196", Name: "cdc/w=100/nodes=3/init=false/chaos=true", MinVersion: "2.1.0", Nodes: nodes(4, cpu(16)), diff --git a/pkg/cmd/roachtest/decommission.go b/pkg/cmd/roachtest/decommission.go index 1c2484ca4090..ddeb8868dbdc 100644 --- a/pkg/cmd/roachtest/decommission.go +++ b/pkg/cmd/roachtest/decommission.go @@ -17,12 +17,21 @@ package main import ( "context" + "encoding/csv" "fmt" + "reflect" + "regexp" "strconv" + "strings" "time" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + + "github.com/kr/pretty" _ "github.com/lib/pq" + "github.com/pkg/errors" "golang.org/x/sync/errgroup" ) @@ -120,7 +129,7 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) { if err != nil { t.Fatal(err) } - c.l.Printf(fmt.Sprintf("run: %s\n", stmt)) + c.l.Printf("run: %s\n", stmt) } var m *errgroup.Group // see comment in version.go @@ -229,9 +238,430 @@ func registerDecommission(r *registry) { Run: func(ctx context.Context, t *test, c *cluster) { if local { duration = 3 * time.Minute - fmt.Printf("running with duration=%s in local mode\n", duration) + c.l.Printf("running with duration=%s in local mode\n", duration) } runDecommission(t, c, numNodes, duration) }, }) } + +func runDecommissionAcceptance(ctx context.Context, t *test, c *cluster) { + args := startArgs("--sequential", "--env=COCKROACH_SCAN_MAX_IDLE_TIME=5ms") + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, args) + + execCLI := func( + ctx context.Context, + runNode int, + extraArgs ...string, + ) (string, error) { + args := []string{cockroach} + args = append(args, extraArgs...) + args = append(args, "--insecure") + args = append(args, fmt.Sprintf("--port={pgport:%d}", runNode)) + buf, err := c.RunWithBuffer(ctx, c.l, c.Node(runNode), args...) + c.l.Printf("%s\n", buf) + return string(buf), err + } + + decommission := func( + ctx context.Context, + runNode int, + targetNodes nodeListOption, + verbs ...string, + ) (string, error) { + args := []string{"node"} + args = append(args, verbs...) + for _, target := range targetNodes { + args = append(args, strconv.Itoa(target)) + } + return execCLI(ctx, runNode, args...) + } + + matchCSV := func(csvStr string, matchColRow [][]string) (err error) { + defer func() { + if err != nil { + err = errors.Errorf("csv input:\n%v\nexpected:\n%s\nerrors:%s", + csvStr, pretty.Sprint(matchColRow), err) + } + }() + + reader := csv.NewReader(strings.NewReader(csvStr)) + reader.FieldsPerRecord = -1 + records, err := reader.ReadAll() + if err != nil { + return err + } + + lr, lm := len(records), len(matchColRow) + if lr < lm { + return errors.Errorf("csv has %d rows, but expected at least %d", lr, lm) + } + + // Compare only the last len(matchColRow) records. That is, if we want to + // match 4 rows and we have 100 records, we only really compare + // records[96:], that is, the last four rows. + records = records[lr-lm:] + + for i := range records { + if lr, lm := len(records[i]), len(matchColRow[i]); lr != lm { + return errors.Errorf("row #%d: csv has %d columns, but expected %d", i+1, lr, lm) + } + for j := range records[i] { + pat, str := matchColRow[i][j], records[i][j] + re := regexp.MustCompile(pat) + if !re.MatchString(str) { + err = errors.Errorf("%v\nrow #%d, col #%d: found %q which does not match %q", + err, i+1, j+1, str, pat) + } + } + } + return err + } + + decommissionHeader := []string{ + "id", "is_live", "replicas", "is_decommissioning", "is_draining", + } + decommissionFooter := []string{ + "No more data reported on target nodes. " + + "Please verify cluster health before removing the nodes.", + } + statusHeader := []string{ + "id", "address", "build", "started_at", "updated_at", "is_live", + } + waitLiveDeprecated := "--wait=live is deprecated and is treated as --wait=all" + + c.l.Printf("decommissioning first node from the second, polling the status manually\n") + retryOpts := retry.Options{ + InitialBackoff: time.Second, + MaxBackoff: 5 * time.Second, + Multiplier: 1, + MaxRetries: 20, + } + for r := retry.Start(retryOpts); r.Next(); { + o, err := decommission(ctx, 2, c.Node(1), + "decommission", "--wait", "none", "--format", "csv") + if err != nil { + t.Fatalf("decommission failed: %v", err) + } + + exp := [][]string{ + decommissionHeader, + {"1", "true", "0", "true", "false"}, + decommissionFooter, + } + + if err := matchCSV(o, exp); err != nil { + continue + } + break + } + + // Check that even though the node is decommissioned, we still see it (since + // it remains live) in `node ls`. + { + o, err := execCLI(ctx, 2, "node", "ls", "--format", "csv") + if err != nil { + t.Fatalf("node-ls failed: %v", err) + } + exp := [][]string{ + {"id"}, + {"1"}, + {"2"}, + {"3"}, + {"4"}, + } + if err := matchCSV(o, exp); err != nil { + t.Fatal(err) + } + } + // Ditto `node status`. + { + o, err := execCLI(ctx, 2, "node", "status", "--format", "csv") + if err != nil { + t.Fatalf("node-status failed: %v", err) + } + exp := [][]string{ + statusHeader, + {`1`, `.*`, `.*`, `.*`, `.*`, `.*`}, + {`2`, `.*`, `.*`, `.*`, `.*`, `.*`}, + {`3`, `.*`, `.*`, `.*`, `.*`, `.*`}, + {`4`, `.*`, `.*`, `.*`, `.*`, `.*`}, + } + if err := matchCSV(o, exp); err != nil { + t.Fatal(err) + } + } + + c.l.Printf("recommissioning first node (from third node)\n") + if _, err := decommission(ctx, 3, c.Node(1), "recommission"); err != nil { + t.Fatalf("recommission failed: %v", err) + } + + c.l.Printf("decommissioning second node from third, using --wait=all\n") + { + o, err := decommission(ctx, 3, c.Node(2), + "decommission", "--wait", "all", "--format", "csv") + if err != nil { + t.Fatalf("decommission failed: %v", err) + } + + exp := [][]string{ + decommissionHeader, + {"2", "true", "0", "true", "false"}, + decommissionFooter, + } + if err := matchCSV(o, exp); err != nil { + t.Fatal(err) + } + } + + c.l.Printf("recommissioning second node from itself\n") + if _, err := decommission(ctx, 2, c.Node(2), "recommission"); err != nil { + t.Fatalf("recommission failed: %v", err) + } + + c.l.Printf("decommissioning third node via `quit --decommission`\n") + func() { + // This should not take longer than five minutes, and if it does, it's + // likely stuck forever and we want to see the output. + timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + if _, err := execCLI(timeoutCtx, 3, "quit", "--decommission"); err != nil { + if timeoutCtx.Err() != nil { + t.Fatalf("quit --decommission failed: %s", err) + } + // TODO(tschottdorf): grep the process output for the string announcing success? + c.l.Errorf("WARNING: ignoring error on quit --decommission: %s\n", err) + } + }() + + // Now that the third node is down and decommissioned, decommissioning it + // again should be a no-op. We do it from node one but as always it doesn't + // matter. + c.l.Printf("checking that other nodes see node three as successfully decommissioned\n") + { + o, err := decommission(ctx, 2, c.Node(3), + "decommission", "--format", "csv") // wait=all is implied + if err != nil { + t.Fatalf("decommission failed: %v", err) + } + + exp := [][]string{ + decommissionHeader, + // Expect the same as usual, except this time the node should be draining + // because it shut down cleanly (thanks to `quit --decommission`). + {"3", "true", "0", "true", "true"}, + decommissionFooter, + } + if err := matchCSV(o, exp); err != nil { + t.Fatal(err) + } + + // Bring the node back up. It's still decommissioned, so it won't be of much use. + c.Stop(ctx, c.Node(3)) + c.Start(ctx, c.Node(3), args) + + // Recommission. Welcome back! + if _, err = decommission(ctx, 2, c.Node(3), "recommission"); err != nil { + t.Fatalf("recommission failed: %v", err) + } + } + + // Kill the first node and verify that we can decommission it while it's down, + // bringing it back up to verify that its replicas still get removed. + c.l.Printf("intentionally killing first node\n") + c.Stop(ctx, c.Node(1)) + c.l.Printf("decommission first node, starting with it down but restarting it for verification\n") + { + o, err := decommission(ctx, 2, c.Node(1), + "decommission", "--wait", "live") + if err != nil { + t.Fatalf("decommission failed: %v", err) + } + if strings.Split(o, "\n")[1] != waitLiveDeprecated { + t.Fatal("missing deprecate message for --wait=live") + } + c.Start(ctx, c.Node(1), args) + // Run a second time to wait until the replicas have all been GC'ed. + // Note that we specify "all" because even though the first node is + // now running, it may not be live by the time the command runs. + o, err = decommission(ctx, 2, c.Node(1), + "decommission", "--wait", "all", "--format", "csv") + if err != nil { + t.Fatalf("decommission failed: %v", err) + } + + exp := [][]string{ + decommissionHeader, + {"1", "true|false", "0", "true", "false"}, + decommissionFooter, + } + if err := matchCSV(o, exp); err != nil { + t.Fatal(err) + } + } + + // Now we want to test decommissioning a truly dead node. Make sure we don't + // waste too much time waiting for the node to be recognized as dead. Note that + // we don't want to set this number too low or everything will seem dead to the + // allocator at all times, so nothing will ever happen. + func() { + db := c.Conn(ctx, 2) + defer db.Close() + const stmt = "SET CLUSTER SETTING server.time_until_store_dead = '1m15s'" + if _, err := db.ExecContext(ctx, stmt); err != nil { + t.Fatal(err) + } + }() + + c.l.Printf("intentionally killing first node\n") + c.Stop(ctx, c.Node(1)) + // It is being decommissioned in absentia, meaning that its replicas are + // being removed due to deadness. We can't see that reflected in the output + // since the current mechanism gets its replica counts from what the node + // reports about itself, so our assertion here is somewhat weak. + c.l.Printf("decommission first node in absentia using --wait=live\n") + { + o, err := decommission(ctx, 3, c.Node(1), + "decommission", "--wait", "live", "--format", "csv") + if err != nil { + t.Fatalf("decommission failed: %v", err) + } + + // Note we don't check precisely zero replicas (which the node would write + // itself, but it's dead). We do check that the node isn't live, though, which + // is essentially what `--wait=live` waits for. + // Note that the target node may still be "live" when it's marked as + // decommissioned, as its replica count may drop to zero faster than + // liveness times out. + exp := [][]string{ + decommissionHeader, + {"1", `true|false`, "0", `true`, `false`}, + decommissionFooter, + } + if err := matchCSV(o, exp); err != nil { + t.Fatal(err) + } + if strings.Split(o, "\n")[1] != waitLiveDeprecated { + t.Fatal("missing deprecate message for --wait=live") + } + } + + // Check that (at least after a bit) the node disappears from `node ls` + // because it is decommissioned and not live. + for { + o, err := execCLI(ctx, 2, "node", "ls", "--format", "csv") + if err != nil { + t.Fatalf("node-ls failed: %v", err) + } + + exp := [][]string{ + {"id"}, + {"2"}, + {"3"}, + {"4"}, + } + + if err := matchCSV(o, exp); err != nil { + time.Sleep(time.Second) + continue + } + break + } + for { + o, err := execCLI(ctx, 2, "node", "status", "--format", "csv") + if err != nil { + t.Fatalf("node-status failed: %v", err) + } + + exp := [][]string{ + statusHeader, + {`2`, `.*`, `.*`, `.*`, `.*`, `.*`}, + {`3`, `.*`, `.*`, `.*`, `.*`, `.*`}, + {`4`, `.*`, `.*`, `.*`, `.*`, `.*`}, + } + if err := matchCSV(o, exp); err != nil { + time.Sleep(time.Second) + continue + } + break + } + + if err := retry.ForDuration(time.Minute, func() error { + // Verify the event log has recorded exactly one decommissioned or + // recommissioned event for each commissioning operation. + // + // Spurious errors appear to be possible since we might be trying to + // send RPCs to the (relatively recently) down node: + // + // pq: rpc error: code = Unavailable desc = grpc: the connection is + // unavailable + // + // Seen in https://teamcity.cockroachdb.com/viewLog.html?buildId=344802. + db := c.Conn(ctx, 2) + defer db.Close() + + rows, err := db.Query(` +SELECT "eventType", "targetID" FROM system.eventlog +WHERE "eventType" IN ($1, $2) ORDER BY timestamp`, + "node_decommissioned", "node_recommissioned", + ) + if err != nil { + c.l.Printf("retrying: %v\n", err) + return err + } + defer rows.Close() + + matrix, err := sqlutils.RowsToStrMatrix(rows) + if err != nil { + return err + } + + expMatrix := [][]string{ + {"node_decommissioned", "1"}, + {"node_recommissioned", "1"}, + {"node_decommissioned", "2"}, + {"node_recommissioned", "2"}, + {"node_decommissioned", "3"}, + {"node_recommissioned", "3"}, + {"node_decommissioned", "1"}, + } + + if !reflect.DeepEqual(matrix, expMatrix) { + t.Fatalf("unexpected diff(matrix, expMatrix):\n%s", pretty.Diff(matrix, expMatrix)) + } + return nil + }); err != nil { + t.Fatal(err) + } + + // Last, verify that the operator can't shoot themselves in the foot by + // accidentally decommissioning all nodes. + // + // Specify wait=none because the command would block forever (the replicas have + // nowhere to go). + if _, err := decommission(ctx, 2, c.All(), "decommission", "--wait", "none"); err != nil { + t.Fatalf("decommission failed: %v", err) + } + + // Check that we can still do stuff. Creating a database should be good enough. + db := c.Conn(ctx, 2) + defer db.Close() + + if _, err := db.Exec(`CREATE DATABASE still_working;`); err != nil { + t.Fatal(err) + } + + // Recommission all nodes. + if _, err := decommission(ctx, 2, c.All(), "recommission"); err != nil { + t.Fatalf("recommission failed: %v", err) + } + + // To verify that all nodes are actually accepting replicas again, decommission + // the first nodes (blocking until it's done). This proves that the other nodes + // absorb the first one's replicas. + if _, err := decommission(ctx, 2, c.Node(1), "decommission"); err != nil { + t.Fatalf("decommission failed: %v", err) + } +} diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index fafc2844eefd..46ec3f02511d 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -144,7 +144,7 @@ func (sc *SchemaChanger) runBackfill( case sqlbase.DescriptorMutation_ADD: switch t := m.Descriptor_.(type) { case *sqlbase.DescriptorMutation_Column: - if columnNeedsBackfill(m.GetColumn()) { + if sqlbase.ColumnNeedsBackfill(m.GetColumn()) { needColumnBackfill = true } case *sqlbase.DescriptorMutation_Index: @@ -582,10 +582,6 @@ func (sc *SchemaChanger) truncateAndBackfillColumns( backfill.ColumnMutationFilter) } -func columnNeedsBackfill(desc *sqlbase.ColumnDescriptor) bool { - return desc.DefaultExpr != nil || !desc.Nullable || desc.IsComputed() -} - // runSchemaChangesInTxn runs all the schema changes immediately in a // transaction. This is called when a CREATE TABLE is followed by // schema changes in the same transaction. The CREATE TABLE is @@ -626,7 +622,7 @@ func runSchemaChangesInTxn( case sqlbase.DescriptorMutation_ADD: switch m.Descriptor_.(type) { case *sqlbase.DescriptorMutation_Column: - if doneColumnBackfill || !columnNeedsBackfill(m.GetColumn()) { + if doneColumnBackfill || !sqlbase.ColumnNeedsBackfill(m.GetColumn()) { break } if err := columnBackfillInTxn(ctx, txn, tc, evalCtx, tableDesc, traceKV); err != nil { diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index 5daf1426fa7f..746fa708b862 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -66,13 +66,12 @@ func SplitTable( targetNodeIdx int, vals ...interface{}, ) { - pik, err := sqlbase.TestingMakePrimaryIndexKey(desc, vals...) - if err != nil { - t.Fatal(err) + if tc.ReplicationMode() != base.ReplicationManual { + t.Fatal("SplitTable called on a test cluster that was not in manual replication mode") } - // Prevent the merge queue from immediately discarding our split. - if _, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { + pik, err := sqlbase.TestingMakePrimaryIndexKey(desc, vals...) + if err != nil { t.Fatal(err) } @@ -328,8 +327,6 @@ func TestDistSQLRangeCachesIntegrationTest(t *testing.T) { // We're going to split one of the tables, but node 4 is unaware of this. _, err = db0.Exec(fmt.Sprintf(` - -- Prevent the merge queue from immediately discarding our splits. - SET CLUSTER SETTING kv.range_merge.queue_enabled = false; ALTER TABLE "right" SPLIT AT VALUES (1), (2), (3); ALTER TABLE "right" EXPERIMENTAL_RELOCATE VALUES (ARRAY[%d], 1), (ARRAY[%d], 2), (ARRAY[%d], 3); `, @@ -411,9 +408,6 @@ func TestDistSQLDeadHosts(t *testing.T) { r.Exec(t, "CREATE TABLE t (x INT PRIMARY KEY, xsquared INT)") - // Prevent the merge queue from immediately discarding our splits. - r.Exec(t, "SET CLUSTER SETTING kv.range_merge.queue_enabled = false") - for i := 0; i < numNodes; i++ { r.Exec(t, fmt.Sprintf("ALTER TABLE t SPLIT AT VALUES (%d)", n*i/5)) } @@ -501,9 +495,6 @@ func TestDistSQLDrainingHosts(t *testing.T) { r := sqlutils.MakeSQLRunner(tc.ServerConn(0)) r.DB.SetMaxOpenConns(1) - // Prevent the merge queue from immediately discarding our splits. - r.Exec(t, "SET CLUSTER SETTING kv.range_merge.queue_enabled = false") - r.Exec(t, "SET DISTSQL = ON") // Force the query to be distributed. r.Exec( diff --git a/pkg/sql/distsqlplan/span_resolver_test.go b/pkg/sql/distsqlplan/span_resolver_test.go index 54611398fd84..d3be6f3e2099 100644 --- a/pkg/sql/distsqlplan/span_resolver_test.go +++ b/pkg/sql/distsqlplan/span_resolver_test.go @@ -331,11 +331,6 @@ func setupRanges( } } - // Prevent the merge queue from immediately discarding our splits. - if _, err := db.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { - t.Fatal(err) - } - tableDesc := sqlbase.GetTableDescriptor(cdb, "t", "test") // Split every SQL row to its own range. rowRanges := make([]roachpb.RangeDescriptor, len(values)) diff --git a/pkg/sql/distsqlrun/cluster_test.go b/pkg/sql/distsqlrun/cluster_test.go index cf27fc982cec..d100fc0c692d 100644 --- a/pkg/sql/distsqlrun/cluster_test.go +++ b/pkg/sql/distsqlrun/cluster_test.go @@ -563,8 +563,6 @@ func TestDistSQLReadsFillGatewayID(t *testing.T) { sqlutils.ToRowFn(sqlutils.RowIdxFn)) if _, err := db.Exec(` --- Prevent the merge queue from immediately discarding our splits. -SET CLUSTER SETTING kv.range_merge.queue_enabled = false; ALTER TABLE t SPLIT AT VALUES (1), (2), (3); ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 1), (ARRAY[1], 2), (ARRAY[3], 3); `); err != nil { diff --git a/pkg/sql/distsqlrun/tablereader_test.go b/pkg/sql/distsqlrun/tablereader_test.go index 7c0fae73e568..00fe2e6c67b0 100644 --- a/pkg/sql/distsqlrun/tablereader_test.go +++ b/pkg/sql/distsqlrun/tablereader_test.go @@ -197,8 +197,6 @@ func TestMisplannedRangesMetadata(t *testing.T) { sqlutils.ToRowFn(sqlutils.RowIdxFn)) _, err := db.Exec(` --- Prevent the merge queue from immediately discarding our splits. -SET CLUSTER SETTING kv.range_merge.queue_enabled = false; ALTER TABLE t SPLIT AT VALUES (1), (2), (3); ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 1), (ARRAY[1], 2), (ARRAY[3], 3); `) diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 122412829b3a..26a90bcb6a07 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -1122,6 +1122,7 @@ alter_ddl_stmt: // ALTER TABLE ... VALIDATE CONSTRAINT // ALTER TABLE ... SPLIT AT // ALTER TABLE ... SCATTER [ FROM ( ) TO ( ) ] +// ALTER TABLE ... INJECT STATISTICS ... (experimental) // // Column qualifiers: // [CONSTRAINT ] {NULL | NOT NULL | UNIQUE | PRIMARY KEY | CHECK () | DEFAULT } @@ -1493,6 +1494,7 @@ alter_table_cmd: // ALTER TABLE INJECT STATISTICS | INJECT STATISTICS a_expr { + /* SKIP DOC */ $$.val = &tree.AlterTableInjectStats{ Stats: $3.expr(), } @@ -1928,8 +1930,8 @@ create_ddl_stmt: | create_view_stmt // EXTEND WITH HELP: CREATE VIEW | create_sequence_stmt // EXTEND WITH HELP: CREATE SEQUENCE -// %Help: CREATE STATISTICS - create a new table statistic -// %Category: Misc +// %Help: CREATE STATISTICS - create a new table statistic (experimental) +// %Category: Experimental // %Text: // CREATE STATISTICS // ON [, ...] @@ -1937,6 +1939,7 @@ create_ddl_stmt: create_stats_stmt: CREATE STATISTICS statistics_name ON name_list FROM table_name { + /* SKIP DOC */ $$.val = &tree.CreateStats{ Name: tree.Name($3), ColumnNames: $5.nameList(), @@ -2814,10 +2817,10 @@ zone_value: // %Help: SHOW // %Category: Group // %Text: -// SHOW SESSION, SHOW CLUSTER SETTING, SHOW DATABASES, SHOW TABLES, SHOW COLUMNS, SHOW INDEXES, -// SHOW CONSTRAINTS, SHOW CREATE, SHOW USERS, -// SHOW TRANSACTION, SHOW BACKUP, SHOW JOBS, SHOW QUERIES, SHOW ROLES, SHOW SESSIONS, SHOW SYNTAX, -// SHOW TRACE +// SHOW BACKUP, SHOW CLUSTER SETTING, SHOW COLUMNS, SHOW CONSTRAINTS, +// SHOW CREATE, SHOW DATABASES, SHOW HISTOGRAM, SHOW INDEXES, SHOW JOBS, +// SHOW QUERIES, SHOW ROLES, SHOW SESSION, SHOW SESSIONS, SHOW STATISTICS, +// SHOW SYNTAX, SHOW TABLES, SHOW TRACE SHOW TRANSACTION, SHOW USERS show_stmt: show_backup_stmt // EXTEND WITH HELP: SHOW BACKUP | show_columns_stmt // EXTEND WITH HELP: SHOW COLUMNS @@ -2869,8 +2872,8 @@ session_var: | TIME ZONE { $$ = "timezone" } | TIME error // SHOW HELP: SHOW SESSION -// %Help: SHOW STATISTICS - display table statistics -// %Category: Misc +// %Help: SHOW STATISTICS - display table statistics (experimental) +// %Category: Experimental // %Text: SHOW STATISTICS [USING JSON] FOR TABLE // // Returns the available statistics for a table. @@ -2882,16 +2885,18 @@ session_var: show_stats_stmt: SHOW STATISTICS FOR TABLE table_name { + /* SKIP DOC */ $$.val = &tree.ShowTableStats{Table: $5.normalizableTableNameFromUnresolvedName() } } | SHOW STATISTICS USING JSON FOR TABLE table_name { + /* SKIP DOC */ $$.val = &tree.ShowTableStats{Table: $7.normalizableTableNameFromUnresolvedName(), UsingJSON: true} } | SHOW STATISTICS error // SHOW HELP: SHOW STATISTICS -// %Help: SHOW HISTOGRAM - display histogram -// %Category: Misc +// %Help: SHOW HISTOGRAM - display histogram (experimental) +// %Category: Experimental // %Text: SHOW HISTOGRAM // // Returns the data in the histogram with the @@ -2900,6 +2905,7 @@ show_stats_stmt: show_histogram_stmt: SHOW HISTOGRAM ICONST { + /* SKIP DOC */ id, err := $3.numVal().AsInt64() if err != nil { sqllex.Error(err.Error()) diff --git a/pkg/sql/run_control_test.go b/pkg/sql/run_control_test.go index 08c69583bd10..ae40e9f32317 100644 --- a/pkg/sql/run_control_test.go +++ b/pkg/sql/run_control_test.go @@ -233,11 +233,6 @@ func TestCancelDistSQLQuery(t *testing.T) { t.Fatal(err) } - // Prevent the merge queue from immediately discarding our splits. - if _, err := conn1.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { - t.Fatal(err) - } - if _, err := conn1.Exec("ALTER TABLE nums SPLIT AT VALUES (50)"); err != nil { t.Fatal(err) } diff --git a/pkg/sql/sqlbase/structured.go b/pkg/sql/sqlbase/structured.go index 5285335bcfaf..545db5304798 100644 --- a/pkg/sql/sqlbase/structured.go +++ b/pkg/sql/sqlbase/structured.go @@ -2075,6 +2075,31 @@ func (desc *TableDescriptor) FinalizeMutation() (MutationID, error) { return mutationID, nil } +// ColumnNeedsBackfill returns true if adding the given column requires a +// backfill (dropping a column always requires a backfill). +func ColumnNeedsBackfill(desc *ColumnDescriptor) bool { + return desc.DefaultExpr != nil || !desc.Nullable || desc.IsComputed() +} + +// HasColumnBackfillMutation returns whether the table has any queued column +// mutations that require a backfill. +func (desc *TableDescriptor) HasColumnBackfillMutation() bool { + for _, m := range desc.Mutations { + col := m.GetColumn() + if col == nil { + // Index backfills don't affect changefeeds. + continue + } + // It's unfortunate that there's no one method we can call to check if a + // mutation will be a backfill or not, but this logic was extracted from + // backfill.go. + if m.Direction == DescriptorMutation_DROP || ColumnNeedsBackfill(col) { + return true + } + } + return false +} + // Dropped returns true if the table is being dropped. func (desc *TableDescriptor) Dropped() bool { return desc.State == TableDescriptor_DROP diff --git a/pkg/sql/trace_test.go b/pkg/sql/trace_test.go index 8f01101e214a..e4844ea0c637 100644 --- a/pkg/sql/trace_test.go +++ b/pkg/sql/trace_test.go @@ -523,8 +523,6 @@ func TestKVTraceDistSQL(t *testing.T) { r.Exec(t, "CREATE DATABASE test") r.Exec(t, "CREATE TABLE test.a (a INT PRIMARY KEY, b INT)") r.Exec(t, "INSERT INTO test.a VALUES (1,1), (2,2)") - // Prevent the merge queue from immediately discarding our splits. - r.Exec(t, "SET CLUSTER SETTING kv.range_merge.queue_enabled = false") r.Exec(t, "ALTER TABLE a SPLIT AT VALUES(1)") r.Exec(t, "SET tracing = on,kv; SELECT count(*) FROM test.a; SET tracing = off") diff --git a/pkg/sql/txn_restart_test.go b/pkg/sql/txn_restart_test.go index ccaec508d557..7c1110c8be01 100644 --- a/pkg/sql/txn_restart_test.go +++ b/pkg/sql/txn_restart_test.go @@ -1535,8 +1535,6 @@ func TestDistSQLRetryableError(t *testing.T) { // We're going to split one of the tables, but node 4 is unaware of this. _, err := db.Exec(fmt.Sprintf(` - -- Prevent the merge queue from immediately discarding our splits. - SET CLUSTER SETTING kv.range_merge.queue_enabled = false; ALTER TABLE "t" SPLIT AT VALUES (1), (2), (3); ALTER TABLE "t" EXPERIMENTAL_RELOCATE VALUES (ARRAY[%d], 1), (ARRAY[%d], 2), (ARRAY[%d], 3); `, diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 993a24d31bf8..5dcdf968f871 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -1578,7 +1578,6 @@ func TestStoreSplitTimestampCacheDifferentLeaseHolder(t *testing.T) { EvalKnobs: batcheval.TestingKnobs{ TestingEvalFilter: filter, }, - DisableMergeQueue: true, } tc := testcluster.StartTestCluster(t, 2, args) @@ -1760,7 +1759,6 @@ func TestStoreSplitOnRemovedReplica(t *testing.T) { args.ReplicationMode = base.ReplicationManual args.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{ TestingRequestFilter: filter, - DisableMergeQueue: true, } tc := testcluster.StartTestCluster(t, 3, args) @@ -1850,7 +1848,6 @@ func TestStoreSplitFailsAfterMaxRetries(t *testing.T) { args.ReplicationMode = base.ReplicationManual args.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{ TestingRequestFilter: filter, - DisableMergeQueue: true, } tc := testcluster.StartTestCluster(t, 1, args) diff --git a/pkg/storage/consistency_queue_test.go b/pkg/storage/consistency_queue_test.go index 08bd294c7207..a7c67b3c2f18 100644 --- a/pkg/storage/consistency_queue_test.go +++ b/pkg/storage/consistency_queue_test.go @@ -300,11 +300,6 @@ func TestConsistencyQueueRecomputeStats(t *testing.T) { ScanInterval: time.Second, ScanMinIdleTime: 0, ScanMaxIdleTime: 100 * time.Millisecond, - Knobs: base.TestingKnobs{ - Store: &storage.StoreTestingKnobs{ - DisableMergeQueue: true, - }, - }, } nodeZeroArgs := tsArgs nodeZeroArgs.StoreSpecs = []base.StoreSpec{{ diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index bb4b816505d8..c57158576ff7 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -92,6 +92,10 @@ type TestClusterInterface interface { // Target returns a roachpb.ReplicationTarget for the specified server. Target(serverIdx int) roachpb.ReplicationTarget + + // ReplicationMode returns the ReplicationMode that the test cluster was + // configured with. + ReplicationMode() base.TestClusterReplicationMode } // TestClusterFactory encompasses the actual implementation of the shim diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index f1e4bc4215c5..030301ae04c1 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -244,6 +244,7 @@ func (tc *TestCluster) doAddServer(t testing.TB, serverArgs base.TestServerArgs) stkCopy = *stk.(*storage.StoreTestingKnobs) } stkCopy.DisableSplitQueue = true + stkCopy.DisableMergeQueue = true stkCopy.DisableReplicateQueue = true serverArgs.Knobs.Store = &stkCopy } @@ -576,6 +577,11 @@ func (tc *TestCluster) WaitForNodeStatuses(t testing.TB) { }) } +// ReplicationMode implements TestClusterInterface. +func (tc *TestCluster) ReplicationMode() base.TestClusterReplicationMode { + return tc.replicationMode +} + type testClusterFactoryImpl struct{} // TestClusterFactory can be passed to serverutils.InitTestClusterFactory diff --git a/pkg/ui/src/views/login/loginPage.tsx b/pkg/ui/src/views/login/loginPage.tsx index 7e32c4ff9717..fa5ca5dc7dcf 100644 --- a/pkg/ui/src/views/login/loginPage.tsx +++ b/pkg/ui/src/views/login/loginPage.tsx @@ -129,9 +129,10 @@ class LoginPage extends React.Component

Log in to the Web UI

{this.renderError()} -
+