Skip to content

Commit

Permalink
Merge #52235
Browse files Browse the repository at this point in the history
52235: sql: change table truncation to create new indexes rather than new ID r=rohany a=rohany

Fixes #52170.

This commit updates the implementation of table truncation to create a
new set of indexes for the table, rather than creating a new table ID
for the table's data.

Release note: None

Co-authored-by: Rohan Yadav <[email protected]>
  • Loading branch information
craig[bot] and rohany committed Aug 13, 2020
2 parents 04f3329 + d07b25d commit 673ca44
Show file tree
Hide file tree
Showing 22 changed files with 955 additions and 748 deletions.
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1321,13 +1321,13 @@ func TestChangefeedTruncateRenameDrop(t *testing.T) {
assertPayloads(t, truncate, []string{`truncate: [1]->{"after": {"a": 1}}`})
assertPayloads(t, truncateCascade, []string{`truncate_cascade: [1]->{"after": {"b": 1}}`})
sqlDB.Exec(t, `TRUNCATE TABLE truncate CASCADE`)
if _, err := truncate.Next(); !testutils.IsError(err, `"truncate" was dropped or truncated`) {
t.Errorf(`expected ""truncate" was dropped or truncated" error got: %+v`, err)
if _, err := truncate.Next(); !testutils.IsError(err, `"truncate" was truncated`) {
t.Fatalf(`expected ""truncate" was truncated" error got: %+v`, err)
}
if _, err := truncateCascade.Next(); !testutils.IsError(
err, `"truncate_cascade" was dropped or truncated`,
err, `"truncate_cascade" was truncated`,
) {
t.Errorf(`expected ""truncate_cascade" was dropped or truncated" error got: %+v`, err)
t.Fatalf(`expected ""truncate_cascade" was truncated" error got: %+v`, err)
}

sqlDB.Exec(t, `CREATE TABLE rename (a INT PRIMARY KEY)`)
Expand Down
23 changes: 23 additions & 0 deletions pkg/ccl/changefeedccl/schemafeed/table_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
tableEventTypeAddColumnNoBackfill
tableEventTypeAddColumnWithBackfill
tableEventTypeDropColumn
tableEventTruncate
)

var (
Expand Down Expand Up @@ -55,6 +56,8 @@ func classifyTableEvent(e TableEvent) tableEventType {
return tableEventTypeAddColumnNoBackfill
case hasNewColumnDropBackfillMutation(e):
return tableEventTypeDropColumn
case tableTruncated(e):
return tableEventTruncate
default:
return tableEventTypeUnknown
}
Expand All @@ -66,6 +69,10 @@ type tableEventFilter map[tableEventType]bool

func (b tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) (bool, error) {
et := classifyTableEvent(e)
// Truncation events are not ignored and return an error.
if et == tableEventTruncate {
return false, errors.Errorf(`"%s" was truncated`, e.Before.Name)
}
shouldFilter, ok := b[et]
if !ok {
return false, errors.AssertionFailedf("policy does not specify how to handle event type %v", et)
Expand Down Expand Up @@ -100,3 +107,19 @@ func newColumnNoBackfill(e TableEvent) (res bool) {
return len(e.Before.Columns) < len(e.After.Columns) &&
!e.Before.HasColumnBackfillMutation()
}

func pkChangeMutationExists(desc *sqlbase.ImmutableTableDescriptor) bool {
for _, m := range desc.Mutations {
if m.Direction == descpb.DescriptorMutation_ADD && m.GetPrimaryKeySwap() != nil {
return true
}
}
return false
}

func tableTruncated(e TableEvent) bool {
// A table was truncated if the primary index has changed, but an ALTER
// PRIMARY KEY statement was not performed. TRUNCATE operates by creating
// a new set of indexes for the table, including a new primary index.
return e.Before.PrimaryIndex.ID != e.After.PrimaryIndex.ID && !pkChangeMutationExists(e.Before)
}
840 changes: 472 additions & 368 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ message SchemaChangeGCDetails {
// Indexes to GC.
repeated DroppedIndex indexes = 1 [(gogoproto.nullable) = false];

// The below two fields are used only in the case of TRUNCATE operating on
// tables with interleaved indexes. They are only set together.

// InterleavedTable is the table being truncated. In particular, it is the
// TableDescriptor before any of the truncate modifications have been applied.
sqlbase.TableDescriptor interleaved_table = 4;
// InterleavedIndexes is the set of interleaved indexes to truncate.
repeated sqlbase.IndexDescriptor interleaved_indexes = 5 [(gogoproto.nullable) = false];

// Entire tables to GC.
repeated DroppedID tables = 2 [(gogoproto.nullable) = false];

Expand Down
73 changes: 56 additions & 17 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,59 @@ func (sc *SchemaChanger) getTableVersion(
return tableDesc, nil
}

// TruncateInterleavedIndexes truncates the input set of indexes from the given
// table. It is used in the schema change GC job to delete interleaved index
// data as part of a TRUNCATE statement. Note that we cannot use
// SchemaChanger.truncateIndexes instead because that accesses the most recent
// version of the table when deleting. In this case, we need to use the version
// of the table before truncation, which is passed in.
func TruncateInterleavedIndexes(
ctx context.Context,
execCfg *ExecutorConfig,
table *sqlbase.ImmutableTableDescriptor,
indexes []descpb.IndexDescriptor,
) error {
log.Infof(ctx, "truncating %d interleaved indexes", len(indexes))
chunkSize := int64(indexTruncateChunkSize)
alloc := &sqlbase.DatumAlloc{}
codec, db := execCfg.Codec, execCfg.DB
for _, desc := range indexes {
var resume roachpb.Span
for rowIdx, done := int64(0), false; !done; rowIdx += chunkSize {
log.VEventf(ctx, 2, "truncate interleaved index (%d) at row: %d, span: %s", table.ID, rowIdx, resume)
resumeAt := resume
// Make a new txn just to drop this chunk.
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
rd := row.MakeDeleter(codec, table, nil)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, nil /* *tree.EvalContext */); err != nil {
return err
}
resume, err := td.deleteIndex(
ctx,
&desc,
resumeAt,
chunkSize,
false, /* traceKV */
)
done = resume.Key == nil
return err
}); err != nil {
return err
}
}
// All the data chunks have been removed. Now also removed the
// zone configs for the dropped indexes, if any.
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return RemoveIndexZoneConfigs(ctx, txn, execCfg, table.ParentID, indexes)
}); err != nil {
return err
}
}
log.Infof(ctx, "finished truncating interleaved indexes")
return nil
}

