Skip to content

Commit

Permalink
sql: create jobs for truncated and dropped tables
Browse files Browse the repository at this point in the history
Creates a job for statements involving dropping or truncated tables, including
DROP DATABASE. The job is completed when the GC TTL expires and both table
data and ID is deleted for each of the tables involved.

Fixes cockroachdb#19004

Release note: None
  • Loading branch information
Erik Trinh committed Sep 11, 2018
1 parent 2ddf654 commit 77666bd
Show file tree
Hide file tree
Showing 12 changed files with 796 additions and 195 deletions.
471 changes: 338 additions & 133 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ message ResumeSpanList {
repeated roachpb.Span resume_spans = 1 [(gogoproto.nullable) = false];
}

message DroppedTableDetails {
string name = 1;
uint32 ID = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sqlbase.ID"];
}

message SchemaChangeDetails {
util.hlc.Timestamp read_as_of = 1 [(gogoproto.nullable) = false];
// A schema change can involve running multiple processors backfilling
Expand All @@ -121,6 +126,7 @@ message SchemaChangeDetails {
// be processed. The index represents the index of a mutation in a
// mutation list containing mutations for the same mutationID.
repeated ResumeSpanList resume_span_list = 2 [(gogoproto.nullable) = false];
repeated DroppedTableDetails remaining_dropped_tables = 3 [(gogoproto.nullable) = false];
}

message SchemaChangeProgress {
Expand Down
29 changes: 27 additions & 2 deletions pkg/sql/drop_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -119,17 +120,41 @@ func (n *dropDatabaseNode) startExec(params runParams) error {
ctx := params.ctx
p := params.p
tbNameStrings := make([]string, 0, len(n.td))
droppedTableDetails := make([]jobspb.DroppedTableDetails, 0, len(n.td))
tableDescs := make([]*sqlbase.TableDescriptor, 0, len(n.td))

for _, toDel := range n.td {
droppedTableDetails = append(droppedTableDetails, jobspb.DroppedTableDetails{
Name: toDel.tn.FQString(),
ID: toDel.desc.ID,
})
tableDescs = append(tableDescs, toDel.desc)
}

job, err := p.createSchemaChangesJob(ctx, tableDescs, droppedTableDetails, tree.AsStringWithFlags(n.n,
tree.FmtAlwaysQualifyTableNames))
if err != nil {
return err
}

if err := job.WithTxn(params.p.txn).Started(ctx); err != nil {
if log.V(2) {
log.Infof(ctx, "Failed to mark job %d as started: %v", *job.ID(), err)
}
}

for _, toDel := range n.td {
tbDesc := toDel.desc
mutationID := tbDesc.MutationJobs[len(tbDesc.MutationJobs)-1].MutationID
if tbDesc.IsView() {
cascadedViews, err := p.dropViewImpl(ctx, tbDesc, tree.DropCascade)
cascadedViews, err := p.dropViewImpl(ctx, tbDesc, tree.DropCascade, mutationID)
if err != nil {
return err
}
// TODO(knz): dependent dropped views should be qualified here.
tbNameStrings = append(tbNameStrings, cascadedViews...)
} else {
cascadedViews, err := p.dropTableImpl(params, tbDesc)
cascadedViews, err := p.dropTableImpl(params, tbDesc, mutationID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (*dropSequenceNode) Close(context.Context) {}
func (p *planner) dropSequenceImpl(
ctx context.Context, seqDesc *sqlbase.TableDescriptor, behavior tree.DropBehavior,
) error {
return p.initiateDropTable(ctx, seqDesc, true /* drainName */)
return p.initiateDropTable(ctx, seqDesc, true /* drainName */, sqlbase.InvalidMutationID)
}

// sequenceDependency error returns an error if the given sequence cannot be dropped because
Expand Down
35 changes: 28 additions & 7 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand Down Expand Up @@ -107,7 +109,22 @@ func (n *dropTableNode) startExec(params runParams) error {
if droppedDesc == nil {
continue
}
droppedViews, err := params.p.dropTableImpl(params, droppedDesc)

droppedDetails := jobspb.DroppedTableDetails{Name: toDel.tn.FQString(), ID: toDel.desc.ID}
job, err := params.p.createSchemaChangesJob(ctx, sqlbase.TableDescriptors{droppedDesc}, []jobspb.DroppedTableDetails{droppedDetails}, tree.AsStringWithFlags(n.n,
tree.FmtAlwaysQualifyTableNames))
if err != nil {
return err
}

if err := job.WithTxn(params.p.txn).Started(ctx); err != nil {
if log.V(2) {
log.Infof(ctx, "Failed to mark job %d as started: %v", *job.ID(), err)
}
}

mutationID := droppedDesc.MutationJobs[len(droppedDesc.MutationJobs)-1].MutationID
droppedViews, err := params.p.dropTableImpl(params, droppedDesc, mutationID)
if err != nil {
return err
}
Expand All @@ -124,9 +141,10 @@ func (n *dropTableNode) startExec(params runParams) error {
TableName string
Statement string
User string
MutationID uint32
CascadeDroppedViews []string
}{toDel.tn.FQString(), n.n.String(),
params.SessionData().User, droppedViews},
params.SessionData().User, uint32(mutationID), droppedViews},
); err != nil {
return err
}
Expand Down Expand Up @@ -246,7 +264,7 @@ func (p *planner) removeInterleave(ctx context.Context, ref sqlbase.ForeignKeyRe
// on it if `cascade` is enabled). It returns a list of view names that were
// dropped due to `cascade` behavior.
func (p *planner) dropTableImpl(
params runParams, tableDesc *sqlbase.TableDescriptor,
params runParams, tableDesc *sqlbase.TableDescriptor, mutationID sqlbase.MutationID,
) ([]string, error) {
ctx := params.ctx

Expand Down Expand Up @@ -297,15 +315,15 @@ func (p *planner) dropTableImpl(
if viewDesc.Dropped() {
continue
}
cascadedViews, err := p.dropViewImpl(ctx, viewDesc, tree.DropCascade)
cascadedViews, err := p.dropViewImpl(ctx, viewDesc, tree.DropCascade, sqlbase.InvalidMutationID)
if err != nil {
return droppedViews, err
}
droppedViews = append(droppedViews, cascadedViews...)
droppedViews = append(droppedViews, viewDesc.Name)
}

err := p.initiateDropTable(ctx, tableDesc, true /* drain name */)
err := p.initiateDropTable(ctx, tableDesc, true /* drain name */, mutationID)
return droppedViews, err
}

Expand All @@ -314,7 +332,10 @@ func (p *planner) dropTableImpl(
// TRUNCATE which directly deletes the old name to id map and doesn't need
// drain the old map.
func (p *planner) initiateDropTable(
ctx context.Context, tableDesc *sqlbase.TableDescriptor, drainName bool,
ctx context.Context,
tableDesc *sqlbase.TableDescriptor,
drainName bool,
mutationID sqlbase.MutationID,
) error {
if tableDesc.Dropped() {
return fmt.Errorf("table %q is being dropped", tableDesc.Name)
Expand Down Expand Up @@ -377,7 +398,7 @@ func (p *planner) initiateDropTable(
// change manager, which is notified via a system config gossip.
// The schema change manager will properly schedule deletion of
// the underlying data when the GC deadline expires.
return p.writeDropTable(ctx, tableDesc)
return p.writeTableDesc(ctx, tableDesc, mutationID)
}

func (p *planner) removeFKBackReference(
Expand Down
191 changes: 191 additions & 0 deletions pkg/sql/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/sqlmigrations"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -970,3 +974,190 @@ CREATE VIEW test.acol(a) AS SELECT a FROM test.t;
return err
})
}

// Tests that a job is created for a dropped table and succeeds when the data is deleted.
func TestDropTableJob(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
AsyncExecQuickly: true,
},
}
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())
ctx := context.TODO()

const numRows = 2*sql.TableTruncateChunkSize + 1

if err := tests.CreateKVTable(sqlDB, "kv", numRows); err != nil {
t.Fatal(err)
}

tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "kv")
nameKey := sqlbase.MakeNameMetadataKey(keys.MinNonPredefinedUserDescID, "kv")
gr, err := kvDB.Get(ctx, nameKey)

if err != nil {
t.Fatal(err)
}

if !gr.Exists() {
t.Fatalf("Name entry %q does not exist", nameKey)
}

if _, err := sqlDB.Exec(`DROP TABLE t.kv`); err != nil {
t.Fatal(err)
}

// Data hasn't been GC-ed.
sqlRun := sqlutils.MakeSQLRunner(sqlDB)
if err := jobutils.VerifySystemJob(t, sqlRun, 1, jobspb.TypeSchemaChange, jobs.StatusRunning, jobs.Record{
Username: security.RootUser,
Description: "DROP TABLE t.public.kv",
DescriptorIDs: sqlbase.IDs{
tableDesc.ID,
},
}); err != nil {
t.Fatal(err)
}

// The closure pushes a zone config reducing the TTL to 0 for descriptor.
cfg := config.DefaultZoneConfig()
cfg.GC.TTLSeconds = 0
buf, err := protoutil.Marshal(&cfg)
if err != nil {
t.Fatal(err)
}
if _, err := sqlDB.Exec(`INSERT INTO system.zones VALUES ($1, $2)`, tableDesc.ID, buf); err != nil {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
return descExists(sqlDB, false, tableDesc.ID)
})

if err := jobutils.VerifySystemJob(t, sqlRun, 1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: "DROP TABLE t.public.kv",
DescriptorIDs: sqlbase.IDs{
tableDesc.ID,
},
}); err != nil {
t.Fatal(err)
}
}

// Tests that a job is created for a dropped database and succeeds when the data is deleted.
func TestDropDatabaseJob(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
AsyncExecQuickly: true,
},
}
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())
ctx := context.TODO()

