Skip to content

Commit

Permalink
importer: replace Clear/RevertRange calls with DeleteRange in IMPORT …
Browse files Browse the repository at this point in the history
…rollbacks

This patch replaces the ClearRange and RevertRange calls with MVCC DeleteRange
during IMPORT rollbacks.

In the predicate based DeleteRange call, the client's provided table span is
partitioned in order to process the partitions in parallel. The new
bulkio.import.predicate_delete_range_parallelism setting defines the number of
workers that can issue DeleteRange requests in concurrently, and the new
bulkio.import.predicate_delete_range_batch_size setting defines the number of
ranges of data to include in a single DeleteRange request.

Release note: none
  • Loading branch information
msbutler committed Aug 9, 2022
1 parent 9650b0d commit 377acfc
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pkg/settings/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

// MaxSettings is the maximum number of settings that the system supports.
// Exported for tests.
const MaxSettings = 511
const MaxSettings = 1023

// Values is a container that stores values for all registered settings.
// Each setting is assigned a unique slot (up to MaxSettings).
Expand Down
66 changes: 47 additions & 19 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,10 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
// will write.
details.Walltime = p.ExecCfg().Clock.Now().WallTime

// Check if the tables being imported into are starting empty, in which
// case we can cheaply clear-range instead of revert-range to cleanup.
// Check if the tables being imported into are starting empty, in which case
// we can cheaply clear-range instead of revert-range to cleanup (or if the
// cluster has finalized to 22.1, use DeleteRange without predicate
// filtering).
for i := range details.Tables {
if !details.Tables[i].IsNew {
tblDesc := tabledesc.NewBuilder(details.Tables[i].Desc).BuildImmutableTable()
Expand Down Expand Up @@ -1461,6 +1463,11 @@ func (r *importResumer) dropTables(
return nil
}

// useDeleteRange indicates that the cluster has been finalized to 22.1 and
// can use MVCC compatible DeleteRange to with range tombstones during an import
// rollback.
useDeleteRange := r.settings.Version.IsActive(ctx, clusterversion.MVCCRangeTombstones)

var tableWasEmpty bool
var intoTable catalog.TableDescriptor
for _, tbl := range details.Tables {
Expand Down Expand Up @@ -1497,25 +1504,46 @@ func (r *importResumer) dropTables(
// it was rolled back to its pre-IMPORT state, and instead provide a manual
// admin knob (e.g. ALTER TABLE REVERT TO SYSTEM TIME) if anything goes wrong.
ts := hlc.Timestamp{WallTime: details.Walltime}.Prev()

// disallowShadowingBelow=writeTS used to write means no existing keys could
// have been covered by a key imported and the table was offline to other
// writes, so even if GC has run it would not have GC'ed any keys to which
// we need to revert, so we can safely ignore the target-time GC check.
const ignoreGC = true
if err := sql.RevertTables(ctx, txn.DB(), execCfg, []catalog.TableDescriptor{intoTable}, ts, ignoreGC,
sql.RevertTableDefaultBatchSize); err != nil {
return errors.Wrap(err, "rolling back partially completed IMPORT")
if useDeleteRange {
predicates := roachpb.DeleteRangePredicates{StartTime: ts}
if err := sql.DeleteTableWithPredicate(
ctx,
execCfg.DB,
execCfg.Codec,
&execCfg.Settings.SV,
execCfg.DistSender,
intoTable,
predicates, sql.RevertTableDefaultBatchSize); err != nil {
return errors.Wrap(err, "rolling back IMPORT INTO in non empty table via DeleteRange")
}
} else {
// disallowShadowingBelow=writeTS used to write means no existing keys could
// have been covered by a key imported and the table was offline to other
// writes, so even if GC has run it would not have GC'ed any keys to which
// we need to revert, so we can safely ignore the target-time GC check.
const ignoreGC = true
if err := sql.RevertTables(ctx, txn.DB(), execCfg, []catalog.TableDescriptor{intoTable}, ts, ignoreGC,
sql.RevertTableDefaultBatchSize); err != nil {
return errors.Wrap(err, "rolling back partially completed IMPORT via RevertRange")
}
}
} else if tableWasEmpty {
// Set a DropTime on the table descriptor to differentiate it from an
// older-format (v1.1) descriptor. This enables ClearTableData to use a
// RangeClear for faster data removal, rather than removing by chunks.
intoTable.TableDesc().DropTime = int64(1)
if err := gcjob.ClearTableData(
ctx, execCfg.DB, execCfg.DistSender, execCfg.Codec, &execCfg.Settings.SV, intoTable,
); err != nil {
return errors.Wrapf(err, "clearing data for table %d", intoTable.GetID())
if useDeleteRange {
if err := gcjob.DeleteAllTableData(
ctx, execCfg.DB, execCfg.DistSender, execCfg.Codec, intoTable,
); err != nil {
return errors.Wrap(err, "rolling back IMPORT INTO in empty table via DeleteRange")
}
} else {
// Set a DropTime on the table descriptor to differentiate it from an
// older-format (v1.1) descriptor. This enables ClearTableData to use a
// RangeClear for faster data removal, rather than removing by chunks.
intoTable.TableDesc().DropTime = int64(1)
if err := gcjob.ClearTableData(
ctx, execCfg.DB, execCfg.DistSender, execCfg.Codec, &execCfg.Settings.SV, intoTable,
); err != nil {
return errors.Wrapf(err, "rolling back IMPORT INTO in empty table via ClearRange")
}
}
}

Expand Down
48 changes: 32 additions & 16 deletions pkg/sql/importer/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3151,26 +3151,40 @@ func TestImportIntoCSV(t *testing.T) {

// Verify a failed IMPORT INTO won't prevent a subsequent IMPORT INTO.
t.Run("import-into-checkpoint-leftover", func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)
defer sqlDB.Exec(t, `DROP TABLE t`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}
for _, emptyTable := range []bool{true, false} {
subtestName := "empty-table"
if emptyTable == true {
subtestName = "nonEmptyTable"
}
t.Run(subtestName, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)
defer sqlDB.Exec(t, `DROP TABLE t`)

for i, v := range insert {
sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v)
}
if emptyTable != false {
// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

// Hit a failure during import.
forceFailure = true
sqlDB.ExpectErr(
t, `testing injected failure`,
fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[1]),
)
forceFailure = false
for i, v := range insert {
sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v)
}
}
preImportData := sqlDB.QueryStr(t, `SELECT * FROM t`)

// Expect it to succeed on re-attempt.
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[1]))
// Hit a failure during import.
forceFailure = true
sqlDB.ExpectErr(
t, `testing injected failure`,
fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[1]),
)
forceFailure = false

sqlDB.CheckQueryResults(t, `SELECT * FROM t`, preImportData)

// Expect it to succeed on re-attempt.
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[1]))
})
}
})

