Skip to content

Commit

Permalink
Merge #39619
Browse files Browse the repository at this point in the history
39619: Revert "storage: build SSTs from KV_BATCH snapshot" r=nvanbenschoten a=tbg

This reverts commit b320ff5.

In the above commit, we are starting both to ingest multiple SSTs at the
same time, and additionally these SSTs contain range deletion
tombstones. Both are firsts, and it turns out that there are some kinks
to work out. The commit causes quite a number of failures, so reduce
churn while we do so.

See #39604.

Release note: None

Co-authored-by: Tobias Schottdorf <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Aug 13, 2019
2 parents 74c2efe + 0f82aa5 commit 5e6c20d
Show file tree
Hide file tree
Showing 15 changed files with 268 additions and 835 deletions.
1 change: 0 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
<tr><td><code>kv.rangefeed.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.snapshot_sst.sync_size</code></td><td>byte size</td><td><code>2.0 MiB</code></td><td>threshold after which snapshot SST writes must fsync</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>262144</code></td><td>maximum number of bytes used to track write intents in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.parallel_commits_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional commits will be parallelized with transactional writes</td></tr>
Expand Down
110 changes: 0 additions & 110 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"math/rand"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -38,7 +37,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/rditer"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
Expand Down Expand Up @@ -3048,105 +3046,10 @@ func (h *unreliableRaftHandler) HandleRaftResponse(
func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

// We will be testing the SSTs written on store2's engine.
var eng engine.Engine
ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
storeCfg.TestingKnobs.BeforeSnapshotSSTIngestion = func(
inSnap storage.IncomingSnapshot,
snapType storage.SnapshotRequest_Type,
sstNames []string,
) error {
// Only verify snapshots of type RAFT and on the range under exercise
// (range 2). Note that the keys of range 2 aren't verified in this
// functions. Unreplicated range-id local keys are not verified because
// there are too many keys and the other replicated keys are verified later
// on in the test. This function verifies that the subsumed replicas have
// been handled properly.
if snapType != storage.SnapshotRequest_RAFT || inSnap.State.Desc.RangeID != roachpb.RangeID(2) {
return nil
}
// The seven SSTs we are expecting to ingest are in the following order:
// 1. Replicated range-id local keys of the range in the snapshot.
// 2. Range-local keys of the range in the snapshot.
// 3. User keys of the range in the snapshot.
// 4. Unreplicated range-id local keys of the range in the snapshot.
// 5. SST to clear range-id local keys of the subsumed replica with
// RangeID 3.
// 6. SST to clear range-id local keys of the subsumed replica with
// RangeID 4.
// 7. SST to clear the user keys of the subsumed replicas.
//
// NOTE: There are no range-local keys in [d, /Max) in the store we're
// sending a snapshot to, so we aren't expecting an SST to clear those
// keys.
if len(sstNames) != 7 {
return errors.Errorf("expected to ingest 7 SSTs, got %d SSTs", len(sstNames))
}

// Only verify the SSTs of the subsumed replicas (the last three SSTs) by
// constructing the expected SST and ensuring that they are byte-by-byte
// equal. This verification ensures that the SSTs have the same tombstones
// and range deletion tombstones.
var expectedSSTs [][]byte
sstNames = sstNames[4:]

// Range-id local range of subsumed replicas.
for _, rangeID := range []roachpb.RangeID{roachpb.RangeID(3), roachpb.RangeID(4)} {
sst, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
}
defer sst.Close()
r := rditer.MakeRangeIDLocalKeyRange(rangeID, false)
if err := sst.ClearRange(r.Start, r.End); err != nil {
return err
}
tombstoneKey := keys.RaftTombstoneKey(rangeID)
tombstoneValue := &roachpb.RaftTombstone{NextReplicaID: math.MaxInt32}
if err := engine.MVCCBlindPutProto(context.TODO(), &sst, nil, tombstoneKey, hlc.Timestamp{}, tombstoneValue, nil); err != nil {
return err
}
expectedSST, err := sst.Finish()
if err != nil {
return err
}
expectedSSTs = append(expectedSSTs, expectedSST)
}

// User key range of subsumed replicas.
sst, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
}
defer sst.Close()
desc := roachpb.RangeDescriptor{
StartKey: roachpb.RKey("d"),
EndKey: roachpb.RKeyMax,
}
r := rditer.MakeUserKeyRange(&desc)
if err := engine.ClearRangeWithHeuristic(eng, &sst, r.Start, r.End); err != nil {
return err
}
expectedSST, err := sst.Finish()
if err != nil {
return err
}
expectedSSTs = append(expectedSSTs, expectedSST)

