Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
74479: backupccl: replace memory accumulator with bound account r=dt a=adityamaru

This change replaces the memory accumulator with a raw bound
account. The memory accumulator provides a threadsafe wrapper
around the bound account, and some redundant resource pooling
semantics. Both of these are currently not needed by the sstSink
that is being memory monitored. This change deletes the memory
accumulator.

Release note: None

74546: kvserver: allow VOTER_INCOMING to receive the lease r=shralex a=shralex

Previously, VOTER_INCOMING replicas joining the cluster weren't allowed
to receive the lease, even though there is no actual problem with doing so.
This change removes the restriction, as a step towards #74077

Release note: None

74549: kvserver: determine if leaseholder is removed using proposed replica … r=shralex a=shralex

…descriptor

Previosly, the determination of whether a leaseholder is being removed was made
by looking at the proposed changes. As a step towards #74077
we'd like to instead look at the replica descriptor that the reconfiguration change would result in.
This prepares the ground for making a distinction between incoming and outgoing replicas in the next PR.
This PR should not cause any change in behavior.

Release note: None

Co-authored-by: Aditya Maru <[email protected]>
Co-authored-by: shralex <[email protected]>
  • Loading branch information
3 people committed Jan 6, 2022
4 parents 7d7ab0d + 21118a0 + 22b4fb5 + 9714e8c commit 2ffb8b2
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 105 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"create_scheduled_backup.go",
"key_rewriter.go",
"manifest_handling.go",
"memory_utils.go",
"restoration_data.go",
"restore_data_processor.go",
"restore_job.go",
Expand Down
44 changes: 26 additions & 18 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -123,8 +124,8 @@ type backupDataProcessor struct {
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
backupErr error

// Memory accumulator that reserves the memory usage of the backup processor.
backupMem *memoryAccumulator
// BoundAccount that reserves the memory usage of the backup processor.
memAcc *mon.BoundAccount
}

var _ execinfra.Processor = &backupDataProcessor{}
Expand All @@ -143,12 +144,13 @@ func newBackupDataProcessor(
memMonitor = knobs.BackupMemMonitor
}
}
ba := memMonitor.MakeBoundAccount()
bp := &backupDataProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
backupMem: newMemoryAccumulator(memMonitor),
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
memAcc: &ba,
}
if err := bp.Init(bp, post, backupOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand Down Expand Up @@ -177,7 +179,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) {
TaskName: "backup-worker",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.backupMem)
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.memAcc)
cancel()
close(bp.progCh)
}); err != nil {
Expand Down Expand Up @@ -213,7 +215,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer
func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.ProcessorBase.InternalClose()
bp.backupMem.close(bp.Ctx)
bp.memAcc.Close(bp.Ctx)
}

// ConsumerClosed is part of the RowSource interface. We have to override the
Expand Down Expand Up @@ -245,7 +247,7 @@ func runBackupProcessor(
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.BackupDataSpec,
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
backupMem *memoryAccumulator,
memAcc *mon.BoundAccount,
) error {
backupProcessorSpan := tracing.SpanFromContext(ctx)
clusterSettings := flowCtx.Cfg.Settings
Expand Down Expand Up @@ -544,7 +546,7 @@ func runBackupProcessor(
return err
}

sink, err := makeSSTSink(ctx, sinkConf, storage, backupMem)
sink, err := makeSSTSink(ctx, sinkConf, storage, memAcc)
if err != nil {
return err
}
Expand Down Expand Up @@ -602,18 +604,24 @@ type sstSink struct {
spanGrows int
}

backupMem *memoryAccumulator
memAcc struct {
ba *mon.BoundAccount
reservedBytes int64
}
}

