Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: ensure each DistSQL processor only deletes its own data #86057

Merged
merged 1 commit into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,26 @@ message RowLevelTTLDetails {
}

message RowLevelTTLProgress {
int64 row_count = 1;
// JobRowCount is the number of deleted rows for the entire TTL job.
int64 job_row_count = 1;
// ProcessorProgresses is the progress per DistSQL processor.
repeated RowLevelTTLProcessorProgress processor_progresses = 2 [(gogoproto.nullable)=false];
}

message RowLevelTTLProcessorProgress {

// ProcessorID is the ID of the DistSQL processor.
int32 processor_id = 1 [(gogoproto.customname) = "ProcessorID"];

// SQLInstanceID is the instance ID of the DistSQL processor.
int32 sql_instance_id = 2 [
(gogoproto.customname) = "SQLInstanceID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID",
(gogoproto.nullable) = false
];

// ProcessorRowCount is the row count of the DistSQL processor.
int64 processor_row_count = 3;
}

message SchemaTelemetryDetails {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ go_library(
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/server/telemetry",
Expand Down Expand Up @@ -68,6 +67,7 @@ go_test(
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand Down
86 changes: 25 additions & 61 deletions pkg/sql/ttl/ttljob/ttljob_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -151,51 +150,18 @@ func (t *ttlProcessor) work(ctx context.Context) error {

// Iterate over every range to feed work for the goroutine processors.
var alloc tree.DatumAlloc
ri := kvcoord.MakeRangeIterator(serverCfg.DistSender)
for _, span := range ttlSpec.Spans {
rangeSpan := span
ri.Seek(ctx, roachpb.RKey(span.Key), kvcoord.Ascending)
for done := false; ri.Valid() && !done; ri.Next(ctx) {
// Send range info to each goroutine worker.
rangeDesc := ri.Desc()
var nextRange rangeToProcess
// A single range can contain multiple tables or indexes.
// If this is the case, the rangeDesc.StartKey would be less than span.Key
// or the rangeDesc.EndKey would be greater than the span.EndKey, meaning
// the range contains the start or the end of the range respectively.
// Trying to decode keys outside the PK range will lead to a decoding error.
// As such, only populate nextRange.startPK and nextRange.endPK if this is the case
// (by default, a 0 element startPK or endPK means the beginning or end).
if rangeDesc.StartKey.AsRawKey().Compare(span.Key) > 0 {
var err error
nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, codec, pkTypes, &alloc)
if err != nil {
return errors.Wrapf(
err,
"error decoding starting PRIMARY KEY for range ID %d (start key %x, table start key %x)",
rangeDesc.RangeID,
rangeDesc.StartKey.AsRawKey(),
span.Key,
)
}
}
if rangeDesc.EndKey.AsRawKey().Compare(span.EndKey) < 0 {
rangeSpan.Key = rangeDesc.EndKey.AsRawKey()
var err error
nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, codec, pkTypes, &alloc)
if err != nil {
return errors.Wrapf(
err,
"error decoding ending PRIMARY KEY for range ID %d (end key %x, table end key %x)",
rangeDesc.RangeID,
rangeDesc.EndKey.AsRawKey(),
span.EndKey,
)
}
} else {
done = true
}
rangeChan <- nextRange
startPK, err := keyToDatums(roachpb.RKey(span.Key), codec, pkTypes, &alloc)
if err != nil {
return err
}
endPK, err := keyToDatums(roachpb.RKey(span.EndKey), codec, pkTypes, &alloc)
if err != nil {
return err
}
rangeChan <- rangeToProcess{
startPK: startPK,
endPK: endPK,
}
}
return nil
Expand All @@ -217,14 +183,20 @@ func (t *ttlProcessor) work(ctx context.Context) error {
func(_ *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
progress := md.Progress
rowLevelTTL := progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL
existingRowCount := rowLevelTTL.RowCount
rowLevelTTL.RowCount += processorRowCount
rowLevelTTL.JobRowCount += processorRowCount
processorID := t.ProcessorID
sqlInstanceID := flowCtx.NodeID.SQLInstanceID()
rowLevelTTL.ProcessorProgresses = append(rowLevelTTL.ProcessorProgresses, jobspb.RowLevelTTLProcessorProgress{
ProcessorID: processorID,
SQLInstanceID: sqlInstanceID,
ProcessorRowCount: processorRowCount,
})
ju.UpdateProgress(progress)
log.VInfof(
ctx,
2, /* level */
"TTL processorRowCount updated jobID=%d processorID=%d tableID=%d existingRowCount=%d processorRowCount=%d progress=%s",
jobID, t.ProcessorID, details.TableID, existingRowCount, processorRowCount, progress,
"TTL processorRowCount updated jobID=%d processorID=%d sqlInstanceID=%d tableID=%d jobRowCount=%d processorRowCount=%d",
jobID, processorID, sqlInstanceID, details.TableID, rowLevelTTL.JobRowCount, processorRowCount,
)
return nil
},
Expand Down Expand Up @@ -369,27 +341,19 @@ func (t *ttlProcessor) runTTLOnRange(
func keyToDatums(
key roachpb.RKey, codec keys.SQLCodec, pkTypes []*types.T, alloc *tree.DatumAlloc,
) (tree.Datums, error) {
rKey := key.AsRawKey()

// If any of these errors, that means we reached an "empty" key, which
// symbolizes the start or end of a range.
if _, _, err := codec.DecodeTablePrefix(rKey); err != nil {
return nil, nil //nolint:returnerrcheck
}
if _, _, _, err := codec.DecodeIndexPrefix(rKey); err != nil {
return nil, nil //nolint:returnerrcheck
}
rKey := key.AsRawKey()

// Decode the datums ourselves, instead of using rowenc.DecodeKeyVals.
// We cannot use rowenc.DecodeKeyVals because we may not have the entire PK
// as the key for the range (e.g. a PK (a, b) may only be split on (a)).
rKey, err := codec.StripTenantPrefix(key.AsRawKey())
rKey, err := codec.StripTenantPrefix(rKey)
if err != nil {
return nil, errors.Wrapf(err, "error decoding tenant prefix of %x", key)
}
rKey, _, _, err = rowenc.DecodePartialTableIDIndexID(key)
rKey, _, _, err = rowenc.DecodePartialTableIDIndexID(rKey)
if err != nil {
return nil, errors.Wrapf(err, "error decoding table/index ID of %x", key)
return nil, errors.Wrapf(err, "error decoding table/index ID of key=%x", key)
}
encDatums := make([]rowenc.EncDatum, 0, len(pkTypes))
for len(rKey) > 0 && len(encDatums) < len(pkTypes) {
Expand Down
Loading