Skip to content

Commit

Permalink
ttljob: fix a range edge case
Browse files Browse the repository at this point in the history
Release note (bug fix): Fix a bug in row-level TTL where the last range
key of a table may overlap with a separate table or index, resulting in
a "error decoding X bytes" error message when performing row-level TTL.
  • Loading branch information
otan committed May 13, 2022
1 parent b78f991 commit 036f3f5
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 11 deletions.
45 changes: 35 additions & 10 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,11 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err

var initialVersion descpb.DescriptorVersion

// TODO(#75428): feature flag check, ttl pause check.
var ttlSettings catpb.RowLevelTTL
var pkColumns []string
var pkTypes []*types.T
var relationName string
var rangeSpan roachpb.Span
var rangeSpan, entirePKSpan roachpb.Span
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
desc, err := descsCol.GetImmutableTableByID(
ctx,
Expand Down Expand Up @@ -333,7 +332,8 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
}

relationName = tn.FQString()
rangeSpan = desc.TableSpan(p.ExecCfg().Codec)
entirePKSpan = desc.IndexSpan(p.ExecCfg().Codec, desc.GetPrimaryIndex().GetID())
rangeSpan = entirePKSpan
ttlSettings = *ttl
return nil
}); err != nil {
Expand Down Expand Up @@ -459,15 +459,40 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
if err := r.ValueProto(&rangeDesc); err != nil {
return err
}
rangeSpan.Key = rangeDesc.EndKey.AsRawKey()
var nextRange rangeToProcess
nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, &alloc)
if err != nil {
return errors.Wrapf(err, "error decoding starting PRIMARY KEY for range ID %d", rangeDesc.RangeID)
// A single range can contain multiple tables or indexes.
// If this is the case, the rangeDesc.StartKey would be less than entirePKSpan.Key
// or the rangeDesc.EndKey would be greater than the entirePKSpan.EndKey, meaning
// the range contains the start or the end of the range respectively.
// Trying to decode keys outside the PK range will lead to a decoding error.
// As such, only populate nextRange.startPK and nextRange.endPK if this is the case
// (by default, a 0 element startPK or endPK means the beginning or end).
if rangeDesc.StartKey.AsRawKey().Compare(entirePKSpan.Key) > 0 {
nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, &alloc)
if err != nil {
return errors.Wrapf(
err,
"error decoding starting PRIMARY KEY for range ID %d (start key %x, table start key %x)",
rangeDesc.RangeID,
rangeDesc.StartKey.AsRawKey(),
entirePKSpan.Key,
)
}
}
nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, &alloc)
if err != nil {
return errors.Wrapf(err, "error decoding ending PRIMARY KEY for range ID %d", rangeDesc.RangeID)
if rangeDesc.EndKey.AsRawKey().Compare(entirePKSpan.EndKey) < 0 {
rangeSpan.Key = rangeDesc.EndKey.AsRawKey()
nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, &alloc)
if err != nil {
return errors.Wrapf(
err,
"error decoding ending PRIMARY KEY for range ID %d (end key %x, table end key %x)",
rangeDesc.RangeID,
rangeDesc.EndKey.AsRawKey(),
entirePKSpan.EndKey,
)
}
} else {
done = true
}
ch <- nextRange
}
Expand Down
34 changes: 33 additions & 1 deletion pkg/sql/ttl/ttljob/ttljob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
type testCase struct {
desc string
createTable string
preSetup []string
postSetup []string
numExpiredRows int
numNonExpiredRows int
numSplits int
Expand All @@ -316,6 +318,23 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
numExpiredRows: 1001,
numNonExpiredRows: 5,
},
{
desc: "one column pk, table ranges overlap",
createTable: `CREATE TABLE tbl (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
text TEXT
) WITH (ttl_expire_after = '30 days')`,
preSetup: []string{
`CREATE TABLE tbm (id INT PRIMARY KEY)`,
`ALTER TABLE tbm SPLIT AT VALUES (1)`,
},
postSetup: []string{
`CREATE TABLE tbl2 (id INT PRIMARY KEY)`,
`ALTER TABLE tbl2 SPLIT AT VALUES (1)`,
},
numExpiredRows: 1001,
numNonExpiredRows: 5,
},
{
desc: "one column pk with statistics",
createTable: `CREATE TABLE tbl (
Expand Down Expand Up @@ -388,9 +407,12 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
other_col INT,
"quote-kw-col" TIMESTAMPTZ,
text TEXT,
INDEX (text),
INDEX text_idx (text),
PRIMARY KEY (id, other_col, "quote-kw-col")
) WITH (ttl_expire_after = '30 days', ttl_select_batch_size = 50, ttl_delete_batch_size = 10, ttl_range_concurrency = 3)`,
postSetup: []string{
`ALTER INDEX tbl@text_idx SPLIT AT VALUES ('bob')`,
},
numExpiredRows: 1001,
numNonExpiredRows: 5,
numSplits: 10,
Expand Down Expand Up @@ -440,6 +462,11 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
rangeBatchSize := 1 + rng.Intn(3)
t.Logf("range batch size: %d", rangeBatchSize)

for _, stmt := range tc.preSetup {
t.Logf("running pre statement: %s", stmt)
th.sqlDB.Exec(t, stmt)
}

th.sqlDB.Exec(t, tc.createTable)
th.sqlDB.Exec(t, `SET CLUSTER SETTING sql.ttl.range_batch_size = $1`, rangeBatchSize)

Expand Down Expand Up @@ -529,6 +556,11 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
addRow(timeutil.Now().Add(time.Hour * 24 * 30))
}

for _, stmt := range tc.postSetup {
t.Logf("running post statement: %s", stmt)
th.sqlDB.Exec(t, stmt)
}

// Force the schedule to execute.
th.env.SetTime(timeutil.Now().Add(time.Hour * 24))
require.NoError(t, th.executeSchedules())
Expand Down

0 comments on commit 036f3f5

Please sign in to comment.