Skip to content

Commit

Permalink
sql: create jobs for truncated and dropped tables
Browse files Browse the repository at this point in the history
Creates a job for truncate and drop table statements, which is completed
when the GC TTL expires and the table data and ID is deleted.

Fixes cockroachdb#19004

Release note: None
  • Loading branch information
Erik Trinh committed Sep 10, 2018
1 parent 2ddf654 commit 46206fa
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/drop_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (n *dropDatabaseNode) startExec(params runParams) error {
// TODO(knz): dependent dropped views should be qualified here.
tbNameStrings = append(tbNameStrings, cascadedViews...)
} else {
cascadedViews, err := p.dropTableImpl(params, tbDesc)
cascadedViews, err := p.dropTableImpl(params, tbDesc, sqlbase.InvalidMutationID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (*dropSequenceNode) Close(context.Context) {}
func (p *planner) dropSequenceImpl(
ctx context.Context, seqDesc *sqlbase.TableDescriptor, behavior tree.DropBehavior,
) error {
return p.initiateDropTable(ctx, seqDesc, true /* drainName */)
return p.initiateDropTable(ctx, seqDesc, true /* drainName */, sqlbase.InvalidMutationID)
}

// sequenceDependency error returns an error if the given sequence cannot be dropped because
Expand Down
21 changes: 15 additions & 6 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,15 @@ func (n *dropTableNode) startExec(params runParams) error {
if droppedDesc == nil {
continue
}
droppedViews, err := params.p.dropTableImpl(params, droppedDesc)

mutationID := sqlbase.InvalidMutationID
mutationID, err := params.p.createSchemaChangeJob(ctx, droppedDesc,
tree.AsStringWithFlags(n.n, tree.FmtAlwaysQualifyTableNames))
if err != nil {
return err
}

droppedViews, err := params.p.dropTableImpl(params, droppedDesc, mutationID)
if err != nil {
return err
}
Expand All @@ -124,9 +132,10 @@ func (n *dropTableNode) startExec(params runParams) error {
TableName string
Statement string
User string
MutationID uint32
CascadeDroppedViews []string
}{toDel.tn.FQString(), n.n.String(),
params.SessionData().User, droppedViews},
params.SessionData().User, uint32(mutationID), droppedViews},
); err != nil {
return err
}
Expand Down Expand Up @@ -246,7 +255,7 @@ func (p *planner) removeInterleave(ctx context.Context, ref sqlbase.ForeignKeyRe
// on it if `cascade` is enabled). It returns a list of view names that were
// dropped due to `cascade` behavior.
func (p *planner) dropTableImpl(
params runParams, tableDesc *sqlbase.TableDescriptor,
params runParams, tableDesc *sqlbase.TableDescriptor, mutationID sqlbase.MutationID,
) ([]string, error) {
ctx := params.ctx

Expand Down Expand Up @@ -305,7 +314,7 @@ func (p *planner) dropTableImpl(
droppedViews = append(droppedViews, viewDesc.Name)
}

err := p.initiateDropTable(ctx, tableDesc, true /* drain name */)
err := p.initiateDropTable(ctx, tableDesc, true /* drain name */, mutationID)
return droppedViews, err
}

Expand All @@ -314,7 +323,7 @@ func (p *planner) dropTableImpl(
// TRUNCATE which directly deletes the old name to id map and doesn't need
// drain the old map.
func (p *planner) initiateDropTable(
ctx context.Context, tableDesc *sqlbase.TableDescriptor, drainName bool,
ctx context.Context, tableDesc *sqlbase.TableDescriptor, drainName bool, mutationID sqlbase.MutationID,
) error {
if tableDesc.Dropped() {
return fmt.Errorf("table %q is being dropped", tableDesc.Name)
Expand Down Expand Up @@ -377,7 +386,7 @@ func (p *planner) initiateDropTable(
// change manager, which is notified via a system config gossip.
// The schema change manager will properly schedule deletion of
// the underlying data when the GC deadline expires.
return p.writeDropTable(ctx, tableDesc)
return p.writeTableDesc(ctx, tableDesc, mutationID)
}

func (p *planner) removeFKBackReference(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (p *planner) dropViewImpl(
}
}

if err := p.initiateDropTable(ctx, viewDesc, true /* drainName */); err != nil {
if err := p.initiateDropTable(ctx, viewDesc, true /* drainName */, sqlbase.InvalidMutationID); err != nil {
return cascadeDroppedViews, err
}

Expand Down
78 changes: 54 additions & 24 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,35 +436,65 @@ func (sc *SchemaChanger) maybeAddDrop(
return false, nil
}

// This can happen if a change other than the drop originally
// scheduled the changer for this table. If that's the case,
// we still need to wait for the deadline to expire.
if table.DropTime != 0 {
var timeRemaining time.Duration
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
timeRemaining = 0
_, zoneCfg, _, err := GetZoneConfigInTxn(
ctx, txn, uint32(table.ID), &sqlbase.IndexDescriptor{}, "",
)
if err != nil {
return err
// Find our job.
foundJobID := false
if sc.mutationID != sqlbase.InvalidMutationID {
for _, g := range table.MutationJobs {
if g.MutationID == sc.mutationID {
job, err := sc.jobRegistry.LoadJob(ctx, g.JobID)
if err != nil {
return false, err
}
sc.job = job
foundJobID = true
break
}
deadline := table.DropTime + int64(zoneCfg.GC.TTLSeconds)*time.Second.Nanoseconds()
timeRemaining = timeutil.Since(timeutil.Unix(0, deadline))
return nil
}); err != nil {
}

if foundJobID {
if err := sc.job.Started(ctx); err != nil {
if log.V(2) {
log.Infof(ctx, "Failed to mark job %d as started: %v", *sc.job.ID(), err)
}
}
}

// This can happen if a change other than the drop originally
// scheduled the changer for this table. If that's the case,
// we still need to wait for the deadline to expire.
if table.DropTime != 0 {
var timeRemaining time.Duration
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
timeRemaining = 0
_, zoneCfg, _, err := GetZoneConfigInTxn(
ctx, txn, uint32(table.ID), &sqlbase.IndexDescriptor{}, "",
)
if err != nil {
return err
}
deadline := table.DropTime + int64(zoneCfg.GC.TTLSeconds)*time.Second.Nanoseconds()
timeRemaining = timeutil.Since(timeutil.Unix(0, deadline))
return nil
}); err != nil {
return false, err
}
if timeRemaining < 0 {
return false, errNotHitGCTTLDeadline
}
}
// Do all the hard work of deleting the table data and the table ID.
if err := sc.truncateTable(ctx, lease, table, evalCtx); err != nil {
return false, err
}
if timeRemaining < 0 {
return false, errNotHitGCTTLDeadline

if err := DropTableDesc(ctx, table, sc.db, false /* traceKV */); err != nil {
return true, err
} else if !foundJobID {
return true, nil
} else {
return true, sc.job.Succeeded(ctx, jobs.NoopFn)
}
}
// Do all the hard work of deleting the table data and the table ID.
if err := sc.truncateTable(ctx, lease, table, evalCtx); err != nil {
return false, err
}

return true, DropTableDesc(ctx, table, sc.db, false /* traceKV */)
}

if table.Adding() {
Expand Down
39 changes: 28 additions & 11 deletions pkg/sql/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func (p *planner) Truncate(ctx context.Context, n *tree.Truncate) (planNode, err
toTruncate := make(map[sqlbase.ID]string, len(n.Tables))
// toTraverse is the list of tables whose references need to be traversed
// while constructing the list of tables that should be truncated.
toTraverse := make([]sqlbase.TableDescriptor, 0, len(n.Tables))
toTraverse := make([]sqlbase.ID, 0, len(n.Tables))
tableDescs := make(map[sqlbase.ID]*sqlbase.TableDescriptor, len(n.Tables))
for _, name := range n.Tables {
tn, err := name.Normalize()
if err != nil {
Expand All @@ -58,15 +59,16 @@ func (p *planner) Truncate(ctx context.Context, n *tree.Truncate) (planNode, err
}

toTruncate[tableDesc.ID] = tn.FQString()
toTraverse = append(toTraverse, *tableDesc)
toTraverse = append(toTraverse, tableDesc.ID)
tableDescs[tableDesc.ID] = tableDesc
}

// Check that any referencing tables are contained in the set, or, if CASCADE
// requested, add them all to the set.
for len(toTraverse) > 0 {
// Pick last element.
idx := len(toTraverse) - 1
tableDesc := toTraverse[idx]
tableDesc := tableDescs[toTraverse[idx]]
toTraverse = toTraverse[:idx]

maybeEnqueue := func(ref sqlbase.ForeignKeyReference, msg string) error {
Expand All @@ -90,7 +92,8 @@ func (p *planner) Truncate(ctx context.Context, n *tree.Truncate) (planNode, err
return err
}
toTruncate[other.ID] = otherName
toTraverse = append(toTraverse, *other)
toTraverse = append(toTraverse, other.ID)
tableDescs[other.ID] = other
return nil
}

Expand Down Expand Up @@ -122,7 +125,20 @@ func (p *planner) Truncate(ctx context.Context, n *tree.Truncate) (planNode, err
}
traceKV := p.extendedEvalCtx.Tracing.KVTracingEnabled()
for id, name := range toTruncate {
if err := p.truncateTable(ctx, id, traceKV); err != nil {

tableDesc := tableDescs[id]
mutationID := sqlbase.InvalidMutationID
mutationID, err := p.createSchemaChangeJob(ctx, tableDesc,
tree.AsStringWithFlags(n, tree.FmtAlwaysQualifyTableNames))
if err != nil {
return nil, err
}

if err := p.writeTableDesc(ctx, tableDesc, mutationID); err != nil {
return nil, err
}

if err := p.truncateTable(ctx, id, traceKV, mutationID); err != nil {
return nil, err
}

Expand All @@ -134,10 +150,11 @@ func (p *planner) Truncate(ctx context.Context, n *tree.Truncate) (planNode, err
int32(id),
int32(p.extendedEvalCtx.NodeID),
struct {
TableName string
Statement string
User string
}{name, n.String(), p.SessionData().User},
TableName string
Statement string
User string
MutationID uint32
}{name, n.String(), p.SessionData().User, uint32(mutationID)},
); err != nil {
return nil, err
}
Expand All @@ -149,7 +166,7 @@ func (p *planner) Truncate(ctx context.Context, n *tree.Truncate) (planNode, err
// truncateTable truncates the data of a table in a single transaction. It
// drops the table and recreates it with a new ID. The dropped table is
// GC-ed later through an asynchronous schema change.
func (p *planner) truncateTable(ctx context.Context, id sqlbase.ID, traceKV bool) error {
func (p *planner) truncateTable(ctx context.Context, id sqlbase.ID, traceKV bool, mutationID sqlbase.MutationID) error {
// Read the table descriptor because it might have changed
// while another table in the truncation list was truncated.
tableDesc, err := sqlbase.GetTableDescFromID(ctx, p.txn, id)
Expand Down Expand Up @@ -187,7 +204,7 @@ func (p *planner) truncateTable(ctx context.Context, id sqlbase.ID, traceKV bool
}

// Drop table.
if err := p.initiateDropTable(ctx, tableDesc, false /* drainName */); err != nil {
if err := p.initiateDropTable(ctx, tableDesc, false /* drainName */, mutationID); err != nil {
return err
}

Expand Down

0 comments on commit 46206fa

Please sign in to comment.