// Verify that during IMPORT INTO the table is offline.
Expand Down Expand Up @@ -3558,10 +3572,12 @@ func TestImportIntoCSV(t *testing.T) {
fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[0]),
)

preCollisionData := sqlDB.QueryStr(t, `SELECT * FROM t`)
sqlDB.ExpectErr(
t, `ingested key collides with an existing one: /Table/\d+/1/0/0`,
fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.fileWithShadowKeys[0]),
)
sqlDB.CheckQueryResults(t, `SELECT * FROM t`, preCollisionData)
})

// Tests that IMPORT INTO invalidates FK and CHECK constraints.
Expand Down
145 changes: 145 additions & 0 deletions pkg/sql/revert.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,34 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

// TODO (msbutler): tune these
var rollbackBatchSize = settings.RegisterIntSetting(
settings.TenantWritable,
"bulkio.import.predicate_delete_range_batch_size",
"the number of ranges to include in a single Predicate Based DeleteRange request",
10)

var predicateDeleteRangeNumWorkers = settings.RegisterIntSetting(
settings.TenantWritable,
"bulkio.import.predicate_delete_range_parallelism",
"the number of workers used to issue Predicate Based DeleteRange request",
4)

// RevertTableDefaultBatchSize is the default batch size for reverting tables.
// This only needs to be small enough to keep raft/rocks happy -- there is no
// reply size to worry about.
Expand Down Expand Up @@ -101,3 +120,129 @@ func RevertTables(

return nil
}

// DeleteTableWithPredicate issues a series of point and range tombstones over a subset of keys in a
// table that match the passed-in predicate.
//
// This function will error, without a resume span, if it encounters an intent
// in the span. The caller should resolve these intents by retrying the
// function. To prevent errors, the caller should only pass a table that will
// not see new writes during this bulk delete operation (e.g. on a span that's
// part of an import rollback).
//
// NOTE: this function will issue tombstones on keys with versions that match
// the predicate, in contrast to RevertRange, which rolls back a table to a
// certain timestamp. For example, if a key gets a single update after the
// predicate.startTime, DeleteTableWithPredicate would delete that key, while
// RevertRange would revert that key to its state before the update.
func DeleteTableWithPredicate(
ctx context.Context,
db *kv.DB,
codec keys.SQLCodec,
sv *settings.Values,
distSender *kvcoord.DistSender,
table catalog.TableDescriptor,
predicates roachpb.DeleteRangePredicates,
batchSize int64,
) error {

log.Infof(ctx, "deleting data for table %d with predicate %s", table.GetID(), predicates.String())
tableKey := roachpb.RKey(codec.TablePrefix(uint32(table.GetID())))
tableSpan := roachpb.RSpan{Key: tableKey, EndKey: tableKey.PrefixEnd()}

// To process the table in parallel, spin up a few workers and partition the
// inputted span such that each span partition contains a few ranges of data.
// The partitions are sent to the workers via the spansToDo channel.
//
// TODO (msbutler): tune these
rangesPerBatch := rollbackBatchSize.Get(sv)
numWorkers := int(predicateDeleteRangeNumWorkers.Get(sv))

spansToDo := make(chan *roachpb.Span, 1)

// Create a cancellable context to prevent the worker goroutines below from
// leaking once the parent goroutine returns.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

grp := ctxgroup.WithContext(ctx)
grp.GoCtx(func(ctx context.Context) error {
return ctxgroup.GroupWorkers(ctx, numWorkers, func(ctx context.Context, _ int) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case span, ok := <-spansToDo:
if !ok {
return nil
}
// If the kvserver returns a resume span, shadow the initial span with
// the resume span and rerun the request until no resume span is
// returned.
resumeCount := 1
for span != nil {
admissionHeader := roachpb.AdmissionHeader{
Priority: int32(admissionpb.BulkNormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
}
delRangeRequest := &roachpb.DeleteRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: span.Key,
EndKey: span.EndKey,
},
UseRangeTombstone: true,
Predicates: predicates,
}
log.VEventf(ctx, 2, "deleting range %s - %s; attempt %v", span.Key, span.EndKey, resumeCount)

rawResp, err := kv.SendWrappedWithAdmission(
ctx,
db.NonTransactionalSender(),
roachpb.Header{MaxSpanRequestKeys: batchSize},
admissionHeader,
delRangeRequest)

if err != nil {
return errors.Wrapf(err.GoError(), "delete range %s - %s", span.Key, span.EndKey)
}
span = nil
resp := rawResp.(*roachpb.DeleteRangeResponse)
if resp.ResumeSpan != nil {
if !resp.ResumeSpan.Valid() {
return errors.Errorf("invalid resume span: %s", resp.ResumeSpan)
}
span = resp.ResumeSpan
resumeCount++
}
}
}
}
})
})

