diff --git a/pkg/cmd/roachtest/tests/admission_control_latency.go b/pkg/cmd/roachtest/tests/admission_control_latency.go index 7ab8f7ce39ff..9569920a9779 100644 --- a/pkg/cmd/roachtest/tests/admission_control_latency.go +++ b/pkg/cmd/roachtest/tests/admission_control_latency.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -901,6 +902,47 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster) t.L().Printf("validating stats after the perturbation") failures = append(failures, isAcceptableChange(t.L(), baselineStats, afterStats, v.acceptableChange)...) require.True(t, len(failures) == 0, strings.Join(failures, "\n")) + v.validateTokensReturned(ctx, t) +} + +// validateTokensReturned ensures that all RAC tokens are returned to the pool +// at the end of the test. +func (v variations) validateTokensReturned(ctx context.Context, t test.Test) { + t.L().Printf("validating all tokens returned") + for _, node := range v.stableNodes() { + // Wait for the tokens to be returned to the pool. Normally this will + // pass immediately however it is possible that there is still some + // recovery so loop a few times. + testutils.SucceedsWithin(t, func() error { + db := v.Conn(ctx, t.L(), node) + defer db.Close() + for _, sType := range []string{"send", "eval"} { + for _, tType := range []string{"elastic", "regular"} { + statPrefix := fmt.Sprintf("kvflowcontrol.tokens.%s.%s", sType, tType) + query := fmt.Sprintf(` + SELECT d.value::INT8 AS deducted, r.value::INT8 AS returned + FROM + crdb_internal.node_metrics d, + crdb_internal.node_metrics r + WHERE + d.name='%s.deducted' AND + r.name='%s.returned'`, + statPrefix, statPrefix) + rows, err := db.QueryContext(ctx, query) + require.NoError(t, err) + require.True(t, rows.Next()) + var deducted, returned int64 + if err := rows.Scan(&deducted, &returned); err != nil { + return err + } + if deducted != returned { + return errors.Newf("tokens not returned for %s: deducted %d returned %d", statPrefix, deducted, returned) + } + } + } + return nil + }, 5*time.Second) + } } // trackedStat is a collection of the relevant values from the histogram. The @@ -994,7 +1036,7 @@ func (v variations) waitForRebalanceToStop(ctx context.Context, t test.Test) { Multiplier: 1, } for r := retry.StartWithCtx(ctx, opts); r.Next(); { - if row := db.QueryRow(q); row != nil { + if row := db.QueryRowContext(ctx, q); row != nil { var secondsSinceLastEvent int if err := row.Scan(&secondsSinceLastEvent); err != nil && !errors.Is(err, gosql.ErrNoRows) { t.Fatal(err) @@ -1021,7 +1063,7 @@ func (v variations) waitForIOOverloadToEnd(ctx context.Context, t test.Test) { anyOverloaded := false for _, nodeId := range v.targetNodes() { db := v.Conn(ctx, t.L(), nodeId) - if row := db.QueryRow(q); row != nil { + if row := db.QueryRowContext(ctx, q); row != nil { var overload float64 if err := row.Scan(&overload); err != nil && !errors.Is(err, gosql.ErrNoRows) { db.Close() diff --git a/pkg/cmd/roachtest/tests/util.go b/pkg/cmd/roachtest/tests/util.go index e7bb108becf9..24597c266216 100644 --- a/pkg/cmd/roachtest/tests/util.go +++ b/pkg/cmd/roachtest/tests/util.go @@ -151,7 +151,7 @@ func profileTopStatements( // Enable continuous statement diagnostics rather than just the first one. sql := "SET CLUSTER SETTING sql.stmt_diagnostics.collect_continuously.enabled=true" - if _, err := db.Exec(sql); err != nil { + if _, err := db.ExecContext(ctx, sql); err != nil { return err } @@ -199,7 +199,7 @@ FROM ( dbName, minNumExpectedStmts, ) - if _, err := db.Exec(sql); err != nil { + if _, err := db.ExecContext(ctx, sql); err != nil { return err } return nil @@ -217,7 +217,7 @@ func downloadProfiles( query := "SELECT id, collected_at FROM system.statement_diagnostics" db := cluster.Conn(ctx, logger, 1) defer db.Close() - idRow, err := db.Query(query) + idRow, err := db.QueryContext(ctx, query) if err != nil { return err } @@ -236,7 +236,7 @@ func downloadProfiles( return err } url := urlPrefix + diagID - resp, err := client.Get(context.Background(), url) + resp, err := client.Get(ctx, url) if err != nil { return err } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index ed1e9ef4be18..bf00768862f1 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -394,6 +394,11 @@ type testingRCRange struct { snapshots []testingTrackerSnapshot raftLog raft.MemoryStorage + // mu is ordered after RaftMu. + // + // This is because we hold RaftMu when calling into the RangeController, + // which in turn may call back out to the testingRCRange for state + // information, as it mocks the dependencies of the RangeController. mu struct { syncutil.Mutex r testingRange @@ -506,26 +511,37 @@ func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPrior } func (r *testingRCRange) admit(ctx context.Context, storeID roachpb.StoreID, av AdmittedVector) { - r.mu.Lock() - defer r.mu.Unlock() - - for _, replDesc := range sortReplicasLocked(r) { - replica := r.mu.r.replicaSet[replDesc.ReplicaID] - if replica.desc.StoreID == storeID { - for _, v := range av.Admitted { - // Ensure that Match doesn't lag behind the highest index in the - // AdmittedVector. - replica.info.Match = max(replica.info.Match, v) + var replicaID roachpb.ReplicaID + var found bool + func() { + // We need to ensure that r.mu isn't held before (and while) holding + // RaftMu, in order to order the locks correctly (RaftMu before + // testingRCRange.mu). + r.mu.Lock() + defer r.mu.Unlock() + for _, replDesc := range sortReplicasLocked(r) { + replica := r.mu.r.replicaSet[replDesc.ReplicaID] + if replica.desc.StoreID == storeID { + for _, v := range av.Admitted { + // Ensure that Match doesn't lag behind the highest index in the + // AdmittedVector. + replica.info.Match = max(replica.info.Match, v) + } + replicaID = replica.desc.ReplicaID + r.mu.r.replicaSet[replicaID] = replica + found = true + break } - r.mu.r.replicaSet[replica.desc.ReplicaID] = replica - func() { - r.rc.opts.ReplicaMutexAsserter.RaftMu.Lock() - defer r.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock() - r.rc.AdmitRaftMuLocked(ctx, replica.desc.ReplicaID, av) - }() - return } + }() + + if !found { + panic("replica not found") } + + r.rc.opts.ReplicaMutexAsserter.RaftMu.Lock() + defer r.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock() + r.rc.AdmitRaftMuLocked(ctx, replicaID, av) } type testingRange struct { diff --git a/pkg/sql/logictest/testdata/logic_test/grant_on_all_tables_in_schema b/pkg/sql/logictest/testdata/logic_test/grant_on_all_tables_in_schema index d59f386ea42e..01fba7ee9cd5 100644 --- a/pkg/sql/logictest/testdata/logic_test/grant_on_all_tables_in_schema +++ b/pkg/sql/logictest/testdata/logic_test/grant_on_all_tables_in_schema @@ -132,3 +132,12 @@ database_name schema_name table_name grantee privilege_type is_grantable otherdb public tbl admin ALL true otherdb public tbl root ALL true otherdb public tbl testuser SELECT false + +statement ok +CREATE TABLE t131157 (c1 INT) + +statement ok +GRANT ALL ON t131157 TO testuser + +statement error t131157 is not a sequence +REVOKE CREATE ON SEQUENCE t131157 FROM testuser diff --git a/pkg/sql/resolver.go b/pkg/sql/resolver.go index de406e70529c..1af0e88da3b3 100644 --- a/pkg/sql/resolver.go +++ b/pkg/sql/resolver.go @@ -538,6 +538,8 @@ func (p *planner) getDescriptorsFromTargetListForPrivilegeChange( objectType: privilege.Sequence, }, ) + } else if targets.Tables.SequenceOnly { + return nil, pgerror.Newf(pgcode.WrongObjectType, "%s is not a sequence", tableDesc.GetName()) } else { descs = append( descs, diff --git a/pkg/workload/tpcc/tpcc.go b/pkg/workload/tpcc/tpcc.go index 10026f753251..2600d2b0ff0c 100644 --- a/pkg/workload/tpcc/tpcc.go +++ b/pkg/workload/tpcc/tpcc.go @@ -130,6 +130,8 @@ type tpcc struct { // context group for any background reset table operation to avoid goroutine leaks during long duration workloads resetTableGrp ctxgroup.Group resetTableCancelFn context.CancelFunc + + asOfSystemTime string } type waitSetter struct { @@ -260,7 +262,8 @@ var tpccMeta = workload.Meta{ `deprecated-fk-indexes`: {RuntimeOnly: true}, `query-trace-file`: {RuntimeOnly: true}, `fake-time`: {RuntimeOnly: true}, - "txn-preamble-file": {RuntimeOnly: true}, + `txn-preamble-file`: {RuntimeOnly: true}, + `aost`: {RuntimeOnly: true, CheckConsistencyOnly: true}, } g.flags.IntVar(&g.warehouses, `warehouses`, 1, `Number of warehouses for loading`) @@ -300,6 +303,10 @@ var tpccMeta = workload.Meta{ g.flags.StringVar(&g.queryTraceFile, `query-trace-file`, ``, `File to write the query traces to. Defaults to no output`) // Support executing a query file before each transaction. g.flags.StringVar(&g.txnPreambleFile, "txn-preamble-file", "", "queries that will be injected before each txn") + g.flags.StringVar(&g.asOfSystemTime, "aost", "", + "This is an optional parameter to specify AOST; used exclusively in conjunction with the TPC-C consistency "+ + "check. Example values are (\"'-1m'\", \"'-1h'\")") + RandomSeed.AddFlag(&g.flags) g.connFlags = workload.NewConnFlags(&g.flags) // Hardcode this since it doesn't seem like anyone will want to change @@ -604,7 +611,7 @@ func (w *tpcc) Hooks() workload.Hooks { } start := timeutil.Now() - err := check.Fn(db, "" /* asOfSystemTime */) + err := check.Fn(db, w.asOfSystemTime /* asOfSystemTime */) log.Infof(ctx, `check %s took %s`, check.Name, timeutil.Since(start)) if err != nil { return errors.Wrapf(err, `check failed: %s`, check.Name)