Skip to content

Commit

Permalink
Merge pull request #7598 from tschottdorf/hard-state
Browse files Browse the repository at this point in the history
storage: always write a HardState
  • Loading branch information
tbg authored Jul 12, 2016
2 parents ec0ec2d + 7f2218c commit fb6cc2e
Show file tree
Hide file tree
Showing 11 changed files with 488 additions and 129 deletions.
67 changes: 67 additions & 0 deletions storage/client_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Tamir Duberstein ([email protected])

package storage_test

import (
"math/rand"
"testing"

"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/internal/client"
"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util/randutil"
"github.com/cockroachdb/cockroach/util/tracing"
)

func BenchmarkReplicaSnapshot(b *testing.B) {
defer tracing.Disable()()
defer config.TestingDisableTableSplits()()
store, stopper, _ := createTestStore(b)
// We want to manually control the size of the raft log.
store.DisableRaftLogQueue(true)
defer stopper.Stop()

const rangeID = 1
const keySize = 1 << 7 // 128 B
const valSize = 1 << 10 // 1 KiB
const snapSize = 1 << 25 // 32 MiB

rep, err := store.GetReplica(rangeID)
if err != nil {
b.Fatal(err)
}

src := rand.New(rand.NewSource(0))
for i := 0; i < snapSize/(keySize+valSize); i++ {
key := keys.MakeRowSentinelKey(randutil.RandBytes(src, keySize))
val := randutil.RandBytes(src, valSize)
pArgs := putArgs(key, val)
if _, pErr := client.SendWrappedWith(rep, nil, roachpb.Header{
RangeID: rangeID,
}, &pArgs); pErr != nil {
b.Fatal(pErr)
}
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := rep.GetSnapshot(); err != nil {
b.Fatal(err)
}
}
}
69 changes: 69 additions & 0 deletions storage/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Tobias Schottdorf ([email protected])

package storage

import (
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util/log"
"github.com/pkg/errors"
)

// MIGRATION(tschottdorf): As of #7310, we make sure that a Replica always has
// a complete Raft state on disk. Prior versions may not have that, which
// causes issues due to the fact that we used to synthesize a TruncatedState
// and do so no more. To make up for that, write a missing TruncatedState here.
// That key is in the replicated state, but since during a cluster upgrade, all
// nodes do it, it's fine (and we never CPut on that key, so anything in the
// Raft pipeline will simply overwrite it).
//
// Migration(tschottdorf): See #6991. It's possible that the HardState is
// missing after a snapshot was applied (so there is a TruncatedState). In this
// case, synthesize a HardState (simply setting everything that was in the
// snapshot to committed). Having lost the original HardState can theoretically
// mean that the replica was further ahead or had voted, and so there's no
// guarantee that this will be correct. But it will be correct in the majority
// of cases, and some state *has* to be recovered.
func migrate7310And6991(batch engine.ReadWriter, desc roachpb.RangeDescriptor) error {
state, err := loadState(batch, &desc)
if err != nil {
return errors.Wrap(err, "could not migrate TruncatedState: %s")
}
if (*state.TruncatedState == roachpb.RaftTruncatedState{}) {
state.TruncatedState.Term = raftInitialLogTerm
state.TruncatedState.Index = raftInitialLogIndex
state.RaftAppliedIndex = raftInitialLogIndex
if _, err := saveState(batch, state); err != nil {
return errors.Wrapf(err, "could not migrate TruncatedState to %+v", &state.TruncatedState)
}
log.Warningf("migration: synthesized TruncatedState for %+v", desc)
}

hs, err := loadHardState(batch, desc.RangeID)
if err != nil {
return errors.Wrap(err, "unable to load HardState")
}
// Only update the HardState when there is a nontrivial Commit field. We
// don't have a snapshot here, so we could wind up lowering the commit
// index (which would error out and fatal us).
if hs.Commit == 0 {
if err := synthesizeHardState(batch, state, hs); err != nil {
return errors.Wrap(err, "could not migrate HardState")
}
}
return nil
}
72 changes: 72 additions & 0 deletions storage/migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Tobias Schottdorf ([email protected])

package storage

import (
"reflect"
"testing"

"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/coreos/etcd/raft/raftpb"
)

func TestMigrate7310And6991(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop()
eng := engine.NewInMem(roachpb.Attributes{}, 1<<10, stopper)

desc := *testRangeDescriptor()

if err := migrate7310And6991(eng, desc); err != nil {
t.Fatal(err)
}

ts, err := loadTruncatedState(eng, desc.RangeID)
if err != nil {
t.Fatal(err)
}

hs, err := loadHardState(eng, desc.RangeID)
if err != nil {
t.Fatal(err)
}

rApplied, lApplied, err := loadAppliedIndex(eng, desc.RangeID)
if err != nil {
t.Fatal(err)
}

expTS := roachpb.RaftTruncatedState{Term: raftInitialLogTerm, Index: raftInitialLogIndex}
if expTS != ts {
t.Errorf("expected %+v, got %+v", &expTS, &ts)
}

expHS := raftpb.HardState{Term: raftInitialLogTerm, Commit: raftInitialLogIndex}
if !reflect.DeepEqual(expHS, hs) {
t.Errorf("expected %+v, got %+v", &expHS, &hs)
}

expRApplied, expLApplied := uint64(raftInitialLogIndex), uint64(0)
if expRApplied != rApplied || expLApplied != lApplied {
t.Errorf("expected (raftApplied,leaseApplied)=(%d,%d), got (%d,%d)",
expRApplied, expLApplied, rApplied, lApplied)
}
}
6 changes: 3 additions & 3 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ func (r *Replica) Desc() *roachpb.RangeDescriptor {
}

// setDesc atomically sets the range's descriptor. This method calls
// processRangeDescriptorUpdate() to make the range manager handle the
// descriptor update.
// processRangeDescriptorUpdate() to make the Store handle the descriptor
// update.
func (r *Replica) setDesc(desc *roachpb.RangeDescriptor) error {
r.setDescWithoutProcessUpdate(desc)
if r.store == nil {
Expand Down Expand Up @@ -1459,7 +1459,7 @@ func (r *Replica) handleRaftReady() error {

if !raft.IsEmptySnap(rd.Snapshot) {
var err error
lastIndex, err = r.applySnapshot(rd.Snapshot, normalSnapshot)
lastIndex, err = r.applySnapshot(rd.Snapshot, rd.HardState)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (r *Replica) EndTransaction(
if err := r.runCommitTrigger(ctx, batch.(engine.Batch), ms, args, reply.Txn); err != nil {
// TODO(tschottdorf): should an error here always amount to a
// ReplicaCorruptionError?
log.Errorf("Range %d transaction commit trigger fail: %s", r.RangeID, err)
log.Error(errors.Wrapf(err, "range %d commit trigger", r.RangeID))
return reply, nil, err
}
}
Expand Down
Loading

0 comments on commit fb6cc2e

Please sign in to comment.