Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
67936: backupccl: set a low GC TTL on the temporary system tables r=pbardea a=pbardea

Cluster restore creates a temporary system table to hold system table
data before transactionally writing it to the real system tables. This
table is only persisted during the lifetime of the restore and is
dropped at the end of the restore. It should be GC'd quickly since it's
of no use after the restore finishes.

Release note (bug fix): Ensure that auxilary tables used during cluster
restore are GC'd quickly afterwards.

69018: log: replace a panic with an error in debug merge-logs r=ajwerner,knz,otan a=cameronnunez

Related issue: [#68278](#68278)

Release justification: bug fix

Release note (bug fix): Users would receive a panic message when the log parser
fails to extract log file formats. This has been replaced with a helpful
error message.

69607: kvserver: Run one ScanInterleavedIntents per store concurrently r=dt a=itsbilal

Currently, lock table migration related requests (Barrier,
ScanInterleavedIntents, PushTxn, ResolveIntent), are only run
with a concurrency of 4 regardless of the size of the cluster.

This change increases that to 4 * numNodes, and adds a new
mechanism to limit the ScanInterleavedIntents on the receiver
side to 1 per store. This throttle is applied before latches
are grabbed. Only ScanInterleavedIntents needs to be
rate-limited as it's by far the heaviest component in
the separated intents migrations.

Release justification: Category 2, low-risk update to new functionality
Release note: None.

Co-authored-by: Paul Bardea <[email protected]>
Co-authored-by: Cameron Nunez <[email protected]>
Co-authored-by: Bilal Akhtar <[email protected]>
  • Loading branch information
4 people committed Sep 2, 2021
4 parents a46e3a2 + 0f2de63 + a5caea6 + 9e720b6 commit 0dc7f4b
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 25 deletions.
17 changes: 16 additions & 1 deletion pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ CREATE TABLE data2.foo (a int);
systemschema.TableStatisticsTable.GetName(),
systemschema.UITable.GetName(),
systemschema.UsersTable.GetName(),
systemschema.ZonesTable.GetName(),
systemschema.ScheduledJobsTable.GetName(),
}

Expand Down Expand Up @@ -317,6 +316,22 @@ CREATE TABLE data2.foo (a int);
}
})

t.Run("zone_configs", func(t *testing.T) {
// The restored zones should be a superset of the zones in the backed up
// cluster.
zoneIDsResult := sqlDB.QueryStr(t, `SELECT id FROM system.zones`)
var q strings.Builder
q.WriteString("SELECT * FROM system.zones WHERE id IN (")
for i, restoreZoneIDRow := range zoneIDsResult {
if i > 0 {
q.WriteString(", ")
}
q.WriteString(restoreZoneIDRow[0])
}
q.WriteString(")")
sqlDBRestore.CheckQueryResults(t, q.String(), sqlDB.QueryStr(t, q.String()))
})

