Skip to content

Commit

Permalink
sql: create jobs for truncated and dropped tables
Browse files Browse the repository at this point in the history
Creates a job for statements involving dropping or truncated tables, including
DROP DATABASE. The job is completed when the GC TTL expires and both table
data and ID is deleted for each of the tables involved.

Fixes cockroachdb#19004

Release note: None
  • Loading branch information
Erik Trinh committed Sep 12, 2018
1 parent 2ddf654 commit 421d519
Show file tree
Hide file tree
Showing 13 changed files with 836 additions and 337 deletions.
471 changes: 338 additions & 133 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ message ResumeSpanList {
repeated roachpb.Span resume_spans = 1 [(gogoproto.nullable) = false];
}

message DroppedTableDetails {
string name = 1;
uint32 ID = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sqlbase.ID"];
}

message SchemaChangeDetails {
util.hlc.Timestamp read_as_of = 1 [(gogoproto.nullable) = false];
// A schema change can involve running multiple processors backfilling
Expand All @@ -121,6 +126,7 @@ message SchemaChangeDetails {
// be processed. The index represents the index of a mutation in a
// mutation list containing mutations for the same mutationID.
repeated ResumeSpanList resume_span_list = 2 [(gogoproto.nullable) = false];
repeated DroppedTableDetails remaining_dropped_tables = 3 [(gogoproto.nullable) = false];
}

message SchemaChangeProgress {
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/drop_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -119,6 +120,29 @@ func (n *dropDatabaseNode) startExec(params runParams) error {
ctx := params.ctx
p := params.p
tbNameStrings := make([]string, 0, len(n.td))
droppedTableDetails := make([]jobspb.DroppedTableDetails, 0, len(n.td))
tableDescs := make([]*sqlbase.TableDescriptor, 0, len(n.td))

for _, toDel := range n.td {
droppedTableDetails = append(droppedTableDetails, jobspb.DroppedTableDetails{
Name: toDel.tn.FQString(),
ID: toDel.desc.ID,
})
tableDescs = append(tableDescs, toDel.desc)
}

job, err := p.createDropTablesJob(ctx, tableDescs, droppedTableDetails, tree.AsStringWithFlags(n.n,
tree.FmtAlwaysQualifyTableNames))
if err != nil {
return err
}

if err := job.WithTxn(params.p.txn).Started(ctx); err != nil {
if log.V(2) {
log.Infof(ctx, "Failed to mark job %d as started: %v", *job.ID(), err)
}
}

for _, toDel := range n.td {
tbDesc := toDel.desc
if tbDesc.IsView() {
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand Down Expand Up @@ -107,6 +109,20 @@ func (n *dropTableNode) startExec(params runParams) error {
if droppedDesc == nil {
continue
}

droppedDetails := jobspb.DroppedTableDetails{Name: toDel.tn.FQString(), ID: toDel.desc.ID}
job, err := params.p.createDropTablesJob(ctx, sqlbase.TableDescriptors{droppedDesc}, []jobspb.DroppedTableDetails{droppedDetails}, tree.AsStringWithFlags(n.n,
tree.FmtAlwaysQualifyTableNames))
if err != nil {
return err
}

if err := job.WithTxn(params.p.txn).Started(ctx); err != nil {
if log.V(2) {
log.Infof(ctx, "Failed to mark job %d as started: %v", *job.ID(), err)
}
}

droppedViews, err := params.p.dropTableImpl(params, droppedDesc)
if err != nil {
return err
Expand Down
71 changes: 71 additions & 0 deletions pkg/sql/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/sqlmigrations"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -199,6 +203,18 @@ INSERT INTO t.kv VALUES ('c', 'e'), ('a', 'c'), ('b', 'd');
if err := zoneExists(sqlDB, &cfg, tbDesc.ID); err != nil {
t.Fatal(err)
}

// Job still running, waiting for GC.
sqlRun := sqlutils.MakeSQLRunner(sqlDB)
if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusRunning, jobs.Record{
Username: security.RootUser,
Description: "DROP DATABASE t CASCADE",
DescriptorIDs: sqlbase.IDs{
tbDesc.ID,
},
}); err != nil {
t.Fatal(err)
}
}

// Test that a dropped database's data gets deleted properly.
Expand Down Expand Up @@ -267,6 +283,17 @@ INSERT INTO t.kv VALUES ('c', 'e'), ('a', 'c'), ('b', 'd');

tests.CheckKeyCount(t, kvDB, tableSpan, 6)

sqlRun := sqlutils.MakeSQLRunner(sqlDB)
if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusRunning, jobs.Record{
Username: security.RootUser,
Description: "DROP DATABASE t CASCADE",
DescriptorIDs: sqlbase.IDs{
tbDesc.ID,
},
}); err != nil {
t.Fatal(err)
}

// Push a new zone config for both the table and database with TTL=0
// so the data is deleted immediately.
cfg := config.DefaultZoneConfig()
Expand All @@ -292,6 +319,16 @@ INSERT INTO t.kv VALUES ('c', 'e'), ('a', 'c'), ('b', 'd');

// Data is deleted.
tests.CheckKeyCount(t, kvDB, tableSpan, 0)

if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: "DROP DATABASE t CASCADE",
DescriptorIDs: sqlbase.IDs{
tbDesc.ID,
},
}); err != nil {
t.Fatal(err)
}
}

// Tests that SHOW TABLES works correctly when a database is recreated
Expand Down Expand Up @@ -544,6 +581,18 @@ func TestDropTable(t *testing.T) {
t.Fatalf("table namekey still exists")
}

// Job still running, waiting for GC.
sqlRun := sqlutils.MakeSQLRunner(sqlDB)
if err := jobutils.VerifySystemJob(t, sqlRun, 1, jobspb.TypeSchemaChange, jobs.StatusRunning, jobs.Record{
Username: security.RootUser,
Description: `DROP TABLE t.public.kv`,
DescriptorIDs: sqlbase.IDs{
tableDesc.ID,
},
}); err != nil {
t.Fatal(err)
}

// Can create a table with the same name.
if err := tests.CreateKVTable(sqlDB, "kv", numRows); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -605,12 +654,23 @@ func TestDropTableDeleteData(t *testing.T) {
}

// Data hasn't been GC-ed.
sqlRun := sqlutils.MakeSQLRunner(sqlDB)
for i := 0; i < numTables; i++ {
if err := descExists(sqlDB, true, descs[i].ID); err != nil {
t.Fatal(err)
}
tableSpan := descs[i].TableSpan()
tests.CheckKeyCount(t, kvDB, tableSpan, numKeys)

if err := jobutils.VerifySystemJob(t, sqlRun, 2*i+1, jobspb.TypeSchemaChange, jobs.StatusRunning, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(`DROP TABLE t.public.%s`, descs[i].GetName()),
DescriptorIDs: sqlbase.IDs{
descs[i].ID,
},
}); err != nil {
t.Fatal(err)
}
}

