From 1147fa327afeeac977a27a6ab5a18959635f7169 Mon Sep 17 00:00:00 2001 From: Jesse Seldess Date: Fri, 31 Aug 2018 10:24:19 -0400 Subject: [PATCH 1/9] Fix replace and link in table_ref diagram Release note: None --- docs/generated/sql/bnf/table_ref.bnf | 4 ++-- pkg/cmd/docgen/diagrams.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/generated/sql/bnf/table_ref.bnf b/docs/generated/sql/bnf/table_ref.bnf index a226ade41c9d..8c41cc8e66ab 100644 --- a/docs/generated/sql/bnf/table_ref.bnf +++ b/docs/generated/sql/bnf/table_ref.bnf @@ -1,7 +1,7 @@ table_ref ::= - table_name opt_index_flags ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) + table_name ( '@' scan_parameters | ) ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) | '(' select_stmt ')' ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) | joined_table | '(' joined_table ')' ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) - | func_table ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) + | func_application ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) | '[' explainable_stmt ']' ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | ) diff --git a/pkg/cmd/docgen/diagrams.go b/pkg/cmd/docgen/diagrams.go index 17d71638f265..150d4bbed3c5 100644 --- a/pkg/cmd/docgen/diagrams.go +++ b/pkg/cmd/docgen/diagrams.go @@ -888,17 +888,17 @@ var specs = []stmtSpec{ name: "table_ref", inline: []string{"opt_ordinality", "opt_alias_clause", "opt_expr_list", "opt_column_list", "name_list", "alias_clause"}, replace: map[string]string{ - "select_with_parens": "'(' select_stmt ')'", - "opt_index_hints": "( '@' scan_parameters | )", - "relation_expr": "table_name", - "func_name '(' ( expr_list | ) ')'": "func_application", + "select_with_parens": "'(' select_stmt ')'", + "opt_index_flags": "( '@' scan_parameters | )", + "relation_expr": "table_name", + "func_table": "func_application", // "| func_name '(' ( expr_list | ) ')' ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | )": "", "| special_function ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | )": "", "| '(' joined_table ')' ( 'WITH' 'ORDINALITY' | ) ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) )": "| '(' joined_table ')' ( 'WITH' 'ORDINALITY' | ) ( ( 'AS' table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) | table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) ) | )", }, unlink: []string{"index_name"}, relink: map[string]string{ - "scan_parameters": "opt_index_hints", + "scan_parameters": "opt_index_flags", }, nosplit: true, }, From dc68d7fb4a8d0d04683718a960897e9724886e52 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Mon, 3 Sep 2018 15:10:52 -0400 Subject: [PATCH 2/9] github-pull-request-make: don't add packages with empty names When deleting a go test it was possible to add a package with an empty name. This in turn would cause CI failure as the top-level cockroach package contains no Go sources. Release note: None --- pkg/cmd/github-pull-request-make/main.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/cmd/github-pull-request-make/main.go b/pkg/cmd/github-pull-request-make/main.go index ab426699870c..c6a1921d388b 100644 --- a/pkg/cmd/github-pull-request-make/main.go +++ b/pkg/cmd/github-pull-request-make/main.go @@ -112,17 +112,19 @@ func pkgsFromDiff(r io.Reader) (map[string]pkg, error) { curBenchmarkName = string(currentGoBenchmarkRE.ReplaceAll(line, []byte(replacement))) curTestName = "" case bytes.HasPrefix(line, []byte{'-'}) && bytes.Contains(line, []byte(".Skip")): - switch { - case len(curTestName) > 0: - if !(curPkgName == "build" && curTestName == "TestStyle") { + if curPkgName != "" { + switch { + case len(curTestName) > 0: + if !(curPkgName == "build" && curTestName == "TestStyle") { + curPkg := pkgs[curPkgName] + curPkg.tests = append(curPkg.tests, curTestName) + pkgs[curPkgName] = curPkg + } + case len(curBenchmarkName) > 0: curPkg := pkgs[curPkgName] - curPkg.tests = append(curPkg.tests, curTestName) + curPkg.benchmarks = append(curPkg.benchmarks, curBenchmarkName) pkgs[curPkgName] = curPkg } - case len(curBenchmarkName) > 0: - curPkg := pkgs[curPkgName] - curPkg.benchmarks = append(curPkg.benchmarks, curBenchmarkName) - pkgs[curPkgName] = curPkg } } } From 9dd2f225504104358a10750640cdd78a9fa45523 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Mon, 3 Sep 2018 10:30:18 -0400 Subject: [PATCH 3/9] roachtest: add acceptance/decommission Move the decommission acceptance test to a new acceptance/decommission roachtest. Fixes #29151 Release note: None --- pkg/acceptance/decommission_test.go | 528 ---------------------------- pkg/acceptance/init_test.go | 7 +- pkg/acceptance/util_cluster.go | 68 +--- pkg/cmd/roachtest/acceptance.go | 1 + pkg/cmd/roachtest/decommission.go | 434 ++++++++++++++++++++++- 5 files changed, 438 insertions(+), 600 deletions(-) delete mode 100644 pkg/acceptance/decommission_test.go diff --git a/pkg/acceptance/decommission_test.go b/pkg/acceptance/decommission_test.go deleted file mode 100644 index 4605f3ec09dc..000000000000 --- a/pkg/acceptance/decommission_test.go +++ /dev/null @@ -1,528 +0,0 @@ -// Copyright 2015 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. - -package acceptance - -import ( - "context" - "reflect" - "regexp" - "strconv" - "strings" - "testing" - "time" - - gosql "database/sql" - - "github.com/cockroachdb/cockroach/pkg/acceptance/cluster" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/server/serverpb" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/util/encoding/csv" - "github.com/cockroachdb/cockroach/pkg/util/httputil" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/kr/pretty" - "github.com/pkg/errors" -) - -// TestDecommission starts up an >3 node cluster and decomissions and -// recommissions nodes in various ways. -func TestDecommission(t *testing.T) { - RunLocal(t, func(t *testing.T) { - s := log.Scope(t) - defer s.Close(t) - - runTestWithCluster(t, testDecommissionInner) - }) -} - -func decommission( - ctx context.Context, - c cluster.Cluster, - runNode int, - targetNodes []roachpb.NodeID, - verbs ...string, -) (string, string, error) { - args := append([]string{"node"}, verbs...) - for _, target := range targetNodes { - args = append(args, strconv.Itoa(int(target))) - } - o, e, err := c.ExecCLI(ctx, runNode, args) - return o, e, err -} - -func matchCSV(csvStr string, matchColRow [][]string) (err error) { - defer func() { - if err != nil { - err = errors.Errorf("csv input:\n%v\nexpected:\n%s\nerrors:%s", csvStr, pretty.Sprint(matchColRow), err) - } - }() - - reader := csv.NewReader(strings.NewReader(csvStr)) - reader.FieldsPerRecord = -1 - records, err := reader.ReadAll() - if err != nil { - return err - } - - lr, lm := len(records), len(matchColRow) - if lr < lm { - return errors.Errorf("csv has %d rows, but expected at least %d", lr, lm) - } - - // Compare only the last len(matchColRow) records. That is, if we want to - // match 4 rows and we have 100 records, we only really compare - // records[96:], that is, the last four rows. - records = records[lr-lm:] - - for i := range records { - if lr, lm := len(records[i]), len(matchColRow[i]); lr != lm { - return errors.Errorf("row #%d: csv has %d columns, but expected %d", i+1, lr, lm) - } - for j := range records[i] { - pat, str := matchColRow[i][j], records[i][j] - re := regexp.MustCompile(pat) - if !re.MatchString(str) { - err = errors.Errorf("%v\nrow #%d, col #%d: found %q which does not match %q", err, i+1, j+1, str, pat) - } - } - } - return err -} - -func testDecommissionInner( - ctx context.Context, t *testing.T, c cluster.Cluster, cfg cluster.TestConfig, -) { - if c.NumNodes() < 4 { - // TODO(tschottdorf): or we invent a way to change the ZoneConfig in - // this test and test less ambitiously (or split up the tests). - t.Skip("need at least four nodes") - } - - withDB := func(n int, stmt string) { - db, err := gosql.Open("postgres", c.PGUrl(ctx, n)) - if err != nil { - t.Fatal(err) - } - defer func() { - if err := db.Close(); err != nil { - t.Error(err) - } - }() - if _, err := db.ExecContext(ctx, stmt); err != nil { - t.Fatal(err) - } - } - - withDB(1, "SET CLUSTER SETTING server.remote_debugging.mode = 'any';") - - // Get the ids for each node. - idMap := make(map[int]roachpb.NodeID) - for i := 0; i < c.NumNodes(); i++ { - var details serverpb.DetailsResponse - if err := httputil.GetJSON(cluster.HTTPClient, c.URL(ctx, i)+"/_status/details/local", &details); err != nil { - t.Fatal(err) - } - idMap[i] = details.NodeID - } - - decommissionHeader := []string{"id", "is_live", "replicas", "is_decommissioning", "is_draining"} - decommissionFooter := []string{"No more data reported on target nodes. Please verify cluster health before removing the nodes."} - waitLiveDeprecated := "--wait=live is deprecated and is treated as --wait=all" - - statusHeader := []string{"id", "address", "build", "started_at", "updated_at", "is_live"} - - log.Info(ctx, "decommissioning first node from the second, polling the status manually") - retryOpts := retry.Options{ - InitialBackoff: time.Second, - MaxBackoff: 5 * time.Second, - Multiplier: 1, - MaxRetries: 20, - } - for r := retry.Start(retryOpts); r.Next(); { - o, _, err := decommission(ctx, c, 1, []roachpb.NodeID{idMap[0]}, "decommission", "--wait", "none", "--format", "csv") - if err != nil { - t.Fatal(err) - } - - exp := [][]string{ - decommissionHeader, - {strconv.Itoa(int(idMap[0])), "true", "0", "true", "false"}, - decommissionFooter, - } - log.Infof(ctx, o) - - if err := matchCSV(o, exp); err != nil { - continue - } - break - } - - // Check that even though the node is decommissioned, we still see it (since - // it remains live) in `node ls`. - { - o, _, err := c.ExecCLI(ctx, 2, []string{"node", "ls", "--format", "csv"}) - if err != nil { - t.Fatal(err) - } - exp := [][]string{ - {"id"}, - {"1"}, - {"2"}, - {"3"}, - {"4"}, - } - if err := matchCSV(o, exp); err != nil { - t.Fatal(err) - } - } - // Ditto `node status`. - { - o, _, err := c.ExecCLI(ctx, 2, []string{"node", "status", "--format", "csv"}) - if err != nil { - t.Fatal(err) - } - exp := [][]string{ - statusHeader, - {`1`, `.*`, `.*`, `.*`, `.*`, `.*`}, - {`2`, `.*`, `.*`, `.*`, `.*`, `.*`}, - {`3`, `.*`, `.*`, `.*`, `.*`, `.*`}, - {`4`, `.*`, `.*`, `.*`, `.*`, `.*`}, - } - if err := matchCSV(o, exp); err != nil { - t.Fatal(err) - } - } - - log.Info(ctx, "recommissioning first node (from third node)") - { - o, _, err := decommission(ctx, c, 2, []roachpb.NodeID{idMap[0]}, "recommission") - if err != nil { - t.Fatal(err) - } - log.Infof(ctx, o) - } - - log.Info(ctx, "decommissioning second node from third, using --wait=all") - { - target := idMap[1] - o, _, err := decommission(ctx, c, 2, []roachpb.NodeID{target}, "decommission", "--wait", "all", "--format", "csv") - if err != nil { - t.Fatal(err) - } - log.Infof(ctx, o) - - exp := [][]string{ - decommissionHeader, - {strconv.Itoa(int(target)), "true", "0", "true", "false"}, - decommissionFooter, - } - if err := matchCSV(o, exp); err != nil { - t.Fatal(err) - } - } - - log.Info(ctx, "recommissioning second node from itself") - { - o, _, err := decommission(ctx, c, 1, []roachpb.NodeID{idMap[1]}, "recommission") - if err != nil { - t.Fatalf("could no recommission: %s\n%s", err, o) - } - log.Infof(ctx, o) - } - - log.Info(ctx, "decommissioning third node via `quit --decommission`") - { - // This should not take longer than five minutes, and if it does, it's - // likely stuck forever and we want to see the output. - timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - o, e, err := c.ExecCLI(timeoutCtx, 2, []string{"quit", "--decommission"}) - if err != nil { - if timeoutCtx.Err() != nil { - t.Fatalf("quit --decommission failed: %s\nstdout:\n%s\nstderr:\n%s", err, o, e) - } - // TODO(tschottdorf): grep the process output for the string announcing success? - log.Warningf(ctx, "ignoring error on quit --decommission: %s", err) - } else { - log.Infof(ctx, o, e) - } - - // Kill the node to generate the expected event (Kill is idempotent, so this works). - if err := c.Kill(ctx, 2); err != nil { - log.Warning(ctx, err) - } - } - - // Now that the third node is down and decommissioned, decommissioning it - // again should be a no-op. We do it from node one but as always it doesn't - // matter. - log.Info(ctx, "checking that other nodes see node three as successfully decommissioned") - { - target := idMap[2] - o, _, err := decommission(ctx, c, 1, []roachpb.NodeID{target}, "decommission", "--format", "csv") // wait=all is implied - if err != nil { - t.Fatal(err) - } - log.Infof(ctx, o) - - exp := [][]string{ - decommissionHeader, - // Expect the same as usual, except this time the node should be draining - // because it shut down cleanly (thanks to `quit --decommission`). - {strconv.Itoa(int(target)), "true", "0", "true", "true"}, - decommissionFooter, - } - if err := matchCSV(o, exp); err != nil { - t.Fatal(err) - } - - // Bring the node back up. It's still decommissioned, so it won't be of much use. - if err := c.Restart(ctx, 2); err != nil { - t.Fatal(err) - } - - // Recommission. Welcome back! - o, _, err = decommission(ctx, c, 1, []roachpb.NodeID{target}, "recommission") - if err != nil { - t.Fatal(err) - } - log.Infof(ctx, o) - } - - // Kill the first node and verify that we can decommission it while it's down, - // bringing it back up to verify that its replicas still get removed. - log.Info(ctx, "intentionally killing first node") - if err := c.Kill(ctx, 0); err != nil { - t.Fatal(err) - } - log.Info(ctx, "decommission first node, starting with it down but restarting it for verification") - { - target := idMap[0] - o, e, err := decommission(ctx, c, 2, []roachpb.NodeID{target}, "decommission", "--wait", "live") - if err != nil { - t.Fatal(err) - } - log.Infof(ctx, o) - if strings.Split(e, "\n")[1] != waitLiveDeprecated { - t.Fatal("missing deprecate message for --wait=live") - } - if err := c.Restart(ctx, 0); err != nil { - t.Fatal(err) - } - // Run a second time to wait until the replicas have all been GC'ed. - // Note that we specify "all" because even though the first node is - // now running, it may not be live by the time the command runs. - o, _, err = decommission(ctx, c, 2, []roachpb.NodeID{target}, "decommission", "--wait", "all", "--format", "csv") - if err != nil { - t.Fatal(err) - } - - log.Info(ctx, o) - - exp := [][]string{ - decommissionHeader, - {strconv.Itoa(int(target)), "true", "0", "true", "false"}, - decommissionFooter, - } - if err := matchCSV(o, exp); err != nil { - t.Fatal(err) - } - } - - // Now we want to test decommissioning a truly dead node. Make sure we don't - // waste too much time waiting for the node to be recognized as dead. Note that - // we don't want to set this number too low or everything will seem dead to the - // allocator at all times, so nothing will ever happen. - withDB(1, "SET CLUSTER SETTING server.time_until_store_dead = '1m15s'") - - log.Info(ctx, "intentionally killing first node") - if err := c.Kill(ctx, 0); err != nil { - t.Fatal(err) - } - // It is being decommissioned in absentia, meaning that its replicas are - // being removed due to deadness. We can't see that reflected in the output - // since the current mechanism gets its replica counts from what the node - // reports about itself, so our assertion here is somewhat weak. - log.Info(ctx, "decommission first node in absentia using --wait=live") - { - target := idMap[0] - o, e, err := decommission(ctx, c, 2, []roachpb.NodeID{target}, "decommission", "--wait", "live", "--format", "csv") - if err != nil { - t.Fatal(err) - } - - log.Infof(ctx, o) - - // Note we don't check precisely zero replicas (which the node would write - // itself, but it's dead). We do check that the node isn't live, though, which - // is essentially what `--wait=live` waits for. - // Note that the target node may still be "live" when it's marked as - // decommissioned, as its replica count may drop to zero faster than - // liveness times out. - exp := [][]string{ - decommissionHeader, - {strconv.Itoa(int(target)), `true|false`, "0", `true`, `false`}, - decommissionFooter, - } - if err := matchCSV(o, exp); err != nil { - t.Fatal(err) - } - if strings.Split(e, "\n")[1] != waitLiveDeprecated { - t.Fatal("missing deprecate message for --wait=live") - } - } - - // Check that (at least after a bit) the node disappears from `node ls` - // because it is decommissioned and not live. - for { - o, _, err := c.ExecCLI(ctx, 2, []string{"node", "ls", "--format", "csv"}) - if err != nil { - t.Fatal(err) - } - - log.Info(ctx, o) - - exp := [][]string{ - {"id"}, - {"2"}, - {"3"}, - {"4"}, - } - - if err := matchCSV(o, exp); err != nil { - time.Sleep(time.Second) - continue - } - break - } - for { - o, _, err := c.ExecCLI(ctx, 2, []string{"node", "status", "--format", "csv"}) - if err != nil { - t.Fatal(err) - } - - log.Info(ctx, o) - - exp := [][]string{ - statusHeader, - {`2`, `.*`, `.*`, `.*`, `.*`, `.*`}, - {`3`, `.*`, `.*`, `.*`, `.*`, `.*`}, - {`4`, `.*`, `.*`, `.*`, `.*`, `.*`}, - } - if err := matchCSV(o, exp); err != nil { - time.Sleep(time.Second) - continue - } - break - } - - var rows *gosql.Rows - if err := retry.ForDuration(time.Minute, func() error { - // Verify the event log has recorded exactly one decommissioned or - // recommissioned event for each commissioning operation. - // - // Spurious errors appear to be possible since we might be trying to - // send RPCs to the (relatively recently) down node: - // - // pq: rpc error: code = Unavailable desc = grpc: the connection is - // unavailable - // - // Seen in https://teamcity.cockroachdb.com/viewLog.html?buildId=344802. - db, err := gosql.Open("postgres", c.PGUrl(ctx, 1)) - if err != nil { - t.Fatal(err) - } - defer func() { - if err := db.Close(); err != nil { - t.Error(err) - } - }() - - rows, err = db.Query(` - SELECT "eventType", "targetID" FROM system.eventlog - WHERE "eventType" IN ($1, $2) ORDER BY timestamp`, - sql.EventLogNodeDecommissioned, sql.EventLogNodeRecommissioned, - ) - if err != nil { - log.Warning(ctx, errors.Wrap(err, "retrying after")) - return err - } - return nil - }); err != nil { - t.Fatal(err) - } - - matrix, err := sqlutils.RowsToStrMatrix(rows) - if err != nil { - t.Fatal(err) - } - expMatrix := [][]string{ - {string(sql.EventLogNodeDecommissioned), idMap[0].String()}, - {string(sql.EventLogNodeRecommissioned), idMap[0].String()}, - {string(sql.EventLogNodeDecommissioned), idMap[1].String()}, - {string(sql.EventLogNodeRecommissioned), idMap[1].String()}, - {string(sql.EventLogNodeDecommissioned), idMap[2].String()}, - {string(sql.EventLogNodeRecommissioned), idMap[2].String()}, - {string(sql.EventLogNodeDecommissioned), idMap[0].String()}, - } - - if !reflect.DeepEqual(matrix, expMatrix) { - t.Fatalf("unexpected diff(matrix, expMatrix):\n%s", pretty.Diff(matrix, expMatrix)) - } - - // Last, verify that the operator can't shoot themselves in the foot by - // accidentally decommissioning all nodes. - var allNodeIDs []roachpb.NodeID - for _, nodeID := range idMap { - allNodeIDs = append(allNodeIDs, nodeID) - } - - // Specify wait=none because the command would block forever (the replicas have - // nowhere to go). - if _, _, err := decommission( - ctx, c, 1, allNodeIDs, "decommission", "--wait", "none", - ); err != nil { - t.Fatal(err) - } - - // Check that we can still do stuff. Creating a database should be good enough. - db, err := gosql.Open("postgres", c.PGUrl(ctx, 1)) - if err != nil { - t.Fatal(err) - } - defer func() { _ = db.Close() }() - - if _, err := db.Exec(`CREATE DATABASE still_working;`); err != nil { - t.Fatal(err) - } - - // Recommission all nodes. - if _, _, err := decommission( - ctx, c, 1, allNodeIDs, "recommission", - ); err != nil { - t.Fatal(err) - } - - // To verify that all nodes are actually accepting replicas again, decommission - // the first nodes (blocking until it's done). This proves that the other nodes - // absorb the first one's replicas. - if _, _, err := decommission( - ctx, c, 1, []roachpb.NodeID{idMap[0]}, "decommission", - ); err != nil { - t.Fatal(err) - } -} diff --git a/pkg/acceptance/init_test.go b/pkg/acceptance/init_test.go index 9896354a1541..c5f9bc3999cf 100644 --- a/pkg/acceptance/init_test.go +++ b/pkg/acceptance/init_test.go @@ -39,8 +39,7 @@ func TestInitModeBootstrapNodeZero(t *testing.T) { s := log.Scope(t) defer s.Close(t) - // TODO(tschottdorf): give LocalCluster support for the init modes and we should be able - // to switch this to RunLocal. Ditto below. + // TODO(peter): Move this test to a roachtest. RunDocker(t, func(t *testing.T) { runTestWithCluster(t, testInitModeInner, useInitMode(cluster.INIT_BOOTSTRAP_NODE_ZERO)) }) @@ -50,7 +49,7 @@ func TestInitModeCommand(t *testing.T) { s := log.Scope(t) defer s.Close(t) - // TODO(tschottdorf): see above. + // TODO(peter): Move this test to a roachtest. RunDocker(t, func(t *testing.T) { runTestWithCluster(t, testInitModeInner, useInitMode(cluster.INIT_COMMAND)) }) @@ -75,7 +74,7 @@ func TestInitModeNone(t *testing.T) { s := log.Scope(t) defer s.Close(t) - // TODO(tschottdorf): see above. + // TODO(peter): Move this test to a roachtest. RunDocker(t, func(t *testing.T) { runTestWithCluster(t, testInitModeNoneInner, useInitMode(cluster.INIT_NONE)) }) diff --git a/pkg/acceptance/util_cluster.go b/pkg/acceptance/util_cluster.go index 49b2f2d8a1d1..62898bfd88b8 100644 --- a/pkg/acceptance/util_cluster.go +++ b/pkg/acceptance/util_cluster.go @@ -16,8 +16,6 @@ package acceptance import ( "context" - "io/ioutil" - "os" "path/filepath" "regexp" "strings" @@ -25,23 +23,15 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/acceptance/cluster" - "github.com/cockroachdb/cockroach/pkg/acceptance/localcluster" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/binfetcher" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" ) const ( - localTest = "runMode=local" dockerTest = "runMode=docker" ) -// RunLocal runs the given acceptance test using a bare cluster. -func RunLocal(t *testing.T, testee func(t *testing.T)) { - t.Run(localTest, testee) -} - // RunDocker runs the given acceptance test using a Docker cluster. func RunDocker(t *testing.T, testee func(t *testing.T)) { t.Run(dockerTest, testee) @@ -87,15 +77,13 @@ func StartCluster(ctx context.Context, t *testing.T, cfg cluster.TestConfig) (c parts := strings.Split(t.Name(), "/") if len(parts) < 2 { - t.Fatal("must invoke RunLocal or RunDocker") + t.Fatal("must invoke RunDocker") } var runMode string for _, part := range parts[1:] { part = reStripTestEnumeration.ReplaceAllLiteralString(part, "") switch part { - case localTest: - fallthrough case dockerTest: if runMode != "" { t.Fatalf("test has more than one run mode: %s and %s", runMode, part) @@ -105,44 +93,6 @@ func StartCluster(ctx context.Context, t *testing.T, cfg cluster.TestConfig) (c } switch runMode { - case localTest: - pwd, err := os.Getwd() - if err != nil { - t.Fatal(err) - } - dataDir, err := ioutil.TempDir(pwd, ".localcluster") - if err != nil { - t.Fatal(err) - } - - logDir := *flagLogDir - if logDir != "" { - logDir = filepath.Join(logDir, filepath.Clean(t.Name())) - } - - perNodeCfg := localcluster.MakePerNodeFixedPortsCfg(len(cfg.Nodes)) - for i := 0; i < len(cfg.Nodes); i++ { - // TODO(tschottdorf): handle Nodes[i].Stores properly. - if cfg.Nodes[i].Version != "" { - nCfg := perNodeCfg[i] - nCfg.Binary = GetBinary(ctx, t, cfg.Nodes[i].Version) - perNodeCfg[i] = nCfg - } - } - clusterCfg := localcluster.ClusterConfig{ - Ephemeral: true, - DataDir: dataDir, - LogDir: logDir, - NumNodes: len(cfg.Nodes), - PerNodeCfg: perNodeCfg, - NoWait: cfg.NoWait, - } - - l := localcluster.New(clusterCfg) - - l.Start(ctx) - c = &localcluster.LocalCluster{Cluster: l} - case dockerTest: logDir := *flagLogDir if logDir != "" { @@ -153,7 +103,7 @@ func StartCluster(ctx context.Context, t *testing.T, cfg cluster.TestConfig) (c c = l default: - t.Fatalf("unable to run in mode %q, use either RunLocal or RunDocker", runMode) + t.Fatalf("unable to run in mode %q, use RunDocker", runMode) } // Don't wait for replication unless requested (usually it is). @@ -229,17 +179,3 @@ func StartCluster(ctx context.Context, t *testing.T, cfg cluster.TestConfig) (c completed = true return c } - -// GetBinary retrieves a binary for the specified version and returns it. -func GetBinary(ctx context.Context, t *testing.T, version string) string { - t.Helper() - bin, err := binfetcher.Download(ctx, binfetcher.Options{ - Binary: "cockroach", - Dir: ".localcluster_cache", - Version: version, - }) - if err != nil { - t.Fatalf("unable to set up binary for v%s: %s", version, err) - } - return bin -} diff --git a/pkg/cmd/roachtest/acceptance.go b/pkg/cmd/roachtest/acceptance.go index f53188fb0c52..bd7966ab9045 100644 --- a/pkg/cmd/roachtest/acceptance.go +++ b/pkg/cmd/roachtest/acceptance.go @@ -38,6 +38,7 @@ func registerAcceptance(r *registry) { {"bank/node-restart", runBankNodeRestart}, {"build-info", runBuildInfo}, {"cli/node-status", runCLINodeStatus}, + {"decommission", runDecommissionAcceptance}, {"event-log", runEventLog}, {"gossip/peerings", runGossipPeerings}, {"gossip/restart", runGossipRestart}, diff --git a/pkg/cmd/roachtest/decommission.go b/pkg/cmd/roachtest/decommission.go index 1c2484ca4090..ddeb8868dbdc 100644 --- a/pkg/cmd/roachtest/decommission.go +++ b/pkg/cmd/roachtest/decommission.go @@ -17,12 +17,21 @@ package main import ( "context" + "encoding/csv" "fmt" + "reflect" + "regexp" "strconv" + "strings" "time" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + + "github.com/kr/pretty" _ "github.com/lib/pq" + "github.com/pkg/errors" "golang.org/x/sync/errgroup" ) @@ -120,7 +129,7 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) { if err != nil { t.Fatal(err) } - c.l.Printf(fmt.Sprintf("run: %s\n", stmt)) + c.l.Printf("run: %s\n", stmt) } var m *errgroup.Group // see comment in version.go @@ -229,9 +238,430 @@ func registerDecommission(r *registry) { Run: func(ctx context.Context, t *test, c *cluster) { if local { duration = 3 * time.Minute - fmt.Printf("running with duration=%s in local mode\n", duration) + c.l.Printf("running with duration=%s in local mode\n", duration) } runDecommission(t, c, numNodes, duration) }, }) } + +func runDecommissionAcceptance(ctx context.Context, t *test, c *cluster) { + args := startArgs("--sequential", "--env=COCKROACH_SCAN_MAX_IDLE_TIME=5ms") + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, args) + + execCLI := func( + ctx context.Context, + runNode int, + extraArgs ...string, + ) (string, error) { + args := []string{cockroach} + args = append(args, extraArgs...) + args = append(args, "--insecure") + args = append(args, fmt.Sprintf("--port={pgport:%d}", runNode)) + buf, err := c.RunWithBuffer(ctx, c.l, c.Node(runNode), args...) + c.l.Printf("%s\n", buf) + return string(buf), err + } + + decommission := func( + ctx context.Context, + runNode int, + targetNodes nodeListOption, + verbs ...string, + ) (string, error) { + args := []string{"node"} + args = append(args, verbs...) + for _, target := range targetNodes { + args = append(args, strconv.Itoa(target)) + } + return execCLI(ctx, runNode, args...) + } + + matchCSV := func(csvStr string, matchColRow [][]string) (err error) { + defer func() { + if err != nil { + err = errors.Errorf("csv input:\n%v\nexpected:\n%s\nerrors:%s", + csvStr, pretty.Sprint(matchColRow), err) + } + }() + + reader := csv.NewReader(strings.NewReader(csvStr)) + reader.FieldsPerRecord = -1 + records, err := reader.ReadAll() + if err != nil { + return err + } + + lr, lm := len(records), len(matchColRow) + if lr < lm { + return errors.Errorf("csv has %d rows, but expected at least %d", lr, lm) + } + + // Compare only the last len(matchColRow) records. That is, if we want to + // match 4 rows and we have 100 records, we only really compare + // records[96:], that is, the last four rows. + records = records[lr-lm:] + + for i := range records { + if lr, lm := len(records[i]), len(matchColRow[i]); lr != lm { + return errors.Errorf("row #%d: csv has %d columns, but expected %d", i+1, lr, lm) + } + for j := range records[i] { + pat, str := matchColRow[i][j], records[i][j] + re := regexp.MustCompile(pat) + if !re.MatchString(str) { + err = errors.Errorf("%v\nrow #%d, col #%d: found %q which does not match %q", + err, i+1, j+1, str, pat) + } + } + } + return err + } + + decommissionHeader := []string{ + "id", "is_live", "replicas", "is_decommissioning", "is_draining", + } + decommissionFooter := []string{ + "No more data reported on target nodes. " + + "Please verify cluster health before removing the nodes.", + } + statusHeader := []string{ + "id", "address", "build", "started_at", "updated_at", "is_live", + } + waitLiveDeprecated := "--wait=live is deprecated and is treated as --wait=all" + + c.l.Printf("decommissioning first node from the second, polling the status manually\n") + retryOpts := retry.Options{ + InitialBackoff: time.Second, + MaxBackoff: 5 * time.Second, + Multiplier: 1, + MaxRetries: 20, + } + for r := retry.Start(retryOpts); r.Next(); { + o, err := decommission(ctx, 2, c.Node(1), + "decommission", "--wait", "none", "--format", "csv") + if err != nil { + t.Fatalf("decommission failed: %v", err) + } + + exp := [][]string{ + decommissionHeader, + {"1", "true", "0", "true", "false"}, + decommissionFooter, + } + + if err := matchCSV(o, exp); err != nil { + continue + } + break + } + + // Check that even though the node is decommissioned, we still see it (since + // it remains live) in `node ls`. + { + o, err := execCLI(ctx, 2, "node", "ls", "--format", "csv") + if err != nil { + t.Fatalf("node-ls failed: %v", err) + } + exp := [][]string{ + {"id"}, + {"1"}, + {"2"}, + {"3"}, + {"4"}, + } + if err := matchCSV(o, exp); err != nil { + t.Fatal(err) + } + } + // Ditto `node status`. + { + o, err := execCLI(ctx, 2, "node", "status", "--format", "csv") + if err != nil { + t.Fatalf("node-status failed: %v", err) + } + exp := [][]string{ + statusHeader, + {`1`, `.*`, `.*`, `.*`, `.*`, `.*`}, + {`2`, `.*`, `.*`, `.*`, `.*`, `.*`}, + {`3`, `.*`, `.*`, `.*`, `.*`, `.*`}, + {`4`, `.*`, `.*`, `.*`, `.*`, `.*`}, + } + if err := matchCSV(o, exp); err != nil { + t.Fatal(err) + } + } + + c.l.Printf("recommissioning first node (from third node)\n") + if _, err := decommission(ctx, 3, c.Node(1), "recommission"); err != nil { + t.Fatalf("recommission failed: %v", err) + } + + c.l.Printf("decommissioning second node from third, using --wait=all\n") + { + o, err := decommission(ctx, 3, c.Node(2), + "decommission", "--wait", "all", "--format", "csv") + if err != nil { + t.Fatalf("decommission failed: %v", err) + } + + exp := [][]string{ + decommissionHeader, + {"2", "true", "0", "true", "false"}, + decommissionFooter, + } + if err := matchCSV(o, exp); err != nil { + t.Fatal(err) + } + } + + c.l.Printf("recommissioning second node from itself\n") + if _, err := decommission(ctx, 2, c.Node(2), "recommission"); err != nil { + t.Fatalf("recommission failed: %v", err) + } + + c.l.Printf("decommissioning third node via `quit --decommission`\n") + func() { + // This should not take longer than five minutes, and if it does, it's + // likely stuck forever and we want to see the output. + timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + if _, err := execCLI(timeoutCtx, 3, "quit", "--decommission"); err != nil { + if timeoutCtx.Err() != nil { + t.Fatalf("quit --decommission failed: %s", err) + } + // TODO(tschottdorf): grep the process output for the string announcing success? + c.l.Errorf("WARNING: ignoring error on quit --decommission: %s\n", err) + } + }() + + // Now that the third node is down and decommissioned, decommissioning it + // again should be a no-op. We do it from node one but as always it doesn't + // matter. + c.l.Printf("checking that other nodes see node three as successfully decommissioned\n") + { + o, err := decommission(ctx, 2, c.Node(3), + "decommission", "--format", "csv") // wait=all is implied + if err != nil { + t.Fatalf("decommission failed: %v", err) + } + + exp := [][]string{ + decommissionHeader, + // Expect the same as usual, except this time the node should be draining + // because it shut down cleanly (thanks to `quit --decommission`). + {"3", "true", "0", "true", "true"}, + decommissionFooter, + } + if err := matchCSV(o, exp); err != nil { + t.Fatal(err) + } + + // Bring the node back up. It's still decommissioned, so it won't be of much use. + c.Stop(ctx, c.Node(3)) + c.Start(ctx, c.Node(3), args) + + // Recommission. Welcome back! + if _, err = decommission(ctx, 2, c.Node(3), "recommission"); err != nil { + t.Fatalf("recommission failed: %v", err) + } + } + + // Kill the first node and verify that we can decommission it while it's down, + // bringing it back up to verify that its replicas still get removed. + c.l.Printf("intentionally killing first node\n") + c.Stop(ctx, c.Node(1)) + c.l.Printf("decommission first node, starting with it down but restarting it for verification\n") + { + o, err := decommission(ctx, 2, c.Node(1), + "decommission", "--wait", "live") + if err != nil { + t.Fatalf("decommission failed: %v", err) + } + if strings.Split(o, "\n")[1] != waitLiveDeprecated { + t.Fatal("missing deprecate message for --wait=live") + } + c.Start(ctx, c.Node(1), args) + // Run a second time to wait until the replicas have all been GC'ed. + // Note that we specify "all" because even though the first node is + // now running, it may not be live by the time the command runs. + o, err = decommission(ctx, 2, c.Node(1), + "decommission", "--wait", "all", "--format", "csv") + if err != nil { + t.Fatalf("decommission failed: %v", err) + } + + exp := [][]string{ + decommissionHeader, + {"1", "true|false", "0", "true", "false"}, + decommissionFooter, + } + if err := matchCSV(o, exp); err != nil { + t.Fatal(err) + } + } + + // Now we want to test decommissioning a truly dead node. Make sure we don't + // waste too much time waiting for the node to be recognized as dead. Note that + // we don't want to set this number too low or everything will seem dead to the + // allocator at all times, so nothing will ever happen. + func() { + db := c.Conn(ctx, 2) + defer db.Close() + const stmt = "SET CLUSTER SETTING server.time_until_store_dead = '1m15s'" + if _, err := db.ExecContext(ctx, stmt); err != nil { + t.Fatal(err) + } + }() + + c.l.Printf("intentionally killing first node\n") + c.Stop(ctx, c.Node(1)) + // It is being decommissioned in absentia, meaning that its replicas are + // being removed due to deadness. We can't see that reflected in the output + // since the current mechanism gets its replica counts from what the node + // reports about itself, so our assertion here is somewhat weak. + c.l.Printf("decommission first node in absentia using --wait=live\n") + { + o, err := decommission(ctx, 3, c.Node(1), + "decommission", "--wait", "live", "--format", "csv") + if err != nil { + t.Fatalf("decommission failed: %v", err) + } + + // Note we don't check precisely zero replicas (which the node would write + // itself, but it's dead). We do check that the node isn't live, though, which + // is essentially what `--wait=live` waits for. + // Note that the target node may still be "live" when it's marked as + // decommissioned, as its replica count may drop to zero faster than + // liveness times out. + exp := [][]string{ + decommissionHeader, + {"1", `true|false`, "0", `true`, `false`}, + decommissionFooter, + } + if err := matchCSV(o, exp); err != nil { + t.Fatal(err) + } + if strings.Split(o, "\n")[1] != waitLiveDeprecated { + t.Fatal("missing deprecate message for --wait=live") + } + } + + // Check that (at least after a bit) the node disappears from `node ls` + // because it is decommissioned and not live. + for { + o, err := execCLI(ctx, 2, "node", "ls", "--format", "csv") + if err != nil { + t.Fatalf("node-ls failed: %v", err) + } + + exp := [][]string{ + {"id"}, + {"2"}, + {"3"}, + {"4"}, + } + + if err := matchCSV(o, exp); err != nil { + time.Sleep(time.Second) + continue + } + break + } + for { + o, err := execCLI(ctx, 2, "node", "status", "--format", "csv") + if err != nil { + t.Fatalf("node-status failed: %v", err) + } + + exp := [][]string{ + statusHeader, + {`2`, `.*`, `.*`, `.*`, `.*`, `.*`}, + {`3`, `.*`, `.*`, `.*`, `.*`, `.*`}, + {`4`, `.*`, `.*`, `.*`, `.*`, `.*`}, + } + if err := matchCSV(o, exp); err != nil { + time.Sleep(time.Second) + continue + } + break + } + + if err := retry.ForDuration(time.Minute, func() error { + // Verify the event log has recorded exactly one decommissioned or + // recommissioned event for each commissioning operation. + // + // Spurious errors appear to be possible since we might be trying to + // send RPCs to the (relatively recently) down node: + // + // pq: rpc error: code = Unavailable desc = grpc: the connection is + // unavailable + // + // Seen in https://teamcity.cockroachdb.com/viewLog.html?buildId=344802. + db := c.Conn(ctx, 2) + defer db.Close() + + rows, err := db.Query(` +SELECT "eventType", "targetID" FROM system.eventlog +WHERE "eventType" IN ($1, $2) ORDER BY timestamp`, + "node_decommissioned", "node_recommissioned", + ) + if err != nil { + c.l.Printf("retrying: %v\n", err) + return err + } + defer rows.Close() + + matrix, err := sqlutils.RowsToStrMatrix(rows) + if err != nil { + return err + } + + expMatrix := [][]string{ + {"node_decommissioned", "1"}, + {"node_recommissioned", "1"}, + {"node_decommissioned", "2"}, + {"node_recommissioned", "2"}, + {"node_decommissioned", "3"}, + {"node_recommissioned", "3"}, + {"node_decommissioned", "1"}, + } + + if !reflect.DeepEqual(matrix, expMatrix) { + t.Fatalf("unexpected diff(matrix, expMatrix):\n%s", pretty.Diff(matrix, expMatrix)) + } + return nil + }); err != nil { + t.Fatal(err) + } + + // Last, verify that the operator can't shoot themselves in the foot by + // accidentally decommissioning all nodes. + // + // Specify wait=none because the command would block forever (the replicas have + // nowhere to go). + if _, err := decommission(ctx, 2, c.All(), "decommission", "--wait", "none"); err != nil { + t.Fatalf("decommission failed: %v", err) + } + + // Check that we can still do stuff. Creating a database should be good enough. + db := c.Conn(ctx, 2) + defer db.Close() + + if _, err := db.Exec(`CREATE DATABASE still_working;`); err != nil { + t.Fatal(err) + } + + // Recommission all nodes. + if _, err := decommission(ctx, 2, c.All(), "recommission"); err != nil { + t.Fatalf("recommission failed: %v", err) + } + + // To verify that all nodes are actually accepting replicas again, decommission + // the first nodes (blocking until it's done). This proves that the other nodes + // absorb the first one's replicas. + if _, err := decommission(ctx, 2, c.Node(1), "decommission"); err != nil { + t.Fatalf("decommission failed: %v", err) + } +} From f468147ebe0a0110ff1b2b370b632a378a1cb9a0 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 4 Sep 2018 14:01:06 -0400 Subject: [PATCH 4/9] stats: document stats-related commands as experimental Update the documentation inside `sql.y` to designate the stats-related statements as experimental. Release note: None --- docs/generated/sql/bnf/alter_table.bnf | 4 ++-- docs/generated/sql/bnf/show_var.bnf | 2 -- docs/generated/sql/bnf/stmt_block.bnf | 18 ------------------ pkg/sql/parser/sql.y | 26 ++++++++++++++++---------- 4 files changed, 18 insertions(+), 32 deletions(-) diff --git a/docs/generated/sql/bnf/alter_table.bnf b/docs/generated/sql/bnf/alter_table.bnf index 7f6bc5c4de66..3ea94d15bb4d 100644 --- a/docs/generated/sql/bnf/alter_table.bnf +++ b/docs/generated/sql/bnf/alter_table.bnf @@ -1,3 +1,3 @@ alter_onetable_stmt ::= - 'ALTER' 'TABLE' table_name ( ( ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by | 'INJECT' 'STATISTICS' a_expr ) ) ( ( ',' ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by | 'INJECT' 'STATISTICS' a_expr ) ) )* ) - | 'ALTER' 'TABLE' 'IF' 'EXISTS' table_name ( ( ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by | 'INJECT' 'STATISTICS' a_expr ) ) ( ( ',' ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by | 'INJECT' 'STATISTICS' a_expr ) ) )* ) + 'ALTER' 'TABLE' table_name ( ( ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by ) ) ( ( ',' ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by ) ) )* ) + | 'ALTER' 'TABLE' 'IF' 'EXISTS' table_name ( ( ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by ) ) ( ( ',' ( 'ADD' ( column_name typename col_qual_list ) | 'ADD' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' ( column_name typename col_qual_list ) | 'ADD' 'COLUMN' 'IF' 'NOT' 'EXISTS' ( column_name typename col_qual_list ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DEFAULT' a_expr | 'DROP' 'DEFAULT' ) | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'NOT' 'NULL' | 'ALTER' ( 'COLUMN' | ) column_name 'DROP' 'STORED' | 'DROP' ( 'COLUMN' | ) 'IF' 'EXISTS' column_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' ( 'COLUMN' | ) column_name ( 'CASCADE' | 'RESTRICT' | ) | 'ALTER' ( 'COLUMN' | ) column_name ( 'SET' 'DATA' | ) 'TYPE' typename ( 'COLLATE' collation_name | ) ( 'USING' a_expr | ) | 'ADD' ( 'CONSTRAINT' constraint_name constraint_elem | constraint_elem ) | 'VALIDATE' 'CONSTRAINT' constraint_name | 'DROP' 'CONSTRAINT' 'IF' 'EXISTS' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'DROP' 'CONSTRAINT' constraint_name ( 'CASCADE' | 'RESTRICT' | ) | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by ) ) )* ) diff --git a/docs/generated/sql/bnf/show_var.bnf b/docs/generated/sql/bnf/show_var.bnf index 3e5a38b4bb0c..93082503a4eb 100644 --- a/docs/generated/sql/bnf/show_var.bnf +++ b/docs/generated/sql/bnf/show_var.bnf @@ -6,7 +6,6 @@ show_stmt ::= | show_csettings_stmt | show_databases_stmt | show_grants_stmt - | show_histogram_stmt | show_indexes_stmt | show_jobs_stmt | show_queries_stmt @@ -15,7 +14,6 @@ show_stmt ::= | show_schemas_stmt | show_session_stmt | show_sessions_stmt - | show_stats_stmt | show_tables_stmt | show_trace_stmt | show_users_stmt diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index f272faa75577..6dd82283c4dd 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -58,7 +58,6 @@ create_stmt ::= create_user_stmt | create_role_stmt | create_ddl_stmt - | create_stats_stmt deallocate_stmt ::= 'DEALLOCATE' name @@ -157,7 +156,6 @@ show_stmt ::= | show_csettings_stmt | show_databases_stmt | show_grants_stmt - | show_histogram_stmt | show_indexes_stmt | show_jobs_stmt | show_queries_stmt @@ -166,7 +164,6 @@ show_stmt ::= | show_schemas_stmt | show_session_stmt | show_sessions_stmt - | show_stats_stmt | show_tables_stmt | show_trace_stmt | show_users_stmt @@ -262,9 +259,6 @@ create_ddl_stmt ::= | create_view_stmt | create_sequence_stmt -create_stats_stmt ::= - 'CREATE' 'STATISTICS' statistics_name 'ON' name_list 'FROM' table_name - name ::= 'identifier' | unreserved_keyword @@ -322,7 +316,6 @@ explainable_stmt ::= preparable_stmt | alter_ddl_stmt | create_ddl_stmt - | create_stats_stmt | drop_ddl_stmt | execute_stmt @@ -462,9 +455,6 @@ show_databases_stmt ::= show_grants_stmt ::= 'SHOW' 'GRANTS' opt_on_targets_roles for_grantee_clause -show_histogram_stmt ::= - 'SHOW' 'HISTOGRAM' 'ICONST' - show_indexes_stmt ::= 'SHOW' 'INDEX' 'FROM' table_name | 'SHOW' 'INDEXES' 'FROM' table_name @@ -498,10 +488,6 @@ show_sessions_stmt ::= | 'SHOW' 'CLUSTER' 'SESSIONS' | 'SHOW' 'LOCAL' 'SESSIONS' -show_stats_stmt ::= - 'SHOW' 'STATISTICS' 'FOR' 'TABLE' table_name - | 'SHOW' 'STATISTICS' 'USING' 'JSON' 'FOR' 'TABLE' table_name - show_tables_stmt ::= 'SHOW' 'TABLES' 'FROM' name '.' name | 'SHOW' 'TABLES' 'FROM' name @@ -895,9 +881,6 @@ create_sequence_stmt ::= 'CREATE' 'SEQUENCE' sequence_name opt_sequence_option_list | 'CREATE' 'SEQUENCE' 'IF' 'NOT' 'EXISTS' sequence_name opt_sequence_option_list -statistics_name ::= - name - with_clause ::= 'WITH' cte_list @@ -1755,7 +1738,6 @@ alter_table_cmd ::= | 'DROP' 'CONSTRAINT' constraint_name opt_drop_behavior | 'EXPERIMENTAL_AUDIT' 'SET' audit_mode | partition_by - | 'INJECT' 'STATISTICS' a_expr alter_index_cmd ::= partition_by diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 122412829b3a..26a90bcb6a07 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -1122,6 +1122,7 @@ alter_ddl_stmt: // ALTER TABLE ... VALIDATE CONSTRAINT // ALTER TABLE ... SPLIT AT // ALTER TABLE ... SCATTER [ FROM ( ) TO ( ) ] +// ALTER TABLE ... INJECT STATISTICS ... (experimental) // // Column qualifiers: // [CONSTRAINT ] {NULL | NOT NULL | UNIQUE | PRIMARY KEY | CHECK () | DEFAULT } @@ -1493,6 +1494,7 @@ alter_table_cmd: // ALTER TABLE INJECT STATISTICS | INJECT STATISTICS a_expr { + /* SKIP DOC */ $$.val = &tree.AlterTableInjectStats{ Stats: $3.expr(), } @@ -1928,8 +1930,8 @@ create_ddl_stmt: | create_view_stmt // EXTEND WITH HELP: CREATE VIEW | create_sequence_stmt // EXTEND WITH HELP: CREATE SEQUENCE -// %Help: CREATE STATISTICS - create a new table statistic -// %Category: Misc +// %Help: CREATE STATISTICS - create a new table statistic (experimental) +// %Category: Experimental // %Text: // CREATE STATISTICS // ON [, ...] @@ -1937,6 +1939,7 @@ create_ddl_stmt: create_stats_stmt: CREATE STATISTICS statistics_name ON name_list FROM table_name { + /* SKIP DOC */ $$.val = &tree.CreateStats{ Name: tree.Name($3), ColumnNames: $5.nameList(), @@ -2814,10 +2817,10 @@ zone_value: // %Help: SHOW // %Category: Group // %Text: -// SHOW SESSION, SHOW CLUSTER SETTING, SHOW DATABASES, SHOW TABLES, SHOW COLUMNS, SHOW INDEXES, -// SHOW CONSTRAINTS, SHOW CREATE, SHOW USERS, -// SHOW TRANSACTION, SHOW BACKUP, SHOW JOBS, SHOW QUERIES, SHOW ROLES, SHOW SESSIONS, SHOW SYNTAX, -// SHOW TRACE +// SHOW BACKUP, SHOW CLUSTER SETTING, SHOW COLUMNS, SHOW CONSTRAINTS, +// SHOW CREATE, SHOW DATABASES, SHOW HISTOGRAM, SHOW INDEXES, SHOW JOBS, +// SHOW QUERIES, SHOW ROLES, SHOW SESSION, SHOW SESSIONS, SHOW STATISTICS, +// SHOW SYNTAX, SHOW TABLES, SHOW TRACE SHOW TRANSACTION, SHOW USERS show_stmt: show_backup_stmt // EXTEND WITH HELP: SHOW BACKUP | show_columns_stmt // EXTEND WITH HELP: SHOW COLUMNS @@ -2869,8 +2872,8 @@ session_var: | TIME ZONE { $$ = "timezone" } | TIME error // SHOW HELP: SHOW SESSION -// %Help: SHOW STATISTICS - display table statistics -// %Category: Misc +// %Help: SHOW STATISTICS - display table statistics (experimental) +// %Category: Experimental // %Text: SHOW STATISTICS [USING JSON] FOR TABLE // // Returns the available statistics for a table. @@ -2882,16 +2885,18 @@ session_var: show_stats_stmt: SHOW STATISTICS FOR TABLE table_name { + /* SKIP DOC */ $$.val = &tree.ShowTableStats{Table: $5.normalizableTableNameFromUnresolvedName() } } | SHOW STATISTICS USING JSON FOR TABLE table_name { + /* SKIP DOC */ $$.val = &tree.ShowTableStats{Table: $7.normalizableTableNameFromUnresolvedName(), UsingJSON: true} } | SHOW STATISTICS error // SHOW HELP: SHOW STATISTICS -// %Help: SHOW HISTOGRAM - display histogram -// %Category: Misc +// %Help: SHOW HISTOGRAM - display histogram (experimental) +// %Category: Experimental // %Text: SHOW HISTOGRAM // // Returns the data in the histogram with the @@ -2900,6 +2905,7 @@ show_stats_stmt: show_histogram_stmt: SHOW HISTOGRAM ICONST { + /* SKIP DOC */ id, err := $3.numVal().AsInt64() if err != nil { sqllex.Error(err.Error()) From fc4a473b7e6e1f0a0b223f52dd6fc3e4970ab36b Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 4 Sep 2018 21:10:41 +0200 Subject: [PATCH 5/9] roachtest: skip (intentionally) failing Kafka chaos test This test has no chance of passing until Kafka chaos is actually supported (see #28636). Touches #29196. Release note: None --- pkg/cmd/roachtest/cdc.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 28e387e217cb..022b0428bc4f 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -282,6 +282,7 @@ func registerCDC(r *registry) { }, }) r.Add(testSpec{ + Skip: "https://github.com/cockroachdb/cockroach/issues/29196", Name: "cdc/w=100/nodes=3/init=false/chaos=true", MinVersion: "2.1.0", Nodes: nodes(4, cpu(16)), From e2e2e99a7298f07686d74c1b176f89e17dab8529 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Tue, 4 Sep 2018 15:10:36 -0400 Subject: [PATCH 6/9] testcluster: make manual replication mode disable the merge queue TestClusters have a manual replication mode for use in tests that need to precisely control replication on a cluster. Teach that mode to disable the merge queue in addition to the split and replicate queues. This decreases the number of tests that need to directly disable the merge queue. Release note: None --- pkg/sql/distsql_physical_planner_test.go | 17 ++++------------- pkg/sql/distsqlplan/span_resolver_test.go | 5 ----- pkg/sql/distsqlrun/cluster_test.go | 2 -- pkg/sql/distsqlrun/tablereader_test.go | 2 -- pkg/sql/run_control_test.go | 5 ----- pkg/sql/trace_test.go | 2 -- pkg/sql/txn_restart_test.go | 2 -- pkg/storage/client_split_test.go | 3 --- pkg/storage/consistency_queue_test.go | 5 ----- pkg/testutils/serverutils/test_cluster_shim.go | 4 ++++ pkg/testutils/testcluster/testcluster.go | 6 ++++++ 11 files changed, 14 insertions(+), 39 deletions(-) diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index 5daf1426fa7f..746fa708b862 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -66,13 +66,12 @@ func SplitTable( targetNodeIdx int, vals ...interface{}, ) { - pik, err := sqlbase.TestingMakePrimaryIndexKey(desc, vals...) - if err != nil { - t.Fatal(err) + if tc.ReplicationMode() != base.ReplicationManual { + t.Fatal("SplitTable called on a test cluster that was not in manual replication mode") } - // Prevent the merge queue from immediately discarding our split. - if _, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { + pik, err := sqlbase.TestingMakePrimaryIndexKey(desc, vals...) + if err != nil { t.Fatal(err) } @@ -328,8 +327,6 @@ func TestDistSQLRangeCachesIntegrationTest(t *testing.T) { // We're going to split one of the tables, but node 4 is unaware of this. _, err = db0.Exec(fmt.Sprintf(` - -- Prevent the merge queue from immediately discarding our splits. - SET CLUSTER SETTING kv.range_merge.queue_enabled = false; ALTER TABLE "right" SPLIT AT VALUES (1), (2), (3); ALTER TABLE "right" EXPERIMENTAL_RELOCATE VALUES (ARRAY[%d], 1), (ARRAY[%d], 2), (ARRAY[%d], 3); `, @@ -411,9 +408,6 @@ func TestDistSQLDeadHosts(t *testing.T) { r.Exec(t, "CREATE TABLE t (x INT PRIMARY KEY, xsquared INT)") - // Prevent the merge queue from immediately discarding our splits. - r.Exec(t, "SET CLUSTER SETTING kv.range_merge.queue_enabled = false") - for i := 0; i < numNodes; i++ { r.Exec(t, fmt.Sprintf("ALTER TABLE t SPLIT AT VALUES (%d)", n*i/5)) } @@ -501,9 +495,6 @@ func TestDistSQLDrainingHosts(t *testing.T) { r := sqlutils.MakeSQLRunner(tc.ServerConn(0)) r.DB.SetMaxOpenConns(1) - // Prevent the merge queue from immediately discarding our splits. - r.Exec(t, "SET CLUSTER SETTING kv.range_merge.queue_enabled = false") - r.Exec(t, "SET DISTSQL = ON") // Force the query to be distributed. r.Exec( diff --git a/pkg/sql/distsqlplan/span_resolver_test.go b/pkg/sql/distsqlplan/span_resolver_test.go index 54611398fd84..d3be6f3e2099 100644 --- a/pkg/sql/distsqlplan/span_resolver_test.go +++ b/pkg/sql/distsqlplan/span_resolver_test.go @@ -331,11 +331,6 @@ func setupRanges( } } - // Prevent the merge queue from immediately discarding our splits. - if _, err := db.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { - t.Fatal(err) - } - tableDesc := sqlbase.GetTableDescriptor(cdb, "t", "test") // Split every SQL row to its own range. rowRanges := make([]roachpb.RangeDescriptor, len(values)) diff --git a/pkg/sql/distsqlrun/cluster_test.go b/pkg/sql/distsqlrun/cluster_test.go index cf27fc982cec..d100fc0c692d 100644 --- a/pkg/sql/distsqlrun/cluster_test.go +++ b/pkg/sql/distsqlrun/cluster_test.go @@ -563,8 +563,6 @@ func TestDistSQLReadsFillGatewayID(t *testing.T) { sqlutils.ToRowFn(sqlutils.RowIdxFn)) if _, err := db.Exec(` --- Prevent the merge queue from immediately discarding our splits. -SET CLUSTER SETTING kv.range_merge.queue_enabled = false; ALTER TABLE t SPLIT AT VALUES (1), (2), (3); ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 1), (ARRAY[1], 2), (ARRAY[3], 3); `); err != nil { diff --git a/pkg/sql/distsqlrun/tablereader_test.go b/pkg/sql/distsqlrun/tablereader_test.go index 7c0fae73e568..00fe2e6c67b0 100644 --- a/pkg/sql/distsqlrun/tablereader_test.go +++ b/pkg/sql/distsqlrun/tablereader_test.go @@ -197,8 +197,6 @@ func TestMisplannedRangesMetadata(t *testing.T) { sqlutils.ToRowFn(sqlutils.RowIdxFn)) _, err := db.Exec(` --- Prevent the merge queue from immediately discarding our splits. -SET CLUSTER SETTING kv.range_merge.queue_enabled = false; ALTER TABLE t SPLIT AT VALUES (1), (2), (3); ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 1), (ARRAY[1], 2), (ARRAY[3], 3); `) diff --git a/pkg/sql/run_control_test.go b/pkg/sql/run_control_test.go index 08c69583bd10..ae40e9f32317 100644 --- a/pkg/sql/run_control_test.go +++ b/pkg/sql/run_control_test.go @@ -233,11 +233,6 @@ func TestCancelDistSQLQuery(t *testing.T) { t.Fatal(err) } - // Prevent the merge queue from immediately discarding our splits. - if _, err := conn1.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { - t.Fatal(err) - } - if _, err := conn1.Exec("ALTER TABLE nums SPLIT AT VALUES (50)"); err != nil { t.Fatal(err) } diff --git a/pkg/sql/trace_test.go b/pkg/sql/trace_test.go index 8f01101e214a..e4844ea0c637 100644 --- a/pkg/sql/trace_test.go +++ b/pkg/sql/trace_test.go @@ -523,8 +523,6 @@ func TestKVTraceDistSQL(t *testing.T) { r.Exec(t, "CREATE DATABASE test") r.Exec(t, "CREATE TABLE test.a (a INT PRIMARY KEY, b INT)") r.Exec(t, "INSERT INTO test.a VALUES (1,1), (2,2)") - // Prevent the merge queue from immediately discarding our splits. - r.Exec(t, "SET CLUSTER SETTING kv.range_merge.queue_enabled = false") r.Exec(t, "ALTER TABLE a SPLIT AT VALUES(1)") r.Exec(t, "SET tracing = on,kv; SELECT count(*) FROM test.a; SET tracing = off") diff --git a/pkg/sql/txn_restart_test.go b/pkg/sql/txn_restart_test.go index ccaec508d557..7c1110c8be01 100644 --- a/pkg/sql/txn_restart_test.go +++ b/pkg/sql/txn_restart_test.go @@ -1535,8 +1535,6 @@ func TestDistSQLRetryableError(t *testing.T) { // We're going to split one of the tables, but node 4 is unaware of this. _, err := db.Exec(fmt.Sprintf(` - -- Prevent the merge queue from immediately discarding our splits. - SET CLUSTER SETTING kv.range_merge.queue_enabled = false; ALTER TABLE "t" SPLIT AT VALUES (1), (2), (3); ALTER TABLE "t" EXPERIMENTAL_RELOCATE VALUES (ARRAY[%d], 1), (ARRAY[%d], 2), (ARRAY[%d], 3); `, diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 993a24d31bf8..5dcdf968f871 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -1578,7 +1578,6 @@ func TestStoreSplitTimestampCacheDifferentLeaseHolder(t *testing.T) { EvalKnobs: batcheval.TestingKnobs{ TestingEvalFilter: filter, }, - DisableMergeQueue: true, } tc := testcluster.StartTestCluster(t, 2, args) @@ -1760,7 +1759,6 @@ func TestStoreSplitOnRemovedReplica(t *testing.T) { args.ReplicationMode = base.ReplicationManual args.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{ TestingRequestFilter: filter, - DisableMergeQueue: true, } tc := testcluster.StartTestCluster(t, 3, args) @@ -1850,7 +1848,6 @@ func TestStoreSplitFailsAfterMaxRetries(t *testing.T) { args.ReplicationMode = base.ReplicationManual args.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{ TestingRequestFilter: filter, - DisableMergeQueue: true, } tc := testcluster.StartTestCluster(t, 1, args) diff --git a/pkg/storage/consistency_queue_test.go b/pkg/storage/consistency_queue_test.go index 08bd294c7207..a7c67b3c2f18 100644 --- a/pkg/storage/consistency_queue_test.go +++ b/pkg/storage/consistency_queue_test.go @@ -300,11 +300,6 @@ func TestConsistencyQueueRecomputeStats(t *testing.T) { ScanInterval: time.Second, ScanMinIdleTime: 0, ScanMaxIdleTime: 100 * time.Millisecond, - Knobs: base.TestingKnobs{ - Store: &storage.StoreTestingKnobs{ - DisableMergeQueue: true, - }, - }, } nodeZeroArgs := tsArgs nodeZeroArgs.StoreSpecs = []base.StoreSpec{{ diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index bb4b816505d8..c57158576ff7 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -92,6 +92,10 @@ type TestClusterInterface interface { // Target returns a roachpb.ReplicationTarget for the specified server. Target(serverIdx int) roachpb.ReplicationTarget + + // ReplicationMode returns the ReplicationMode that the test cluster was + // configured with. + ReplicationMode() base.TestClusterReplicationMode } // TestClusterFactory encompasses the actual implementation of the shim diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index f1e4bc4215c5..030301ae04c1 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -244,6 +244,7 @@ func (tc *TestCluster) doAddServer(t testing.TB, serverArgs base.TestServerArgs) stkCopy = *stk.(*storage.StoreTestingKnobs) } stkCopy.DisableSplitQueue = true + stkCopy.DisableMergeQueue = true stkCopy.DisableReplicateQueue = true serverArgs.Knobs.Store = &stkCopy } @@ -576,6 +577,11 @@ func (tc *TestCluster) WaitForNodeStatuses(t testing.TB) { }) } +// ReplicationMode implements TestClusterInterface. +func (tc *TestCluster) ReplicationMode() base.TestClusterReplicationMode { + return tc.replicationMode +} + type testClusterFactoryImpl struct{} // TestClusterFactory can be passed to serverutils.InitTestClusterFactory From 86a54d734cb2bf3eb0da8c51a44e2b7d8d7451fa Mon Sep 17 00:00:00 2001 From: Pete Vilter Date: Tue, 4 Sep 2018 15:37:31 -0400 Subject: [PATCH 7/9] ui: add attributes to login form so LastPass will autofill it LastPass wasn't confident enough to autofill and autologin without these attributes. Release note (admin ui change): Add attributes to the login form to allow LastPass to properly recognize it. --- pkg/ui/src/views/login/loginPage.tsx | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/ui/src/views/login/loginPage.tsx b/pkg/ui/src/views/login/loginPage.tsx index 7e32c4ff9717..fa5ca5dc7dcf 100644 --- a/pkg/ui/src/views/login/loginPage.tsx +++ b/pkg/ui/src/views/login/loginPage.tsx @@ -129,9 +129,10 @@ class LoginPage extends React.Component

Log in to the Web UI

{this.renderError()} -
+ Date: Thu, 30 Aug 2018 11:30:38 -0700 Subject: [PATCH 8/9] changefeedccl: introduce table history invariant checker tableHistory tracks that a some invariants hold over a set of tables as time advances. Internally, two timestamps are tracked. The high-water is the highest timestamp such that every version of a TableDescriptor has met a provided invariant (via `validateFn`). An error timestamp is also kept, which is the lowest timestamp where at least one table doesn't meet the invariant. The `ValidateThroughTS` method allows a user to block until some given timestamp is greater (or equal) to either the high-water or the error timestamp. In the latter case, it returns the error. Release note: None --- pkg/ccl/changefeedccl/table_history.go | 300 ++++++++++++++++++++ pkg/ccl/changefeedccl/table_history_test.go | 144 ++++++++++ 2 files changed, 444 insertions(+) create mode 100644 pkg/ccl/changefeedccl/table_history.go create mode 100644 pkg/ccl/changefeedccl/table_history_test.go diff --git a/pkg/ccl/changefeedccl/table_history.go b/pkg/ccl/changefeedccl/table_history.go new file mode 100644 index 000000000000..69dbc734c20e --- /dev/null +++ b/pkg/ccl/changefeedccl/table_history.go @@ -0,0 +1,300 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "sort" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + + "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl" + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/pkg/errors" +) + +type tableHistoryWaiter struct { + ts hlc.Timestamp + errCh chan error +} + +// tableHistory tracks that a some invariants hold over a set of tables as time +// advances. +// +// Internally, two timestamps are tracked. The high-water is the highest +// timestamp such that every version of a TableDescriptor has met a provided +// invariant (via `validateFn`). An error timestamp is also kept, which is the +// lowest timestamp where at least one table doesn't meet the invariant. +// +// The `WaitForTS` method allows a user to block until some given timestamp is +// less (or equal) to either the high-water or the error timestamp. In the +// latter case, it returns the error. +type tableHistory struct { + validateFn func(*sqlbase.TableDescriptor) error + + mu struct { + syncutil.Mutex + + // the highest known valid timestamp + highWater hlc.Timestamp + + // the lowest known invalid timestamp + errTS hlc.Timestamp + + // the error associated with errTS + err error + + // callers waiting on a timestamp to be resolved as valid or invalid + waiters []tableHistoryWaiter + } +} + +// makeTableHistory creates tableHistory with the given initial high-water and +// invariant check function. It is expected that `validateFn` is deterministic. +func makeTableHistory( + validateFn func(*sqlbase.TableDescriptor) error, initialHighWater hlc.Timestamp, +) *tableHistory { + m := &tableHistory{validateFn: validateFn} + m.mu.highWater = initialHighWater + return m +} + +// HighWater returns the current high-water timestamp. +func (m *tableHistory) HighWater() hlc.Timestamp { + m.mu.Lock() + highWater := m.mu.highWater + m.mu.Unlock() + return highWater +} + +// WaitForTS blocks until the given timestamp is less than or equal to the +// high-water or error timestamp. In the latter case, the error is returned. +// +// If called twice with the same timestamp, two different errors may be returned +// (since the error timestamp can recede). However, the return for a given +// timestamp will never switch from nil to an error or vice-versa (assuming that +// `validateFn` is deterministic and the ingested descriptors are read +// transactionally). +func (m *tableHistory) WaitForTS(ctx context.Context, ts hlc.Timestamp) error { + var errCh chan error + + m.mu.Lock() + highWater := m.mu.highWater + var err error + if m.mu.errTS != (hlc.Timestamp{}) && !ts.Less(m.mu.errTS) { + err = m.mu.err + } + fastPath := err != nil || !highWater.Less(ts) + if !fastPath { + errCh = make(chan error, 1) + m.mu.waiters = append(m.mu.waiters, tableHistoryWaiter{ts: ts, errCh: errCh}) + } + m.mu.Unlock() + if fastPath { + if log.V(1) { + log.Infof(ctx, "fastpath for %s: %v", ts, err) + } + return err + } + + if log.V(1) { + log.Infof(ctx, "waiting for %s highwater", ts) + } + start := timeutil.Now() + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errCh: + if log.V(1) { + log.Infof(ctx, "waited %s for %s highwater: %v", timeutil.Since(start), ts, err) + } + return err + } +} + +// IngestDescriptors checks the given descriptors against the invariant check +// function and adjusts the high-water or error timestamp appropriately. It is +// required that the descriptors represent a transactional kv read between the +// two given timestamps. +func (m *tableHistory) IngestDescriptors( + startTS, endTS hlc.Timestamp, descs []*sqlbase.TableDescriptor, +) error { + sort.Slice(descs, func(i, j int) bool { + return descs[i].ModificationTime.Less(descs[j].ModificationTime) + }) + var validateErr error + for _, desc := range descs { + if err := m.validateFn(desc); validateErr == nil { + validateErr = err + } + } + return m.adjustTimestamps(startTS, endTS, validateErr) +} + +// adjustTimestamps adjusts the high-water or error timestamp appropriately. +func (m *tableHistory) adjustTimestamps(startTS, endTS hlc.Timestamp, validateErr error) error { + m.mu.Lock() + defer m.mu.Unlock() + + if validateErr != nil { + // don't care about startTS in the invalid case + if m.mu.errTS == (hlc.Timestamp{}) || endTS.Less(m.mu.errTS) { + m.mu.errTS = endTS + m.mu.err = validateErr + newWaiters := make([]tableHistoryWaiter, 0, len(m.mu.waiters)) + for _, w := range m.mu.waiters { + if w.ts.Less(m.mu.errTS) { + newWaiters = append(newWaiters, w) + continue + } + w.errCh <- validateErr + } + m.mu.waiters = newWaiters + } + return validateErr + } + + if m.mu.highWater.Less(startTS) { + return errors.Errorf(`gap between %s and %s`, m.mu.highWater, startTS) + } + if m.mu.highWater.Less(endTS) { + m.mu.highWater = endTS + newWaiters := make([]tableHistoryWaiter, 0, len(m.mu.waiters)) + for _, w := range m.mu.waiters { + if m.mu.highWater.Less(w.ts) { + newWaiters = append(newWaiters, w) + continue + } + w.errCh <- nil + } + m.mu.waiters = newWaiters + } + return nil +} + +type tableHistoryUpdater struct { + settings *cluster.Settings + db *client.DB + targets jobspb.ChangefeedTargets + m *tableHistory +} + +func (u *tableHistoryUpdater) PollTableDescs(ctx context.Context) error { + // TODO(dan): Replace this with a RangeFeed once it stabilizes. + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(changefeedPollInterval.Get(&u.settings.SV)): + } + + startTS, endTS := u.m.HighWater(), u.db.Clock().Now() + if !startTS.Less(endTS) { + continue + } + descs, err := fetchTableDescriptorVersions(ctx, u.db, startTS, endTS, u.targets) + if err != nil { + return err + } + if err := u.m.IngestDescriptors(startTS, endTS, descs); err != nil { + return err + } + } +} + +func fetchTableDescriptorVersions( + ctx context.Context, + db *client.DB, + startTS, endTS hlc.Timestamp, + targets jobspb.ChangefeedTargets, +) ([]*sqlbase.TableDescriptor, error) { + if log.V(2) { + log.Infof(ctx, `fetching table descs [%s,%s)`, startTS, endTS) + } + start := timeutil.Now() + span := roachpb.Span{Key: keys.MakeTablePrefix(keys.DescriptorTableID)} + span.EndKey = span.Key.PrefixEnd() + header := roachpb.Header{Timestamp: endTS} + req := &roachpb.ExportRequest{ + RequestHeader: roachpb.RequestHeaderFromSpan(span), + StartTime: startTS, + MVCCFilter: roachpb.MVCCFilter_All, + ReturnSST: true, + } + res, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req) + if log.V(2) { + log.Infof(ctx, `fetched table descs [%s,%s) took %s`, startTS, endTS, timeutil.Since(start)) + } + if pErr != nil { + return nil, errors.Wrapf( + pErr.GoError(), `fetching changes for [%s,%s)`, span.Key, span.EndKey) + } + + var tableDescs []*sqlbase.TableDescriptor + for _, file := range res.(*roachpb.ExportResponse).Files { + if err := func() error { + it, err := engineccl.NewMemSSTIterator(file.SST, false /* verify */) + if err != nil { + return err + } + defer it.Close() + for it.Seek(engine.NilKey); ; it.Next() { + if ok, err := it.Valid(); err != nil { + return err + } else if !ok { + return nil + } + remaining, _, _, err := sqlbase.DecodeTableIDIndexID(it.UnsafeKey().Key) + if err != nil { + return err + } + _, tableID, err := encoding.DecodeUvarintAscending(remaining) + if err != nil { + return err + } + // WIP: I think targets currently doesn't contain interleaved + // parents if they are not watched by the changefeed, but this + // seems wrong. + origName, ok := targets[sqlbase.ID(tableID)] + if !ok { + // Uninteresting table. + continue + } + unsafeValue := it.UnsafeValue() + if unsafeValue == nil { + return errors.Errorf(`"%s" was dropped or truncated`, origName) + } + value := roachpb.Value{RawBytes: unsafeValue} + var desc sqlbase.Descriptor + if err := value.GetProto(&desc); err != nil { + return err + } + if tableDesc := desc.GetTable(); tableDesc != nil { + // WIP + log.Infof(ctx, "%s %d %s", desc.GetName(), tableDesc.Version, it.UnsafeKey().Timestamp) + tableDescs = append(tableDescs, tableDesc) + } + } + }(); err != nil { + return nil, err + } + } + return tableDescs, nil +} diff --git a/pkg/ccl/changefeedccl/table_history_test.go b/pkg/ccl/changefeedccl/table_history_test.go new file mode 100644 index 000000000000..d6b769c40966 --- /dev/null +++ b/pkg/ccl/changefeedccl/table_history_test.go @@ -0,0 +1,144 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestTableHistory(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + ts := func(wt int64) hlc.Timestamp { return hlc.Timestamp{WallTime: wt} } + + validateFn := func(desc *sqlbase.TableDescriptor) error { + if desc.Name != `` { + return errors.New(desc.Name) + } + return nil + } + requireChannelEmpty := func(t *testing.T, ch chan error) { + t.Helper() + select { + case err := <-ch: + t.Fatalf(`expected empty channel got %v`, err) + default: + } + } + + m := makeTableHistory(validateFn, ts(0)) + + require.Equal(t, ts(0), m.HighWater()) + + // advance + require.NoError(t, m.IngestDescriptors(ts(0), ts(1), nil)) + require.Equal(t, ts(1), m.HighWater()) + require.NoError(t, m.IngestDescriptors(ts(1), ts(2), nil)) + require.Equal(t, ts(2), m.HighWater()) + + // no-ops + require.NoError(t, m.IngestDescriptors(ts(0), ts(1), nil)) + require.Equal(t, ts(2), m.HighWater()) + require.NoError(t, m.IngestDescriptors(ts(1), ts(2), nil)) + require.Equal(t, ts(2), m.HighWater()) + + // overlap + require.NoError(t, m.IngestDescriptors(ts(1), ts(3), nil)) + require.Equal(t, ts(3), m.HighWater()) + + // gap + require.EqualError(t, m.IngestDescriptors(ts(4), ts(5), nil), + `gap between 0.000000003,0 and 0.000000004,0`) + require.Equal(t, ts(3), m.HighWater()) + + // validates + require.NoError(t, m.IngestDescriptors(ts(3), ts(4), []*sqlbase.TableDescriptor{ + {ID: 0}, + })) + require.Equal(t, ts(4), m.HighWater()) + + // high-water already high enough. fast-path + require.NoError(t, m.WaitForTS(ctx, ts(3))) + require.NoError(t, m.WaitForTS(ctx, ts(4))) + + // high-water not there yet. blocks + errCh6 := make(chan error, 1) + errCh7 := make(chan error, 1) + go func() { errCh7 <- m.WaitForTS(ctx, ts(7)) }() + go func() { errCh6 <- m.WaitForTS(ctx, ts(6)) }() + requireChannelEmpty(t, errCh6) + requireChannelEmpty(t, errCh7) + + // high-water advances, but not enough + require.NoError(t, m.IngestDescriptors(ts(4), ts(5), nil)) + requireChannelEmpty(t, errCh6) + requireChannelEmpty(t, errCh7) + + // high-water advances, unblocks only errCh6 + require.NoError(t, m.IngestDescriptors(ts(5), ts(6), nil)) + require.NoError(t, <-errCh6) + requireChannelEmpty(t, errCh7) + + // high-water advances again, unblocks errCh7 + require.NoError(t, m.IngestDescriptors(ts(6), ts(7), nil)) + require.NoError(t, <-errCh7) + + // validate ctx cancellation + errCh8 := make(chan error, 1) + ctxTS8, cancelTS8 := context.WithCancel(ctx) + go func() { errCh8 <- m.WaitForTS(ctxTS8, ts(8)) }() + requireChannelEmpty(t, errCh8) + cancelTS8() + require.EqualError(t, <-errCh8, `context canceled`) + + // does not validate, high-water does not change + require.EqualError(t, m.IngestDescriptors(ts(7), ts(10), []*sqlbase.TableDescriptor{ + {ID: 0, Name: `whoops!`}, + }), `whoops!`) + require.Equal(t, ts(7), m.HighWater()) + + // ts 10 has errored, so validate can return its error without blocking + require.EqualError(t, m.WaitForTS(ctx, ts(10)), `whoops!`) + + // ts 8 and 9 are still unknown + errCh8 = make(chan error, 1) + errCh9 := make(chan error, 1) + go func() { errCh8 <- m.WaitForTS(ctx, ts(8)) }() + go func() { errCh9 <- m.WaitForTS(ctx, ts(9)) }() + requireChannelEmpty(t, errCh8) + requireChannelEmpty(t, errCh9) + + // turns out ts 10 is not a tight bound. ts 9 also has an error + require.EqualError(t, m.IngestDescriptors(ts(7), ts(9), []*sqlbase.TableDescriptor{ + {ID: 0, Name: `oh no!`}, + }), `oh no!`) + require.Equal(t, ts(7), m.HighWater()) + require.EqualError(t, <-errCh9, `oh no!`) + + // ts 8 is still unknown + requireChannelEmpty(t, errCh8) + + // always return the earlist error seen (so waiting for ts 10 immediately + // returns the 9 error now, it returned the ts 10 error above) + require.EqualError(t, m.WaitForTS(ctx, ts(9)), `oh no!`) + + // something earlier than ts 10 can still be okay + require.NoError(t, m.IngestDescriptors(ts(7), ts(8), nil)) + require.Equal(t, ts(8), m.HighWater()) + require.NoError(t, <-errCh8) +} From fd1c2352fa4a8c639b5d98ef743cb417b1f3eb53 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Thu, 30 Aug 2018 11:32:39 -0700 Subject: [PATCH 9/9] changefeedccl: error when a watched table backfills When a table is currently being backfilled for a schema change (e.g. adding a column with a default value), it's unclear what the expectation is for any rows that are changed during the backfill. Our current invariant is that rows are emitted with an updated timestamp and a later SELECT ... AS OF SYSTEM TIME for that row would exactly match the emitted data. During the backfill, there is nothing we can emit that would definitely meet that invariant (because the backfill can be aborted and rolled back). In the meantime, this commit makes sure that we error whenever a backfill happens, even if it's fast enough that we never get it from leasing. This also paves the way for switching to RangeFeed, which doesn't have the convenient `fetchSpansForTargets` hook that the ExportRequest based poller was (ab)using. Closes #28643 Release note (bug fix): CHANGEFEEDs now error when a watched table backfills (instead of undefined behavior) --- pkg/ccl/changefeedccl/changefeed.go | 5 +- .../changefeedccl/changefeed_processors.go | 66 +++-- pkg/ccl/changefeedccl/changefeed_stmt.go | 27 +- pkg/ccl/changefeedccl/changefeed_test.go | 248 ++++++++++++++++-- pkg/ccl/changefeedccl/helpers_test.go | 10 +- pkg/ccl/changefeedccl/poller.go | 20 +- pkg/ccl/changefeedccl/rowfetcher_cache.go | 40 +-- pkg/sql/backfill.go | 8 +- pkg/sql/sqlbase/structured.go | 25 ++ 9 files changed, 361 insertions(+), 88 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 9b2f4470ce7c..841ab8216b2c 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -69,11 +69,12 @@ type emitEntry struct { // returns a closure that may be repeatedly called to advance the changefeed. // The returned closure is not threadsafe. func kvsToRows( - leaseManager *sql.LeaseManager, + leaseMgr *sql.LeaseManager, + tableHist *tableHistory, details jobspb.ChangefeedDetails, inputFn func(context.Context) (bufferEntry, error), ) func(context.Context) ([]emitEntry, error) { - rfCache := newRowFetcherCache(leaseManager) + rfCache := newRowFetcherCache(leaseMgr, tableHist) var kvs sqlbase.SpanKVFetcher appendEmitEntryForKV := func( diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index db9f10363b6b..9a347cbfb78e 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -34,11 +34,18 @@ type changeAggregator struct { // cancel shuts down the processor, both the `Next()` flow and the poller. cancel func() + // errCh contains the return values of poller and tableHistUpdater. + errCh chan error // poller runs in the background and puts kv changes and resolved spans into // a buffer, which is used by `Next()`. poller *poller - // pollerErrCh is written once with the poller error (or nil). - pollerErrCh chan error + // pollerDoneCh is closed when the poller exits. + pollerDoneCh chan struct{} + // tableHistUpdater runs in the background and continually advances the + // high-water of a tableHistory. + tableHistUpdater *tableHistoryUpdater + // tableHistUpdaterDoneCh is closed when the tableHistUpdater exits. + tableHistUpdaterDoneCh chan struct{} // sink is the Sink to write rows to. Resolved timestamps are never written // by changeAggregator. @@ -116,7 +123,25 @@ func newChangeAggregatorProcessor( ca.poller = makePoller( flowCtx.Settings, flowCtx.ClientDB, flowCtx.ClientDB.Clock(), flowCtx.Gossip, spans, spec.Feed, initialHighWater, buf) - rowsFn := kvsToRows(flowCtx.LeaseManager.(*sql.LeaseManager), spec.Feed, buf.Get) + + leaseMgr := flowCtx.LeaseManager.(*sql.LeaseManager) + tableHist := makeTableHistory(func(desc *sqlbase.TableDescriptor) error { + // NB: Each new `tableDesc.Version` is initially written with an mvcc + // timestamp equal to its `ModificationTime`. It might later update that + // `Version` with backfill progress, but we only validate a table + // descriptor through its `ModificationTime` before using it, so this + // validation function can't depend on anything that changes after a new + // `Version` of a table desc is written. + return validateChangefeedTable(spec.Feed.Targets, desc) + }, initialHighWater) + ca.tableHistUpdater = &tableHistoryUpdater{ + settings: flowCtx.Settings, + db: flowCtx.ClientDB, + targets: spec.Feed.Targets, + m: tableHist, + } + rowsFn := kvsToRows(leaseMgr, tableHist, spec.Feed, buf.Get) + ca.tickFn = emitEntries(spec.Feed, ca.sink, rowsFn) return ca, nil @@ -130,13 +155,25 @@ func (ca *changeAggregator) OutputTypes() []sqlbase.ColumnType { func (ca *changeAggregator) Start(ctx context.Context) context.Context { ctx, ca.cancel = context.WithCancel(ctx) - ca.pollerErrCh = make(chan error, 1) + // Give errCh enough buffer for both of these, but only the first one is + // ever used. + ca.errCh = make(chan error, 2) + ca.pollerDoneCh = make(chan struct{}) go func(ctx context.Context) { + defer close(ca.pollerDoneCh) err := ca.poller.Run(ctx) // Trying to call MoveToDraining here is racy (`MoveToDraining called in // state stateTrailingMeta`), so return the error via a channel. - ca.pollerErrCh <- err - close(ca.pollerErrCh) + ca.errCh <- err + ca.cancel() + }(ctx) + ca.tableHistUpdaterDoneCh = make(chan struct{}) + go func(ctx context.Context) { + defer close(ca.tableHistUpdaterDoneCh) + err := ca.tableHistUpdater.PollTableDescs(ctx) + // Trying to call MoveToDraining here is racy (`MoveToDraining called in + // state stateTrailingMeta`), so return the error via a channel. + ca.errCh <- err ca.cancel() }(ctx) @@ -145,11 +182,9 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { } func (ca *changeAggregator) close() { - // Wait for the poller to finish shutting down. If the poller errored first, - // then Next will have passed its error to MoveToDraining. Otherwise, the - // error will be related to the forced shutdown of the poller (probably - // context canceled) and we don't care what it is, so throw it away. - <-ca.pollerErrCh + // Wait for the poller and tableHistUpdater to finish shutting down. + <-ca.pollerDoneCh + <-ca.tableHistUpdaterDoneCh if err := ca.sink.Close(); err != nil { log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err) } @@ -170,12 +205,13 @@ func (ca *changeAggregator) Next() (sqlbase.EncDatumRow, *distsqlrun.ProducerMet if err := ca.tick(); err != nil { select { - // If the poller errored first, that's the interesting one, so - // overwrite `err`. - case err = <-ca.pollerErrCh: + // If the poller or tableHistUpdater errored first, that's the + // interesting one, so overwrite `err`. + case err = <-ca.errCh: default: } - // Shut down the poller if it wasn't already. + // Shut down the poller and tableHistUpdater if they weren't + // already. ca.cancel() ca.MoveToDraining(err) diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index aaeab223b834..4e2d3d0c61d7 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -166,12 +166,12 @@ func changefeedPlanHook( targets := make(jobspb.ChangefeedTargets, len(targetDescs)) for _, desc := range targetDescs { if tableDesc := desc.GetTable(); tableDesc != nil { - if err := validateChangefeedTable(tableDesc); err != nil { - return err - } targets[tableDesc.ID] = jobspb.ChangefeedTarget{ StatementTimeName: tableDesc.Name, } + if err := validateChangefeedTable(targets, tableDesc); err != nil { + return err + } } } @@ -275,7 +275,14 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails return details, nil } -func validateChangefeedTable(tableDesc *sqlbase.TableDescriptor) error { +func validateChangefeedTable( + targets jobspb.ChangefeedTargets, tableDesc *sqlbase.TableDescriptor, +) error { + t, ok := targets[tableDesc.ID] + if !ok { + return errors.Errorf(`unwatched table: %s`, tableDesc.Name) + } + // Technically, the only non-user table known not to work is system.jobs // (which creates a cycle since the resolved timestamp high-water mark is // saved in it), but there are subtle differences in the way many of them @@ -298,6 +305,18 @@ func validateChangefeedTable(tableDesc *sqlbase.TableDescriptor) error { `CHANGEFEEDs are currently supported on tables with exactly 1 column family: %s has %d`, tableDesc.Name, len(tableDesc.Families)) } + + if tableDesc.State == sqlbase.TableDescriptor_DROP { + return errors.Errorf(`"%s" was dropped or truncated`, t.StatementTimeName) + } + if tableDesc.Name != t.StatementTimeName { + return errors.Errorf(`"%s" was renamed to "%s"`, t.StatementTimeName, tableDesc.Name) + } + + if tableDesc.HasColumnBackfillMutation() { + return errors.Errorf(`CHANGEFEEDs cannot operate on tables being backfilled`) + } + return nil } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index f9bb52916c2e..170a007d56ac 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -218,37 +218,220 @@ func TestChangefeedSchemaChange(t *testing.T) { defer s.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '0ns'`) - sqlDB.Exec(t, `CREATE DATABASE d`) - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING DEFAULT 'before')`) - - var start string - sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&start) - sqlDB.Exec(t, `INSERT INTO foo (a, b) VALUES (0, '0')`) - sqlDB.Exec(t, `INSERT INTO foo (a) VALUES (1)`) - sqlDB.Exec(t, `ALTER TABLE foo ALTER COLUMN b SET DEFAULT 'after'`) - sqlDB.Exec(t, `INSERT INTO foo (a) VALUES (2)`) - sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN c INT`) - sqlDB.Exec(t, `INSERT INTO foo (a) VALUES (3)`) - sqlDB.Exec(t, `INSERT INTO foo (a, c) VALUES (4, 14)`) - rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, start) - defer closeFeedRowsHack(t, sqlDB, rows) - assertPayloads(t, rows, []string{ - `foo: [0]->{"a": 0, "b": "0"}`, - `foo: [1]->{"a": 1, "b": "before"}`, - `foo: [2]->{"a": 2, "b": "after"}`, - `foo: [3]->{"a": 3, "b": "after", "c": null}`, - `foo: [4]->{"a": 4, "b": "after", "c": 14}`, + + t.Run(`historical`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE historical (a INT PRIMARY KEY, b STRING DEFAULT 'before')`) + var start string + sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&start) + sqlDB.Exec(t, `INSERT INTO historical (a, b) VALUES (0, '0')`) + sqlDB.Exec(t, `INSERT INTO historical (a) VALUES (1)`) + sqlDB.Exec(t, `ALTER TABLE historical ALTER COLUMN b SET DEFAULT 'after'`) + sqlDB.Exec(t, `INSERT INTO historical (a) VALUES (2)`) + sqlDB.Exec(t, `ALTER TABLE historical ADD COLUMN c INT`) + sqlDB.Exec(t, `INSERT INTO historical (a) VALUES (3)`) + sqlDB.Exec(t, `INSERT INTO historical (a, c) VALUES (4, 14)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR historical WITH cursor=$1`, start) + defer closeFeedRowsHack(t, sqlDB, rows) + assertPayloads(t, rows, []string{ + `historical: [0]->{"a": 0, "b": "0"}`, + `historical: [1]->{"a": 1, "b": "before"}`, + `historical: [2]->{"a": 2, "b": "after"}`, + `historical: [3]->{"a": 3, "b": "after", "c": null}`, + `historical: [4]->{"a": 4, "b": "after", "c": 14}`, + }) }) - sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN d INT`) - sqlDB.Exec(t, `INSERT INTO foo (a, d) VALUES (5, 15)`) - assertPayloads(t, rows, []string{ - `foo: [5]->{"a": 5, "b": "after", "c": null, "d": 15}`, + t.Run(`add column`, func(t *testing.T) { + // NB: the default is a nullable column + sqlDB.Exec(t, `CREATE TABLE add_column (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO add_column VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_column`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_column ADD COLUMN b STRING`) + sqlDB.Exec(t, `INSERT INTO add_column VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `add_column: [1]->{"a": 1}`, + `add_column: [2]->{"a": 2, "b": "2"}`, + }) + }) + + t.Run(`add column not null`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_column_notnull (a INT PRIMARY KEY)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_column_notnull WITH resolved`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_column_notnull ADD COLUMN b STRING NOT NULL`) + sqlDB.Exec(t, `INSERT INTO add_column_notnull VALUES (2, '2')`) + skipResolvedTimestamps(t, rows) + if err := rows.Err(); !testutils.IsError(err, `cannot operate on tables being backfilled`) { + t.Fatalf(`expected "cannot operate on tables being backfilled" error got: %+v`, err) + } + }) + + t.Run(`add column with default`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_column_def (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO add_column_def VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_column_def`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'`) + sqlDB.Exec(t, `INSERT INTO add_column_def VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `add_column_def: [1]->{"a": 1}`, + }) + if rows.Next() { + t.Fatal(`unexpected row`) + } + if err := rows.Err(); !testutils.IsError(err, `cannot operate on tables being backfilled`) { + t.Fatalf(`expected "cannot operate on tables being backfilled" error got: %+v`, err) + } + }) + + t.Run(`add column computed`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_column_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`) + sqlDB.Exec(t, `INSERT INTO add_column_comp VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_column_comp`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_column_comp ADD COLUMN c INT AS (a + 10) STORED`) + sqlDB.Exec(t, `INSERT INTO add_column_comp (a) VALUES (2)`) + assertPayloads(t, rows, []string{ + `add_column_comp: [1]->{"a": 1, "b": 6}`, + }) + if rows.Next() { + t.Fatal(`unexpected row`) + } + if err := rows.Err(); !testutils.IsError(err, `cannot operate on tables being backfilled`) { + t.Fatalf(`expected "cannot operate on tables being backfilled" error got: %+v`, err) + } + }) + + t.Run(`rename column`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE rename_column (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO rename_column VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR rename_column`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE rename_column RENAME COLUMN b TO c`) + sqlDB.Exec(t, `INSERT INTO rename_column VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `rename_column: [1]->{"a": 1, "b": "1"}`, + `rename_column: [2]->{"a": 2, "c": "2"}`, + }) + }) + + t.Run(`drop column`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR drop_column`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE drop_column DROP COLUMN b`) + sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2)`) + assertPayloads(t, rows, []string{ + `drop_column: [1]->{"a": 1, "b": "1"}`, + }) + if rows.Next() { + t.Fatal(`unexpected row`) + } + if err := rows.Err(); !testutils.IsError(err, `cannot operate on tables being backfilled`) { + t.Fatalf(`expected "cannot operate on tables being backfilled" error got: %+v`, err) + } + }) + + t.Run(`add default`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_default (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO add_default (a, b) VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_default`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_default ALTER COLUMN b SET DEFAULT 'd'`) + sqlDB.Exec(t, `INSERT INTO add_default (a) VALUES (2)`) + assertPayloads(t, rows, []string{ + `add_default: [1]->{"a": 1, "b": "1"}`, + `add_default: [2]->{"a": 2, "b": "d"}`, + }) + }) + + t.Run(`alter default`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE alter_default (a INT PRIMARY KEY, b STRING DEFAULT 'before')`) + sqlDB.Exec(t, `INSERT INTO alter_default (a) VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR alter_default`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE alter_default ALTER COLUMN b SET DEFAULT 'after'`) + sqlDB.Exec(t, `INSERT INTO alter_default (a) VALUES (2)`) + assertPayloads(t, rows, []string{ + `alter_default: [1]->{"a": 1, "b": "before"}`, + `alter_default: [2]->{"a": 2, "b": "after"}`, + }) }) - // TODO(dan): Test a schema change that uses a backfill once we figure out - // the user facing semantics of that. + t.Run(`drop default`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE drop_default (a INT PRIMARY KEY, b STRING DEFAULT 'd')`) + sqlDB.Exec(t, `INSERT INTO drop_default (a) VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR drop_default`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE drop_default ALTER COLUMN b DROP DEFAULT`) + sqlDB.Exec(t, `INSERT INTO drop_default (a) VALUES (2)`) + assertPayloads(t, rows, []string{ + `drop_default: [1]->{"a": 1, "b": "d"}`, + `drop_default: [2]->{"a": 2, "b": null}`, + }) + }) + + t.Run(`drop not null`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE drop_notnull (a INT PRIMARY KEY, b STRING NOT NULL)`) + sqlDB.Exec(t, `INSERT INTO drop_notnull VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR drop_notnull`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE drop_notnull ALTER b DROP NOT NULL`) + sqlDB.Exec(t, `INSERT INTO drop_notnull VALUES (2, NULL)`) + assertPayloads(t, rows, []string{ + `drop_notnull: [1]->{"a": 1, "b": "1"}`, + `drop_notnull: [2]->{"a": 2, "b": null}`, + }) + }) + + t.Run(`checks`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE checks (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO checks VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR checks`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE checks ADD CONSTRAINT c CHECK (a < 5) NOT VALID`) + sqlDB.Exec(t, `INSERT INTO checks VALUES (2)`) + sqlDB.Exec(t, `ALTER TABLE checks VALIDATE CONSTRAINT c`) + sqlDB.Exec(t, `INSERT INTO checks VALUES (3)`) + sqlDB.Exec(t, `ALTER TABLE checks DROP CONSTRAINT c`) + sqlDB.Exec(t, `INSERT INTO checks VALUES (6)`) + assertPayloads(t, rows, []string{ + `checks: [1]->{"a": 1}`, + `checks: [2]->{"a": 2}`, + `checks: [3]->{"a": 3}`, + `checks: [6]->{"a": 6}`, + }) + }) + + t.Run(`add index`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_index (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO add_index VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_index`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `CREATE INDEX b_idx ON add_index (b)`) + sqlDB.Exec(t, `SELECT * FROM add_index@b_idx`) + sqlDB.Exec(t, `INSERT INTO add_index VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `add_index: [1]->{"a": 1, "b": "1"}`, + `add_index: [2]->{"a": 2, "b": "2"}`, + }) + }) + + t.Run(`unique`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE "unique" (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO "unique" VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR "unique"`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE "unique" ADD CONSTRAINT u UNIQUE (b)`) + sqlDB.Exec(t, `INSERT INTO "unique" VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `unique: [1]->{"a": 1, "b": "1"}`, + `unique: [2]->{"a": 2, "b": "2"}`, + }) + }) } func TestChangefeedInterleaved(t *testing.T) { @@ -689,6 +872,19 @@ func assertPayloads(t *testing.T, rows *gosql.Rows, expected []string) { } } +func skipResolvedTimestamps(t *testing.T, rows *gosql.Rows) { + for rows.Next() { + var table gosql.NullString + var key, value []byte + if err := rows.Scan(&table, &key, &value); err != nil { + t.Fatal(err) + } + if table.Valid { + t.Errorf(`unexpected row %s: %s->%s`, table.String, key, value) + } + } +} + func expectResolvedTimestampGreaterThan(t testing.TB, rows *gosql.Rows, ts string) { t.Helper() for { diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 7b9d74aec819..0dcb73553a9a 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -108,11 +108,19 @@ func createBenchmarkChangefeed( poller := makePoller( s.ClusterSettings(), s.DB(), feedClock, s.Gossip(), spans, details, initialHighWater, buf) - rowsFn := kvsToRows(s.LeaseManager().(*sql.LeaseManager), details, buf.Get) + th := makeTableHistory(func(*sqlbase.TableDescriptor) error { return nil }, initialHighWater) + thUpdater := &tableHistoryUpdater{ + settings: s.ClusterSettings(), + db: s.DB(), + targets: details.Targets, + m: th, + } + rowsFn := kvsToRows(s.LeaseManager().(*sql.LeaseManager), th, details, buf.Get) tickFn := emitEntries(details, sink, rowsFn) ctx, cancel := context.WithCancel(ctx) go func() { _ = poller.Run(ctx) }() + go func() { _ = thUpdater.PollTableDescs(ctx) }() errCh := make(chan error, 1) var wg sync.WaitGroup diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index bcfff30d9081..2bae6dbe5172 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -81,22 +81,9 @@ func fetchSpansForTargets( spans = nil txn.SetFixedTimestamp(ctx, ts) // Note that all targets are currently guaranteed to be tables. - for tableID, t := range targets { + for tableID := range targets { tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, tableID) if err != nil { - if errors.Cause(err) == sqlbase.ErrDescriptorNotFound { - return errors.Errorf(`"%s" was dropped or truncated`, t.StatementTimeName) - } - return err - } - if tableDesc.State == sqlbase.TableDescriptor_DROP { - return errors.Errorf(`"%s" was dropped or truncated`, t.StatementTimeName) - } - if tableDesc.Name != t.StatementTimeName { - return errors.Errorf( - `"%s" was renamed to "%s"`, t.StatementTimeName, tableDesc.Name) - } - if err := validateChangefeedTable(tableDesc); err != nil { return err } spans = append(spans, tableDesc.PrimaryIndexSpan()) @@ -137,11 +124,6 @@ func (p *poller) Run(ctx context.Context) error { log.VEventf(ctx, 1, `changefeed poll [%s,%s): %s`, p.highWater, nextHighWater, time.Duration(nextHighWater.WallTime-p.highWater.WallTime)) - _, err := fetchSpansForTargets(ctx, p.db, p.details.Targets, nextHighWater) - if err != nil { - return err - } - var ranges []roachpb.RangeDescriptor if err := p.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { var err error diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index 4dffd98511bc..f56b7f3ab61a 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -25,41 +25,42 @@ import ( // StartScanFrom can be used to turn that key (or all the keys making up the // column families of one row) into a row. type rowFetcherCache struct { - leaseMgr *sql.LeaseManager - fetchers map[*sqlbase.TableDescriptor]*sqlbase.RowFetcher + leaseMgr *sql.LeaseManager + tableHist *tableHistory + fetchers map[*sqlbase.TableDescriptor]*sqlbase.RowFetcher a sqlbase.DatumAlloc } -func newRowFetcherCache(leaseMgr *sql.LeaseManager) *rowFetcherCache { +func newRowFetcherCache(leaseMgr *sql.LeaseManager, tableHist *tableHistory) *rowFetcherCache { return &rowFetcherCache{ - leaseMgr: leaseMgr, - fetchers: make(map[*sqlbase.TableDescriptor]*sqlbase.RowFetcher), + leaseMgr: leaseMgr, + tableHist: tableHist, + fetchers: make(map[*sqlbase.TableDescriptor]*sqlbase.RowFetcher), } } func (c *rowFetcherCache) TableDescForKey( ctx context.Context, key roachpb.Key, ts hlc.Timestamp, ) (*sqlbase.TableDescriptor, error) { - var skippedCols int - for { + var tableDesc *sqlbase.TableDescriptor + for skippedCols := 0; ; { remaining, tableID, _, err := sqlbase.DecodeTableIDIndexID(key) if err != nil { return nil, err } - // TODO(dan): We don't really need a lease, this is just a convenient way to - // get the right descriptor for a timestamp, so release it immediately after - // we acquire it. Avoid the lease entirely. - tableDesc, _, err := c.leaseMgr.Acquire(ctx, ts, tableID) + // No caching of these are attempted, since the lease manager does its + // own caching. + tableDesc, _, err = c.leaseMgr.Acquire(ctx, ts, tableID) if err != nil { return nil, err } + // Immediately release the lease, since we only need it for the exact + // timestamp requested. if err := c.leaseMgr.Release(tableDesc); err != nil { return nil, err } - if err := validateChangefeedTable(tableDesc); err != nil { - return nil, err - } + // Skip over the column data. for ; skippedCols < len(tableDesc.PrimaryIndex.ColumnIDs); skippedCols++ { l, err := encoding.PeekLength(remaining) @@ -71,10 +72,19 @@ func (c *rowFetcherCache) TableDescForKey( var interleaved bool remaining, interleaved = encoding.DecodeIfInterleavedSentinel(remaining) if !interleaved { - return tableDesc, nil + break } key = remaining } + + // Leasing invariant: each new `tableDesc.Version` of a descriptor is + // initially written with an mvcc timestamp equal to its modification time. + // It might be updated later with backfill progress, but (critically) the + // `validateFn` we passed to `tableHist` doesn't care about this. + if err := c.tableHist.WaitForTS(ctx, tableDesc.ModificationTime); err != nil { + return nil, err + } + return tableDesc, nil } func (c *rowFetcherCache) RowFetcherForTableDesc( diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index fafc2844eefd..46ec3f02511d 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -144,7 +144,7 @@ func (sc *SchemaChanger) runBackfill( case sqlbase.DescriptorMutation_ADD: switch t := m.Descriptor_.(type) { case *sqlbase.DescriptorMutation_Column: - if columnNeedsBackfill(m.GetColumn()) { + if sqlbase.ColumnNeedsBackfill(m.GetColumn()) { needColumnBackfill = true } case *sqlbase.DescriptorMutation_Index: @@ -582,10 +582,6 @@ func (sc *SchemaChanger) truncateAndBackfillColumns( backfill.ColumnMutationFilter) } -func columnNeedsBackfill(desc *sqlbase.ColumnDescriptor) bool { - return desc.DefaultExpr != nil || !desc.Nullable || desc.IsComputed() -} - // runSchemaChangesInTxn runs all the schema changes immediately in a // transaction. This is called when a CREATE TABLE is followed by // schema changes in the same transaction. The CREATE TABLE is @@ -626,7 +622,7 @@ func runSchemaChangesInTxn( case sqlbase.DescriptorMutation_ADD: switch m.Descriptor_.(type) { case *sqlbase.DescriptorMutation_Column: - if doneColumnBackfill || !columnNeedsBackfill(m.GetColumn()) { + if doneColumnBackfill || !sqlbase.ColumnNeedsBackfill(m.GetColumn()) { break } if err := columnBackfillInTxn(ctx, txn, tc, evalCtx, tableDesc, traceKV); err != nil { diff --git a/pkg/sql/sqlbase/structured.go b/pkg/sql/sqlbase/structured.go index 5285335bcfaf..545db5304798 100644 --- a/pkg/sql/sqlbase/structured.go +++ b/pkg/sql/sqlbase/structured.go @@ -2075,6 +2075,31 @@ func (desc *TableDescriptor) FinalizeMutation() (MutationID, error) { return mutationID, nil } +// ColumnNeedsBackfill returns true if adding the given column requires a +// backfill (dropping a column always requires a backfill). +func ColumnNeedsBackfill(desc *ColumnDescriptor) bool { + return desc.DefaultExpr != nil || !desc.Nullable || desc.IsComputed() +} + +// HasColumnBackfillMutation returns whether the table has any queued column +// mutations that require a backfill. +func (desc *TableDescriptor) HasColumnBackfillMutation() bool { + for _, m := range desc.Mutations { + col := m.GetColumn() + if col == nil { + // Index backfills don't affect changefeeds. + continue + } + // It's unfortunate that there's no one method we can call to check if a + // mutation will be a backfill or not, but this logic was extracted from + // backfill.go. + if m.Direction == DescriptorMutation_DROP || ColumnNeedsBackfill(col) { + return true + } + } + return false +} + // Dropped returns true if the table is being dropped. func (desc *TableDescriptor) Dropped() bool { return desc.State == TableDescriptor_DROP