Skip to content

Commit

Permalink
builtins: stream consistency checker output
Browse files Browse the repository at this point in the history
Also makes it resilient to per-Range errors, which now no longer
tank the entire operation.

```sql
-- avoid buffering in cli
\set display_format=csv;
-- avoid rows getting buffered at server
set avoid_buffering=true;
-- compute as fast as possible
SET CLUSTER SETTING server.consistency_check.max_rate = '1tb';

SELECT * FROM crdb_internal.check_consistency(false, '', '');
```

Release justification: improvement for a debugging-related feature
Release note: None
  • Loading branch information
tbg committed Sep 9, 2022
1 parent 203c078 commit 967b163
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 17 deletions.
22 changes: 13 additions & 9 deletions pkg/sql/logictest/testdata/logic_test/builtin_function_notenant
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ subtest check_consistency

# Sanity-check crdb_internal.check_consistency.

statement error start key must be >= "\\x02"
SELECT crdb_internal.check_consistency(true, '\x01', '\xffff')
statement error start key must be > "\\x02"
SELECT crdb_internal.check_consistency(true, '\x02', '\xffff')

statement error end key must be < "\\xff\\xff"
SELECT crdb_internal.check_consistency(true, '\x02', '\xffff00')
SELECT crdb_internal.check_consistency(true, '\x0200', '\xffff00')

statement error start key must be less than end key
SELECT crdb_internal.check_consistency(true, '\x02', '\x02')
SELECT crdb_internal.check_consistency(true, '\x03', '\x03')

statement error start key must be less than end key
SELECT crdb_internal.check_consistency(true, '\x03', '\x02')
SELECT crdb_internal.check_consistency(true, '\x04', '\x03')

query ITT
SELECT range_id, status, regexp_replace(detail, '[0-9]+', '', 'g') FROM crdb_internal.check_consistency(true, '\x02', '\xffff') WHERE range_id = 1
SELECT range_id, status, regexp_replace(detail, '[0-9]+', '', 'g') FROM crdb_internal.check_consistency(true, '\x03', '\xffff') WHERE range_id = 1
----
1 RANGE_CONSISTENT stats: {ContainsEstimates: LastUpdateNanos: IntentAge: GCBytesAge: LiveBytes: LiveCount: KeyBytes: KeyCount: ValBytes: ValCount: IntentBytes: IntentCount: SeparatedIntentCount: RangeKeyCount: RangeKeyBytes: RangeValCount: RangeValBytes: SysBytes: SysCount: AbortSpanBytes:}

Expand All @@ -32,14 +32,18 @@ SELECT count(*) > 5 FROM crdb_internal.check_consistency(true, '', '')
true

# Query that should touch only a single range.
#
# NB: the use of ScanMetaKVs causes issues here. Bounds [`k`, k.Next()]` don't work,
# with errors such as (here k=\xff):
# pq: failed to verify keys for Scan: end key /Meta2/"\xff\x00" must be greater than start /Meta2/"\xff\x00"
query B
SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '\x03', '\x0300')
SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '\xff', '\xffff')
----
true

# Ditto, but implicit start key \x02
# Ditto, but implicit start key \x03
query B
SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '', '\x0200')
SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '', '\x04')
----
true

Expand Down
69 changes: 61 additions & 8 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -1827,11 +1827,15 @@ func (j *jsonRecordSetGenerator) Next(ctx context.Context) (bool, error) {
}