t.Run("ensure that tables can be created at the excepted ID", func(t *testing.T) {
var maxID, dbID, tableID int
sqlDBRestore.QueryRow(t, "SELECT max(id) FROM system.namespace").Scan(&maxID)
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2539,9 +2539,12 @@ func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context) error {
executor := r.execCfg.InternalExecutor
// After restoring the system tables, drop the temporary database holding the
// system tables.
gcTTLQuery := fmt.Sprintf("ALTER DATABASE %s CONFIGURE ZONE USING gc.ttlseconds=1", restoreTempSystemDB)
if _, err := executor.Exec(ctx, "altering-gc-ttl-temp-system" /* opName */, nil /* txn */, gcTTLQuery); err != nil {
log.Errorf(ctx, "failed to update the GC TTL of %q: %+v", restoreTempSystemDB, err)
}
dropTableQuery := fmt.Sprintf("DROP DATABASE %s CASCADE", restoreTempSystemDB)
_, err := executor.Exec(ctx, "drop-temp-system-db" /* opName */, nil /* txn */, dropTableQuery)
if err != nil {
if _, err := executor.Exec(ctx, "drop-temp-system-db" /* opName */, nil /* txn */, dropTableQuery); err != nil {
return errors.Wrap(err, "dropping temporary system db")
}
return nil
Expand Down
5 changes: 2 additions & 3 deletions pkg/cli/debug_merge_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,8 @@ func (s *fileLogStream) open() bool {
if s.err = seekToFirstAfterFrom(s.f, s.from, s.editMode, s.format); s.err != nil {
return false
}
var err error
if s.d, err = log.NewEntryDecoderWithFormat(bufio.NewReaderSize(s.f, readBufSize), s.editMode, s.format); err != nil {
panic(err)
if s.d, s.err = log.NewEntryDecoderWithFormat(bufio.NewReaderSize(s.f, readBufSize), s.editMode, s.format); s.err != nil {
return false
}
return true
}
Expand Down
28 changes: 25 additions & 3 deletions pkg/cli/debug_merge_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package cli

import (
"bytes"
"fmt"
"io/ioutil"
"path/filepath"
"testing"
Expand Down Expand Up @@ -184,14 +185,22 @@ func getCases(format string) []testCase {
}
}

func (c testCase) run(t *testing.T) {
outBuf := bytes.Buffer{}
func resetDebugMergeLogFlags(errorFn func(s string)) {
debugMergeLogsCmd.Flags().Visit(func(f *pflag.Flag) {
if err := f.Value.Set(f.DefValue); err != nil {
t.Fatalf("Failed to set flag to default: %v", err)
errorFn(fmt.Sprintf("Failed to set flag to default: %v", err))
}
})
}

func (c testCase) run(t *testing.T) {
resetDebugMergeLogFlags(func(s string) { t.Fatal(s) })
outBuf := bytes.Buffer{}
debugMergeLogsCmd.SetOut(&outBuf)
// Ensure that the original writer is restored when the test
// completes. Otherwise, subsequent tests may not see their output.
defer debugMergeLogsCmd.SetOut(nil)

if err := debugMergeLogsCmd.ParseFlags(c.flags); err != nil {
t.Fatalf("Failed to set flags: %v", err)
}
Expand Down Expand Up @@ -235,3 +244,16 @@ func TestCrdbV1DebugMergeLogs(t *testing.T) {
t.Run(c.name, c.run)
}
}

func Example_format_error() {
c := NewCLITest(TestCLIParams{NoServer: true})
defer c.Cleanup()

resetDebugMergeLogFlags(func(s string) { fmt.Fprintf(stderr, "ERROR: %v", s) })

c.RunWithArgs([]string{"debug", "merge-logs", "testdata/merge_logs_v1/missing_format/*"})

// Output:
// debug merge-logs testdata/merge_logs_v1/missing_format/*
// ERROR: decoding format: failed to extract log file format from the log
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
I210801 21:05:59.364923 1 util/log/sync_buffer.go:70 [config] binary: CockroachDB CCL v20.1.17 (x86_64-apple-darwin14, built 2021/05/17 16:30:22,
I210801 21:05:59.364923 1 util/log/sync_buffer.go:70 [config] arguments: [./cockroach start]
I210801 21:05:59.364923 1 util/log/sync_buffer.go:70 line format: [IWEF]yymmdd hh:mm:ss.uuuuuu goid file:line msg utf8=✓
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type Limiters struct {
// rangefeeds in the "catch-up" state across the store. The "catch-up" state
// is a temporary state at the beginning of a rangefeed which is expensive
// because it uses an engine iterator.
ConcurrentRangefeedIters limit.ConcurrentRequestLimiter
ConcurrentRangefeedIters limit.ConcurrentRequestLimiter
ConcurrentScanInterleavedIntents limit.ConcurrentRequestLimiter
}

// EvalContext is the interface through which command evaluation accesses the
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting(
settings.PositiveInt,
)

// concurrentscanInterleavedIntentsLimit is the number of concurrent
// ScanInterleavedIntents requests that will be run on a store. Used as part
// of pre-evaluation throttling.
var concurrentscanInterleavedIntentsLimit = settings.RegisterIntSetting(
"kv.migration.concurrent_scan_interleaved_intents",
"number of scan interleaved intents requests a store will handle concurrently before queueing",
1,
settings.PositiveInt,
)

// Minimum time interval between system config updates which will lead to
// enqueuing replicas.
var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting(
Expand Down Expand Up @@ -878,6 +888,11 @@ func NewStore(
s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter(
"exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)),
)
s.limiters.ConcurrentScanInterleavedIntents = limit.MakeConcurrentRequestLimiter(
"scanInterleavedIntentsLimiter", int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV)))
concurrentscanInterleavedIntentsLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
s.limiters.ConcurrentScanInterleavedIntents.SetLimit(int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV)))
})

// The snapshot storage is usually empty at this point since it is cleared
// after each snapshot application, except when the node crashed right before
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,19 @@ func (s *Store) maybeThrottleBatch(
}
return res, nil

case *roachpb.ScanInterleavedIntentsRequest:
before := timeutil.Now()
res, err := s.limiters.ConcurrentScanInterleavedIntents.Begin(ctx)
if err != nil {
return nil, err
}

waited := timeutil.Since(before)
if waited > time.Second {
log.Infof(ctx, "ScanInterleavedIntents request was delayed by %v", waited)
}
return res, nil

default:
return nil, nil
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/migration/migrationcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error
return nil
}

// NumNodes is part of the migration.Cluster interface.
func (c *Cluster) NumNodes(ctx context.Context) (int, error) {
ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness)
if err != nil {
return 0, err
}
return len(ns), nil
}

// ForEveryNode is part of the migration.Cluster interface.
func (c *Cluster) ForEveryNode(
ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error,
Expand Down
5 changes: 5 additions & 0 deletions pkg/migration/migrationcluster/tenant_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func NewTenantCluster(db *kv.DB) *TenantCluster {
return &TenantCluster{db: db}
}

// NumNodes is part of the migration.Cluster interface.
func (t *TenantCluster) NumNodes(ctx context.Context) (int, error) {
return 0, errors.AssertionFailedf("non-system tenants cannot iterate nodes")
}

// ForEveryNode is part of the migration.Cluster interface.
func (t *TenantCluster) ForEveryNode(
ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error,
Expand Down
33 changes: 19 additions & 14 deletions pkg/migration/migrations/separated_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,14 @@ import (
"github.com/cockroachdb/errors"
)

// The number of concurrent migrateLockTableRequests requests to run. This
// is effectively a cluster-wide setting as the actual legwork of the migration
// happens when the destination replica(s) are sending replies back to the
// original node.
// The number of concurrent lock table migration related requests (eg. barrier,
// ScanInterleavedIntents) to run, as a multiplier of the number of nodes. If
// this cluster has 15 nodes, 15 * concurrentMigrateLockTableRequests requests
// will be executed at once.
//
// TODO(bilal): Add logic to make this concurrency limit a per-leaseholder limit
// as opposed to a cluster-wide limit. That way, we could limit
// migrateLockTableRequests to 1 per leaseholder as opposed to 4 for the entire
// cluster, avoiding the case where all 4 ranges at a time could have the same node
// as their leaseholder.
// Note that some of the requests (eg. ScanInterleavedIntents) do throttling on
// the receiving end, to reduce the chances of a large number of requests
// being directed at a few nodes.
const concurrentMigrateLockTableRequests = 4

// The maximum number of times to retry a migrateLockTableRequest before failing
Expand Down Expand Up @@ -88,7 +86,7 @@ type migrateLockTablePool struct {
db *kv.DB
clock *hlc.Clock
done chan bool
status [concurrentMigrateLockTableRequests]int64
status []int64
finished uint64

mu struct {
Expand Down Expand Up @@ -335,7 +333,7 @@ func (m *migrateLockTablePool) runStatusLogger(ctx context.Context) {
select {
case <-ticker.C:
var ranges strings.Builder
for i := 0; i < concurrentMigrateLockTableRequests; i++ {
for i := 0; i < len(m.status); i++ {
rangeID := atomic.LoadInt64(&m.status[i])
if rangeID == 0 {
continue
Expand Down Expand Up @@ -385,15 +383,18 @@ func runSeparatedIntentsMigration(
db *kv.DB,
ri rangeIterator,
ir intentResolver,
numNodes int,
) error {
concurrentRequests := concurrentMigrateLockTableRequests * numNodes
workerPool := migrateLockTablePool{
requests: make(chan migrateLockTableRequest, concurrentMigrateLockTableRequests),
requests: make(chan migrateLockTableRequest, concurrentRequests),
stopper: stopper,
db: db,
ir: ir,
clock: clock,
status: make([]int64, concurrentRequests),
}
migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentMigrateLockTableRequests)
migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentRequests)
if err != nil {
return err
}
Expand All @@ -417,7 +418,11 @@ func separatedIntentsMigration(
})
ri := kvcoord.NewRangeIterator(deps.DistSender)

return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir)
numNodes, err := deps.Cluster.NumNodes(ctx)
if err != nil {
return err
}
return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir, numNodes)
}

func postSeparatedIntentsMigration(
Expand Down
2 changes: 1 addition & 1 deletion pkg/migration/migrations/separated_intents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TestRunSeparatedIntentsMigration(t *testing.T) {
errorPerNCalls, err = strconv.Atoi(d.Input)
require.NoError(t, err)
case "run-migration":
err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir)
err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir, 1)
if err == nil {
return "ok"
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/migration/system_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
// Cluster abstracts a physical KV cluster and can be utilized by a long-running
// migration.
type Cluster interface {
// NumNodes returns the number of nodes in the cluster. This is merely a
// convenience method and is not meant to be used to infer cluster stability;
// for that, use UntilClusterStable.
NumNodes(ctx context.Context) (int, error)

// ForEveryNode is a short hand to execute the given closure (named by the
// informational parameter op) against every node in the cluster at a given
// point in time. Given it's possible for nodes to join or leave the cluster
Expand Down

0 comments on commit 0dc7f4b

Please sign in to comment.