Skip to content

Commit

Permalink
Merge #66917 #67093
Browse files Browse the repository at this point in the history
66917: kvcoord: assert sanity when tracking in-flight write r=andreimatei a=andreimatei

Release note: None

67093: kv: remove spanset.GetDBEngine r=nvanbenschoten a=nvanbenschoten

This was originally introduced to work around limitations in the
`storage.Reader` interface, where only a `RocksDB` instance could
be passed to `engine.ExportToSst`. Since then, a lot has changed,
and this is no longer needed.

Removing this is important, as it appears to undermine #55461 and
make #66485 difficult.

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
3 people committed Jul 1, 2021
3 parents c452a0c + 65b0954 + cd22662 commit db47a54
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 37 deletions.
5 changes: 2 additions & 3 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func declareKeysExport(
// evalExport dumps the requested keys into files of non-overlapping key ranges
// in a format suitable for bulk ingest.
func evalExport(
ctx context.Context, batch storage.Reader, cArgs batcheval.CommandArgs, resp roachpb.Response,
ctx context.Context, reader storage.Reader, cArgs batcheval.CommandArgs, resp roachpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*roachpb.ExportRequest)
h := cArgs.Header
Expand Down Expand Up @@ -147,7 +147,6 @@ func evalExport(
return result.Result{}, errors.Errorf("unknown MVCC filter: %s", args.MVCCFilter)
}

e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey})
targetSize := uint64(args.TargetFileSize)
// TODO(adityamaru): Remove this once we are able to set tenant specific
// cluster settings. This takes the minimum of the system tenant's cluster
Expand All @@ -169,7 +168,7 @@ func evalExport(
var curSizeOfExportedSSTs int64
for start := args.Key; start != nil; {
destFile := &storage.MemFile{}
summary, resume, err := e.ExportMVCCToSst(start, args.EndKey, args.StartTime,
summary, resume, err := reader.ExportMVCCToSst(start, args.EndKey, args.StartTime,
h.Timestamp, exportAllRevisions, targetSize, maxSize, useTBI, destFile)
if err != nil {
return result.Result{}, err
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,12 @@ func (tp *txnPipeliner) updateLockTracking(
// need to prove that these succeeded sometime before we commit.
header := req.Header()
tp.ifWrites.insert(header.Key, header.Sequence)
// The request is not expected to be a ranged one, as we're only
// tracking one key in the ifWrites. Ranged requests do not admit
// ba.AsyncConsensus.
if roachpb.IsRange(req) {
log.Fatalf(ctx, "unexpected range request with AsyncConsensus: %s", req)
}
} else {
// If the lock acquisitions weren't performed asynchronously
// then add them directly to our lock footprint. Locking read
Expand Down
16 changes: 4 additions & 12 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -219,27 +218,20 @@ func EvalAddSSTable(

func checkForKeyCollisions(
_ context.Context,
readWriter storage.ReadWriter,
reader storage.Reader,
mvccStartKey storage.MVCCKey,
mvccEndKey storage.MVCCKey,
data []byte,
) (enginepb.MVCCStats, error) {
// We could get a spansetBatch so fetch the underlying db engine as
// we need access to the underlying C.DBIterator later, and the
// dbIteratorGetter is not implemented by a spansetBatch.
dbEngine := spanset.GetDBEngine(readWriter, roachpb.Span{Key: mvccStartKey.Key, EndKey: mvccEndKey.Key})

emptyMVCCStats := enginepb.MVCCStats{}

// Create iterator over the existing data.
existingDataIter := dbEngine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: mvccEndKey.Key})
existingDataIter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: mvccEndKey.Key})
defer existingDataIter.Close()
existingDataIter.SeekGE(mvccStartKey)
if ok, err := existingDataIter.Valid(); err != nil {
return emptyMVCCStats, errors.Wrap(err, "checking for key collisions")
return enginepb.MVCCStats{}, errors.Wrap(err, "checking for key collisions")
} else if !ok {
// Target key range is empty, so it is safe to ingest.
return emptyMVCCStats, nil
return enginepb.MVCCStats{}, nil
}

return existingDataIter.CheckForKeyCollisions(data, mvccStartKey.Key, mvccEndKey.Key)
Expand Down
22 changes: 0 additions & 22 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,28 +489,6 @@ func (s spanSetReader) PinEngineStateForIterators() error {
return s.r.PinEngineStateForIterators()
}

// GetDBEngine recursively searches for the underlying rocksDB engine.
func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader {
switch v := reader.(type) {
case ReadWriter:
return GetDBEngine(getSpanReader(v, span), span)
case *spanSetBatch:
return GetDBEngine(getSpanReader(v.ReadWriter, span), span)
default:
return reader
}
}

// getSpanReader is a getter to access the storage.Reader field of the
// spansetReader.
func getSpanReader(r ReadWriter, span roachpb.Span) storage.Reader {
if err := r.spanSetReader.spans.CheckAllowed(SpanReadOnly, span); err != nil {
panic("Not in the span")
}

return r.spanSetReader.r
}

type spanSetWriter struct {
w storage.Writer
spans *SpanSet
Expand Down

0 comments on commit db47a54

Please sign in to comment.