Skip to content

Commit

Permalink
builtins: refactor check_consistency for clarity
Browse files Browse the repository at this point in the history
This implements a suggestion made by Pavel[^1].

[^1]: #87378 (comment)

Release justification: None needed; 22.2 has been cut
Release note: None
  • Loading branch information
tbg committed Sep 8, 2022
1 parent ecf0b36 commit 9663668
Showing 1 changed file with 54 additions and 44 deletions.
98 changes: 54 additions & 44 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -1834,12 +1834,11 @@ type checkConsistencyGenerator struct {
mode roachpb.ChecksumMode

descs []roachpb.RangeDescriptor
// remainingRows is populated by Start(). Each Next() call peels of the first
// remainingRows is populated by Next() call peels of the first
// row and moves it to curRow. When empty, consumes from 'descs' to produce
// more rows.
remainingRows []roachpb.CheckConsistencyResponse_Result
curDuration time.Duration
curRow roachpb.CheckConsistencyResponse_Result
remainingRows []roachpb.CheckConsistencyResponse_Result
lastRefillDuration time.Duration
}

var _ eval.ValueGenerator = &checkConsistencyGenerator{}
Expand Down Expand Up @@ -1926,47 +1925,58 @@ func (c *checkConsistencyGenerator) Start(ctx context.Context, _ *kv.Txn) error
}
c.descs = append(c.descs, desc)
}
// Dummy element that the first call to .Next() can peel off.
c.remainingRows = make([]roachpb.CheckConsistencyResponse_Result, 1)
return nil
}

// Next is part of the tree.ValueGenerator interface.
func (c *checkConsistencyGenerator) Next(ctx context.Context) (bool, error) {
tBegin := timeutil.Now()
if len(c.remainingRows) == 0 {
if len(c.descs) == 0 {
return false, nil
}
// NB: peeling off the spans one by one allows this generator to produce
// rows in a streaming manner. If we called CheckConsistency(c.from, c.to)
// we would only get the result once all checks have completed and it will
// generally be a lot more brittle since an error will completely wipe out
// the result set.
desc := c.descs[0]
c.descs = c.descs[1:]
resp, err := c.consistencyChecker.CheckConsistency(
ctx, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey(), c.mode,
)
if err != nil {
// Emit result as a row, and keep going.
c.remainingRows = []roachpb.CheckConsistencyResponse_Result{
{
RangeID: desc.RangeID,
StartKey: desc.StartKey,
Status: roachpb.CheckConsistencyResponse_RANGE_INDETERMINATE,
Detail: err.Error(),
},
}
} else {
// NB: this could have more than one entry, if a range split in the
// meantime.
c.remainingRows = resp.Result
// maybeRefillRows checks whether c.remainingRows is empty and if so, consumes
// the first element of c.descs and runs a consistency check on it, appending to
// c.remainingRows with the result. If there are no descriptors or there are
// remaining rows, this is a no-op. Otherwise, this will always populate
// c.remainingRows with at least one element.
func (c *checkConsistencyGenerator) maybeRefillRows(ctx context.Context) {
if len(c.descs) == 0 || len(c.remainingRows) > 0 {
// Nothing to do.
return
}
defer func(tBegin time.Time) {
c.lastRefillDuration = timeutil.Since(tBegin)
}(timeutil.Now())
// NB: peeling off the spans one by one allows this generator to produce
// rows in a streaming manner. If we called CheckConsistency(c.from, c.to)
// we would only get the result once all checks have completed and it will
// generally be a lot more brittle since an error will completely wipe out
// the result set.
desc := c.descs[0]
c.descs = c.descs[1:]
resp, err := c.consistencyChecker.CheckConsistency(
ctx, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey(), c.mode,
)
if err != nil {
// Emit result as a row, and keep going.
c.remainingRows = []roachpb.CheckConsistencyResponse_Result{
{
RangeID: desc.RangeID,
StartKey: desc.StartKey,
Status: roachpb.CheckConsistencyResponse_RANGE_INDETERMINATE,
Detail: err.Error(),
},
}
return
}

c.curDuration = timeutil.Since(tBegin)
c.curRow = c.remainingRows[0]
// NB: this could have more than one entry, if a range split in the
// meantime.
c.remainingRows = resp.Result
}

// Next is part of the tree.ValueGenerator interface.
func (c *checkConsistencyGenerator) Next(ctx context.Context) (bool, error) {
// Invariant: len(c.remainingRows) > 0.
c.remainingRows = c.remainingRows[1:]
return true, nil
c.maybeRefillRows(ctx) // if necessary, try to repopulate remainingRows
return len(c.remainingRows) > 0, nil
}

// Values is part of the tree.ValueGenerator interface.
Expand All @@ -1977,12 +1987,12 @@ func (c *checkConsistencyGenerator) Values() (tree.Datums, error) {
},
}
return tree.Datums{
tree.NewDInt(tree.DInt(c.curRow.RangeID)),
tree.NewDBytes(tree.DBytes(c.curRow.StartKey)),
tree.NewDString(roachpb.Key(c.curRow.StartKey).String()),
tree.NewDString(c.curRow.Status.String()),
tree.NewDString(c.curRow.Detail),
tree.NewDInterval(duration.MakeDuration(c.curDuration.Nanoseconds(), 0 /* days */, 0 /* months */), intervalMeta),
tree.NewDInt(tree.DInt(c.remainingRows[0].RangeID)),
tree.NewDBytes(tree.DBytes(c.remainingRows[0].StartKey)),
tree.NewDString(roachpb.Key(c.remainingRows[0].StartKey).String()),
tree.NewDString(c.remainingRows[0].Status.String()),
tree.NewDString(c.remainingRows[0].Detail),
tree.NewDInterval(duration.MakeDuration(c.lastRefillDuration.Nanoseconds(), 0 /* days */, 0 /* months */), intervalMeta),
}, nil
}

Expand Down

0 comments on commit 9663668

Please sign in to comment.