for i := range sstNames {
actualSST, err := eng.ReadFile(sstNames[i])
if err != nil {
return err
}
if !bytes.Equal(actualSST, expectedSSTs[i]) {
return errors.Errorf("contents of %s were unexpected", sstNames[i])
}
}
return nil
}
mtc := &multiTestContext{
storeConfig: &storeCfg,
// This test was written before the multiTestContext started creating many
Expand All @@ -3157,7 +3060,6 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
mtc.Start(t, 3)
defer mtc.Stop()
store0, store2 := mtc.Store(0), mtc.Store(2)
eng = store2.Engine()
distSender := mtc.distSenders[0]

// Create three fully-caught-up, adjacent ranges on all three stores.
Expand All @@ -3172,18 +3074,6 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
mtc.waitForValues(key, []int64{1, 1, 1})
}

// Put some keys in [d, /Max) so the subsumed replica of [c, /Max) with range
// ID 4 has tombstones. We will clear uncontained key range of subsumed
// replicas, so when we are receiving a snapshot for [a, d), we expect to
// clear the keys in [d, /Max).
for i := 0; i < 10; i++ {
key := roachpb.Key("d" + strconv.Itoa(i))
if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(key, []int64{1, 1, 1})
}

aRepl0 := store0.LookupReplica(roachpb.RKey("a"))

// Start dropping all Raft traffic to the first range on store1.
Expand Down
2 changes: 0 additions & 2 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1073,7 +1072,6 @@ func TestFailedSnapshotFillsReservation(t *testing.T) {
RangeSize: 100,
State: storagepb.ReplicaState{Desc: rep.Desc()},
}
header.RaftMessageRequest.Message.Snapshot.Data = uuid.UUID{}.GetBytes()
// Cause this stream to return an error as soon as we ask it for something.
// This injects an error into HandleSnapshotStream when we try to send the
// "snapshot accepted" message.
Expand Down
50 changes: 0 additions & 50 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,53 +504,3 @@ func WriteSyncNoop(ctx context.Context, eng Engine) error {
}
return nil
}

// ClearRangeWithHeuristic clears the keys from start (inclusive) to end
// (exclusive). Depending on the number of keys, it will either use ClearRange
// or ClearRangeIter.
func ClearRangeWithHeuristic(eng Reader, writer Writer, start, end MVCCKey) error {
iter := eng.NewIterator(IterOptions{UpperBound: end.Key})
defer iter.Close()

// It is expensive for there to be many range deletion tombstones in the same
// sstable because all of the tombstones in an sstable are loaded whenever the
// sstable is accessed. So we avoid using range deletion unless there is some
// minimum number of keys. The value here was pulled out of thin air. It might
// be better to make this dependent on the size of the data being deleted. Or
// perhaps we should fix RocksDB to handle large numbers of tombstones in an
// sstable better.
const clearRangeMinKeys = 64
// Peek into the range to see whether it's large enough to justify
// ClearRange. Note that the work done here is bounded by
// clearRangeMinKeys, so it will be fairly cheap even for large
// ranges.
//
// TODO(bdarnell): Move this into ClearIterRange so we don't have
// to do this scan twice.
count := 0
iter.Seek(start)
for {
valid, err := iter.Valid()
if err != nil {
return err
}
if !valid || !iter.Key().Less(end) {
break
}
count++
if count > clearRangeMinKeys {
break
}
iter.Next()
}
var err error
if count > clearRangeMinKeys {
err = writer.ClearRange(start, end)
} else {
err = writer.ClearIterRange(iter, start, end)
}
if err != nil {
return err
}
return nil
}
20 changes: 0 additions & 20 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,26 +621,6 @@ func MVCCPutProto(
return MVCCPut(ctx, engine, ms, key, timestamp, value, txn)
}

// MVCCBlindPutProto sets the given key to the protobuf-serialized byte string
// of msg and the provided timestamp. See MVCCBlindPut for a discussion on this
// fast-path and when it is appropriate to use.
func MVCCBlindPutProto(
ctx context.Context,
engine Writer,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
msg protoutil.Message,
txn *roachpb.Transaction,
) error {
value := roachpb.Value{}
if err := value.SetProto(msg); err != nil {
return err
}
value.InitChecksum(key)
return MVCCBlindPut(ctx, engine, ms, key, timestamp, value, txn)
}

