Skip to content

Commit

Permalink
Merge #56596 #57488
Browse files Browse the repository at this point in the history
56596: kvserver: check that a snapshot is sane before sending it r=andreimatei a=andreimatei

The recipient has the check that a snapshot includes it as a replica,
and rejects the snapshot with an assertion error if it doesn't. This
patch adds a less-panicked check on the sender side, as it turns out
that the situation is possible.

Release note: None

57488: bulkio: Correctly export backup specific metrics. r=miretskiy a=miretskiy

Fixes #57459

Fix backup metrics struct to be compatible with metrics registry,
which expects all metrics to be exported struct fields, and embedded
structs to implement "metrics.Struct" interface.

Release Note: Correctly export schedules_BACKUP_* metrics as well
as backup RPO metric.

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
3 people committed Dec 3, 2020
3 parents e51e04c + d84890f + 373c841 commit 1421d00
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/create_scheduled_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ INSERT INTO t values (1), (10), (100);
ex, _, err := jobs.GetScheduledJobExecutor(tree.ScheduledBackupExecutor.InternalName())
require.NoError(t, err)
require.NotNil(t, ex.Metrics())
return &ex.Metrics().(*backupMetrics).ExecutorMetrics
return ex.Metrics().(*backupMetrics).ExecutorMetrics
}()

t.Run("retry", func(t *testing.T) {
Expand Down
13 changes: 7 additions & 6 deletions pkg/ccl/backupccl/schedule_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type scheduledBackupExecutor struct {
}

type backupMetrics struct {
jobs.ExecutorMetrics
rpoMetric *metric.Gauge
*jobs.ExecutorMetrics
RpoMetric *metric.Gauge
}

var _ metric.Struct = &backupMetrics{}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (e *scheduledBackupExecutor) backupSucceeded(
// If this schedule is designated as maintaining the "LastBackup" metric used
// for monitoring an RPO SLA, update that metric.
if args.UpdatesLastBackupMetric {
e.metrics.rpoMetric.Update(details.(jobspb.BackupDetails).EndTime.GoTime().Unix())
e.metrics.RpoMetric.Update(details.(jobspb.BackupDetails).EndTime.GoTime().Unix())
}

if args.UnpauseOnSuccess == jobs.InvalidScheduleID {
Expand Down Expand Up @@ -256,12 +256,13 @@ func init() {
jobs.RegisterScheduledJobExecutorFactory(
tree.ScheduledBackupExecutor.InternalName(),
func() (jobs.ScheduledJobExecutor, error) {
m := jobs.MakeExecutorMetrics(tree.ScheduledBackupExecutor.UserName())
return &scheduledBackupExecutor{
metrics: backupMetrics{
ExecutorMetrics: jobs.MakeExecutorMetrics(tree.ScheduledBackupExecutor.UserName()),
rpoMetric: metric.NewGauge(metric.Metadata{
ExecutorMetrics: &m,
RpoMetric: metric.NewGauge(metric.Metadata{
Name: "schedules.BACKUP.last-completed-time",
Help: "The unix timestamp of the most recently completed backup by a scehedule specified as maintaining this metric",
Help: "The unix timestamp of the most recently completed backup by a schedule specified as maintaining this metric",
Measurement: "Jobs",
Unit: metric.Unit_TIMESTAMP_SEC,
}),
Expand Down
45 changes: 45 additions & 0 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,34 @@ func (r *Replica) atomicReplicationChange(
// this may want to detect that and retry, sending a snapshot and promoting
// both sides.

// Wait for our replica to catch up with the descriptor change. The replica is
// expected to usually be already caught up because it's expected to usually
// be the leaseholder - but it doesn't have to be. Being caught up is
// important because we might need to send snapshots below to newly-added
// replicas, and those snapshots would be invalid if our stale descriptor
// doesn't contain the respective replicas.
// TODO(andrei): Find a better way to wait for replication. If we knew the
// LAI of the respective command, we could use waitForApplication().
descriptorOK := false
start := timeutil.Now()
retOpts := retry.Options{InitialBackoff: time.Second, MaxBackoff: time.Second, MaxRetries: 10}
for re := retry.StartWithCtx(ctx, retOpts); ; re.Next() {
rDesc := r.Desc()
if rDesc.Generation >= desc.Generation {
descriptorOK = true
break
}
log.VEventf(ctx, 1, "stale descriptor detected; waiting to catch up to replication. want: %s, have: %s",
desc, rDesc)
if _, err := r.IsDestroyed(); err != nil {
return nil, errors.Wrapf(err, "replica destroyed while waiting desc replication")
}
}
if !descriptorOK {
return nil, errors.Newf(
"waited for %s and replication hasn't caught up with descriptor update", timeutil.Since(start))
}

iChgs := make([]internalReplicationChange, 0, len(chgs))

for _, target := range chgs.VoterAdditions() {
Expand Down Expand Up @@ -1584,6 +1612,11 @@ func prepareChangeReplicasTrigger(
return crt, nil
}

// execChangeReplicasTxn runs a txn updating a range descriptor. The txn commit
// will carry a ChangeReplicasTrigger. Returns the updated descriptor. Note
// that, if the current node does not have the leaseholder for the respective
// range, then upon return the node's replica of the range (if any) might not
// reflect the updated descriptor yet until it applies the transaction.
func execChangeReplicasTxn(
ctx context.Context,
store *Store,
Expand Down Expand Up @@ -1901,6 +1934,18 @@ func (r *Replica) sendSnapshot(
defer snap.Close()
log.Event(ctx, "generated snapshot")

// Check that the snapshot we generated has a descriptor that includes the
// recipient. If it doesn't, the recipient will reject it, so it's better to
// not send it in the first place. It's possible to hit this case if we're not
// the leaseholder and we haven't yet applied the configuration change that's
// adding the recipient to the range.
if _, ok := snap.State.Desc.GetReplicaDescriptor(recipient.StoreID); !ok {
return errors.Newf(
"attempting to send snapshot that does not contain the recipient as a replica; "+
"snapshot type: %s, recipient: s%d, desc: %s",
snapType, recipient, snap.State.Desc)
}

sender, err := r.GetReplicaDescriptor()
if err != nil {
return errors.Wrapf(err, "%s: change replicas failed", r)
Expand Down

0 comments on commit 1421d00

Please sign in to comment.