Skip to content

Commit

Permalink
sql: create jobs for truncated and dropped tables
Browse files Browse the repository at this point in the history
Creates a job for statements involving dropping or truncated tables, including
DROP DATABASE. The job is completed when the GC TTL expires and both table
data and ID is deleted for each of the tables involved.

Fixes cockroachdb#19004

Release note: None
  • Loading branch information
Erik Trinh committed Sep 17, 2018
1 parent 2ddf654 commit 02d7b64
Show file tree
Hide file tree
Showing 13 changed files with 961 additions and 343 deletions.
539 changes: 406 additions & 133 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ message ResumeSpanList {
repeated roachpb.Span resume_spans = 1 [(gogoproto.nullable) = false];
}

message DroppedTableDetails {
enum Status {
DRAINING_NAME = 0 [(gogoproto.enumvalue_customname) = "StatusDrainingName"];
WAIT_FOR_GC_INTERVAL = 1 [(gogoproto.enumvalue_customname) = "StatusWaitForGCInterval"];
ROCKSDB_COMPACTION = 2 [(gogoproto.enumvalue_customname) = "StatusRocksDBCompaction"];
DONE = 10 [(gogoproto.enumvalue_customname) = "StatusDone"];
}
string name = 1;
uint32 ID = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sqlbase.ID"];
Status status = 3;
}