var n int64
lastKey := tableSpan.Key
ri := kvcoord.MakeRangeIterator(distSender)
for ri.Seek(ctx, tableSpan.Key, kvcoord.Ascending); ; ri.Next(ctx) {
if !ri.Valid() {
return ri.Error()
}
if n++; n >= rangesPerBatch || !ri.NeedAnother(tableSpan) {
endKey := ri.Desc().EndKey
if tableSpan.EndKey.Less(endKey) {
endKey = tableSpan.EndKey
}
spansToDo <- &roachpb.Span{Key: lastKey.AsRawKey(), EndKey: endKey.AsRawKey()}
n = 0
lastKey = endKey
}

if !ri.NeedAnother(tableSpan) {
break
}
}
close(spansToDo)
return grp.Wait()
}
35 changes: 26 additions & 9 deletions pkg/sql/revert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestRevertTable(t *testing.T) {
func TestTableRollback(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -60,17 +60,20 @@ func TestRevertTable(t *testing.T) {
targetTime, err := hlc.ParseHLC(ts)
require.NoError(t, err)

const ignoreGC = false
beforeNumRows := db.QueryStr(t, `SELECT count(*) FROM test`)

t.Run("simple", func(t *testing.T) {
// Make some more edits: delete some rows and edit others, insert into some of
// the gaps made between previous rows, edit a large swath of rows and add a
// large swath of new rows as well.
// Make some more edits: delete some rows and edit others, insert into some of
// the gaps made between previous rows, edit a large swath of rows and add a
// large swath of new rows as well.
db.Exec(t, `DELETE FROM test WHERE k % 5 = 2`)
db.Exec(t, `INSERT INTO test (k, rev) SELECT generate_series(10, $1, 10), 10`, numRows)
db.Exec(t, `INSERT INTO test (k, rev) SELECT generate_series($1+1, $1+500, 1), 500`, numRows)

t.Run("simple-revert", func(t *testing.T) {

const ignoreGC = false
db.Exec(t, `UPDATE test SET rev = 2 WHERE k % 4 = 0`)
db.Exec(t, `DELETE FROM test WHERE k % 5 = 2`)
db.Exec(t, `INSERT INTO test (k, rev) SELECT generate_series(10, $1, 10), 10`, numRows)
db.Exec(t, `UPDATE test SET rev = 4 WHERE k > 150 and k < 350`)
db.Exec(t, `INSERT INTO test (k, rev) SELECT generate_series($1+1, $1+500, 1), 500`, numRows)

var edited, aost int
db.QueryRow(t, `SELECT xor_agg(k # rev) FROM test`).Scan(&edited)
Expand All @@ -86,6 +89,20 @@ func TestRevertTable(t *testing.T) {
var reverted int
db.QueryRow(t, `SELECT xor_agg(k # rev) FROM test`).Scan(&reverted)
require.Equal(t, before, reverted, "expected reverted table after edits to match before")

db.CheckQueryResults(t, `SELECT count(*) FROM test`, beforeNumRows)
})

t.Run("simple-delete-range-predicate", func(t *testing.T) {

// Delete all keys with values after the targetTime
desc := desctestutils.TestingGetPublicTableDescriptor(kv, keys.SystemSQLCodec, "test", "test")

predicates := roachpb.DeleteRangePredicates{StartTime: targetTime}
require.NoError(t, sql.DeleteTableWithPredicate(context.Background(), kv, execCfg.Codec,
&s.ClusterSettings().SV, execCfg.DistSender, desc, predicates, 10))

db.CheckQueryResults(t, `SELECT count(*) FROM test`, beforeNumRows)
})
}

Expand Down

0 comments on commit 377acfc

Please sign in to comment.