Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

builtins: stream consistency checker output #87378

Merged
merged 3 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3018,7 +3018,7 @@ may increase either contention or retry errors, or both.</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="crdb_internal.assignment_cast"></a><code>crdb_internal.assignment_cast(val: anyelement, type: anyelement) &rarr; anyelement</code></td><td><span class="funcdesc"><p>This function is used internally to perform assignment casts during mutations.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.check_consistency"></a><code>crdb_internal.check_consistency(stats_only: <a href="bool.html">bool</a>, start_key: <a href="bytes.html">bytes</a>, end_key: <a href="bytes.html">bytes</a>) &rarr; tuple{int AS range_id, bytes AS start_key, string AS start_key_pretty, string AS status, string AS detail}</code></td><td><span class="funcdesc"><p>Runs a consistency check on ranges touching the specified key range. an empty start or end key is treated as the minimum and maximum possible, respectively. stats_only should only be set to false when targeting a small number of ranges to avoid overloading the cluster. Each returned row contains the range ID, the status (a roachpb.CheckConsistencyResponse_Status), and verbose detail.</p>
<tr><td><a name="crdb_internal.check_consistency"></a><code>crdb_internal.check_consistency(stats_only: <a href="bool.html">bool</a>, start_key: <a href="bytes.html">bytes</a>, end_key: <a href="bytes.html">bytes</a>) &rarr; tuple{int AS range_id, bytes AS start_key, string AS start_key_pretty, string AS status, string AS detail, interval AS duration}</code></td><td><span class="funcdesc"><p>Runs a consistency check on ranges touching the specified key range. an empty start or end key is treated as the minimum and maximum possible, respectively. stats_only should only be set to false when targeting a small number of ranges to avoid overloading the cluster. Each returned row contains the range ID, the status (a roachpb.CheckConsistencyResponse_Status), and verbose detail.</p>
<p>Example usage:
SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)</p>
</span></td><td>Volatile</td></tr>
Expand Down
22 changes: 13 additions & 9 deletions pkg/sql/logictest/testdata/logic_test/builtin_function_notenant
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ subtest check_consistency

# Sanity-check crdb_internal.check_consistency.

statement error start key must be >= "\\x02"
SELECT crdb_internal.check_consistency(true, '\x01', '\xffff')
statement error start key must be > "\\x02"
SELECT crdb_internal.check_consistency(true, '\x02', '\xffff')

statement error end key must be < "\\xff\\xff"
SELECT crdb_internal.check_consistency(true, '\x02', '\xffff00')
SELECT crdb_internal.check_consistency(true, '\x0200', '\xffff00')

statement error start key must be less than end key
SELECT crdb_internal.check_consistency(true, '\x02', '\x02')
SELECT crdb_internal.check_consistency(true, '\x03', '\x03')

statement error start key must be less than end key
SELECT crdb_internal.check_consistency(true, '\x03', '\x02')
SELECT crdb_internal.check_consistency(true, '\x04', '\x03')

query ITT
SELECT range_id, status, regexp_replace(detail, '[0-9]+', '', 'g') FROM crdb_internal.check_consistency(true, '\x02', '\xffff') WHERE range_id = 1
SELECT range_id, status, regexp_replace(detail, '[0-9]+', '', 'g') FROM crdb_internal.check_consistency(true, '\x03', '\xffff') WHERE range_id = 1
----
1 RANGE_CONSISTENT stats: {ContainsEstimates: LastUpdateNanos: IntentAge: GCBytesAge: LiveBytes: LiveCount: KeyBytes: KeyCount: ValBytes: ValCount: IntentBytes: IntentCount: SeparatedIntentCount: RangeKeyCount: RangeKeyBytes: RangeValCount: RangeValBytes: SysBytes: SysCount: AbortSpanBytes:}

Expand All @@ -32,14 +32,18 @@ SELECT count(*) > 5 FROM crdb_internal.check_consistency(true, '', '')
true

# Query that should touch only a single range.
#
# NB: the use of ScanMetaKVs causes issues here. Bounds [`k`, k.Next()]` don't work,
# with errors such as (here k=\xff):
# pq: failed to verify keys for Scan: end key /Meta2/"\xff\x00" must be greater than start /Meta2/"\xff\x00"
query B
SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '\x03', '\x0300')
SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '\xff', '\xffff')
----
true

