Skip to content

Commit

Permalink
Merge "[FAB-5849] calibrate state transfer pace"
Browse files Browse the repository at this point in the history
  • Loading branch information
mastersingh24 authored and Gerrit Code Review committed Sep 24, 2017
2 parents 281c2c8 + 7de3912 commit 900850f
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 18 deletions.
2 changes: 1 addition & 1 deletion gossip/state/mocks/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (g *GossipMock) Gossip(msg *proto.GossipMessage) {
func (g *GossipMock) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage) {
args := g.Called(acceptor, passThrough)
if args.Get(0) == nil {
return nil, args.Get(1).(<-chan proto.ReceivedMessage)
return nil, args.Get(1).(chan proto.ReceivedMessage)
}
return args.Get(0).(<-chan *proto.GossipMessage), nil
}
Expand Down
5 changes: 3 additions & 2 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,10 @@ func (s *GossipStateProviderImpl) handleStateResponse(msg proto.ReceivedMessage)
if max < payload.SeqNum {
max = payload.SeqNum
}
err := s.payloads.Push(payload)

err := s.addPayload(payload, blocking)
if err != nil {
logger.Warningf("Payload with sequence number %d was received earlier", payload.SeqNum)
logger.Warningf("Payload with sequence number %d wasn't added to payload buffer: %v", payload.SeqNum, err)
}
}
return max, nil
Expand Down
97 changes: 82 additions & 15 deletions gossip/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func TestNilDirectMsg(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()
p.s.handleStateRequest(nil)
Expand All @@ -305,7 +305,7 @@ func TestNilAddPayload(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()
err := p.s.AddPayload(nil)
Expand All @@ -318,7 +318,7 @@ func TestAddPayloadLedgerUnavailable(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()
// Simulate a problem in the ledger
Expand All @@ -339,6 +339,77 @@ func TestAddPayloadLedgerUnavailable(t *testing.T) {
assert.Contains(t, err.Error(), "cannot query ledger")
}

func TestLargeBlockGap(t *testing.T) {
// Scenario: the peer knows of a peer who has a ledger height much higher
// than itself (500 blocks higher).
// The peer needs to ask blocks in a way such that the size of the payload buffer
// never rises above a certain threshold.

mc := &mockCommitter{}
blocksPassedToLedger := make(chan uint64, 200)
mc.On("CommitWithPvtData", mock.Anything).Run(func(arg mock.Arguments) {
blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number
})
msgsFromPeer := make(chan proto.ReceivedMessage)
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
membership := []discovery.NetworkMember{
{
PKIid: common.PKIidType("a"),
Endpoint: "a",
Properties: &proto.Properties{
LedgerHeight: 500,
},
}}
g.On("PeersOfChannel", mock.Anything).Return(membership)
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, msgsFromPeer)
g.On("Send", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
msg := arguments.Get(0).(*proto.GossipMessage)
// The peer requested a state request
req := msg.GetStateRequest()
// Construct a skeleton for the response
res := &proto.GossipMessage{
Nonce: msg.Nonce,
Channel: []byte(util.GetTestChainID()),
Content: &proto.GossipMessage_StateResponse{
StateResponse: &proto.RemoteStateResponse{},
},
}
// Populate the response with payloads according to what the peer asked
for seq := req.StartSeqNum; seq <= req.EndSeqNum; seq++ {
rawblock := pcomm.NewBlock(seq, []byte{})
b, _ := pb.Marshal(rawblock)
payload := &proto.Payload{
SeqNum: seq,
Data: b,
}
res.GetStateResponse().Payloads = append(res.GetStateResponse().Payloads, payload)
}
// Finally, send the response down the channel the peer expects to receive it from
sMsg, _ := res.NoopSign()
msgsFromPeer <- &comm.ReceivedMessageImpl{
SignedGossipMessage: sMsg,
}
})
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()

// Process blocks at a speed of 20 Millisecond for each block.
// The imaginative peer that responds to state
// If the payload buffer expands above defMaxBlockDistance*2 + defAntiEntropyBatchSize blocks, fail the test
blockProcessingTime := 20 * time.Millisecond // 10 seconds for total 500 blocks
expectedSequence := 1
for expectedSequence < 500 {
blockSeq := <-blocksPassedToLedger
assert.Equal(t, expectedSequence, int(blockSeq))
// Ensure payload buffer isn't over-populated
assert.True(t, p.s.payloads.Size() <= defMaxBlockDistance*2+defAntiEntropyBatchSize, "payload buffer size is %d", p.s.payloads.Size())
expectedSequence++
time.Sleep(blockProcessingTime)
}
}

func TestOverPopulation(t *testing.T) {
// Scenario: Add to the state provider blocks
// with a gap in between, and ensure that the payload buffer
Expand All @@ -353,7 +424,7 @@ func TestOverPopulation(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
defer p.shutdown()

Expand Down Expand Up @@ -415,7 +486,7 @@ func TestBlockingEnqueue(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
defer p.shutdown()

Expand Down Expand Up @@ -476,7 +547,7 @@ func TestFailures(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(0), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
assert.Panics(t, func() {
newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
Expand Down Expand Up @@ -535,7 +606,7 @@ func TestGossipReception(t *testing.T) {
g.On("Accept", mock.Anything, false).Return(rmc, nil).Run(func(_ mock.Arguments) {
signalChan <- struct{}{}
})
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
mc := &mockCommitter{}
receivedChan := make(chan struct{})
Expand Down Expand Up @@ -576,7 +647,7 @@ func TestMetadataCompatibility(t *testing.T) {
finChan <- struct{}{}
})
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
metaState := common.NewNodeMetastate(5)
b, _ := metaState.Bytes()
defaultPeer := discovery.NetworkMember{
Expand Down Expand Up @@ -1149,12 +1220,8 @@ func TestTransferOfPrivateRWSet(t *testing.T) {
return ch
}

commChannelFactory := func(ch chan proto.ReceivedMessage) <-chan proto.ReceivedMessage {
return ch
}

g.On("Accept", mock.Anything, false).Return(gossipChannelFactory(gossipChannel), nil)
g.On("Accept", mock.Anything, true).Return(nil, commChannelFactory(commChannel))
g.On("Accept", mock.Anything, true).Return(nil, commChannel)

g.On("UpdateChannelMetadata", mock.Anything, mock.Anything)
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
Expand Down Expand Up @@ -1331,7 +1398,7 @@ func (t testPeer) Gossip() <-chan *proto.GossipMessage {
return t.gossipChannel
}

func (t testPeer) Comm() <-chan proto.ReceivedMessage {
func (t testPeer) Comm() chan proto.ReceivedMessage {
return t.commChannel
}

Expand Down Expand Up @@ -1372,7 +1439,7 @@ func TestTransferOfPvtDataBetweenPeers(t *testing.T) {
Return(nil, peer.Comm()).
Once().
On("Accept", mock.Anything, true).
Return(nil, make(<-chan proto.ReceivedMessage))
Return(nil, make(chan proto.ReceivedMessage))

peer.On("UpdateChannelMetadata", mock.Anything, mock.Anything)
peer.coord.On("Close")
Expand Down

0 comments on commit 900850f

Please sign in to comment.