From 9e720b628b8acb906756dde19721ecf8b5df91ce Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Mon, 30 Aug 2021 16:06:18 -0400 Subject: [PATCH 1/3] kvserver: Run one ScanInterleavedIntents per store concurrently Currently, lock table migration related requests (Barrier, ScanInterleavedIntents, PushTxn, ResolveIntent), are only run with a concurrency of 4 regardless of the size of the cluster. This change increases that to 4 * numNodes, and adds a new mechanism to limit the ScanInterleavedIntents on the receiver side to 1 per store. This throttle is applied before latches are grabbed. Only ScanInterleavedIntents needs to be rate-limited as it's by far the heaviest component in the separated intents migrations. Release justification: Category 2, low-risk update to new functionality Release note: None. --- pkg/kv/kvserver/batcheval/eval_context.go | 3 +- pkg/kv/kvserver/store.go | 15 +++++++++ pkg/kv/kvserver/store_send.go | 13 ++++++++ pkg/migration/migrationcluster/cluster.go | 9 +++++ .../migrationcluster/tenant_cluster.go | 5 +++ pkg/migration/migrations/separated_intents.go | 33 +++++++++++-------- .../migrations/separated_intents_test.go | 2 +- pkg/migration/system_migration.go | 5 +++ 8 files changed, 69 insertions(+), 16 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 8e17a018359e..e30278c48f30 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -39,7 +39,8 @@ type Limiters struct { // rangefeeds in the "catch-up" state across the store. The "catch-up" state // is a temporary state at the beginning of a rangefeed which is expensive // because it uses an engine iterator. - ConcurrentRangefeedIters limit.ConcurrentRequestLimiter + ConcurrentRangefeedIters limit.ConcurrentRequestLimiter + ConcurrentScanInterleavedIntents limit.ConcurrentRequestLimiter } // EvalContext is the interface through which command evaluation accesses the diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 05928d5a4143..21707249512b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -138,6 +138,16 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting( settings.PositiveInt, ) +// concurrentscanInterleavedIntentsLimit is the number of concurrent +// ScanInterleavedIntents requests that will be run on a store. Used as part +// of pre-evaluation throttling. +var concurrentscanInterleavedIntentsLimit = settings.RegisterIntSetting( + "kv.migration.concurrent_scan_interleaved_intents", + "number of scan interleaved intents requests a store will handle concurrently before queueing", + 1, + settings.PositiveInt, +) + // Minimum time interval between system config updates which will lead to // enqueuing replicas. var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting( @@ -860,6 +870,11 @@ func NewStore( s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter( "exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)), ) + s.limiters.ConcurrentScanInterleavedIntents = limit.MakeConcurrentRequestLimiter( + "scanInterleavedIntentsLimiter", int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV))) + concurrentscanInterleavedIntentsLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { + s.limiters.ConcurrentScanInterleavedIntents.SetLimit(int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV))) + }) // The snapshot storage is usually empty at this point since it is cleared // after each snapshot application, except when the node crashed right before diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index b8f149d4e934..6e787d183075 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -315,6 +315,19 @@ func (s *Store) maybeThrottleBatch( } return res, nil + case *roachpb.ScanInterleavedIntentsRequest: + before := timeutil.Now() + res, err := s.limiters.ConcurrentScanInterleavedIntents.Begin(ctx) + if err != nil { + return nil, err + } + + waited := timeutil.Since(before) + if waited > time.Second { + log.Infof(ctx, "ScanInterleavedIntents request was delayed by %v", waited) + } + return res, nil + default: return nil, nil } diff --git a/pkg/migration/migrationcluster/cluster.go b/pkg/migration/migrationcluster/cluster.go index e4208d848020..5435fae9e4ce 100644 --- a/pkg/migration/migrationcluster/cluster.go +++ b/pkg/migration/migrationcluster/cluster.go @@ -94,6 +94,15 @@ func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error return nil } +// NumNodes is part of the migration.Cluster interface. +func (c *Cluster) NumNodes(ctx context.Context) (int, error) { + ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) + if err != nil { + return 0, err + } + return len(ns), nil +} + // ForEveryNode is part of the migration.Cluster interface. func (c *Cluster) ForEveryNode( ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, diff --git a/pkg/migration/migrationcluster/tenant_cluster.go b/pkg/migration/migrationcluster/tenant_cluster.go index 66ca0dd22d84..455ebea6b9c9 100644 --- a/pkg/migration/migrationcluster/tenant_cluster.go +++ b/pkg/migration/migrationcluster/tenant_cluster.go @@ -127,6 +127,11 @@ func NewTenantCluster(db *kv.DB) *TenantCluster { return &TenantCluster{db: db} } +// NumNodes is part of the migration.Cluster interface. +func (t *TenantCluster) NumNodes(ctx context.Context) (int, error) { + return 0, errors.AssertionFailedf("non-system tenants cannot iterate nodes") +} + // ForEveryNode is part of the migration.Cluster interface. func (t *TenantCluster) ForEveryNode( ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, diff --git a/pkg/migration/migrations/separated_intents.go b/pkg/migration/migrations/separated_intents.go index e2d61d88df2e..cc7f8395c8f4 100644 --- a/pkg/migration/migrations/separated_intents.go +++ b/pkg/migration/migrations/separated_intents.go @@ -35,16 +35,14 @@ import ( "github.com/cockroachdb/errors" ) -// The number of concurrent migrateLockTableRequests requests to run. This -// is effectively a cluster-wide setting as the actual legwork of the migration -// happens when the destination replica(s) are sending replies back to the -// original node. +// The number of concurrent lock table migration related requests (eg. barrier, +// ScanInterleavedIntents) to run, as a multiplier of the number of nodes. If +// this cluster has 15 nodes, 15 * concurrentMigrateLockTableRequests requests +// will be executed at once. // -// TODO(bilal): Add logic to make this concurrency limit a per-leaseholder limit -// as opposed to a cluster-wide limit. That way, we could limit -// migrateLockTableRequests to 1 per leaseholder as opposed to 4 for the entire -// cluster, avoiding the case where all 4 ranges at a time could have the same node -// as their leaseholder. +// Note that some of the requests (eg. ScanInterleavedIntents) do throttling on +// the receiving end, to reduce the chances of a large number of requests +// being directed at a few nodes. const concurrentMigrateLockTableRequests = 4 // The maximum number of times to retry a migrateLockTableRequest before failing @@ -87,7 +85,7 @@ type migrateLockTablePool struct { db *kv.DB clock *hlc.Clock done chan bool - status [concurrentMigrateLockTableRequests]int64 + status []int64 finished uint64 mu struct { @@ -334,7 +332,7 @@ func (m *migrateLockTablePool) runStatusLogger(ctx context.Context) { select { case <-ticker.C: var ranges strings.Builder - for i := 0; i < concurrentMigrateLockTableRequests; i++ { + for i := 0; i < len(m.status); i++ { rangeID := atomic.LoadInt64(&m.status[i]) if rangeID == 0 { continue @@ -384,15 +382,18 @@ func runSeparatedIntentsMigration( db *kv.DB, ri rangeIterator, ir intentResolver, + numNodes int, ) error { + concurrentRequests := concurrentMigrateLockTableRequests * numNodes workerPool := migrateLockTablePool{ - requests: make(chan migrateLockTableRequest, concurrentMigrateLockTableRequests), + requests: make(chan migrateLockTableRequest, concurrentRequests), stopper: stopper, db: db, ir: ir, clock: clock, + status: make([]int64, concurrentRequests), } - migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentMigrateLockTableRequests) + migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentRequests) if err != nil { return err } @@ -416,7 +417,11 @@ func separatedIntentsMigration( }) ri := kvcoord.NewRangeIterator(deps.DistSender) - return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir) + numNodes, err := deps.Cluster.NumNodes(ctx) + if err != nil { + return err + } + return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir, numNodes) } func postSeparatedIntentsMigration( diff --git a/pkg/migration/migrations/separated_intents_test.go b/pkg/migration/migrations/separated_intents_test.go index 3930d7a3dea2..d5c60779af42 100644 --- a/pkg/migration/migrations/separated_intents_test.go +++ b/pkg/migration/migrations/separated_intents_test.go @@ -262,7 +262,7 @@ func TestRunSeparatedIntentsMigration(t *testing.T) { errorPerNCalls, err = strconv.Atoi(d.Input) require.NoError(t, err) case "run-migration": - err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir) + err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir, 1) if err == nil { return "ok" } diff --git a/pkg/migration/system_migration.go b/pkg/migration/system_migration.go index 4b0935c7352d..aa072c0a4018 100644 --- a/pkg/migration/system_migration.go +++ b/pkg/migration/system_migration.go @@ -26,6 +26,11 @@ import ( // Cluster abstracts a physical KV cluster and can be utilized by a long-running // migration. type Cluster interface { + // NumNodes returns the number of nodes in the cluster. This is merely a + // convenience method and is not meant to be used to infer cluster stability; + // for that, use UntilClusterStable. + NumNodes(ctx context.Context) (int, error) + // ForEveryNode is a short hand to execute the given closure (named by the // informational parameter op) against every node in the cluster at a given // point in time. Given it's possible for nodes to join or leave the cluster From 0f2de6385fd212445c04dd8fe65fbac3cfa7c4bc Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Thu, 22 Jul 2021 11:04:53 -0400 Subject: [PATCH 2/3] backupccl: set a low GC TTL on the temporary system tables Cluster restore creates a temporary system table to hold system table data before transactionally writing it to the real system tables. This table is only persisted during the lifetime of the restore and is dropped at the end of the restore. It should be GC'd quickly since it's of no use after the restore finishes. Release note (bug fix): Ensure that auxilary tables used during cluster restore are GC'd quickly afterwards. Release justification: bug fix and UX improvement. --- .../full_cluster_backup_restore_test.go | 17 ++++++++++++++++- pkg/ccl/backupccl/restore_job.go | 7 +++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index 89736c7a078e..149f8fc1154e 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -239,7 +239,6 @@ CREATE TABLE data2.foo (a int); systemschema.TableStatisticsTable.GetName(), systemschema.UITable.GetName(), systemschema.UsersTable.GetName(), - systemschema.ZonesTable.GetName(), systemschema.ScheduledJobsTable.GetName(), } @@ -315,6 +314,22 @@ CREATE TABLE data2.foo (a int); } }) + t.Run("zone_configs", func(t *testing.T) { + // The restored zones should be a superset of the zones in the backed up + // cluster. + zoneIDsResult := sqlDB.QueryStr(t, `SELECT id FROM system.zones`) + var q strings.Builder + q.WriteString("SELECT * FROM system.zones WHERE id IN (") + for i, restoreZoneIDRow := range zoneIDsResult { + if i > 0 { + q.WriteString(", ") + } + q.WriteString(restoreZoneIDRow[0]) + } + q.WriteString(")") + sqlDBRestore.CheckQueryResults(t, q.String(), sqlDB.QueryStr(t, q.String())) + }) + t.Run("ensure that tables can be created at the excepted ID", func(t *testing.T) { var maxID, dbID, tableID int sqlDBRestore.QueryRow(t, "SELECT max(id) FROM system.namespace").Scan(&maxID) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 20ebf102d78b..8ec40b75d4ab 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -2523,9 +2523,12 @@ func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context) error { executor := r.execCfg.InternalExecutor // After restoring the system tables, drop the temporary database holding the // system tables. + gcTTLQuery := fmt.Sprintf("ALTER DATABASE %s CONFIGURE ZONE USING gc.ttlseconds=1", restoreTempSystemDB) + if _, err := executor.Exec(ctx, "altering-gc-ttl-temp-system" /* opName */, nil /* txn */, gcTTLQuery); err != nil { + log.Errorf(ctx, "failed to update the GC TTL of %q: %+v", restoreTempSystemDB, err) + } dropTableQuery := fmt.Sprintf("DROP DATABASE %s CASCADE", restoreTempSystemDB) - _, err := executor.Exec(ctx, "drop-temp-system-db" /* opName */, nil /* txn */, dropTableQuery) - if err != nil { + if _, err := executor.Exec(ctx, "drop-temp-system-db" /* opName */, nil /* txn */, dropTableQuery); err != nil { return errors.Wrap(err, "dropping temporary system db") } return nil From a5caea6ffa7aed5f3e02be0a6ca6de95bb935b6f Mon Sep 17 00:00:00 2001 From: Cameron Nunez Date: Mon, 16 Aug 2021 17:44:01 -0400 Subject: [PATCH 3/3] replace a panic with an error in merge-logs Release justification: bug fix Release note (bug fix): Users would receive a panic message when the log parser fails to extract log file formats. This has been replaced with a helpful error message. --- pkg/cli/debug_merge_logs.go | 5 ++-- pkg/cli/debug_merge_logs_test.go | 28 +++++++++++++++++-- ...001.ubuntu.2018-11-30T22_06_47Z.004130.log | 3 ++ 3 files changed, 30 insertions(+), 6 deletions(-) create mode 100644 pkg/cli/testdata/merge_logs_v1/missing_format/cockroach.test-0001.ubuntu.2018-11-30T22_06_47Z.004130.log diff --git a/pkg/cli/debug_merge_logs.go b/pkg/cli/debug_merge_logs.go index 52bb42bce647..38ef8ed1b62d 100644 --- a/pkg/cli/debug_merge_logs.go +++ b/pkg/cli/debug_merge_logs.go @@ -499,9 +499,8 @@ func (s *fileLogStream) open() bool { if s.err = seekToFirstAfterFrom(s.f, s.from, s.editMode, s.format); s.err != nil { return false } - var err error - if s.d, err = log.NewEntryDecoderWithFormat(bufio.NewReaderSize(s.f, readBufSize), s.editMode, s.format); err != nil { - panic(err) + if s.d, s.err = log.NewEntryDecoderWithFormat(bufio.NewReaderSize(s.f, readBufSize), s.editMode, s.format); s.err != nil { + return false } return true } diff --git a/pkg/cli/debug_merge_logs_test.go b/pkg/cli/debug_merge_logs_test.go index a26b68ac1b0d..ff90513d595f 100644 --- a/pkg/cli/debug_merge_logs_test.go +++ b/pkg/cli/debug_merge_logs_test.go @@ -12,6 +12,7 @@ package cli import ( "bytes" + "fmt" "io/ioutil" "path/filepath" "testing" @@ -184,14 +185,22 @@ func getCases(format string) []testCase { } } -func (c testCase) run(t *testing.T) { - outBuf := bytes.Buffer{} +func resetDebugMergeLogFlags(errorFn func(s string)) { debugMergeLogsCmd.Flags().Visit(func(f *pflag.Flag) { if err := f.Value.Set(f.DefValue); err != nil { - t.Fatalf("Failed to set flag to default: %v", err) + errorFn(fmt.Sprintf("Failed to set flag to default: %v", err)) } }) +} + +func (c testCase) run(t *testing.T) { + resetDebugMergeLogFlags(func(s string) { t.Fatal(s) }) + outBuf := bytes.Buffer{} debugMergeLogsCmd.SetOut(&outBuf) + // Ensure that the original writer is restored when the test + // completes. Otherwise, subsequent tests may not see their output. + defer debugMergeLogsCmd.SetOut(nil) + if err := debugMergeLogsCmd.ParseFlags(c.flags); err != nil { t.Fatalf("Failed to set flags: %v", err) } @@ -235,3 +244,16 @@ func TestCrdbV1DebugMergeLogs(t *testing.T) { t.Run(c.name, c.run) } } + +func Example_format_error() { + c := NewCLITest(TestCLIParams{NoServer: true}) + defer c.Cleanup() + + resetDebugMergeLogFlags(func(s string) { fmt.Fprintf(stderr, "ERROR: %v", s) }) + + c.RunWithArgs([]string{"debug", "merge-logs", "testdata/merge_logs_v1/missing_format/*"}) + + // Output: + // debug merge-logs testdata/merge_logs_v1/missing_format/* + // ERROR: decoding format: failed to extract log file format from the log +} diff --git a/pkg/cli/testdata/merge_logs_v1/missing_format/cockroach.test-0001.ubuntu.2018-11-30T22_06_47Z.004130.log b/pkg/cli/testdata/merge_logs_v1/missing_format/cockroach.test-0001.ubuntu.2018-11-30T22_06_47Z.004130.log new file mode 100644 index 000000000000..47ec4e95eca4 --- /dev/null +++ b/pkg/cli/testdata/merge_logs_v1/missing_format/cockroach.test-0001.ubuntu.2018-11-30T22_06_47Z.004130.log @@ -0,0 +1,3 @@ +I210801 21:05:59.364923 1 util/log/sync_buffer.go:70 [config] binary: CockroachDB CCL v20.1.17 (x86_64-apple-darwin14, built 2021/05/17 16:30:22, +I210801 21:05:59.364923 1 util/log/sync_buffer.go:70 [config] arguments: [./cockroach start] +I210801 21:05:59.364923 1 util/log/sync_buffer.go:70 line format: [IWEF]yymmdd hh:mm:ss.uuuuuu goid file:line msg utf8=✓