message SchemaChangeDetails {
util.hlc.Timestamp read_as_of = 1 [(gogoproto.nullable) = false];
// A schema change can involve running multiple processors backfilling
Expand All @@ -121,6 +133,7 @@ message SchemaChangeDetails {
// be processed. The index represents the index of a mutation in a
// mutation list containing mutations for the same mutationID.
repeated ResumeSpanList resume_span_list = 2 [(gogoproto.nullable) = false];
repeated DroppedTableDetails dropped_tables = 3 [(gogoproto.nullable) = false];
}

message SchemaChangeProgress {
Expand Down
21 changes: 21 additions & 0 deletions pkg/sql/drop_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -119,6 +120,26 @@ func (n *dropDatabaseNode) startExec(params runParams) error {
ctx := params.ctx
p := params.p
tbNameStrings := make([]string, 0, len(n.td))
droppedTableDetails := make([]jobspb.DroppedTableDetails, 0, len(n.td))
tableDescs := make([]*sqlbase.TableDescriptor, 0, len(n.td))

for _, toDel := range n.td {
if toDel.desc.IsView() {
continue
}
droppedTableDetails = append(droppedTableDetails, jobspb.DroppedTableDetails{
Name: toDel.tn.FQString(),
ID: toDel.desc.ID,
})
tableDescs = append(tableDescs, toDel.desc)
}

_, err := p.createDropTablesJob(ctx, tableDescs, droppedTableDetails, tree.AsStringWithFlags(n.n,
tree.FmtAlwaysQualifyTableNames))
if err != nil {
return err
}

for _, toDel := range n.td {
tbDesc := toDel.desc
if tbDesc.IsView() {
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
Expand Down Expand Up @@ -107,6 +108,13 @@ func (n *dropTableNode) startExec(params runParams) error {
if droppedDesc == nil {
continue
}

droppedDetails := jobspb.DroppedTableDetails{Name: toDel.tn.FQString(), ID: toDel.desc.ID}
if _, err := params.p.createDropTablesJob(ctx, sqlbase.TableDescriptors{droppedDesc}, []jobspb.DroppedTableDetails{droppedDetails}, tree.AsStringWithFlags(n.n,
tree.FmtAlwaysQualifyTableNames)); err != nil {
return err
}

droppedViews, err := params.p.dropTableImpl(params, droppedDesc)
if err != nil {
return err
Expand Down
124 changes: 120 additions & 4 deletions pkg/sql/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/sqlmigrations"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -199,6 +203,18 @@ INSERT INTO t.kv VALUES ('c', 'e'), ('a', 'c'), ('b', 'd');
if err := zoneExists(sqlDB, &cfg, tbDesc.ID); err != nil {
t.Fatal(err)
}

// Job still running, waiting for GC.
sqlRun := sqlutils.MakeSQLRunner(sqlDB)
if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusRunning, jobs.Record{
Username: security.RootUser,
Description: "DROP DATABASE t CASCADE",
DescriptorIDs: sqlbase.IDs{
tbDesc.ID,
},
}); err != nil {
t.Fatal(err)
}
}

// Test that a dropped database's data gets deleted properly.
Expand All @@ -220,6 +236,8 @@ func TestDropDatabaseDeleteData(t *testing.T) {
CREATE DATABASE t;
CREATE TABLE t.kv (k CHAR PRIMARY KEY, v CHAR, FAMILY (k), FAMILY (v));
INSERT INTO t.kv VALUES ('c', 'e'), ('a', 'c'), ('b', 'd');
CREATE TABLE t.kv2 (k CHAR PRIMARY KEY, v CHAR, FAMILY (k), FAMILY (v));
INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a');
`); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -253,8 +271,24 @@ INSERT INTO t.kv VALUES ('c', 'e'), ('a', 'c'), ('b', 'd');
}
tbDesc := desc.GetTable()

tb2NameKey := sqlbase.MakeNameMetadataKey(dbDesc.ID, "kv2")
gr2, err := kvDB.Get(ctx, tb2NameKey)
if err != nil {
t.Fatal(err)
}
if !gr2.Exists() {
t.Fatalf(`table "kv2" does not exist`)
}
tb2DescKey := sqlbase.MakeDescMetadataKey(sqlbase.ID(gr2.ValueInt()))
if err := kvDB.GetProto(ctx, tb2DescKey, desc); err != nil {
t.Fatal(err)
}
tb2Desc := desc.GetTable()

tableSpan := tbDesc.TableSpan()
table2Span := tb2Desc.TableSpan()
tests.CheckKeyCount(t, kvDB, tableSpan, 6)
tests.CheckKeyCount(t, kvDB, table2Span, 6)

if _, err := sqlDB.Exec(`DROP DATABASE t RESTRICT`); !testutils.IsError(err,
`database "t" is not empty`) {
Expand All @@ -266,6 +300,18 @@ INSERT INTO t.kv VALUES ('c', 'e'), ('a', 'c'), ('b', 'd');
}

tests.CheckKeyCount(t, kvDB, tableSpan, 6)
tests.CheckKeyCount(t, kvDB, table2Span, 6)

sqlRun := sqlutils.MakeSQLRunner(sqlDB)
if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusRunning, jobs.Record{
Username: security.RootUser,
Description: "DROP DATABASE t CASCADE",
DescriptorIDs: sqlbase.IDs{
tbDesc.ID, tb2Desc.ID,
},
}); err != nil {
t.Fatal(err)
}

// Push a new zone config for both the table and database with TTL=0
// so the data is deleted immediately.
Expand All @@ -278,9 +324,6 @@ INSERT INTO t.kv VALUES ('c', 'e'), ('a', 'c'), ('b', 'd');
if _, err := sqlDB.Exec(`INSERT INTO system.zones VALUES ($1, $2)`, tbDesc.ID, buf); err != nil {
t.Fatal(err)
}
if _, err := sqlDB.Exec(`INSERT INTO system.zones VALUES ($1, $2)`, dbDesc.ID, buf); err != nil {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
if err := descExists(sqlDB, false, tbDesc.ID); err != nil {
Expand All @@ -290,8 +333,47 @@ INSERT INTO t.kv VALUES ('c', 'e'), ('a', 'c'), ('b', 'd');
return zoneExists(sqlDB, nil, tbDesc.ID)
})

// Data is deleted.
// Table 1 data is deleted.
tests.CheckKeyCount(t, kvDB, tableSpan, 0)
tests.CheckKeyCount(t, kvDB, table2Span, 6)

if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusRunning, jobs.Record{
Username: security.RootUser,
Description: "DROP DATABASE t CASCADE",
DescriptorIDs: sqlbase.IDs{
tbDesc.ID, tb2Desc.ID,
},
}); err != nil {
t.Fatal(err)
}

if _, err := sqlDB.Exec(`INSERT INTO system.zones VALUES ($1, $2)`, tb2Desc.ID, buf); err != nil {
t.Fatal(err)
}
if _, err := sqlDB.Exec(`INSERT INTO system.zones VALUES ($1, $2)`, dbDesc.ID, buf); err != nil {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
if err := descExists(sqlDB, false, tb2Desc.ID); err != nil {
return err
}

return zoneExists(sqlDB, nil, tb2Desc.ID)
})

// Table 2 data is deleted.
tests.CheckKeyCount(t, kvDB, table2Span, 0)

if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: "DROP DATABASE t CASCADE",
DescriptorIDs: sqlbase.IDs{
tbDesc.ID, tb2Desc.ID,
},
}); err != nil {
t.Fatal(err)
}
}

// Tests that SHOW TABLES works correctly when a database is recreated
Expand Down Expand Up @@ -544,6 +626,18 @@ func TestDropTable(t *testing.T) {
t.Fatalf("table namekey still exists")
}

// Job still running, waiting for GC.
sqlRun := sqlutils.MakeSQLRunner(sqlDB)
if err := jobutils.VerifySystemJob(t, sqlRun, 1, jobspb.TypeSchemaChange, jobs.StatusRunning, jobs.Record{
Username: security.RootUser,
Description: `DROP TABLE t.public.kv`,
DescriptorIDs: sqlbase.IDs{
tableDesc.ID,
},
}); err != nil {
t.Fatal(err)
}

// Can create a table with the same name.
if err := tests.CreateKVTable(sqlDB, "kv", numRows); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -605,12 +699,23 @@ func TestDropTableDeleteData(t *testing.T) {
}

// Data hasn't been GC-ed.
sqlRun := sqlutils.MakeSQLRunner(sqlDB)
for i := 0; i < numTables; i++ {
if err := descExists(sqlDB, true, descs[i].ID); err != nil {
t.Fatal(err)
}
tableSpan := descs[i].TableSpan()
tests.CheckKeyCount(t, kvDB, tableSpan, numKeys)

if err := jobutils.VerifySystemJob(t, sqlRun, 2*i+1, jobspb.TypeSchemaChange, jobs.StatusRunning, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(`DROP TABLE t.public.%s`, descs[i].GetName()),
DescriptorIDs: sqlbase.IDs{
descs[i].ID,
},
}); err != nil {
t.Fatal(err)
}
}

// The closure pushes a zone config reducing the TTL to 0 for descriptor i.
Expand All @@ -636,6 +741,17 @@ func TestDropTableDeleteData(t *testing.T) {
})
tableSpan := descs[i].TableSpan()
tests.CheckKeyCount(t, kvDB, tableSpan, 0)

// Ensure that the job is marked as succeeded.
if err := jobutils.VerifySystemJob(t, sqlRun, 2*i+1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(`DROP TABLE t.public.%s`, descs[i].GetName()),
DescriptorIDs: sqlbase.IDs{
descs[i].ID,
},
}); err != nil {
t.Fatal(err)
}
}

// Push a new zone config for a few tables with TTL=0 so the data
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/drop_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (n *dropViewNode) startExec(params runParams) error {
if droppedDesc == nil {
continue
}

cascadeDroppedViews, err := params.p.dropViewImpl(ctx, droppedDesc, n.n.DropBehavior)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 02d7b64

Please sign in to comment.