Skip to content

Commit

Permalink
Fix setting of watermark on restore from crash
Browse files Browse the repository at this point in the history
Change-Id: I9c66cf9ae2c5fa3e0c09cc372453ed15833417aa
Signed-off-by: jyellick <[email protected]>
  • Loading branch information
jyellick committed Aug 30, 2016
1 parent fdaaff1 commit 1d8114f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 8 deletions.
67 changes: 67 additions & 0 deletions consensus/pbft/pbft-core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,73 @@ func TestReplicaCrash3(t *testing.T) {
}
}

// TestReplicaCrash4 simulates the restart with no checkpoints
// in the store because they have been garbage collected
// the bug occurs because the low watermark is incorrectly set to
// be zero
func TestReplicaCrash4(t *testing.T) {
validatorCount := 4
config := loadConfig()
config.Set("general.K", 2)
config.Set("general.logmultiplier", 2)
net := makePBFTNetwork(validatorCount, config)
defer net.stop()

twoOffline := false
threeOffline := true
net.filterFn = func(src int, dst int, msg []byte) []byte {
if twoOffline && dst == 2 { // 2 is 'offline'
return nil
}
if threeOffline && dst == 3 { // 3 is 'offline'
return nil
}
return msg
}

for i := int64(1); i <= 8; i++ {
net.pbftEndpoints[0].manager.Queue() <- createPbftReqBatch(i, uint64(generateBroadcaster(validatorCount)))
}
net.process() // vp0,1,2 should have a stable checkpoint for seqNo 8
net.process() // this second time is necessary for garbage collection it seams

// Now vp0,1,2 should be in sync with 8 executions in view 0, and vp4 should be offline
for i, pep := range net.pbftEndpoints {

if i == 3 {
// 3 is offline for this test
continue
}

if pep.pbft.view != 0 {
t.Errorf("Expected replica %d to be in view 1, got %d", pep.id, pep.pbft.view)
}

expectedExecutions := uint64(8)
if pep.sc.executions != expectedExecutions {
t.Errorf("Expected %d executions on replica %d, got %d", expectedExecutions, pep.id, pep.sc.executions)
}
}

// Create new pbft instances to restore from persistence
for id := 0; id < 3; id++ {
pe := net.pbftEndpoints[id]
config := loadConfig()
config.Set("general.K", "2")
pe.pbft.close()
pe.pbft = newPbftCore(uint64(id), config, pe.sc, events.NewTimerFactoryImpl(pe.manager))
pe.manager.SetReceiver(pe.pbft)
pe.pbft.N = 4
pe.pbft.f = (4 - 1) / 3
pe.pbft.requestTimeout = 200 * time.Millisecond

expected := uint64(8)
if pe.pbft.h != expected {
t.Errorf("Low watermark should have been %d, got %d", expected, pe.pbft.h)
}
}

}
func TestReplicaPersistQSet(t *testing.T) {
persist := make(map[string][]byte)

Expand Down
16 changes: 8 additions & 8 deletions consensus/pbft/pbft-persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,11 @@ func (instance *pbftCore) restoreState() {
logger.Warningf("Replica %d could not restore reqBatchStore: %s", instance.id, err)
}

instance.restoreLastSeqNo()

chkpts, err := instance.consumer.ReadStateSet("chkpt.")
if err == nil {
highSeq := uint64(0)
lowWatermark := instance.lastExec // This is safe because we will round down in moveWatermarks
for key, id := range chkpts {
var seqNo uint64
if _, err = fmt.Sscanf(key, "chkpt.%d", &seqNo); err != nil {
Expand All @@ -159,20 +161,18 @@ func (instance *pbftCore) restoreState() {
idAsString := base64.StdEncoding.EncodeToString(id)
logger.Debugf("Replica %d found checkpoint %s for seqNo %d", instance.id, idAsString, seqNo)
instance.chkpts[seqNo] = idAsString
if seqNo > highSeq {
highSeq = seqNo
if seqNo < lowWatermark {
lowWatermark = seqNo
}
}
}
instance.moveWatermarks(highSeq)
instance.moveWatermarks(lowWatermark)
} else {
logger.Warningf("Replica %d could not restore checkpoints: %s", instance.id, err)
}

instance.restoreLastSeqNo()

logger.Infof("Replica %d restored state: view: %d, seqNo: %d, pset: %d, qset: %d, reqBatches: %d, chkpts: %d",
instance.id, instance.view, instance.seqNo, len(instance.pset), len(instance.qset), len(instance.reqBatchStore), len(instance.chkpts))
logger.Infof("Replica %d restored state: view: %d, seqNo: %d, pset: %d, qset: %d, reqBatches: %d, chkpts: %d h: %d",
instance.id, instance.view, instance.seqNo, len(instance.pset), len(instance.qset), len(instance.reqBatchStore), len(instance.chkpts), instance.h)
}

func (instance *pbftCore) restoreLastSeqNo() {
Expand Down

0 comments on commit 1d8114f

Please sign in to comment.