Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: change table truncation to create new indexes rather than new ID #52235

Merged
merged 1 commit into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1321,13 +1321,13 @@ func TestChangefeedTruncateRenameDrop(t *testing.T) {
assertPayloads(t, truncate, []string{`truncate: [1]->{"after": {"a": 1}}`})
assertPayloads(t, truncateCascade, []string{`truncate_cascade: [1]->{"after": {"b": 1}}`})
sqlDB.Exec(t, `TRUNCATE TABLE truncate CASCADE`)
if _, err := truncate.Next(); !testutils.IsError(err, `"truncate" was dropped or truncated`) {
t.Errorf(`expected ""truncate" was dropped or truncated" error got: %+v`, err)
if _, err := truncate.Next(); !testutils.IsError(err, `"truncate" was truncated`) {
t.Fatalf(`expected ""truncate" was truncated" error got: %+v`, err)
}
if _, err := truncateCascade.Next(); !testutils.IsError(
err, `"truncate_cascade" was dropped or truncated`,
err, `"truncate_cascade" was truncated`,
) {
t.Errorf(`expected ""truncate_cascade" was dropped or truncated" error got: %+v`, err)
t.Fatalf(`expected ""truncate_cascade" was truncated" error got: %+v`, err)
}

sqlDB.Exec(t, `CREATE TABLE rename (a INT PRIMARY KEY)`)
Expand Down
23 changes: 23 additions & 0 deletions pkg/ccl/changefeedccl/schemafeed/table_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
tableEventTypeAddColumnNoBackfill
tableEventTypeAddColumnWithBackfill
tableEventTypeDropColumn
tableEventTruncate
)

var (
Expand Down Expand Up @@ -55,6 +56,8 @@ func classifyTableEvent(e TableEvent) tableEventType {
return tableEventTypeAddColumnNoBackfill
case hasNewColumnDropBackfillMutation(e):
return tableEventTypeDropColumn
case tableTruncated(e):
return tableEventTruncate
default:
return tableEventTypeUnknown
}
Expand All @@ -66,6 +69,10 @@ type tableEventFilter map[tableEventType]bool

func (b tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) (bool, error) {
et := classifyTableEvent(e)
// Truncation events are not ignored and return an error.
if et == tableEventTruncate {
return false, errors.Errorf(`"%s" was truncated`, e.Before.Name)
}
shouldFilter, ok := b[et]
if !ok {
return false, errors.AssertionFailedf("policy does not specify how to handle event type %v", et)
Expand Down Expand Up @@ -100,3 +107,19 @@ func newColumnNoBackfill(e TableEvent) (res bool) {
return len(e.Before.Columns) < len(e.After.Columns) &&
!e.Before.HasColumnBackfillMutation()
}

func pkChangeMutationExists(desc *sqlbase.ImmutableTableDescriptor) bool {
for _, m := range desc.Mutations {
if m.Direction == descpb.DescriptorMutation_ADD && m.GetPrimaryKeySwap() != nil {
return true
}
}
return false
}

func tableTruncated(e TableEvent) bool {
// A table was truncated if the primary index has changed, but an ALTER
// PRIMARY KEY statement was not performed. TRUNCATE operates by creating
// a new set of indexes for the table, including a new primary index.
return e.Before.PrimaryIndex.ID != e.After.PrimaryIndex.ID && !pkChangeMutationExists(e.Before)
}
840 changes: 472 additions & 368 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ message SchemaChangeGCDetails {
// Indexes to GC.
repeated DroppedIndex indexes = 1 [(gogoproto.nullable) = false];

// The below two fields are used only in the case of TRUNCATE operating on
// tables with interleaved indexes. They are only set together.

// InterleavedTable is the table being truncated. In particular, it is the
// TableDescriptor before any of the truncate modifications have been applied.
sqlbase.TableDescriptor interleaved_table = 4;
// InterleavedIndexes is the set of interleaved indexes to truncate.
repeated sqlbase.IndexDescriptor interleaved_indexes = 5 [(gogoproto.nullable) = false];

// Entire tables to GC.
repeated DroppedID tables = 2 [(gogoproto.nullable) = false];

Expand Down
73 changes: 56 additions & 17 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,59 @@ func (sc *SchemaChanger) getTableVersion(
return tableDesc, nil
}

// TruncateInterleavedIndexes truncates the input set of indexes from the given
// table. It is used in the schema change GC job to delete interleaved index
// data as part of a TRUNCATE statement. Note that we cannot use
// SchemaChanger.truncateIndexes instead because that accesses the most recent
// version of the table when deleting. In this case, we need to use the version
// of the table before truncation, which is passed in.
func TruncateInterleavedIndexes(
ctx context.Context,
execCfg *ExecutorConfig,
table *sqlbase.ImmutableTableDescriptor,
indexes []descpb.IndexDescriptor,
) error {
log.Infof(ctx, "truncating %d interleaved indexes", len(indexes))
chunkSize := int64(indexTruncateChunkSize)
alloc := &sqlbase.DatumAlloc{}
codec, db := execCfg.Codec, execCfg.DB
for _, desc := range indexes {
var resume roachpb.Span
for rowIdx, done := int64(0), false; !done; rowIdx += chunkSize {
log.VEventf(ctx, 2, "truncate interleaved index (%d) at row: %d, span: %s", table.ID, rowIdx, resume)
resumeAt := resume
// Make a new txn just to drop this chunk.
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
rd := row.MakeDeleter(codec, table, nil)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, nil /* *tree.EvalContext */); err != nil {
return err
}
resume, err := td.deleteIndex(
ctx,
&desc,
resumeAt,
chunkSize,
false, /* traceKV */
)
done = resume.Key == nil
return err
}); err != nil {
return err
}
}
// All the data chunks have been removed. Now also removed the
// zone configs for the dropped indexes, if any.
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return RemoveIndexZoneConfigs(ctx, txn, execCfg, table.ParentID, indexes)
}); err != nil {
return err
}
}
log.Infof(ctx, "finished truncating interleaved indexes")
return nil
}