// Fix the column families so the key counts below don't change if the
// family heuristics are updated.
if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.kv (k CHAR PRIMARY KEY, v CHAR, FAMILY (k), FAMILY (v));
INSERT INTO t.kv VALUES ('c', 'e'), ('a', 'c'), ('b', 'd');
CREATE TABLE t.kv2 (k CHAR PRIMARY KEY, v CHAR, FAMILY (k), FAMILY (v));
INSERT INTO t.kv2 VALUES ('a', 'b'), ('c', 'd'), ('d', 'a');
`); err != nil {
t.Fatal(err)
}

dbNameKey := sqlbase.MakeNameMetadataKey(keys.RootNamespaceID, "t")
r, err := kvDB.Get(ctx, dbNameKey)
if err != nil {
t.Fatal(err)
}
if !r.Exists() {
t.Fatalf(`database "t" does not exist`)
}
tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "kv")
tableDesc2 := sqlbase.GetTableDescriptor(kvDB, "t", "kv2")

nameKey := sqlbase.MakeNameMetadataKey(keys.MinNonPredefinedUserDescID, "kv")
nameKey2 := sqlbase.MakeNameMetadataKey(keys.MinNonPredefinedUserDescID, "kv2")
gr, err := kvDB.Get(ctx, nameKey)

if err != nil {
t.Fatal(err)
}

gr2, err := kvDB.Get(ctx, nameKey2)

if err != nil {
t.Fatal(err)
}

if !gr.Exists() || !gr2.Exists() {
t.Fatalf("Name entry %q or %q does not exist", nameKey, nameKey2)
}

if _, err := sqlDB.Exec(`DROP DATABASE t CASCADE`); err != nil {
t.Fatal(err)
}

// Data hasn't been GC-ed.
sqlRun := sqlutils.MakeSQLRunner(sqlDB)
if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusRunning, jobs.Record{
Username: security.RootUser,
Description: "DROP DATABASE t CASCADE",
DescriptorIDs: sqlbase.IDs{
tableDesc.ID, tableDesc2.ID,
},
}); err != nil {
t.Fatal(err)
}

// The closure pushes a zone config reducing the TTL to 0 for descriptor 1.
cfg := config.DefaultZoneConfig()
cfg.GC.TTLSeconds = 0
buf, err := protoutil.Marshal(&cfg)
if err != nil {
t.Fatal(err)
}
if _, err := sqlDB.Exec(`INSERT INTO system.zones VALUES ($1, $2)`, tableDesc.ID, buf); err != nil {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
return descExists(sqlDB, false, tableDesc.ID)
})

if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusRunning, jobs.Record{
Username: security.RootUser,
Description: "DROP DATABASE t CASCADE",
DescriptorIDs: sqlbase.IDs{
tableDesc.ID, tableDesc2.ID,
},
}); err != nil {
t.Fatal(err)
}

if _, err := sqlDB.Exec(`INSERT INTO system.zones VALUES ($1, $2)`, tableDesc2.ID, buf); err != nil {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
return descExists(sqlDB, false, tableDesc2.ID)
})

if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: "DROP DATABASE t CASCADE",
DescriptorIDs: sqlbase.IDs{
tableDesc.ID, tableDesc2.ID,
},
}); err != nil {
t.Fatal(err)
}
}
Loading

0 comments on commit 77666bd

Please sign in to comment.