Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: fork-lift proposer-evaluated KV #10327

Merged
merged 1 commit into from
Nov 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pkg/kv/txn_correctness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,12 +750,18 @@ func (hv *historyVerifier) runHistory(
var wg sync.WaitGroup
wg.Add(len(txnMap))
retryErrs := make(chan *retryError, len(txnMap))
errs := make(chan error, 1) // only populated while buffer available

for i, txnCmds := range txnMap {
go func(i int, txnCmds []*cmd) {
if err := hv.runTxn(i, priorities[i], isolations[i], txnCmds, db, t); err != nil {
if re, ok := err.(*retryError); !ok {
t.Errorf("(%s): unexpected failure: %s", cmds, err)
reportErr := errors.Wrapf(err, "(%s): unexpected failure", cmds)
select {
case errs <- reportErr:
default:
t.Error(reportErr)
}
} else {
retryErrs <- re
}
Expand All @@ -765,7 +771,13 @@ func (hv *historyVerifier) runHistory(
}
wg.Wait()

// If we received a retry error, propagate the first one now.
// For serious errors, report the first one.
select {
case err := <-errs:
return err
default:
}
// In the absence of serious errors, report the first retry error, if any.
select {
case re := <-retryErrs:
return re
Expand Down
23 changes: 16 additions & 7 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,11 @@ func TestRestoreReplicas(t *testing.T) {
}
}

// TODO(bdarnell): more aggressive testing here; especially with
// proposer-evaluated KV, what this test does is much less as it doesn't
// exercise the path in which the replica change fails at *apply* time (we only
// test the failfast path), in which case the replica change isn't even
// proposed.
func TestFailedReplicaChange(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -939,18 +944,17 @@ func TestStoreRangeCorruptionChangeReplicas(t *testing.T) {
syncutil.Mutex
store *storage.Store
}
sc.TestingKnobs.TestingCommandFilter = func(filterArgs storagebase.FilterArgs) *roachpb.Error {
sc.TestingKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
corrupt.Lock()
defer corrupt.Unlock()

if corrupt.store == nil || filterArgs.Sid != corrupt.store.StoreID() {
if corrupt.store == nil || filterArgs.StoreID != corrupt.store.StoreID() {
return nil
}

if filterArgs.Req.Header().Key.Equal(roachpb.Key("boom")) {
return roachpb.NewError(storage.NewReplicaCorruptionError(errors.New("test")))
}
return nil
return roachpb.NewError(
storage.NewReplicaCorruptionError(errors.New("boom")),
)
}

// Don't timeout raft leader.
Expand Down Expand Up @@ -998,7 +1002,7 @@ func TestStoreRangeCorruptionChangeReplicas(t *testing.T) {
return err
})

args := putArgs(roachpb.Key("boom"), []byte("value"))
args := putArgs(roachpb.Key("any write"), []byte("should mark as corrupted"))
if _, err := client.SendWrapped(context.Background(), rg1(store0), &args); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1173,6 +1177,11 @@ func TestStoreRangeDownReplicate(t *testing.T) {

// TestChangeReplicasDescriptorInvariant tests that a replica change aborts if
// another change has been made to the RangeDescriptor since it was initiated.
//
// TODO(tschottdorf): If this test is flaky because the snapshot count does not
// increase, it's likely because with proposer-evaluated KV, less gets proposed
// and so sometimes Raft discards the preemptive snapshot (though we count that
// case in stats already) or doesn't produce a Ready.
func TestChangeReplicasDescriptorInvariant(t *testing.T) {
defer leaktest.AfterTest(t)()
mtc := startMultiTestContext(t, 3)
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,9 @@ func TestSplitSnapshotRace_SnapshotWins(t *testing.T) {
// non-atomically with respect to the reads (and in particular their update of
// the timestamp cache), then some of them may not be reflected in the
// timestamp cache of the new range, in which case this test would fail.
//
// TODO(tschottdorf): hacks around #10084, see usage of
// ProposerEvaluatedKVEnabled() within.
func TestStoreSplitTimestampCacheReadRace(t *testing.T) {
defer leaktest.AfterTest(t)()
splitKey := roachpb.Key("a")
Expand All @@ -1091,6 +1094,11 @@ func TestStoreSplitTimestampCacheReadRace(t *testing.T) {
}

getContinues := make(chan struct{})
if storage.ProposerEvaluatedKVEnabled() {
// TODO(tschottdorf): because of command queue hack (would deadlock
// otherwise); see #10084.
close(getContinues)
}
var getStarted sync.WaitGroup
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableSplitQueue = true
Expand All @@ -1101,7 +1109,9 @@ func TestStoreSplitTimestampCacheReadRace(t *testing.T) {
if st == nil || !st.LeftDesc.EndKey.Equal(splitKey) {
return nil
}
close(getContinues)
if !storage.ProposerEvaluatedKVEnabled() {
close(getContinues)
}
} else if filterArgs.Req.Method() == roachpb.Get &&
bytes.HasPrefix(filterArgs.Req.Header().Key, splitKey.Next()) {
getStarted.Done()
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,7 @@ func GetGCQueueTxnCleanupThreshold() time.Duration {
func (nl *NodeLiveness) StopHeartbeat() {
close(nl.stopHeartbeat)
}

func ProposerEvaluatedKVEnabled() bool {
return propEvalKV
}
Loading