diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index 1911d38b9479..77f64883db34 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -241,7 +241,6 @@ CREATE TABLE data2.foo (a int); systemschema.TableStatisticsTable.GetName(), systemschema.UITable.GetName(), systemschema.UsersTable.GetName(), - systemschema.ZonesTable.GetName(), systemschema.ScheduledJobsTable.GetName(), } @@ -317,6 +316,22 @@ CREATE TABLE data2.foo (a int); } }) + t.Run("zone_configs", func(t *testing.T) { + // The restored zones should be a superset of the zones in the backed up + // cluster. + zoneIDsResult := sqlDB.QueryStr(t, `SELECT id FROM system.zones`) + var q strings.Builder + q.WriteString("SELECT * FROM system.zones WHERE id IN (") + for i, restoreZoneIDRow := range zoneIDsResult { + if i > 0 { + q.WriteString(", ") + } + q.WriteString(restoreZoneIDRow[0]) + } + q.WriteString(")") + sqlDBRestore.CheckQueryResults(t, q.String(), sqlDB.QueryStr(t, q.String())) + }) + t.Run("ensure that tables can be created at the excepted ID", func(t *testing.T) { var maxID, dbID, tableID int sqlDBRestore.QueryRow(t, "SELECT max(id) FROM system.namespace").Scan(&maxID) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 2d5c37a1dc08..e2a6f6c453ec 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -2539,9 +2539,12 @@ func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context) error { executor := r.execCfg.InternalExecutor // After restoring the system tables, drop the temporary database holding the // system tables. + gcTTLQuery := fmt.Sprintf("ALTER DATABASE %s CONFIGURE ZONE USING gc.ttlseconds=1", restoreTempSystemDB) + if _, err := executor.Exec(ctx, "altering-gc-ttl-temp-system" /* opName */, nil /* txn */, gcTTLQuery); err != nil { + log.Errorf(ctx, "failed to update the GC TTL of %q: %+v", restoreTempSystemDB, err) + } dropTableQuery := fmt.Sprintf("DROP DATABASE %s CASCADE", restoreTempSystemDB) - _, err := executor.Exec(ctx, "drop-temp-system-db" /* opName */, nil /* txn */, dropTableQuery) - if err != nil { + if _, err := executor.Exec(ctx, "drop-temp-system-db" /* opName */, nil /* txn */, dropTableQuery); err != nil { return errors.Wrap(err, "dropping temporary system db") } return nil diff --git a/pkg/cli/debug_merge_logs.go b/pkg/cli/debug_merge_logs.go index 52bb42bce647..38ef8ed1b62d 100644 --- a/pkg/cli/debug_merge_logs.go +++ b/pkg/cli/debug_merge_logs.go @@ -499,9 +499,8 @@ func (s *fileLogStream) open() bool { if s.err = seekToFirstAfterFrom(s.f, s.from, s.editMode, s.format); s.err != nil { return false } - var err error - if s.d, err = log.NewEntryDecoderWithFormat(bufio.NewReaderSize(s.f, readBufSize), s.editMode, s.format); err != nil { - panic(err) + if s.d, s.err = log.NewEntryDecoderWithFormat(bufio.NewReaderSize(s.f, readBufSize), s.editMode, s.format); s.err != nil { + return false } return true } diff --git a/pkg/cli/debug_merge_logs_test.go b/pkg/cli/debug_merge_logs_test.go index a26b68ac1b0d..ff90513d595f 100644 --- a/pkg/cli/debug_merge_logs_test.go +++ b/pkg/cli/debug_merge_logs_test.go @@ -12,6 +12,7 @@ package cli import ( "bytes" + "fmt" "io/ioutil" "path/filepath" "testing" @@ -184,14 +185,22 @@ func getCases(format string) []testCase { } } -func (c testCase) run(t *testing.T) { - outBuf := bytes.Buffer{} +func resetDebugMergeLogFlags(errorFn func(s string)) { debugMergeLogsCmd.Flags().Visit(func(f *pflag.Flag) { if err := f.Value.Set(f.DefValue); err != nil { - t.Fatalf("Failed to set flag to default: %v", err) + errorFn(fmt.Sprintf("Failed to set flag to default: %v", err)) } }) +} + +func (c testCase) run(t *testing.T) { + resetDebugMergeLogFlags(func(s string) { t.Fatal(s) }) + outBuf := bytes.Buffer{} debugMergeLogsCmd.SetOut(&outBuf) + // Ensure that the original writer is restored when the test + // completes. Otherwise, subsequent tests may not see their output. + defer debugMergeLogsCmd.SetOut(nil) + if err := debugMergeLogsCmd.ParseFlags(c.flags); err != nil { t.Fatalf("Failed to set flags: %v", err) } @@ -235,3 +244,16 @@ func TestCrdbV1DebugMergeLogs(t *testing.T) { t.Run(c.name, c.run) } } + +func Example_format_error() { + c := NewCLITest(TestCLIParams{NoServer: true}) + defer c.Cleanup() + + resetDebugMergeLogFlags(func(s string) { fmt.Fprintf(stderr, "ERROR: %v", s) }) + + c.RunWithArgs([]string{"debug", "merge-logs", "testdata/merge_logs_v1/missing_format/*"}) + + // Output: + // debug merge-logs testdata/merge_logs_v1/missing_format/* + // ERROR: decoding format: failed to extract log file format from the log +} diff --git a/pkg/cli/testdata/merge_logs_v1/missing_format/cockroach.test-0001.ubuntu.2018-11-30T22_06_47Z.004130.log b/pkg/cli/testdata/merge_logs_v1/missing_format/cockroach.test-0001.ubuntu.2018-11-30T22_06_47Z.004130.log new file mode 100644 index 000000000000..47ec4e95eca4 --- /dev/null +++ b/pkg/cli/testdata/merge_logs_v1/missing_format/cockroach.test-0001.ubuntu.2018-11-30T22_06_47Z.004130.log @@ -0,0 +1,3 @@ +I210801 21:05:59.364923 1 util/log/sync_buffer.go:70 [config] binary: CockroachDB CCL v20.1.17 (x86_64-apple-darwin14, built 2021/05/17 16:30:22, +I210801 21:05:59.364923 1 util/log/sync_buffer.go:70 [config] arguments: [./cockroach start] +I210801 21:05:59.364923 1 util/log/sync_buffer.go:70 line format: [IWEF]yymmdd hh:mm:ss.uuuuuu goid file:line msg utf8=✓ diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 8e17a018359e..e30278c48f30 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -39,7 +39,8 @@ type Limiters struct { // rangefeeds in the "catch-up" state across the store. The "catch-up" state // is a temporary state at the beginning of a rangefeed which is expensive // because it uses an engine iterator. - ConcurrentRangefeedIters limit.ConcurrentRequestLimiter + ConcurrentRangefeedIters limit.ConcurrentRequestLimiter + ConcurrentScanInterleavedIntents limit.ConcurrentRequestLimiter } // EvalContext is the interface through which command evaluation accesses the diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 64c30928cb0a..1c40e0d5b58c 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -139,6 +139,16 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting( settings.PositiveInt, ) +// concurrentscanInterleavedIntentsLimit is the number of concurrent +// ScanInterleavedIntents requests that will be run on a store. Used as part +// of pre-evaluation throttling. +var concurrentscanInterleavedIntentsLimit = settings.RegisterIntSetting( + "kv.migration.concurrent_scan_interleaved_intents", + "number of scan interleaved intents requests a store will handle concurrently before queueing", + 1, + settings.PositiveInt, +) + // Minimum time interval between system config updates which will lead to // enqueuing replicas. var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting( @@ -878,6 +888,11 @@ func NewStore( s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter( "exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)), ) + s.limiters.ConcurrentScanInterleavedIntents = limit.MakeConcurrentRequestLimiter( + "scanInterleavedIntentsLimiter", int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV))) + concurrentscanInterleavedIntentsLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { + s.limiters.ConcurrentScanInterleavedIntents.SetLimit(int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV))) + }) // The snapshot storage is usually empty at this point since it is cleared // after each snapshot application, except when the node crashed right before diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index b8f149d4e934..6e787d183075 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -315,6 +315,19 @@ func (s *Store) maybeThrottleBatch( } return res, nil + case *roachpb.ScanInterleavedIntentsRequest: + before := timeutil.Now() + res, err := s.limiters.ConcurrentScanInterleavedIntents.Begin(ctx) + if err != nil { + return nil, err + } + + waited := timeutil.Since(before) + if waited > time.Second { + log.Infof(ctx, "ScanInterleavedIntents request was delayed by %v", waited) + } + return res, nil + default: return nil, nil } diff --git a/pkg/migration/migrationcluster/cluster.go b/pkg/migration/migrationcluster/cluster.go index e4208d848020..5435fae9e4ce 100644 --- a/pkg/migration/migrationcluster/cluster.go +++ b/pkg/migration/migrationcluster/cluster.go @@ -94,6 +94,15 @@ func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error return nil } +// NumNodes is part of the migration.Cluster interface. +func (c *Cluster) NumNodes(ctx context.Context) (int, error) { + ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) + if err != nil { + return 0, err + } + return len(ns), nil +} + // ForEveryNode is part of the migration.Cluster interface. func (c *Cluster) ForEveryNode( ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, diff --git a/pkg/migration/migrationcluster/tenant_cluster.go b/pkg/migration/migrationcluster/tenant_cluster.go index 66ca0dd22d84..455ebea6b9c9 100644 --- a/pkg/migration/migrationcluster/tenant_cluster.go +++ b/pkg/migration/migrationcluster/tenant_cluster.go @@ -127,6 +127,11 @@ func NewTenantCluster(db *kv.DB) *TenantCluster { return &TenantCluster{db: db} } +// NumNodes is part of the migration.Cluster interface. +func (t *TenantCluster) NumNodes(ctx context.Context) (int, error) { + return 0, errors.AssertionFailedf("non-system tenants cannot iterate nodes") +} + // ForEveryNode is part of the migration.Cluster interface. func (t *TenantCluster) ForEveryNode( ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, diff --git a/pkg/migration/migrations/separated_intents.go b/pkg/migration/migrations/separated_intents.go index dcff1c912702..a1aa3f87e3a8 100644 --- a/pkg/migration/migrations/separated_intents.go +++ b/pkg/migration/migrations/separated_intents.go @@ -36,16 +36,14 @@ import ( "github.com/cockroachdb/errors" ) -// The number of concurrent migrateLockTableRequests requests to run. This -// is effectively a cluster-wide setting as the actual legwork of the migration -// happens when the destination replica(s) are sending replies back to the -// original node. +// The number of concurrent lock table migration related requests (eg. barrier, +// ScanInterleavedIntents) to run, as a multiplier of the number of nodes. If +// this cluster has 15 nodes, 15 * concurrentMigrateLockTableRequests requests +// will be executed at once. // -// TODO(bilal): Add logic to make this concurrency limit a per-leaseholder limit -// as opposed to a cluster-wide limit. That way, we could limit -// migrateLockTableRequests to 1 per leaseholder as opposed to 4 for the entire -// cluster, avoiding the case where all 4 ranges at a time could have the same node -// as their leaseholder. +// Note that some of the requests (eg. ScanInterleavedIntents) do throttling on +// the receiving end, to reduce the chances of a large number of requests +// being directed at a few nodes. const concurrentMigrateLockTableRequests = 4 // The maximum number of times to retry a migrateLockTableRequest before failing @@ -88,7 +86,7 @@ type migrateLockTablePool struct { db *kv.DB clock *hlc.Clock done chan bool - status [concurrentMigrateLockTableRequests]int64 + status []int64 finished uint64 mu struct { @@ -335,7 +333,7 @@ func (m *migrateLockTablePool) runStatusLogger(ctx context.Context) { select { case <-ticker.C: var ranges strings.Builder - for i := 0; i < concurrentMigrateLockTableRequests; i++ { + for i := 0; i < len(m.status); i++ { rangeID := atomic.LoadInt64(&m.status[i]) if rangeID == 0 { continue @@ -385,15 +383,18 @@ func runSeparatedIntentsMigration( db *kv.DB, ri rangeIterator, ir intentResolver, + numNodes int, ) error { + concurrentRequests := concurrentMigrateLockTableRequests * numNodes workerPool := migrateLockTablePool{ - requests: make(chan migrateLockTableRequest, concurrentMigrateLockTableRequests), + requests: make(chan migrateLockTableRequest, concurrentRequests), stopper: stopper, db: db, ir: ir, clock: clock, + status: make([]int64, concurrentRequests), } - migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentMigrateLockTableRequests) + migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentRequests) if err != nil { return err } @@ -417,7 +418,11 @@ func separatedIntentsMigration( }) ri := kvcoord.NewRangeIterator(deps.DistSender) - return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir) + numNodes, err := deps.Cluster.NumNodes(ctx) + if err != nil { + return err + } + return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir, numNodes) } func postSeparatedIntentsMigration( diff --git a/pkg/migration/migrations/separated_intents_test.go b/pkg/migration/migrations/separated_intents_test.go index 3930d7a3dea2..d5c60779af42 100644 --- a/pkg/migration/migrations/separated_intents_test.go +++ b/pkg/migration/migrations/separated_intents_test.go @@ -262,7 +262,7 @@ func TestRunSeparatedIntentsMigration(t *testing.T) { errorPerNCalls, err = strconv.Atoi(d.Input) require.NoError(t, err) case "run-migration": - err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir) + err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir, 1) if err == nil { return "ok" } diff --git a/pkg/migration/system_migration.go b/pkg/migration/system_migration.go index aa7347cc7fb9..085ec36fe4b2 100644 --- a/pkg/migration/system_migration.go +++ b/pkg/migration/system_migration.go @@ -27,6 +27,11 @@ import ( // Cluster abstracts a physical KV cluster and can be utilized by a long-running // migration. type Cluster interface { + // NumNodes returns the number of nodes in the cluster. This is merely a + // convenience method and is not meant to be used to infer cluster stability; + // for that, use UntilClusterStable. + NumNodes(ctx context.Context) (int, error) + // ForEveryNode is a short hand to execute the given closure (named by the // informational parameter op) against every node in the cluster at a given // point in time. Given it's possible for nodes to join or leave the cluster