// truncateIndexes truncate the KV ranges corresponding to dropped indexes.
//
// The indexes are dropped chunk by chunk, each chunk being deleted in
Expand Down Expand Up @@ -661,17 +714,7 @@ func (sc *SchemaChanger) truncateIndexes(
if err != nil {
return err
}
rd, err := row.MakeDeleter(
ctx,
txn,
sc.execCfg.Codec,
tableDesc,
nil,
alloc,
)
if err != nil {
return err
}
rd := row.MakeDeleter(sc.execCfg.Codec, tableDesc, nil)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, nil /* *tree.EvalContext */); err != nil {
return err
Expand Down Expand Up @@ -1817,16 +1860,12 @@ func indexTruncateInTxn(
alloc := &sqlbase.DatumAlloc{}
var sp roachpb.Span
for done := false; !done; done = sp.Key == nil {
rd, err := row.MakeDeleter(
ctx, txn, execCfg.Codec, tableDesc, nil, alloc,
)
if err != nil {
return err
}
rd := row.MakeDeleter(execCfg.Codec, tableDesc, nil)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, evalCtx); err != nil {
return err
}
var err error
sp, err = td.deleteIndex(
ctx, idx, sp, indexTruncateChunkSize, traceKV,
)
Expand Down
22 changes: 21 additions & 1 deletion pkg/sql/descriptor_mutation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,14 @@ CREATE INDEX allidx ON t.test (k, v);
func(t *testing.T) {

// Init table to start state.
mTest.Exec(t, `TRUNCATE TABLE t.test`)
if _, err := sqlDB.Exec(`
DROP TABLE t.test;
CREATE TABLE t.test (k VARCHAR PRIMARY KEY DEFAULT 'default', v VARCHAR, i VARCHAR DEFAULT 'i', FAMILY (k), FAMILY (v), FAMILY (i));
CREATE INDEX allidx ON t.test (k, v);
`); err != nil {
t.Fatal(err)
}

// read table descriptor
mTest.tableDesc = catalogkv.TestingGetMutableExistingTableDescriptor(
kvDB, keys.SystemSQLCodec, "t", "test")
Expand Down Expand Up @@ -524,6 +531,12 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR, INDEX foo (v));
if _, err := sqlDB.Exec(`TRUNCATE TABLE t.test`); err != nil {
t.Fatal(err)
}
if _, err := sqlDB.Exec(`
DROP TABLE t.test;
CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR, INDEX foo (v));
`); err != nil {
t.Fatal(err)
}
// read table descriptor
mTest.tableDesc = catalogkv.TestingGetMutableExistingTableDescriptor(
kvDB, keys.SystemSQLCodec, "t", "test")
Expand Down Expand Up @@ -687,6 +700,13 @@ CREATE INDEX allidx ON t.test (k, v);
continue
}
// Init table to start state.
if _, err := sqlDB.Exec(`
DROP TABLE t.test;
CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR, i CHAR, INDEX foo (i, v), FAMILY (k), FAMILY (v), FAMILY (i));
CREATE INDEX allidx ON t.test (k, v);
`); err != nil {
t.Fatal(err)
}
if _, err := sqlDB.Exec(`TRUNCATE TABLE t.test`); err != nil {
t.Fatal(err)
}
Expand Down
57 changes: 34 additions & 23 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,37 @@ func (p *planner) dropTableImpl(
return droppedViews, err
}

// unsplitRangesForTable unsplit any manually split ranges within the table span.
func (p *planner) unsplitRangesForTable(
ctx context.Context, tableDesc *sqlbase.MutableTableDescriptor,
) error {
// Gate this on being the system tenant because secondary tenants aren't
// allowed to scan the meta ranges directly.
if p.ExecCfg().Codec.ForSystemTenant() {
span := tableDesc.TableSpan(p.ExecCfg().Codec)
ranges, err := ScanMetaKVs(ctx, p.txn, span)
if err != nil {
return err
}
for _, r := range ranges {
var desc roachpb.RangeDescriptor
if err := r.ValueProto(&desc); err != nil {
return err
}
if (desc.GetStickyBit() != hlc.Timestamp{}) {
// Swallow "key is not the start of a range" errors because it would mean
// that the sticky bit was removed and merged concurrently. DROP TABLE
// should not fail because of this.
if err := p.ExecCfg().DB.AdminUnsplit(ctx, desc.StartKey); err != nil &&
!strings.Contains(err.Error(), "is not the start of a range") {
return err
}
}
}
}
return nil
}

// drainName when set implies that the name needs to go through the draining
// names process. This parameter is always passed in as true except from
// TRUNCATE which directly deletes the old name to id map and doesn't need
Expand Down Expand Up @@ -369,29 +400,9 @@ func (p *planner) initiateDropTable(
}

// Unsplit all manually split ranges in the table so they can be
// automatically merged by the merge queue. Gate this on being the
// system tenant because secondary tenants aren't allowed to scan
// the meta ranges directly.
if p.ExecCfg().Codec.ForSystemTenant() {
span := tableDesc.TableSpan(p.ExecCfg().Codec)
ranges, err := ScanMetaKVs(ctx, p.txn, span)
if err != nil {
return err
}
for _, r := range ranges {
var desc roachpb.RangeDescriptor
if err := r.ValueProto(&desc); err != nil {
return err
}
if (desc.GetStickyBit() != hlc.Timestamp{}) {
// Swallow "key is not the start of a range" errors because it would mean
// that the sticky bit was removed and merged concurrently. DROP TABLE
// should not fail because of this.
if err := p.ExecCfg().DB.AdminUnsplit(ctx, desc.StartKey); err != nil && !strings.Contains(err.Error(), "is not the start of a range") {
return err
}
}
}
// automatically merged by the merge queue.
if err := p.unsplitRangesForTable(ctx, tableDesc); err != nil {
return err
}

tableDesc.State = descpb.TableDescriptor_DROP
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/gcjob/gc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -96,6 +97,25 @@ func (r schemaChangeGCResumer) Resume(
if err != nil {
return err
}

// If there are any interleaved indexes to drop as part of a table TRUNCATE
// operation, then drop the indexes before waiting on the GC timer.
if len(details.InterleavedIndexes) > 0 {
// Before deleting any indexes, ensure that old versions of the table
// descriptor are no longer in use.
if err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, details.InterleavedTable.ID); err != nil {
return err
}
if err := sql.TruncateInterleavedIndexes(
ctx,
execCfg,
sqlbase.NewImmutableTableDescriptor(*details.InterleavedTable),
details.InterleavedIndexes,
); err != nil {
return err
}
}

zoneCfgFilter, gossipUpdateC := setupConfigWatcher(execCfg)
tableDropTimes, indexDropTimes := getDropTimes(details)

Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/gcjob/index_garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ func gcIndexes(
log.Infof(ctx, "GC is being considered on table %d for indexes indexes: %+v", parentID, droppedIndexes)
}

// Before deleting any indexes, ensure that old versions of the table descriptor
// are no longer in use. This is necessary in the case of truncate, where we
// schedule a GC Job in the transaction that commits the truncation.
if err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, parentID); err != nil {
return false, err
}

var parentTable *sqlbase.ImmutableTableDescriptor
if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
parentTable, err = catalogkv.MustGetTableDescByID(ctx, txn, execCfg.Codec, parentID)
Expand Down
Loading

0 comments on commit 673ca44

Please sign in to comment.