Skip to content

Commit

Permalink
Merge #78163
Browse files Browse the repository at this point in the history
78163: ttljob: fix range decoding bug r=rafiss a=otan

Previously, when a range was split by a prefix of a PRIMARY KEY, the TTL
job would fail. This PR rectifies that.

Resolves #78162
Release note: None

Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
craig[bot] and otan committed Mar 23, 2022
2 parents 22e20cb + c059726 commit 83898e0
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 40 deletions.
54 changes: 32 additions & 22 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
var ttlSettings catpb.RowLevelTTL
var pkColumns []string
var pkTypes []*types.T
var pkDirs []descpb.IndexDescriptor_Direction
var name string
var rangeSpan roachpb.Span
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand Down Expand Up @@ -318,7 +317,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
}
pkTypes = append(pkTypes, col.GetType())
}
pkDirs = desc.GetPrimaryIndex().IndexDesc().KeyColumnDirections

ttl := desc.GetRowLevelTTL()
if ttl == nil {
Expand Down Expand Up @@ -485,11 +483,11 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
}
rangeSpan.Key = rangeDesc.EndKey.AsRawKey()
var nextRange rangeToProcess
nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, &alloc)
if err != nil {
return err
}
nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, &alloc)
if err != nil {
return err
}
Expand Down Expand Up @@ -723,33 +721,45 @@ func runTTLOnRange(

// keyToDatums translates a RKey on a range for a table to the appropriate datums.
func keyToDatums(
key roachpb.RKey,
codec keys.SQLCodec,
pkTypes []*types.T,
pkDirs []descpb.IndexDescriptor_Direction,
alloc *tree.DatumAlloc,
key roachpb.RKey, codec keys.SQLCodec, pkTypes []*types.T, alloc *tree.DatumAlloc,
) (tree.Datums, error) {
rKey := key.AsRawKey()

// If any of these errors, that means we reached an "empty" key, which
// symbolizes the start or end of a range.
if _, _, err := codec.DecodeTablePrefix(key.AsRawKey()); err != nil {
if _, _, err := codec.DecodeTablePrefix(rKey); err != nil {
return nil, nil //nolint:returnerrcheck
}
if _, _, _, err := codec.DecodeIndexPrefix(key.AsRawKey()); err != nil {
if _, _, _, err := codec.DecodeIndexPrefix(rKey); err != nil {
return nil, nil //nolint:returnerrcheck
}
encDatums := make([]rowenc.EncDatum, len(pkTypes))
if _, foundNull, err := rowenc.DecodeIndexKey(
codec,
pkTypes,
encDatums,
pkDirs,
key.AsRawKey(),
); err != nil {

// Decode the datums ourselves, instead of using rowenc.DecodeKeyVals.
// We cannot use rowenc.DecodeKeyVals because we may not have the entire PK
// as the key for the range (e.g. a PK (a, b) may only be split on (a)).
rKey, err := codec.StripTenantPrefix(key.AsRawKey())
if err != nil {
return nil, err
}
rKey, _, _, err = rowenc.DecodePartialTableIDIndexID(key)
if err != nil {
return nil, err
} else if foundNull {
return nil, nil
}
datums := make(tree.Datums, len(pkTypes))
encDatums := make([]rowenc.EncDatum, 0, len(pkTypes))
for len(rKey) > 0 && len(encDatums) < len(pkTypes) {
i := len(encDatums)
// We currently assume all PRIMARY KEY columns are ascending, and block
// creation otherwise.
enc := descpb.DatumEncoding_ASCENDING_KEY
var val rowenc.EncDatum
val, rKey, err = rowenc.EncDatumFromBuffer(pkTypes[i], enc, rKey)
if err != nil {
return nil, err
}
encDatums = append(encDatums, val)
}

datums := make(tree.Datums, len(encDatums))
for i, encDatum := range encDatums {
if err := encDatum.EnsureDecoded(pkTypes[i], alloc); err != nil {
return nil, err
Expand Down
38 changes: 21 additions & 17 deletions pkg/sql/ttl/ttljob/ttljob_query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type selectQueryBuilder struct {
// cachedArgs keeps a cache of args to use in the run query.
// The cache is of form [cutoff, <endFilterClause...>, <startFilterClause..>].
cachedArgs []interface{}
// pkColumnNamesSQL caches the column names of the PK.
pkColumnNamesSQL string
// endPKColumnNamesSQL caches the column names of the ending PK.
endPKColumnNamesSQL string
}

func makeSelectQueryBuilder(
Expand All @@ -56,13 +60,13 @@ func makeSelectQueryBuilder(
) selectQueryBuilder {
// We will have a maximum of 1 + len(pkColumns)*2 columns, where one
// is reserved for AOST, and len(pkColumns) for both start and end key.
argsCache := make([]interface{}, 0, 1+len(pkColumns)*2)
argsCache = append(argsCache, cutoff)
cachedArgs := make([]interface{}, 0, 1+len(pkColumns)*2)
cachedArgs = append(cachedArgs, cutoff)
for _, d := range endPK {
argsCache = append(argsCache, d)
cachedArgs = append(cachedArgs, d)
}
for _, d := range startPK {
argsCache = append(argsCache, d)
cachedArgs = append(cachedArgs, d)
}

return selectQueryBuilder{
Expand All @@ -73,21 +77,21 @@ func makeSelectQueryBuilder(
aost: aost,
selectBatchSize: selectBatchSize,

cachedArgs: argsCache,
isFirst: true,
cachedArgs: cachedArgs,
isFirst: true,
pkColumnNamesSQL: makeColumnNamesSQL(pkColumns),
endPKColumnNamesSQL: makeColumnNamesSQL(pkColumns[:len(endPK)]),
}
}

func (b *selectQueryBuilder) buildQuery() string {
columnNamesSQL := makeColumnNamesSQL(b.pkColumns)

// Generate the end key clause for SELECT, which always stays the same.
// Start from $2 as $1 is for the now clause.
// The end key of a range is exclusive, so use <.
var endFilterClause string
if len(b.endPK) > 0 {
endFilterClause = fmt.Sprintf(" AND (%s) < (", columnNamesSQL)
for i := range b.pkColumns {
endFilterClause = fmt.Sprintf(" AND (%s) < (", b.endPKColumnNamesSQL)
for i := range b.endPK {
if i > 0 {
endFilterClause += ", "
}
Expand All @@ -99,7 +103,7 @@ func (b *selectQueryBuilder) buildQuery() string {
var filterClause string
if !b.isFirst {
// After the first query, we always want (col1, ...) > (cursor_col_1, ...)
filterClause = fmt.Sprintf(" AND (%s) > (", columnNamesSQL)
filterClause = fmt.Sprintf(" AND (%s) > (", b.pkColumnNamesSQL)
for i := range b.pkColumns {
if i > 0 {
filterClause += ", "
Expand All @@ -111,8 +115,8 @@ func (b *selectQueryBuilder) buildQuery() string {
filterClause += ")"
} else if len(b.startPK) > 0 {
// For the the first query, we want (col1, ...) >= (cursor_col_1, ...)
filterClause = fmt.Sprintf(" AND (%s) >= (", columnNamesSQL)
for i := range b.pkColumns {
filterClause = fmt.Sprintf(" AND (%s) >= (", makeColumnNamesSQL(b.pkColumns[:len(b.startPK)]))
for i := range b.startPK {
if i > 0 {
filterClause += ", "
}
Expand All @@ -129,7 +133,7 @@ AS OF SYSTEM TIME %[3]s
WHERE crdb_internal_expiration <= $1%[4]s%[5]s
ORDER BY %[1]s
LIMIT %[6]d`,
columnNamesSQL,
b.pkColumnNamesSQL,
b.tableID,
b.aost.String(),
filterClause,
Expand Down Expand Up @@ -214,15 +218,15 @@ type deleteQueryBuilder struct {
func makeDeleteQueryBuilder(
tableID descpb.ID, cutoff time.Time, pkColumns []string, deleteBatchSize int,
) deleteQueryBuilder {
argsCache := make([]interface{}, 0, 1+len(pkColumns)*deleteBatchSize)
argsCache = append(argsCache, cutoff)
cachedArgs := make([]interface{}, 0, 1+len(pkColumns)*deleteBatchSize)
cachedArgs = append(cachedArgs, cutoff)

return deleteQueryBuilder{
tableID: tableID,
pkColumns: pkColumns,
deleteBatchSize: deleteBatchSize,

cachedArgs: argsCache,
cachedArgs: cachedArgs,
}
}

Expand Down
59 changes: 59 additions & 0 deletions pkg/sql/ttl/ttljob/ttljob_query_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,65 @@ LIMIT 2`,
},
},
},
{
desc: "one range, but a partial startPK and endPK split",
b: makeSelectQueryBuilder(
1,
mockTime,
[]string{"col1", "col2"},
tree.Datums{tree.NewDInt(100)},
tree.Datums{tree.NewDInt(181)},
*mockTimestampTZ,
2,
),
iterations: []iteration{
{
expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name]
AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00'
WHERE crdb_internal_expiration <= $1 AND (col1) >= ($3) AND (col1) < ($2)
ORDER BY col1, col2
LIMIT 2`,
expectedArgs: []interface{}{
mockTime,
tree.NewDInt(181),
tree.NewDInt(100),
},
rows: []tree.Datums{
{tree.NewDInt(100), tree.NewDInt(12)},
{tree.NewDInt(105), tree.NewDInt(12)},
},
},
{
expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name]
AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00'
WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($3, $4) AND (col1) < ($2)
ORDER BY col1, col2
LIMIT 2`,
expectedArgs: []interface{}{
mockTime,
tree.NewDInt(181),
tree.NewDInt(105), tree.NewDInt(12),
},
rows: []tree.Datums{
{tree.NewDInt(112), tree.NewDInt(19)},
{tree.NewDInt(180), tree.NewDInt(132)},
},
},
{
expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name]
AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00'
WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($3, $4) AND (col1) < ($2)
ORDER BY col1, col2
LIMIT 2`,
expectedArgs: []interface{}{
mockTime,
tree.NewDInt(181),
tree.NewDInt(180), tree.NewDInt(132),
},
rows: []tree.Datums{},
},
},
},
{
desc: "first range",
b: makeSelectQueryBuilder(
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/ttl/ttljob/ttljob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,10 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
for i := 0; i < tc.numSplits; i++ {
var values []interface{}
var placeholders []string
for idx := 0; idx < tbDesc.GetPrimaryIndex().NumKeyColumns(); idx++ {

// Note we can split a PRIMARY KEY partially.
numKeyCols := 1 + rng.Intn(tbDesc.GetPrimaryIndex().NumKeyColumns())
for idx := 0; idx < numKeyCols; idx++ {
col, err := tbDesc.FindColumnWithID(tbDesc.GetPrimaryIndex().GetKeyColumnID(idx))
require.NoError(t, err)
placeholders = append(placeholders, fmt.Sprintf("$%d", idx+1))
Expand Down

0 comments on commit 83898e0

Please sign in to comment.