diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index 1ff021bbeba8..62741040081e 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -288,7 +288,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err var ttlSettings catpb.RowLevelTTL var pkColumns []string var pkTypes []*types.T - var pkDirs []descpb.IndexDescriptor_Direction var name string var rangeSpan roachpb.Span if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -318,7 +317,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err } pkTypes = append(pkTypes, col.GetType()) } - pkDirs = desc.GetPrimaryIndex().IndexDesc().KeyColumnDirections ttl := desc.GetRowLevelTTL() if ttl == nil { @@ -485,11 +483,11 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err } rangeSpan.Key = rangeDesc.EndKey.AsRawKey() var nextRange rangeToProcess - nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc) + nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, &alloc) if err != nil { return err } - nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc) + nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, &alloc) if err != nil { return err } @@ -723,33 +721,45 @@ func runTTLOnRange( // keyToDatums translates a RKey on a range for a table to the appropriate datums. func keyToDatums( - key roachpb.RKey, - codec keys.SQLCodec, - pkTypes []*types.T, - pkDirs []descpb.IndexDescriptor_Direction, - alloc *tree.DatumAlloc, + key roachpb.RKey, codec keys.SQLCodec, pkTypes []*types.T, alloc *tree.DatumAlloc, ) (tree.Datums, error) { + rKey := key.AsRawKey() + // If any of these errors, that means we reached an "empty" key, which // symbolizes the start or end of a range. - if _, _, err := codec.DecodeTablePrefix(key.AsRawKey()); err != nil { + if _, _, err := codec.DecodeTablePrefix(rKey); err != nil { return nil, nil //nolint:returnerrcheck } - if _, _, _, err := codec.DecodeIndexPrefix(key.AsRawKey()); err != nil { + if _, _, _, err := codec.DecodeIndexPrefix(rKey); err != nil { return nil, nil //nolint:returnerrcheck } - encDatums := make([]rowenc.EncDatum, len(pkTypes)) - if _, foundNull, err := rowenc.DecodeIndexKey( - codec, - pkTypes, - encDatums, - pkDirs, - key.AsRawKey(), - ); err != nil { + + // Decode the datums ourselves, instead of using rowenc.DecodeKeyVals. + // We cannot use rowenc.DecodeKeyVals because we may not have the entire PK + // as the key for the range (e.g. a PK (a, b) may only be split on (a)). + rKey, err := codec.StripTenantPrefix(key.AsRawKey()) + if err != nil { + return nil, err + } + rKey, _, _, err = rowenc.DecodePartialTableIDIndexID(key) + if err != nil { return nil, err - } else if foundNull { - return nil, nil } - datums := make(tree.Datums, len(pkTypes)) + encDatums := make([]rowenc.EncDatum, 0, len(pkTypes)) + for len(rKey) > 0 && len(encDatums) < len(pkTypes) { + i := len(encDatums) + // We currently assume all PRIMARY KEY columns are ascending, and block + // creation otherwise. + enc := descpb.DatumEncoding_ASCENDING_KEY + var val rowenc.EncDatum + val, rKey, err = rowenc.EncDatumFromBuffer(pkTypes[i], enc, rKey) + if err != nil { + return nil, err + } + encDatums = append(encDatums, val) + } + + datums := make(tree.Datums, len(encDatums)) for i, encDatum := range encDatums { if err := encDatum.EnsureDecoded(pkTypes[i], alloc); err != nil { return nil, err diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder.go b/pkg/sql/ttl/ttljob/ttljob_query_builder.go index 81779b1ea605..678fb71e8313 100644 --- a/pkg/sql/ttl/ttljob/ttljob_query_builder.go +++ b/pkg/sql/ttl/ttljob/ttljob_query_builder.go @@ -44,6 +44,10 @@ type selectQueryBuilder struct { // cachedArgs keeps a cache of args to use in the run query. // The cache is of form [cutoff, , ]. cachedArgs []interface{} + // pkColumnNamesSQL caches the column names of the PK. + pkColumnNamesSQL string + // endPKColumnNamesSQL caches the column names of the ending PK. + endPKColumnNamesSQL string } func makeSelectQueryBuilder( @@ -56,13 +60,13 @@ func makeSelectQueryBuilder( ) selectQueryBuilder { // We will have a maximum of 1 + len(pkColumns)*2 columns, where one // is reserved for AOST, and len(pkColumns) for both start and end key. - argsCache := make([]interface{}, 0, 1+len(pkColumns)*2) - argsCache = append(argsCache, cutoff) + cachedArgs := make([]interface{}, 0, 1+len(pkColumns)*2) + cachedArgs = append(cachedArgs, cutoff) for _, d := range endPK { - argsCache = append(argsCache, d) + cachedArgs = append(cachedArgs, d) } for _, d := range startPK { - argsCache = append(argsCache, d) + cachedArgs = append(cachedArgs, d) } return selectQueryBuilder{ @@ -73,21 +77,21 @@ func makeSelectQueryBuilder( aost: aost, selectBatchSize: selectBatchSize, - cachedArgs: argsCache, - isFirst: true, + cachedArgs: cachedArgs, + isFirst: true, + pkColumnNamesSQL: makeColumnNamesSQL(pkColumns), + endPKColumnNamesSQL: makeColumnNamesSQL(pkColumns[:len(endPK)]), } } func (b *selectQueryBuilder) buildQuery() string { - columnNamesSQL := makeColumnNamesSQL(b.pkColumns) - // Generate the end key clause for SELECT, which always stays the same. // Start from $2 as $1 is for the now clause. // The end key of a range is exclusive, so use <. var endFilterClause string if len(b.endPK) > 0 { - endFilterClause = fmt.Sprintf(" AND (%s) < (", columnNamesSQL) - for i := range b.pkColumns { + endFilterClause = fmt.Sprintf(" AND (%s) < (", b.endPKColumnNamesSQL) + for i := range b.endPK { if i > 0 { endFilterClause += ", " } @@ -99,7 +103,7 @@ func (b *selectQueryBuilder) buildQuery() string { var filterClause string if !b.isFirst { // After the first query, we always want (col1, ...) > (cursor_col_1, ...) - filterClause = fmt.Sprintf(" AND (%s) > (", columnNamesSQL) + filterClause = fmt.Sprintf(" AND (%s) > (", b.pkColumnNamesSQL) for i := range b.pkColumns { if i > 0 { filterClause += ", " @@ -111,8 +115,8 @@ func (b *selectQueryBuilder) buildQuery() string { filterClause += ")" } else if len(b.startPK) > 0 { // For the the first query, we want (col1, ...) >= (cursor_col_1, ...) - filterClause = fmt.Sprintf(" AND (%s) >= (", columnNamesSQL) - for i := range b.pkColumns { + filterClause = fmt.Sprintf(" AND (%s) >= (", makeColumnNamesSQL(b.pkColumns[:len(b.startPK)])) + for i := range b.startPK { if i > 0 { filterClause += ", " } @@ -129,7 +133,7 @@ AS OF SYSTEM TIME %[3]s WHERE crdb_internal_expiration <= $1%[4]s%[5]s ORDER BY %[1]s LIMIT %[6]d`, - columnNamesSQL, + b.pkColumnNamesSQL, b.tableID, b.aost.String(), filterClause, @@ -214,15 +218,15 @@ type deleteQueryBuilder struct { func makeDeleteQueryBuilder( tableID descpb.ID, cutoff time.Time, pkColumns []string, deleteBatchSize int, ) deleteQueryBuilder { - argsCache := make([]interface{}, 0, 1+len(pkColumns)*deleteBatchSize) - argsCache = append(argsCache, cutoff) + cachedArgs := make([]interface{}, 0, 1+len(pkColumns)*deleteBatchSize) + cachedArgs = append(cachedArgs, cutoff) return deleteQueryBuilder{ tableID: tableID, pkColumns: pkColumns, deleteBatchSize: deleteBatchSize, - cachedArgs: argsCache, + cachedArgs: cachedArgs, } } diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go index 04e30341135b..1bf0074a7741 100644 --- a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go @@ -152,6 +152,65 @@ LIMIT 2`, }, }, }, + { + desc: "one range, but a partial startPK and endPK split", + b: makeSelectQueryBuilder( + 1, + mockTime, + []string{"col1", "col2"}, + tree.Datums{tree.NewDInt(100)}, + tree.Datums{tree.NewDInt(181)}, + *mockTimestampTZ, + 2, + ), + iterations: []iteration{ + { + expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] +AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' +WHERE crdb_internal_expiration <= $1 AND (col1) >= ($3) AND (col1) < ($2) +ORDER BY col1, col2 +LIMIT 2`, + expectedArgs: []interface{}{ + mockTime, + tree.NewDInt(181), + tree.NewDInt(100), + }, + rows: []tree.Datums{ + {tree.NewDInt(100), tree.NewDInt(12)}, + {tree.NewDInt(105), tree.NewDInt(12)}, + }, + }, + { + expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] +AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' +WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($3, $4) AND (col1) < ($2) +ORDER BY col1, col2 +LIMIT 2`, + expectedArgs: []interface{}{ + mockTime, + tree.NewDInt(181), + tree.NewDInt(105), tree.NewDInt(12), + }, + rows: []tree.Datums{ + {tree.NewDInt(112), tree.NewDInt(19)}, + {tree.NewDInt(180), tree.NewDInt(132)}, + }, + }, + { + expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] +AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' +WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($3, $4) AND (col1) < ($2) +ORDER BY col1, col2 +LIMIT 2`, + expectedArgs: []interface{}{ + mockTime, + tree.NewDInt(181), + tree.NewDInt(180), tree.NewDInt(132), + }, + rows: []tree.Datums{}, + }, + }, + }, { desc: "first range", b: makeSelectQueryBuilder( diff --git a/pkg/sql/ttl/ttljob/ttljob_test.go b/pkg/sql/ttl/ttljob/ttljob_test.go index ab7f18b21ac9..007a639e3495 100644 --- a/pkg/sql/ttl/ttljob/ttljob_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_test.go @@ -498,7 +498,10 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) { for i := 0; i < tc.numSplits; i++ { var values []interface{} var placeholders []string - for idx := 0; idx < tbDesc.GetPrimaryIndex().NumKeyColumns(); idx++ { + + // Note we can split a PRIMARY KEY partially. + numKeyCols := 1 + rng.Intn(tbDesc.GetPrimaryIndex().NumKeyColumns()) + for idx := 0; idx < numKeyCols; idx++ { col, err := tbDesc.FindColumnWithID(tbDesc.GetPrimaryIndex().GetKeyColumnID(idx)) require.NoError(t, err) placeholders = append(placeholders, fmt.Sprintf("$%d", idx+1))