type getBuffer struct {
meta enginepb.MVCCMetadata
value roachpb.Value
Expand Down
76 changes: 24 additions & 52 deletions pkg/storage/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,70 +43,42 @@ type ReplicaDataIterator struct {

// MakeAllKeyRanges returns all key ranges for the given Range.
func MakeAllKeyRanges(d *roachpb.RangeDescriptor) []KeyRange {
return []KeyRange{
MakeRangeIDLocalKeyRange(d.RangeID, false /* replicatedOnly */),
MakeRangeLocalKeyRange(d),
MakeUserKeyRange(d),
}
return makeReplicaKeyRanges(d, keys.MakeRangeIDPrefix)
}

// MakeReplicatedKeyRanges returns all key ranges that are fully Raft
// replicated for the given Range.
//
// NOTE: The logic for receiving snapshot relies on this function returning the
// ranges in the following sorted order:
//
// 1. Replicated range-id local key range
// 2. Range-local key range
// 3. User key range
// MakeReplicatedKeyRanges returns all key ranges that are fully Raft replicated
// for the given Range.
func MakeReplicatedKeyRanges(d *roachpb.RangeDescriptor) []KeyRange {
return []KeyRange{
MakeRangeIDLocalKeyRange(d.RangeID, true /* replicatedOnly */),
MakeRangeLocalKeyRange(d),
MakeUserKeyRange(d),
}
return makeReplicaKeyRanges(d, keys.MakeRangeIDReplicatedPrefix)
}

// MakeRangeIDLocalKeyRange returns the range-id local key range. If
// replicatedOnly is true, then it returns only the replicated keys, otherwise,
// it only returns both the replicated and unreplicated keys.
func MakeRangeIDLocalKeyRange(rangeID roachpb.RangeID, replicatedOnly bool) KeyRange {
var prefixFn func(roachpb.RangeID) roachpb.Key
if replicatedOnly {
prefixFn = keys.MakeRangeIDReplicatedPrefix
} else {
prefixFn = keys.MakeRangeIDPrefix
}
sysRangeIDKey := prefixFn(rangeID)
return KeyRange{
Start: engine.MakeMVCCMetadataKey(sysRangeIDKey),
End: engine.MakeMVCCMetadataKey(sysRangeIDKey.PrefixEnd()),
}
}

// MakeRangeLocalKeyRange returns the range local key range. Range-local keys
// are replicated keys that do not belong to the range they would naturally
// sort into. For example, /Local/Range/Table/1 would sort into [/Min,
// /System), but it actually belongs to [/Table/1, /Table/2).
func MakeRangeLocalKeyRange(d *roachpb.RangeDescriptor) KeyRange {
return KeyRange{
Start: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.StartKey)),
End: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.EndKey)),
}
}

// MakeUserKeyRange returns the user key range.
func MakeUserKeyRange(d *roachpb.RangeDescriptor) KeyRange {
// makeReplicaKeyRanges returns a slice of 3 key ranges. The last key range in
// the returned slice corresponds to the actual range data (i.e. not the range
// metadata).
func makeReplicaKeyRanges(
d *roachpb.RangeDescriptor, metaFunc func(roachpb.RangeID) roachpb.Key,
) []KeyRange {
// The first range in the keyspace starts at KeyMin, which includes the
// node-local space. We need the original StartKey to find the range
// metadata, but the actual data starts at LocalMax.
dataStartKey := d.StartKey.AsRawKey()
if d.StartKey.Equal(roachpb.RKeyMin) {
dataStartKey = keys.LocalMax
}
return KeyRange{
Start: engine.MakeMVCCMetadataKey(dataStartKey),
End: engine.MakeMVCCMetadataKey(d.EndKey.AsRawKey()),
sysRangeIDKey := metaFunc(d.RangeID)
return []KeyRange{
{
Start: engine.MakeMVCCMetadataKey(sysRangeIDKey),
End: engine.MakeMVCCMetadataKey(sysRangeIDKey.PrefixEnd()),
},
{
Start: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.StartKey)),
End: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.EndKey)),
},
{
Start: engine.MakeMVCCMetadataKey(dataStartKey),
End: engine.MakeMVCCMetadataKey(d.EndKey.AsRawKey()),
},
}
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/storage/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,10 +550,9 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat
if err != nil {
return wrapWithNonDeterministicFailure(err, "unable to get replica for merge")
}
const rangeIDLocalOnly = true
const mustClearRange = false
const destroyData = false
if err := rhsRepl.preDestroyRaftMuLocked(
ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, rangeIDLocalOnly, mustClearRange,
ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, destroyData,
); err != nil {
return wrapWithNonDeterministicFailure(err, "unable to destroy range before merge")
}
Expand Down
Loading

0 comments on commit 5e6c20d

Please sign in to comment.