# Ditto, but implicit start key \x02
# Ditto, but implicit start key \x03
query B
SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '', '\x0200')
SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '', '\x04')
----
true

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sem/builtins/fixed_oids.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ var builtinOidsBySignature = map[string]oid.Oid{
`crdb_internal.active_version() -> jsonb`: 1296,
`crdb_internal.approximate_timestamp(timestamp: decimal) -> timestamp`: 1298,
`crdb_internal.assignment_cast(val: anyelement, type: anyelement) -> anyelement`: 1341,
`crdb_internal.check_consistency(stats_only: bool, start_key: bytes, end_key: bytes) -> tuple{int AS range_id, bytes AS start_key, string AS start_key_pretty, string AS status, string AS detail}`: 347,
`crdb_internal.check_consistency(stats_only: bool, start_key: bytes, end_key: bytes) -> tuple{int AS range_id, bytes AS start_key, string AS start_key_pretty, string AS status, string AS detail, interval AS duration}`: 347,
`crdb_internal.check_password_hash_format(password: bytes) -> string`: 1376,
`crdb_internal.cluster_id() -> uuid`: 1299,
`crdb_internal.cluster_name() -> string`: 1301,
Expand Down
121 changes: 101 additions & 20 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -1827,13 +1828,26 @@ func (j *jsonRecordSetGenerator) Next(ctx context.Context) (bool, error) {
}

type checkConsistencyGenerator struct {
txn *kv.Txn // to load range descriptors
consistencyChecker eval.ConsistencyCheckRunner
from, to roachpb.Key
mode roachpb.ChecksumMode
// remainingRows is populated by Start(). Each Next() call peels of the first
// row and moves it to curRow.
remainingRows []roachpb.CheckConsistencyResponse_Result
curRow roachpb.CheckConsistencyResponse_Result

// The descriptors for which we haven't yet emitted rows. Rows are consumed
// from this field and produce one (or more, in the case of splits not reflected
// in the descriptor) rows in `next`.
descs []roachpb.RangeDescriptor
// The current row, emitted by Values().
cur roachpb.CheckConsistencyResponse_Result
// The time it took to produce the current row, i.e. how long it took to run
// the consistency check that produced the row. When a consistency check
// produces more than one row (i.e. after a split), all of the duration will
// be attributed to the first row.
dur time.Duration
// next are the potentially prefetched subsequent rows. This is usually empty
// (as one consistency check produces one result which immediately moves to
// `cur`) except when a descriptor we use doesn't reflect subsequent splits.
next []roachpb.CheckConsistencyResponse_Result
}

var _ eval.ValueGenerator = &checkConsistencyGenerator{}
Expand All @@ -1850,14 +1864,18 @@ func makeCheckConsistencyGenerator(
keyTo := roachpb.Key(*args[2].(*tree.DBytes))

if len(keyFrom) == 0 {
keyFrom = keys.LocalMax
// NB: you'd expect LocalMax here but when we go and call ScanMetaKVs, it
// would interpret LocalMax as Meta1Prefix and translate that to KeyMin,
// then fail on the scan. That method should really handle this better
// but also we should use IterateRangeDescriptors instead.
keyFrom = keys.Meta2Prefix
}
if len(keyTo) == 0 {
keyTo = roachpb.KeyMax
}

if bytes.Compare(keyFrom, keys.LocalMax) < 0 {
return nil, errors.Errorf("start key must be >= %q", []byte(keys.LocalMax))
if bytes.Compare(keyFrom, keys.LocalMax) <= 0 {
return nil, errors.Errorf("start key must be > %q", []byte(keys.LocalMax))
}
if bytes.Compare(keyTo, roachpb.KeyMax) > 0 {
return nil, errors.Errorf("end key must be < %q", []byte(roachpb.KeyMax))
Expand All @@ -1872,6 +1890,7 @@ func makeCheckConsistencyGenerator(
}

return &checkConsistencyGenerator{
txn: ctx.Txn,
consistencyChecker: ctx.ConsistencyChecker,
from: keyFrom,
to: keyTo,
Expand All @@ -1880,8 +1899,8 @@ func makeCheckConsistencyGenerator(
}

var checkConsistencyGeneratorType = types.MakeLabeledTuple(
[]*types.T{types.Int, types.Bytes, types.String, types.String, types.String},
[]string{"range_id", "start_key", "start_key_pretty", "status", "detail"},
[]*types.T{types.Int, types.Bytes, types.String, types.String, types.String, types.Interval},
[]string{"range_id", "start_key", "start_key_pretty", "status", "detail", "duration"},
)

// ResolvedType is part of the tree.ValueGenerator interface.
Expand All @@ -1891,32 +1910,94 @@ func (*checkConsistencyGenerator) ResolvedType() *types.T {

// Start is part of the tree.ValueGenerator interface.
func (c *checkConsistencyGenerator) Start(ctx context.Context, _ *kv.Txn) error {
resp, err := c.consistencyChecker.CheckConsistency(ctx, c.from, c.to, c.mode)
span := roachpb.Span{Key: c.from, EndKey: c.to}
// NB: should use IterateRangeDescriptors here which is in the 'upgrade'
// package to avoid pulling all into memory. That needs a refactor, though.
// kvprober also has some code to iterate in batches.
descs, err := kvclient.ScanMetaKVs(ctx, c.txn, span)
if err != nil {
return err
}
c.remainingRows = resp.Result
for _, v := range descs {
var desc roachpb.RangeDescriptor
if err := v.ValueProto(&desc); err != nil {
return err
}
if len(desc.StartKey) == 0 {
desc.StartKey = keys.MustAddr(keys.LocalMax)
// Elide potential second copy we might be getting for r1
// if meta1 and meta2 haven't split.
// This too should no longer be necessary with IterateRangeDescriptors.
if len(c.descs) == 1 {
continue
}
}
c.descs = append(c.descs, desc)
}
return nil
}

// maybeRefillRows checks whether c.next is empty and if so, consumes the first
// element of c.descs for a consistency check. This populates c.next with at
// least one result (even on error). Returns the duration of the consistency
// check, if any, and zero otherwise.
func (c *checkConsistencyGenerator) maybeRefillRows(ctx context.Context) time.Duration {
if len(c.next) > 0 || len(c.descs) == 0 {
// We have a row to produce or no more ranges to check, so we're done
// for now or for good, respectively.
return 0
}
tBegin := timeutil.Now()
// NB: peeling off the spans one by one allows this generator to produce
// rows in a streaming manner. If we called CheckConsistency(c.from, c.to)
// we would only get the result once all checks have completed and it will
// generally be a lot more brittle since an error will completely wipe out
// the result set.
desc := c.descs[0]
c.descs = c.descs[1:]
resp, err := c.consistencyChecker.CheckConsistency(
ctx, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey(), c.mode,
)
if err != nil {
resp = &roachpb.CheckConsistencyResponse{Result: []roachpb.CheckConsistencyResponse_Result{{
RangeID: desc.RangeID,
StartKey: desc.StartKey,
Status: roachpb.CheckConsistencyResponse_RANGE_INDETERMINATE,
Detail: err.Error(),
}}}
}

// NB: this could have more than one entry, if a range split in the
// meantime.
c.next = resp.Result
return timeutil.Since(tBegin)
}

// Next is part of the tree.ValueGenerator interface.
func (c *checkConsistencyGenerator) Next(_ context.Context) (bool, error) {
if len(c.remainingRows) == 0 {
func (c *checkConsistencyGenerator) Next(ctx context.Context) (bool, error) {
tbg marked this conversation as resolved.
Show resolved Hide resolved
dur := c.maybeRefillRows(ctx)
if len(c.next) == 0 {
return false, nil
}
c.curRow = c.remainingRows[0]
c.remainingRows = c.remainingRows[1:]
c.dur, c.cur, c.next = dur, c.next[0], c.next[1:]
return true, nil
}

// Values is part of the tree.ValueGenerator interface.
func (c *checkConsistencyGenerator) Values() (tree.Datums, error) {
row := c.cur
intervalMeta := types.IntervalTypeMetadata{
DurationField: types.IntervalDurationField{
DurationType: types.IntervalDurationType_MILLISECOND,
},
}
return tree.Datums{
tree.NewDInt(tree.DInt(c.curRow.RangeID)),
tree.NewDBytes(tree.DBytes(c.curRow.StartKey)),
tree.NewDString(roachpb.Key(c.curRow.StartKey).String()),
tree.NewDString(c.curRow.Status.String()),
tree.NewDString(c.curRow.Detail),
tree.NewDInt(tree.DInt(row.RangeID)),
tree.NewDBytes(tree.DBytes(row.StartKey)),
tree.NewDString(roachpb.Key(row.StartKey).String()),
tree.NewDString(row.Status.String()),
tree.NewDString(row.Detail),
tree.NewDInterval(duration.MakeDuration(c.dur.Nanoseconds(), 0 /* days */, 0 /* months */), intervalMeta),
}, nil
}

Expand Down