From 22b4fb519592bd5f51f2e67d1c48d5c5c11880cd Mon Sep 17 00:00:00 2001 From: shralex Date: Thu, 6 Jan 2022 12:03:44 -0800 Subject: [PATCH 1/3] kvserver: allow VOTER_INCOMING to receive the lease Previously, VOTER_INCOMING replicas joining the cluster weren't allowed to receive the lease, even though there is no actual problem with doing so. This change removes the restriction, as a step towards https://github.com/cockroachdb/cockroach/pull/74077 Release note: None --- pkg/kv/kvserver/batcheval/cmd_lease_test.go | 2 +- pkg/kv/kvserver/replica_learner_test.go | 12 ++++++------ pkg/roachpb/metadata_replicas.go | 15 +++++++-------- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_test.go b/pkg/kv/kvserver/batcheval/cmd_lease_test.go index d5cd5efa023e..601e4d4bd70f 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_test.go @@ -270,7 +270,7 @@ func TestCheckCanReceiveLease(t *testing.T) { eligible bool }{ {leaseholderType: roachpb.VOTER_FULL, eligible: true}, - {leaseholderType: roachpb.VOTER_INCOMING, eligible: false}, + {leaseholderType: roachpb.VOTER_INCOMING, eligible: true}, {leaseholderType: roachpb.VOTER_OUTGOING, eligible: false}, {leaseholderType: roachpb.VOTER_DEMOTING_LEARNER, eligible: false}, {leaseholderType: roachpb.VOTER_DEMOTING_NON_VOTER, eligible: false}, diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 202ed815b552..2700c7fc2749 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -839,8 +839,8 @@ func TestLearnerNoAcceptLease(t *testing.T) { } } -// TestJointConfigLease verifies that incoming and outgoing voters can't have the -// lease transferred to them. +// TestJointConfigLease verifies that incoming voters can have the +// lease transferred to them, and outgoing voters cannot. func TestJointConfigLease(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -859,14 +859,14 @@ func TestJointConfigLease(t *testing.T) { require.True(t, desc.Replicas().InAtomicReplicationChange(), desc) err := tc.TransferRangeLease(desc, tc.Target(1)) - exp := `replica cannot hold lease` - require.True(t, testutils.IsError(err, exp), err) + require.NoError(t, err) // NB: we don't have to transition out of the previous joint config first // because this is done automatically by ChangeReplicas before it does what // it's asked to do. - desc = tc.RemoveVotersOrFatal(t, k, tc.Target(1)) - err = tc.TransferRangeLease(desc, tc.Target(1)) + desc = tc.RemoveVotersOrFatal(t, k, tc.Target(0)) + err = tc.TransferRangeLease(desc, tc.Target(0)) + exp := `replica cannot hold lease` require.True(t, testutils.IsError(err, exp), err) } diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 2262f32ff955..1c99da46f4ac 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -524,14 +524,13 @@ func CheckCanReceiveLease(wouldbeLeaseholder ReplicaDescriptor, rngDesc *RangeDe repDesc, ok := rngDesc.GetReplicaDescriptorByID(wouldbeLeaseholder.ReplicaID) if !ok { return errReplicaNotFound - } else if t := repDesc.GetType(); t != VOTER_FULL { - // NB: there's no harm in transferring the lease to a VOTER_INCOMING, - // but we disallow it anyway. On the other hand, transferring to - // VOTER_OUTGOING would be a pretty bad idea since those voters are - // dropped when transitioning out of the joint config, which then - // amounts to removing the leaseholder without any safety precautions. - // This would either wedge the range or allow illegal reads to be - // served. + } else if !repDesc.IsVoterNewConfig() { + // NB: there's no harm in transferring the lease to a VOTER_INCOMING. + // On the other hand, transferring to VOTER_OUTGOING would be a pretty bad + // idea since those voters are dropped when transitioning out of the joint + // config, which then amounts to removing the leaseholder without any + // safety precautions. This would either wedge the range or allow illegal + // reads to be served. // // Since the leaseholder can't remove itself and is a VOTER_FULL, we // also know that in any configuration there's at least one VOTER_FULL. From 9714e8c17dc7d3b17deccdc7a0ed1c5e89fadaf9 Mon Sep 17 00:00:00 2001 From: shralex Date: Thu, 6 Jan 2022 12:12:30 -0800 Subject: [PATCH 2/3] kvserver: determine if leaseholder is removed using proposed replica descriptor Previosly, the determination of whether a leaseholder is being removed was made by looking at the proposed changes. As a step towards https://github.com/cockroachdb/cockroach/pull/74077 we'd like to instead look at the replica descriptor that the reconfiguration change would result in. This prepares the ground for making a distinction between incoming and outgoing replicas in the next PR. This PR should not cause any change in behavior. Release note: None --- pkg/kv/kvserver/replica_raft.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 1eae27056cff..7f6166730630 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -323,10 +323,11 @@ func (r *Replica) propose( // the replica remains in the descriptor, but as VOTER_{OUTGOING,DEMOTING}. // We want to block it from getting into that state in the first place, // since there's no stopping the actual removal/demotion once it's there. - // The Removed() field has contains these replicas when this first - // transition is initiated, so its use here is copacetic. + // IsVoterNewConfig checks that the leaseholder is a voter in the + // proposed configuration. replID := r.ReplicaID() - for _, rDesc := range crt.Removed() { + rDesc, ok := p.command.ReplicatedEvalResult.State.Desc.GetReplicaDescriptorByID(replID) + for !ok || !rDesc.IsVoterNewConfig() { if rDesc.ReplicaID == replID { err := errors.Mark(errors.Newf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt), errMarkInvalidReplicationChange) From 21118a09355e1b3c95c0d0e1f20bffc8647c0cbd Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Wed, 5 Jan 2022 17:25:11 -0500 Subject: [PATCH 3/3] backupccl: replace memory accumulator with bound account This change replaces the memory accumulator with a raw bound account. The memory accumulator provides a threadsafe wrapper around the bound account, and some redundant resource pooling semantics. Both of these are currently not needed by the sstSink that is being memory monitored. This change deletes the memory accumulator. Release note: None --- pkg/ccl/backupccl/BUILD.bazel | 1 - pkg/ccl/backupccl/backup_processor.go | 44 ++++++++++------- pkg/ccl/backupccl/memory_utils.go | 68 --------------------------- 3 files changed, 26 insertions(+), 87 deletions(-) delete mode 100644 pkg/ccl/backupccl/memory_utils.go diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index c0231ff59796..aaa5c3ad52c8 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -16,7 +16,6 @@ go_library( "create_scheduled_backup.go", "key_rewriter.go", "manifest_handling.go", - "memory_utils.go", "restoration_data.go", "restore_data_processor.go", "restore_job.go", diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 4729bb23d118..9829ec198313 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -123,8 +124,8 @@ type backupDataProcessor struct { progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress backupErr error - // Memory accumulator that reserves the memory usage of the backup processor. - backupMem *memoryAccumulator + // BoundAccount that reserves the memory usage of the backup processor. + memAcc *mon.BoundAccount } var _ execinfra.Processor = &backupDataProcessor{} @@ -143,12 +144,13 @@ func newBackupDataProcessor( memMonitor = knobs.BackupMemMonitor } } + ba := memMonitor.MakeBoundAccount() bp := &backupDataProcessor{ - flowCtx: flowCtx, - spec: spec, - output: output, - progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress), - backupMem: newMemoryAccumulator(memMonitor), + flowCtx: flowCtx, + spec: spec, + output: output, + progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress), + memAcc: &ba, } if err := bp.Init(bp, post, backupOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */ execinfra.ProcStateOpts{ @@ -177,7 +179,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { TaskName: "backup-worker", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { - bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.backupMem) + bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.memAcc) cancel() close(bp.progCh) }); err != nil { @@ -213,7 +215,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() bp.ProcessorBase.InternalClose() - bp.backupMem.close(bp.Ctx) + bp.memAcc.Close(bp.Ctx) } // ConsumerClosed is part of the RowSource interface. We have to override the @@ -245,7 +247,7 @@ func runBackupProcessor( flowCtx *execinfra.FlowCtx, spec *execinfrapb.BackupDataSpec, progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, - backupMem *memoryAccumulator, + memAcc *mon.BoundAccount, ) error { backupProcessorSpan := tracing.SpanFromContext(ctx) clusterSettings := flowCtx.Cfg.Settings @@ -544,7 +546,7 @@ func runBackupProcessor( return err } - sink, err := makeSSTSink(ctx, sinkConf, storage, backupMem) + sink, err := makeSSTSink(ctx, sinkConf, storage, memAcc) if err != nil { return err } @@ -602,18 +604,24 @@ type sstSink struct { spanGrows int } - backupMem *memoryAccumulator + memAcc struct { + ba *mon.BoundAccount + reservedBytes int64 + } } func makeSSTSink( - ctx context.Context, conf sstSinkConf, dest cloud.ExternalStorage, backupMem *memoryAccumulator, + ctx context.Context, conf sstSinkConf, dest cloud.ExternalStorage, backupMem *mon.BoundAccount, ) (*sstSink, error) { - s := &sstSink{conf: conf, dest: dest, backupMem: backupMem} + s := &sstSink{conf: conf, dest: dest} + s.memAcc.ba = backupMem // Reserve memory for the file buffer. - if err := s.backupMem.request(ctx, smallFileBuffer.Get(s.conf.settings)); err != nil { + bufSize := smallFileBuffer.Get(s.conf.settings) + if err := s.memAcc.ba.Grow(ctx, bufSize); err != nil { return nil, errors.Wrap(err, "failed to reserve memory for sstSink queue") } + s.memAcc.reservedBytes += bufSize return s, nil } @@ -626,9 +634,9 @@ func (s *sstSink) Close() error { s.cancel() } - // Release the memory reserved for the file buffer back to the memory - // accumulator. - s.backupMem.release(smallFileBuffer.Get(s.conf.settings)) + // Release the memory reserved for the file buffer. + s.memAcc.ba.Shrink(s.ctx, s.memAcc.reservedBytes) + s.memAcc.reservedBytes = 0 if s.out != nil { return s.out.Close() } diff --git a/pkg/ccl/backupccl/memory_utils.go b/pkg/ccl/backupccl/memory_utils.go deleted file mode 100644 index 276fdda34ce4..000000000000 --- a/pkg/ccl/backupccl/memory_utils.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2017 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package backupccl - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/util/mon" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" -) - -// memoryAccumulator is a thin wrapper around a BoundAccount that only releases memory -// from the bound account when it is closed, otherwise it accumulates and -// re-uses resources. -// This is useful when resources, once accumulated should not be returned as -// they may be needed later to make progress. -// It is safe for concurrent use. -type memoryAccumulator struct { - syncutil.Mutex - ba mon.BoundAccount - reserved int64 -} - -// newMemoryAccumulator creates a new accumulator backed by a bound account created -// from the given memory monitor. -func newMemoryAccumulator(mm *mon.BytesMonitor) *memoryAccumulator { - return &memoryAccumulator{ba: mm.MakeBoundAccount()} -} - -// request checks that the given number of bytes is available, requesting some -// from the backing monitor if necessary. -func (acc *memoryAccumulator) request(ctx context.Context, requested int64) error { - acc.Lock() - defer acc.Unlock() - - if acc.reserved >= requested { - acc.reserved -= requested - return nil - } - - requested -= acc.reserved - acc.reserved = 0 - - return acc.ba.Grow(ctx, requested) -} - -// release releases a number of bytes back into the internal reserved pool. -func (acc *memoryAccumulator) release(released int64) { - acc.Lock() - defer acc.Unlock() - - acc.reserved += released -} - -// close returns all accumulated memory to the backing monitor. -func (acc *memoryAccumulator) close(ctx context.Context) { - acc.Lock() - defer acc.Unlock() - - acc.reserved = 0 - acc.ba.Close(ctx) -}