Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: always write a HardState #7598

Merged
merged 8 commits into from
Jul 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions storage/client_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Tamir Duberstein ([email protected])

package storage_test

import (
"math/rand"
"testing"

"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/internal/client"
"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util/randutil"
"github.com/cockroachdb/cockroach/util/tracing"
)

func BenchmarkReplicaSnapshot(b *testing.B) {
defer tracing.Disable()()
defer config.TestingDisableTableSplits()()
store, stopper, _ := createTestStore(b)
// We want to manually control the size of the raft log.
store.DisableRaftLogQueue(true)
defer stopper.Stop()

const rangeID = 1
const keySize = 1 << 7 // 128 B
const valSize = 1 << 10 // 1 KiB
const snapSize = 1 << 25 // 32 MiB

rep, err := store.GetReplica(rangeID)
if err != nil {
b.Fatal(err)
}

src := rand.New(rand.NewSource(0))
for i := 0; i < snapSize/(keySize+valSize); i++ {
key := keys.MakeRowSentinelKey(randutil.RandBytes(src, keySize))
val := randutil.RandBytes(src, valSize)
pArgs := putArgs(key, val)
if _, pErr := client.SendWrappedWith(rep, nil, roachpb.Header{
RangeID: rangeID,
}, &pArgs); pErr != nil {
b.Fatal(pErr)
}
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := rep.GetSnapshot(); err != nil {
b.Fatal(err)
}
}
}
69 changes: 69 additions & 0 deletions storage/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Tobias Schottdorf ([email protected])

package storage

import (
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util/log"
"github.com/pkg/errors"
)

// MIGRATION(tschottdorf): As of #7310, we make sure that a Replica always has
// a complete Raft state on disk. Prior versions may not have that, which
// causes issues due to the fact that we used to synthesize a TruncatedState
// and do so no more. To make up for that, write a missing TruncatedState here.
// That key is in the replicated state, but since during a cluster upgrade, all
// nodes do it, it's fine (and we never CPut on that key, so anything in the
// Raft pipeline will simply overwrite it).
//
// Migration(tschottdorf): See #6991. It's possible that the HardState is
// missing after a snapshot was applied (so there is a TruncatedState). In this
// case, synthesize a HardState (simply setting everything that was in the
// snapshot to committed). Having lost the original HardState can theoretically
// mean that the replica was further ahead or had voted, and so there's no
// guarantee that this will be correct. But it will be correct in the majority
// of cases, and some state *has* to be recovered.
func migrate7310And6991(batch engine.ReadWriter, desc roachpb.RangeDescriptor) error {
state, err := loadState(batch, &desc)
if err != nil {
return errors.Wrap(err, "could not migrate TruncatedState: %s")
}
if (*state.TruncatedState == roachpb.RaftTruncatedState{}) {
state.TruncatedState.Term = raftInitialLogTerm
state.TruncatedState.Index = raftInitialLogIndex
state.RaftAppliedIndex = raftInitialLogIndex
if _, err := saveState(batch, state); err != nil {
return errors.Wrapf(err, "could not migrate TruncatedState to %+v", &state.TruncatedState)
}
log.Warningf("migration: synthesized TruncatedState for %+v", desc)
}

hs, err := loadHardState(batch, desc.RangeID)
if err != nil {
return errors.Wrap(err, "unable to load HardState")
}
// Only update the HardState when there is a nontrivial Commit field. We
// don't have a snapshot here, so we could wind up lowering the commit
// index (which would error out and fatal us).
if hs.Commit == 0 {
if err := synthesizeHardState(batch, state, hs); err != nil {
return errors.Wrap(err, "could not migrate HardState")
}
}
return nil
}
72 changes: 72 additions & 0 deletions storage/migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Tobias Schottdorf ([email protected])

package storage

import (
"reflect"
"testing"

"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/coreos/etcd/raft/raftpb"
)

func TestMigrate7310And6991(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop()
eng := engine.NewInMem(roachpb.Attributes{}, 1<<10, stopper)

desc := *testRangeDescriptor()

if err := migrate7310And6991(eng, desc); err != nil {
t.Fatal(err)
}

ts, err := loadTruncatedState(eng, desc.RangeID)
if err != nil {
t.Fatal(err)
}

hs, err := loadHardState(eng, desc.RangeID)
if err != nil {
t.Fatal(err)
}

rApplied, lApplied, err := loadAppliedIndex(eng, desc.RangeID)
if err != nil {
t.Fatal(err)
}

expTS := roachpb.RaftTruncatedState{Term: raftInitialLogTerm, Index: raftInitialLogIndex}
if expTS != ts {
t.Errorf("expected %+v, got %+v", &expTS, &ts)
}

expHS := raftpb.HardState{Term: raftInitialLogTerm, Commit: raftInitialLogIndex}
if !reflect.DeepEqual(expHS, hs) {
t.Errorf("expected %+v, got %+v", &expHS, &hs)
}

expRApplied, expLApplied := uint64(raftInitialLogIndex), uint64(0)
if expRApplied != rApplied || expLApplied != lApplied {
t.Errorf("expected (raftApplied,leaseApplied)=(%d,%d), got (%d,%d)",
expRApplied, expLApplied, rApplied, lApplied)
}
}
6 changes: 3 additions & 3 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ func (r *Replica) Desc() *roachpb.RangeDescriptor {
}

// setDesc atomically sets the range's descriptor. This method calls
// processRangeDescriptorUpdate() to make the range manager handle the
// descriptor update.
// processRangeDescriptorUpdate() to make the Store handle the descriptor
// update.
func (r *Replica) setDesc(desc *roachpb.RangeDescriptor) error {
r.setDescWithoutProcessUpdate(desc)
if r.store == nil {
Expand Down Expand Up @@ -1459,7 +1459,7 @@ func (r *Replica) handleRaftReady() error {

if !raft.IsEmptySnap(rd.Snapshot) {
var err error
lastIndex, err = r.applySnapshot(rd.Snapshot, normalSnapshot)
lastIndex, err = r.applySnapshot(rd.Snapshot, rd.HardState)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (r *Replica) EndTransaction(
if err := r.runCommitTrigger(ctx, batch.(engine.Batch), ms, args, reply.Txn); err != nil {
// TODO(tschottdorf): should an error here always amount to a
// ReplicaCorruptionError?
log.Errorf("Range %d transaction commit trigger fail: %s", r.RangeID, err)
log.Error(errors.Wrapf(err, "range %d commit trigger", r.RangeID))
return reply, nil, err
}
}
Expand Down Expand Up @@ -2476,7 +2476,7 @@ func (r *Replica) splitTrigger(
// from it below.
deltaMS, err = writeInitialState(batch, deltaMS, split.NewDesc)
if err != nil {
return errors.Errorf("unable to write initial state: %s", err)
return errors.Wrap(err, "unable to write initial state")
}

rightMS := deltaMS
Expand Down
Loading