// truncateIndexes truncate the KV ranges corresponding to dropped indexes.
//
// The indexes are dropped chunk by chunk, each chunk being deleted in
Expand Down Expand Up @@ -660,17 +713,7 @@ func (sc *SchemaChanger) truncateIndexes(
if err != nil {
return err
}
rd, err := row.MakeDeleter(
ctx,
txn,
sc.execCfg.Codec,
tableDesc,
nil,
alloc,
)
if err != nil {
return err
}
rd := row.MakeDeleter(sc.execCfg.Codec, tableDesc, nil)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, nil /* *tree.EvalContext */); err != nil {
return err
Expand Down Expand Up @@ -1759,16 +1802,12 @@ func indexTruncateInTxn(
alloc := &sqlbase.DatumAlloc{}
var sp roachpb.Span
for done := false; !done; done = sp.Key == nil {
rd, err := row.MakeDeleter(
ctx, txn, execCfg.Codec, tableDesc, nil, alloc,
)
if err != nil {
return err
}
rd := row.MakeDeleter(execCfg.Codec, tableDesc, nil)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, evalCtx); err != nil {
return err
}
var err error
sp, err = td.deleteIndex(
ctx, idx, sp, indexTruncateChunkSize, traceKV,
)
Expand Down
22 changes: 21 additions & 1 deletion pkg/sql/descriptor_mutation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,14 @@ CREATE INDEX allidx ON t.test (k, v);
func(t *testing.T) {

// Init table to start state.
mTest.Exec(t, `TRUNCATE TABLE t.test`)
if _, err := sqlDB.Exec(`
DROP TABLE t.test;
CREATE TABLE t.test (k VARCHAR PRIMARY KEY DEFAULT 'default', v VARCHAR, i VARCHAR DEFAULT 'i', FAMILY (k), FAMILY (v), FAMILY (i));
CREATE INDEX allidx ON t.test (k, v);
`); err != nil {
t.Fatal(err)
}

// read table descriptor
mTest.tableDesc = catalogkv.TestingGetMutableExistingTableDescriptor(
kvDB, keys.SystemSQLCodec, "t", "test")
Expand Down Expand Up @@ -524,6 +531,12 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR, INDEX foo (v));
if _, err := sqlDB.Exec(`TRUNCATE TABLE t.test`); err != nil {
t.Fatal(err)
}
if _, err := sqlDB.Exec(`
DROP TABLE t.test;
CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR, INDEX foo (v));
`); err != nil {
t.Fatal(err)
}
// read table descriptor
mTest.tableDesc = catalogkv.TestingGetMutableExistingTableDescriptor(
kvDB, keys.SystemSQLCodec, "t", "test")
Expand Down Expand Up @@ -687,6 +700,13 @@ CREATE INDEX allidx ON t.test (k, v);
continue
}
// Init table to start state.
if _, err := sqlDB.Exec(`
DROP TABLE t.test;
CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR, i CHAR, INDEX foo (i, v), FAMILY (k), FAMILY (v), FAMILY (i));
CREATE INDEX allidx ON t.test (k, v);
`); err != nil {
t.Fatal(err)
}
if _, err := sqlDB.Exec(`TRUNCATE TABLE t.test`); err != nil {
t.Fatal(err)
}
Expand Down
57 changes: 34 additions & 23 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,37 @@ func (p *planner) dropTableImpl(
return droppedViews, err
}

// unsplitRangesForTable unsplit any manually split ranges within the table span.
func (p *planner) unsplitRangesForTable(
ctx context.Context, tableDesc *sqlbase.MutableTableDescriptor,
) error {
// Gate this on being the system tenant because secondary tenants aren't
// allowed to scan the meta ranges directly.
if p.ExecCfg().Codec.ForSystemTenant() {
span := tableDesc.TableSpan(p.ExecCfg().Codec)
ranges, err := ScanMetaKVs(ctx, p.txn, span)
if err != nil {
return err
}
for _, r := range ranges {
var desc roachpb.RangeDescriptor
if err := r.ValueProto(&desc); err != nil {
return err
}
if (desc.GetStickyBit() != hlc.Timestamp{}) {
// Swallow "key is not the start of a range" errors because it would mean
// that the sticky bit was removed and merged concurrently. DROP TABLE
// should not fail because of this.
if err := p.ExecCfg().DB.AdminUnsplit(ctx, desc.StartKey); err != nil &&
!strings.Contains(err.Error(), "is not the start of a range") {
return err
}
}
}
}
return nil
}

// drainName when set implies that the name needs to go through the draining
// names process. This parameter is always passed in as true except from
// TRUNCATE which directly deletes the old name to id map and doesn't need
Expand Down Expand Up @@ -369,29 +400,9 @@ func (p *planner) initiateDropTable(
}

// Unsplit all manually split ranges in the table so they can be
// automatically merged by the merge queue. Gate this on being the
// system tenant because secondary tenants aren't allowed to scan
// the meta ranges directly.
if p.ExecCfg().Codec.ForSystemTenant() {
span := tableDesc.TableSpan(p.ExecCfg().Codec)
ranges, err := ScanMetaKVs(ctx, p.txn, span)
if err != nil {
return err
}
for _, r := range ranges {
var desc roachpb.RangeDescriptor
if err := r.ValueProto(&desc); err != nil {
return err
}
if (desc.GetStickyBit() != hlc.Timestamp{}) {
// Swallow "key is not the start of a range" errors because it would mean
// that the sticky bit was removed and merged concurrently. DROP TABLE
// should not fail because of this.
if err := p.ExecCfg().DB.AdminUnsplit(ctx, desc.StartKey); err != nil && !strings.Contains(err.Error(), "is not the start of a range") {
return err
}
}
}
// automatically merged by the merge queue.
if err := p.unsplitRangesForTable(ctx, tableDesc); err != nil {
return err
}

tableDesc.State = descpb.TableDescriptor_DROP
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/gcjob/gc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -96,6 +97,25 @@ func (r schemaChangeGCResumer) Resume(
if err != nil {
return err
}

// If there are any interleaved indexes to drop as part of a table TRUNCATE
// operation, then drop the indexes before waiting on the GC timer.
if len(details.InterleavedIndexes) > 0 {
// Before deleting any indexes, ensure that old versions of the table
// descriptor are no longer in use.
if err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, details.InterleavedTable.ID); err != nil {
return err
}
if err := sql.TruncateInterleavedIndexes(
ctx,
execCfg,
sqlbase.NewImmutableTableDescriptor(*details.InterleavedTable),
details.InterleavedIndexes,
); err != nil {
return err
}
}

zoneCfgFilter, gossipUpdateC := setupConfigWatcher(execCfg)
tableDropTimes, indexDropTimes := getDropTimes(details)

Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/gcjob/index_garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ func gcIndexes(
log.Infof(ctx, "GC is being considered on table %d for indexes indexes: %+v", parentID, droppedIndexes)
}

// Before deleting any indexes, ensure that old versions of the table descriptor
// are no longer in use. This is necessary in the case of truncate, where we
// schedule a GC Job in the transaction that commits the truncation.
if err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, parentID); err != nil {
return false, err
}

var parentTable *sqlbase.ImmutableTableDescriptor
if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
parentTable, err = catalogkv.MustGetTableDescByID(ctx, txn, execCfg.Codec, parentID)
Expand Down
Loading