// The closure pushes a zone config reducing the TTL to 0 for descriptor i.
Expand All @@ -636,6 +696,17 @@ func TestDropTableDeleteData(t *testing.T) {
})
tableSpan := descs[i].TableSpan()
tests.CheckKeyCount(t, kvDB, tableSpan, 0)

// Ensure that the job is marked as succeeded.
if err := jobutils.VerifySystemJob(t, sqlRun, 2*i+1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(`DROP TABLE t.public.%s`, descs[i].GetName()),
DescriptorIDs: sqlbase.IDs{
descs[i].ID,
},
}); err != nil {
t.Fatal(err)
}
}

// Push a new zone config for a few tables with TTL=0 so the data
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/drop_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand Down Expand Up @@ -83,6 +84,20 @@ func (n *dropViewNode) startExec(params runParams) error {
if droppedDesc == nil {
continue
}

droppedDetails := jobspb.DroppedTableDetails{Name: toDel.tn.FQString(), ID: toDel.desc.ID}
job, err := params.p.createDropTablesJob(ctx, sqlbase.TableDescriptors{droppedDesc}, []jobspb.DroppedTableDetails{droppedDetails}, tree.AsStringWithFlags(n.n,
tree.FmtAlwaysQualifyTableNames))
if err != nil {
return err
}

if err := job.WithTxn(params.p.txn).Started(ctx); err != nil {
if log.V(2) {
log.Infof(ctx, "Failed to mark job %d as started: %v", *job.ID(), err)
}
}

cascadeDroppedViews, err := params.p.dropViewImpl(ctx, droppedDesc, n.n.DropBehavior)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 421d519

Please sign in to comment.