type checkConsistencyGenerator struct {
txn *kv.Txn // to load range descriptors
consistencyChecker eval.ConsistencyCheckRunner
from, to roachpb.Key
mode roachpb.ChecksumMode

descs []roachpb.RangeDescriptor
// remainingRows is populated by Start(). Each Next() call peels of the first
// row and moves it to curRow.
// row and moves it to curRow. When empty, consumes from 'descs' to produce
// more rows.
remainingRows []roachpb.CheckConsistencyResponse_Result
curRow roachpb.CheckConsistencyResponse_Result
}
Expand All @@ -1850,14 +1854,18 @@ func makeCheckConsistencyGenerator(
keyTo := roachpb.Key(*args[2].(*tree.DBytes))

if len(keyFrom) == 0 {
keyFrom = keys.LocalMax
// NB: you'd expect LocalMax here but when we go and call ScanMetaKVs, it
// would interpret LocalMax as Meta1Prefix and translate that to KeyMin,
// then fail on the scan. That method should really handle this better
// but also we should use IterateRangeDescriptors instead.
keyFrom = keys.Meta2Prefix
}
if len(keyTo) == 0 {
keyTo = roachpb.KeyMax
}

if bytes.Compare(keyFrom, keys.LocalMax) < 0 {
return nil, errors.Errorf("start key must be >= %q", []byte(keys.LocalMax))
if bytes.Compare(keyFrom, keys.LocalMax) <= 0 {
return nil, errors.Errorf("start key must be > %q", []byte(keys.LocalMax))
}
if bytes.Compare(keyTo, roachpb.KeyMax) > 0 {
return nil, errors.Errorf("end key must be < %q", []byte(roachpb.KeyMax))
Expand All @@ -1872,6 +1880,7 @@ func makeCheckConsistencyGenerator(
}

return &checkConsistencyGenerator{
txn: ctx.Txn,
consistencyChecker: ctx.ConsistencyChecker,
from: keyFrom,
to: keyTo,
Expand All @@ -1891,18 +1900,62 @@ func (*checkConsistencyGenerator) ResolvedType() *types.T {

// Start is part of the tree.ValueGenerator interface.
func (c *checkConsistencyGenerator) Start(ctx context.Context, _ *kv.Txn) error {
resp, err := c.consistencyChecker.CheckConsistency(ctx, c.from, c.to, c.mode)
span := roachpb.Span{Key: c.from, EndKey: c.to}
// NB: should use IterateRangeDescriptors here which is in the 'upgrade'
// package to avoid pulling all into memory. That needs a refactor, though.
// kvprober also has some code to iterate in batches.
descs, err := kvclient.ScanMetaKVs(ctx, c.txn, span)
if err != nil {
return err
}
c.remainingRows = resp.Result
for _, v := range descs {
var desc roachpb.RangeDescriptor
if err := v.ValueProto(&desc); err != nil {
return err
}
if len(desc.StartKey) == 0 {
desc.StartKey = keys.MustAddr(keys.LocalMax)
// Elide potential second copy we might be getting for r1
// if meta1 and meta2 haven't split.
// This too should no longer be necessary with IterateRangeDescriptors.
if len(c.descs) == 1 {
continue
}
}
c.descs = append(c.descs, desc)
}
return nil
}

// Next is part of the tree.ValueGenerator interface.
func (c *checkConsistencyGenerator) Next(_ context.Context) (bool, error) {
func (c *checkConsistencyGenerator) Next(ctx context.Context) (bool, error) {
if len(c.remainingRows) == 0 {
return false, nil
if len(c.descs) == 0 {
return false, nil
}
// NB: peeling off the spans one by one allows this generator to produce
// rows in a streaming manner. If we called CheckConsistency(c.from, c.to)
// we would only get the result once all checks have completed and it will
// generally be a lot more brittle since an error will completely wipe out
// the result set.
desc := c.descs[0]
c.descs = c.descs[1:]
resp, err := c.consistencyChecker.CheckConsistency(
ctx, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey(), c.mode,
)
if err != nil {
// Emit result as a row, and keep going.
c.remainingRows = []roachpb.CheckConsistencyResponse_Result{
{
RangeID: desc.RangeID,
StartKey: desc.StartKey,
Status: roachpb.CheckConsistencyResponse_RANGE_INDETERMINATE,
Detail: err.Error(),
},
}
} else {
c.remainingRows = resp.Result
}
}
c.curRow = c.remainingRows[0]
c.remainingRows = c.remainingRows[1:]
Expand Down

0 comments on commit 967b163

Please sign in to comment.