func makeSSTSink(
ctx context.Context, conf sstSinkConf, dest cloud.ExternalStorage, backupMem *memoryAccumulator,
ctx context.Context, conf sstSinkConf, dest cloud.ExternalStorage, backupMem *mon.BoundAccount,
) (*sstSink, error) {
s := &sstSink{conf: conf, dest: dest, backupMem: backupMem}
s := &sstSink{conf: conf, dest: dest}
s.memAcc.ba = backupMem

// Reserve memory for the file buffer.
if err := s.backupMem.request(ctx, smallFileBuffer.Get(s.conf.settings)); err != nil {
bufSize := smallFileBuffer.Get(s.conf.settings)
if err := s.memAcc.ba.Grow(ctx, bufSize); err != nil {
return nil, errors.Wrap(err, "failed to reserve memory for sstSink queue")
}
s.memAcc.reservedBytes += bufSize
return s, nil
}

Expand All @@ -626,9 +634,9 @@ func (s *sstSink) Close() error {
s.cancel()
}

// Release the memory reserved for the file buffer back to the memory
// accumulator.
s.backupMem.release(smallFileBuffer.Get(s.conf.settings))
// Release the memory reserved for the file buffer.
s.memAcc.ba.Shrink(s.ctx, s.memAcc.reservedBytes)
s.memAcc.reservedBytes = 0
if s.out != nil {
return s.out.Close()
}
Expand Down
68 changes: 0 additions & 68 deletions pkg/ccl/backupccl/memory_utils.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestCheckCanReceiveLease(t *testing.T) {
eligible bool
}{
{leaseholderType: roachpb.VOTER_FULL, eligible: true},
{leaseholderType: roachpb.VOTER_INCOMING, eligible: false},
{leaseholderType: roachpb.VOTER_INCOMING, eligible: true},
{leaseholderType: roachpb.VOTER_OUTGOING, eligible: false},
{leaseholderType: roachpb.VOTER_DEMOTING_LEARNER, eligible: false},
{leaseholderType: roachpb.VOTER_DEMOTING_NON_VOTER, eligible: false},
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,8 +839,8 @@ func TestLearnerNoAcceptLease(t *testing.T) {
}
}

// TestJointConfigLease verifies that incoming and outgoing voters can't have the
// lease transferred to them.
// TestJointConfigLease verifies that incoming voters can have the
// lease transferred to them, and outgoing voters cannot.
func TestJointConfigLease(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -859,14 +859,14 @@ func TestJointConfigLease(t *testing.T) {
require.True(t, desc.Replicas().InAtomicReplicationChange(), desc)

err := tc.TransferRangeLease(desc, tc.Target(1))
exp := `replica cannot hold lease`
require.True(t, testutils.IsError(err, exp), err)
require.NoError(t, err)

// NB: we don't have to transition out of the previous joint config first
// because this is done automatically by ChangeReplicas before it does what
// it's asked to do.
desc = tc.RemoveVotersOrFatal(t, k, tc.Target(1))
err = tc.TransferRangeLease(desc, tc.Target(1))
desc = tc.RemoveVotersOrFatal(t, k, tc.Target(0))
err = tc.TransferRangeLease(desc, tc.Target(0))
exp := `replica cannot hold lease`
require.True(t, testutils.IsError(err, exp), err)
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,11 @@ func (r *Replica) propose(
// the replica remains in the descriptor, but as VOTER_{OUTGOING,DEMOTING}.
// We want to block it from getting into that state in the first place,
// since there's no stopping the actual removal/demotion once it's there.
// The Removed() field has contains these replicas when this first
// transition is initiated, so its use here is copacetic.
// IsVoterNewConfig checks that the leaseholder is a voter in the
// proposed configuration.
replID := r.ReplicaID()
for _, rDesc := range crt.Removed() {
rDesc, ok := p.command.ReplicatedEvalResult.State.Desc.GetReplicaDescriptorByID(replID)
for !ok || !rDesc.IsVoterNewConfig() {
if rDesc.ReplicaID == replID {
err := errors.Mark(errors.Newf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt),
errMarkInvalidReplicationChange)
Expand Down
15 changes: 7 additions & 8 deletions pkg/roachpb/metadata_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,14 +524,13 @@ func CheckCanReceiveLease(wouldbeLeaseholder ReplicaDescriptor, rngDesc *RangeDe
repDesc, ok := rngDesc.GetReplicaDescriptorByID(wouldbeLeaseholder.ReplicaID)
if !ok {
return errReplicaNotFound
} else if t := repDesc.GetType(); t != VOTER_FULL {
// NB: there's no harm in transferring the lease to a VOTER_INCOMING,
// but we disallow it anyway. On the other hand, transferring to
// VOTER_OUTGOING would be a pretty bad idea since those voters are
// dropped when transitioning out of the joint config, which then
// amounts to removing the leaseholder without any safety precautions.
// This would either wedge the range or allow illegal reads to be
// served.
} else if !repDesc.IsVoterNewConfig() {
// NB: there's no harm in transferring the lease to a VOTER_INCOMING.
// On the other hand, transferring to VOTER_OUTGOING would be a pretty bad
// idea since those voters are dropped when transitioning out of the joint
// config, which then amounts to removing the leaseholder without any
// safety precautions. This would either wedge the range or allow illegal
// reads to be served.
//
// Since the leaseholder can't remove itself and is a VOTER_FULL, we
// also know that in any configuration there's at least one VOTER_FULL.
Expand Down

0 comments on commit 2ffb8b2

Please sign in to comment.