Skip to content

Commit

Permalink
import,backup,*: switch to AddSSTableRequest.DisallowShadowingBelow
Browse files Browse the repository at this point in the history
The new parameter DisallowShadowingBelow replaces DisallowShadowing for all current users.
The previous flag allowed keys that exactly matched -- including their timestamp --
with exactly the same value to exist without triggering an error to allow for retried
AddSSTables or job resumption. However, going forward, adding the same SSTable repeatedly
will no longer add the same key, as the added sstable's keys will have timestamps that
now reflect their addition time. Thus the prior API would no longer work, as keys would
never match. This new API allows overwriting keys if the value matches and the timestamp
being overwritten is not older than the passed value. The fact that the value must match
exactly means this version provides the same preservation of uniqueness as the exact time
match, but now allows for retries.

Release note: none.
  • Loading branch information
dt committed Dec 9, 2021
1 parent e89328d commit 2318073
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 77 deletions.
11 changes: 10 additions & 1 deletion pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -375,8 +376,16 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
iter := sst.iter
defer sst.cleanup()

// "disallowing" shadowing of anything older than logical=1 is i.e. allow all
// shadowing. We must allow shadowing in case the RESTORE has to retry any
// ingestions, but setting a (permissive) disallow like this serves to force
// evaluation of AddSSTable to check for overlapping keys. That in turn will
// result in it maintaining exact MVCC stats rather than estimates. Of course
// this comes at the cost of said overlap check, but in the common case of
// non-overlapping ingestion into empty spans, that is just one seek.
disallowShadowingBelow := hlc.Timestamp{Logical: 1}
batcher, err := bulk.MakeSSTBatcher(ctx, db, evalCtx.Settings,
func() int64 { return rd.flushBytes })
func() int64 { return rd.flushBytes }, disallowShadowingBelow)
if err != nil {
return summary, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/importccl/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,10 +1425,10 @@ func (r *importResumer) dropTables(
// admin knob (e.g. ALTER TABLE REVERT TO SYSTEM TIME) if anything goes wrong.
ts := hlc.Timestamp{WallTime: details.Walltime}.Prev()

// disallowShadowing means no existing keys could have been covered by a key
// imported and the table was offline to other writes, so even if GC has run
// it would not have GC'ed any keys to which we need to revert, so we can
// safely ignore the target-time GC check.
// disallowShadowingBelow=writeTS used to write means no existing keys could
// have been covered by a key imported and the table was offline to other
// writes, so even if GC has run it would not have GC'ed any keys to which
// we need to revert, so we can safely ignore the target-time GC check.
const ignoreGC = true
if err := sql.RevertTables(ctx, txn.DB(), execCfg, revert, ts, ignoreGC, sql.RevertTableDefaultBatchSize); err != nil {
return errors.Wrap(err, "rolling back partially completed IMPORT")
Expand Down
28 changes: 14 additions & 14 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,13 @@ func ingestKvs(
minBufferSize, maxBufferSize, stepSize := importBufferConfigSizes(flowCtx.Cfg.Settings,
true /* isPKAdder */)
pkIndexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB, writeTS, kvserverbase.BulkAdderOptions{
Name: "pkAdder",
DisallowShadowing: true,
SkipDuplicates: true,
MinBufferSize: minBufferSize,
MaxBufferSize: maxBufferSize,
StepBufferSize: stepSize,
SSTSize: flushSize,
Name: "pkAdder",
DisallowShadowingBelow: writeTS,
SkipDuplicates: true,
MinBufferSize: minBufferSize,
MaxBufferSize: maxBufferSize,
StepBufferSize: stepSize,
SSTSize: flushSize,
})
if err != nil {
return nil, err
Expand All @@ -345,13 +345,13 @@ func ingestKvs(
minBufferSize, maxBufferSize, stepSize = importBufferConfigSizes(flowCtx.Cfg.Settings,
false /* isPKAdder */)
indexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB, writeTS, kvserverbase.BulkAdderOptions{
Name: "indexAdder",
DisallowShadowing: true,
SkipDuplicates: true,
MinBufferSize: minBufferSize,
MaxBufferSize: maxBufferSize,
StepBufferSize: stepSize,
SSTSize: flushSize,
Name: "indexAdder",
DisallowShadowingBelow: writeTS,
SkipDuplicates: true,
MinBufferSize: minBufferSize,
MaxBufferSize: maxBufferSize,
StepBufferSize: stepSize,
SSTSize: flushSize,
})
if err != nil {
return nil, err
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ func MakeBulkAdder(
b := &BufferingAdder{
name: opts.Name,
sink: SSTBatcher{
db: db,
maxSize: opts.SSTSize,
rc: rangeCache,
settings: settings,
skipDuplicates: opts.SkipDuplicates,
disallowShadowing: opts.DisallowShadowing,
splitAfter: opts.SplitAndScatterAfter,
batchTS: opts.BatchTimestamp,
db: db,
maxSize: opts.SSTSize,
rc: rangeCache,
settings: settings,
skipDuplicates: opts.SkipDuplicates,
disallowShadowingBelow: opts.DisallowShadowingBelow,
splitAfter: opts.SplitAndScatterAfter,
batchTS: opts.BatchTimestamp,
},
timestamp: timestamp,
curBufferSize: opts.MinBufferSize,
Expand Down
61 changes: 34 additions & 27 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ type SSTBatcher struct {
maxSize func() int64
splitAfter func() int64

// allows ingestion of keys where the MVCC.Key would shadow an existing row.
disallowShadowing bool
// disallowShadowingBelow is described on roachpb.AddSSTableRequest.
disallowShadowingBelow hlc.Timestamp

// skips duplicate keys (iff they are buffered together). This is true when
// used to backfill an inverted index. An array in JSONB with multiple values
// which are the same, will all correspond to the same kv in the inverted
Expand All @@ -77,9 +78,9 @@ type SSTBatcher struct {
// maintain uniform behavior, duplicates in the same batch with equal values
// will not raise a DuplicateKeyError.
skipDuplicates bool
// ingestAll can only be set when disallowShadowing and skipDuplicates are
// false. It will never return a duplicateKey error and continue ingesting all
// data provided to it.
// ingestAll can only be set when disallowShadowingBelow is empty and
// skipDuplicates is false. It will never return a duplicateKey error and
// continue ingesting all data provided to it.
ingestAll bool

// batchTS is the timestamp that will be set on batch requests used to send
Expand Down Expand Up @@ -112,17 +113,23 @@ type SSTBatcher struct {
batchEndValue []byte
flushKeyChecked bool
flushKey roachpb.Key
// stores on-the-fly stats for the SST if disallowShadowing is true.
// stores on-the-fly stats for the SST if disallowShadowingBelow is set.
ms enginepb.MVCCStats
// rows written in the current batch.
rowCounter storage.RowCounter
}

// MakeSSTBatcher makes a ready-to-use SSTBatcher.
func MakeSSTBatcher(
ctx context.Context, db SSTSender, settings *cluster.Settings, flushBytes func() int64,
ctx context.Context,
db SSTSender,
settings *cluster.Settings,
flushBytes func() int64,
disallowShadowingBelow hlc.Timestamp,
) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, settings: settings, maxSize: flushBytes, disallowShadowing: true}
b := &SSTBatcher{
db: db, settings: settings, maxSize: flushBytes, disallowShadowingBelow: disallowShadowingBelow,
}
err := b.Reset(ctx)
return b, err
}
Expand Down Expand Up @@ -191,7 +198,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value
// guaranteed to ingest unique keys. This saves us an extra iteration in
// AddSSTable which has been identified as a significant performance
// regression for IMPORT.
if b.disallowShadowing {
if !b.disallowShadowingBelow.IsEmpty() {
b.updateMVCCStats(key, value)
}

Expand Down Expand Up @@ -320,7 +327,7 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int, nextKey roachpb.Ke
}

beforeSend := timeutil.Now()
files, err := AddSSTable(ctx, b.db, start, end, b.sstFile.Data(), b.disallowShadowing, b.ms, b.settings, b.batchTS)
files, err := AddSSTable(ctx, b.db, start, end, b.sstFile.Data(), b.disallowShadowingBelow, b.ms, b.settings, b.batchTS)
if err != nil {
return err
}
Expand Down Expand Up @@ -397,10 +404,10 @@ type SSTSender interface {
}

type sstSpan struct {
start, end roachpb.Key
sstBytes []byte
disallowShadowing bool
stats enginepb.MVCCStats
start, end roachpb.Key
sstBytes []byte
disallowShadowingBelow hlc.Timestamp
stats enginepb.MVCCStats
}

// AddSSTable retries db.AddSSTable if retryable errors occur, including if the
Expand All @@ -411,7 +418,7 @@ func AddSSTable(
db SSTSender,
start, end roachpb.Key,
sstBytes []byte,
disallowShadowing bool,
disallowShadowingBelow hlc.Timestamp,
ms enginepb.MVCCStats,
settings *cluster.Settings,
batchTs hlc.Timestamp,
Expand All @@ -434,7 +441,7 @@ func AddSSTable(
stats = ms
}

work := []*sstSpan{{start: start, end: end, sstBytes: sstBytes, disallowShadowing: disallowShadowing, stats: stats}}
work := []*sstSpan{{start: start, end: end, sstBytes: sstBytes, disallowShadowingBelow: disallowShadowingBelow, stats: stats}}
const maxAddSSTableRetries = 10
for len(work) > 0 {
item := work[0]
Expand All @@ -461,7 +468,7 @@ func AddSSTable(
}
// This will fail if the range has split but we'll check for that below.
err = db.AddSSTable(ctx, item.start, item.end, item.sstBytes, false, /* disallowConflicts */
item.disallowShadowing, hlc.Timestamp{} /* disallowShadowingBelow */, &item.stats,
!item.disallowShadowingBelow.IsEmpty(), item.disallowShadowingBelow, &item.stats,
ingestAsWriteBatch, batchTs, false /* writeAtBatchTs */)
if err == nil {
log.VEventf(ctx, 3, "adding %s AddSSTable [%s,%s) took %v", sz(len(item.sstBytes)), item.start, item.end, timeutil.Since(before))
Expand All @@ -477,7 +484,7 @@ func AddSSTable(
}
split := mr.Desc.EndKey.AsRawKey()
log.Infof(ctx, "SSTable cannot be added spanning range bounds %v, retrying...", split)
left, right, err := createSplitSSTable(ctx, db, item.start, split, item.disallowShadowing, iter, settings)
left, right, err := createSplitSSTable(ctx, db, item.start, split, item.disallowShadowingBelow, iter, settings)
if err != nil {
return err
}
Expand Down Expand Up @@ -520,7 +527,7 @@ func createSplitSSTable(
ctx context.Context,
db SSTSender,
start, splitKey roachpb.Key,
disallowShadowing bool,
disallowShadowingBelow hlc.Timestamp,
iter storage.SimpleMVCCIterator,
settings *cluster.Settings,
) (*sstSpan, *sstSpan, error) {
Expand Down Expand Up @@ -549,10 +556,10 @@ func createSplitSSTable(
return nil, nil, err
}
left = &sstSpan{
start: first,
end: last.PrefixEnd(),
sstBytes: sstFile.Data(),
disallowShadowing: disallowShadowing,
start: first,
end: last.PrefixEnd(),
sstBytes: sstFile.Data(),
disallowShadowingBelow: disallowShadowingBelow,
}
*sstFile = storage.MemFile{}
w = storage.MakeIngestionSSTWriter(sstFile)
Expand All @@ -578,10 +585,10 @@ func createSplitSSTable(
return nil, nil, err
}
right = &sstSpan{
start: first,
end: last.PrefixEnd(),
sstBytes: sstFile.Data(),
disallowShadowing: disallowShadowing,
start: first,
end: last.PrefixEnd(),
sstBytes: sstFile.Data(),
disallowShadowingBelow: disallowShadowingBelow,
}
return left, right, nil
}
2 changes: 1 addition & 1 deletion pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func TestAddBigSpanningSSTWithSplits(t *testing.T) {

t.Logf("Adding %dkb sst spanning %d splits from %v to %v", len(sst)/kb, len(splits), start, end)
if _, err := bulk.AddSSTable(
ctx, mock, start, end, sst, false /* disallowShadowing */, enginepb.MVCCStats{}, cluster.MakeTestingClusterSettings(), hlc.Timestamp{},
ctx, mock, start, end, sst, hlc.Timestamp{}, enginepb.MVCCStats{}, cluster.MakeTestingClusterSettings(), hlc.Timestamp{},
); err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,12 @@ func TestEvalAddSSTable(t *testing.T) {
},

// DisallowShadowingBelow
"DisallowShadowingBelow cannot be used with DisallowShadowing": {
"DisallowShadowingBelow can be used with DisallowShadowing": {
noShadow: true,
noShadowBelow: 5,
sst: []mvccKV{{"a", 1, "sst"}},
expectErr: `cannot set both DisallowShadowing and DisallowShadowingBelow`,
data: []mvccKV{{"a", 5, "123"}},
sst: []mvccKV{{"a", 6, "123"}},
expect: []mvccKV{{"a", 6, "123"}, {"a", 5, "123"}},
},
"DisallowShadowingBelow errors above existing": {
noShadowBelow: 5,
Expand Down
10 changes: 7 additions & 3 deletions pkg/kv/kvserver/kvserverbase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ type BulkAdderOptions struct {
// detection does not apply.
SkipDuplicates bool

// DisallowShadowing controls whether shadowing of existing keys is permitted
// when the SSTables produced by this adder are ingested.
DisallowShadowing bool
// DisallowShadowingBelow controls whether shadowing of existing keys is
// permitted when the SSTables produced by this adder are ingested. See the
// comment on roachpb.AddSSTableRequest for more details. Note that if this is
// set to a non-empty timestamp, the older flag DisallowShadowing will be set
// on all requests as well, so callers should expect older nodes to handle any
// requests accordingly or check the MVCCAddSSTable version gate.
DisallowShadowingBelow hlc.Timestamp

// BatchTimestamp is the timestamp to use on AddSSTable requests (which can be
// different from the timestamp used to construct the adder which is what is
Expand Down
3 changes: 2 additions & 1 deletion pkg/roachpb/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,8 @@ message AddSSTableRequest {
// resumption or retry. The equal value requirement is to avoid unique
// constraint violations.
//
// This parameter cannot be used together with DisallowShadowing.
// If this parameter is used, the value of DisallowShadowing is ignored, so
// callers may pass both for forward and backwards compatibility.
//
// Added in 22.1, so check the MVCCAddSSTable version gate before using.
util.hlc.Timestamp disallow_shadowing_below = 8 [(gogoproto.nullable) = false];
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/rowexec/bulk_row_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,12 @@ func (sp *bulkRowWriter) ingestLoop(ctx context.Context, kvCh chan row.KVBatch)
ctx, sp.flowCtx.Cfg.DB, writeTS, kvserverbase.BulkAdderOptions{
MinBufferSize: bufferSize,
// We disallow shadowing here to ensure that we report errors when builds
// of unique indexes fail when there are duplicate values.
DisallowShadowing: true,
// of unique indexes fail when there are duplicate values. Note that while
// the timestamp passed does allow shadowing other writes by the same job,
// the check for allowed shadowing also requires the values match, so a
// conflicting unique index entry would still be rejected as its value
// would point to a different owning row.
DisallowShadowingBelow: writeTS,
},
)
if err != nil {
Expand Down
20 changes: 8 additions & 12 deletions pkg/storage/sst.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ import (
// out if it finds any conflicts. This includes intents and existing keys with a
// timestamp at or above the SST key timestamp.
//
// If disallowShadowing is true, it also errors for any existing live key at the
// SST key timestamp, and ignores entries that exactly match an existing entry
// (key/value/timestamp), for backwards compatibility.
//
// If disallowShadowingBelow is non-empty, it also errors for any existing live
// key at the SST key timestamp, but allows shadowing an existing key if its
// timestamp is above the given timestamp and the values are equal. See comment
// on AddSSTableRequest.DisallowShadowingBelow for details.
//
// If disallowShadowing is true, it also errors for any existing live key at the
// SST key timestamp, and ignores entries that exactly match an existing entry
// (key/value/timestamp), for backwards compatibility. If disallowShadowingBelow
// is non-empty, disallowShadowing is ignored.
//
// The given SST and reader cannot contain intents or inline values (i.e. zero
// timestamps), nor tombstones (i.e. empty values), but this is only checked for
// keys that exist in both sides, for performance.
Expand All @@ -53,11 +54,6 @@ func CheckSSTConflicts(
var statsDiff enginepb.MVCCStats
var intents []roachpb.Intent

if disallowShadowing && !disallowShadowingBelow.IsEmpty() {
return enginepb.MVCCStats{}, errors.New(
"cannot set both DisallowShadowing and DisallowShadowingBelow")
}

// Fast path: there are no keys in the reader between the sstable's start and
// end keys. We use a non-prefix iterator for this search, and reopen a prefix
// one if there are engine keys in the span.
Expand Down Expand Up @@ -168,8 +164,8 @@ func CheckSSTConflicts(
//
// * disallowShadowing: any matching key.
// * disallowShadowingBelow: any matching key at or above the given timestamp.
allowIdempotent := disallowShadowing ||
(!disallowShadowingBelow.IsEmpty() && disallowShadowingBelow.LessEq(extKey.Timestamp))
allowIdempotent := (!disallowShadowingBelow.IsEmpty() && disallowShadowingBelow.LessEq(extKey.Timestamp)) ||
(disallowShadowingBelow.IsEmpty() && disallowShadowing)
if allowIdempotent && sstKey.Timestamp.Equal(extKey.Timestamp) &&
bytes.Equal(extValue, sstValue) {
// This SST entry will effectively be a noop, but its stats have already
Expand Down Expand Up @@ -209,7 +205,7 @@ func CheckSSTConflicts(
// a WriteTooOldError -- that error implies that the client should
// retry at a higher timestamp, but we already know that such a retry
// would fail (because it will shadow an existing key).
if len(extValue) > 0 && (disallowShadowing || !disallowShadowingBelow.IsEmpty()) {
if len(extValue) > 0 && (!disallowShadowingBelow.IsEmpty() || disallowShadowing) {
allowShadow := !disallowShadowingBelow.IsEmpty() &&
disallowShadowingBelow.LessEq(extKey.Timestamp) && bytes.Equal(extValue, sstValue)
if !allowShadow {
Expand Down

0 comments on commit 2318073

Please sign in to comment.