Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: verify and save block messages when receive new bock #6150

Merged
merged 1 commit into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions app/submodule/syncer/syncer_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ func (sa *syncerAPI) SyncerTracker(ctx context.Context) *types.TargetTracker {
}
convertTarget := func(src *syncTypes.Target) *types.Target {
return &types.Target{
State: convertSyncStateStage(src.State),
Base: src.Base,
Current: src.Current,
Start: src.Start,
End: src.End,
Err: src.Err,
ChainInfo: src.ChainInfo,
State: convertSyncStateStage(src.State),
Base: src.Base,
Current: src.Current,
Start: src.Start,
End: src.End,
Err: src.Err,
Head: src.Head,
Sender: src.Sender,
}
}
for _, target := range tracker.History() {
Expand Down Expand Up @@ -136,16 +137,11 @@ func (sa *syncerAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) e
return fmt.Errorf("provided messages did not match block: %v", err)
}

ts, err := types.NewTipSet([]*types.BlockHeader{blk.Header})
if err != nil {
return fmt.Errorf("somehow failed to make a tipset out of a single block: %v", err)
}

if _, err := chainModule.ChainReader.PutObject(ctx, blk.Header); err != nil {
return err
}
localPeer := sa.syncer.NetworkModule.Network.GetPeerID()
ci := types.NewChainInfo(localPeer, localPeer, ts)
ci := types.NewChainInfo(localPeer, localPeer, &types.FullTipSet{Blocks: []*types.FullBlock{fb}})
if err := sa.syncer.SyncProvider.HandleNewTipSet(ci); err != nil {
return fmt.Errorf("sync to submitted block failed: %v", err)
}
Expand Down
14 changes: 10 additions & 4 deletions app/submodule/syncer/syncer_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,13 @@ func (syncer *SyncerSubmodule) handleIncomingBlocks(ctx context.Context, msg pub

blkSvc := blockservice.New(syncer.BlockstoreModule.Blockstore, syncer.NetworkModule.Bitswap)

if _, err := syncer.NetworkModule.FetchMessagesByCids(ctx, blkSvc, bm.BlsMessages); err != nil {
blsMsgs, err := syncer.NetworkModule.FetchMessagesByCids(ctx, blkSvc, bm.BlsMessages)
if err != nil {
log.Errorf("fetch block bls messages failed:%s", err.Error())
return
}
if _, err := syncer.NetworkModule.FetchSignedMessagesByCids(ctx, blkSvc, bm.SecpkMessages); err != nil {
secpMsgs, err := syncer.NetworkModule.FetchSignedMessagesByCids(ctx, blkSvc, bm.SecpkMessages)
if err != nil {
log.Errorf("fetch block signed messages failed:%s", err.Error())
return
}
Expand All @@ -224,8 +226,12 @@ func (syncer *SyncerSubmodule) handleIncomingBlocks(ctx context.Context, msg pub

syncer.NetworkModule.Host.ConnManager().TagPeer(sender, "new-block", 20)

ts, _ := types.NewTipSet([]*types.BlockHeader{header})
chainInfo := types.NewChainInfo(source, sender, ts)
fullBlock := &types.FullBlock{
Header: header,
BLSMessages: blsMsgs,
SECPMessages: secpMsgs,
}
chainInfo := types.NewChainInfo(source, sender, &types.FullTipSet{Blocks: []*types.FullBlock{fullBlock}})

if err = syncer.ChainSyncManager.BlockProposer().SendGossipBlock(chainInfo); err != nil {
log.Errorf("failed to notify syncer of new block, block: %s", err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ func NewManager(
}

return Manager{
dispatcher: dispatcher.NewDispatcher(chainSyncer),
dispatcher: dispatcher.NewDispatcher(struct {
*syncer.Syncer
*consensus.BlockValidator
}{Syncer: chainSyncer, BlockValidator: hv}, submodule.ChainReader),
}, nil
}

Expand Down
57 changes: 44 additions & 13 deletions pkg/chainsync/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package dispatcher
import (
"container/list"
"context"
"fmt"
"runtime/debug"
"sync"
atmoic2 "sync/atomic"
"time"

"github.com/filecoin-project/pubsub"
"github.com/filecoin-project/venus/pkg/chain"
"github.com/filecoin-project/venus/pkg/chainsync/types"
types2 "github.com/filecoin-project/venus/venus-shared/types"
"github.com/streadway/handy/atomic"
Expand All @@ -30,15 +32,16 @@ const LocalIncoming = "incoming"
type dispatchSyncer interface {
Head() *types2.TipSet
HandleNewTipSet(context.Context, *types.Target) error
ValidateMsgMeta(ctx context.Context, fblk *types2.FullBlock) error
}

// NewDispatcher creates a new syncing dispatcher with default queue sizes.
func NewDispatcher(catchupSyncer dispatchSyncer) *Dispatcher {
return NewDispatcherWithSizes(catchupSyncer, DefaultWorkQueueSize, DefaultInQueueSize)
func NewDispatcher(catchupSyncer dispatchSyncer, chainStore *chain.Store) *Dispatcher {
return NewDispatcherWithSizes(catchupSyncer, chainStore, DefaultWorkQueueSize, DefaultInQueueSize)
}

// NewDispatcherWithSizes creates a new syncing dispatcher.
func NewDispatcherWithSizes(syncer dispatchSyncer, workQueueSize, inQueueSize int) *Dispatcher {
func NewDispatcherWithSizes(syncer dispatchSyncer, chainStore *chain.Store, workQueueSize, inQueueSize int) *Dispatcher {
return &Dispatcher{
workTracker: types.NewTargetTracker(workQueueSize),
syncer: syncer,
Expand All @@ -48,6 +51,7 @@ func NewDispatcherWithSizes(syncer dispatchSyncer, workQueueSize, inQueueSize in
cancelControler: list.New(),
maxCount: 1,
incomingPubsub: pubsub.New(50),
chainStore: chainStore,
}
}

Expand Down Expand Up @@ -90,35 +94,62 @@ type Dispatcher struct {
maxCount int64

incomingPubsub *pubsub.PubSub
chainStore *chain.Store
}

// SyncTracker returns the target tracker of syncing
// SyncTracker returnss the target tracker of syncing
func (d *Dispatcher) SyncTracker() *types.TargetTracker {
return d.workTracker
}

func (d *Dispatcher) sendHead(ci *types2.ChainInfo) error {
ctx := context.Background()
fts := ci.FullTipSet
if fts == nil {
return fmt.Errorf("got nil tipset")
}

for _, b := range fts.Blocks {
if err := d.syncer.ValidateMsgMeta(ctx, b); err != nil {
log.Warnf("invalid block %s received: %s", b.Cid(), err)
return fmt.Errorf("validate block %s message meta failed: %v", b.Cid(), err)
}
}

for _, b := range fts.Blocks {
_, err := d.chainStore.PutObject(ctx, b.Header)
if err != nil {
return fmt.Errorf("fail to save block to tipset")
}
}

d.incomingPubsub.Pub(fts.TipSet().Blocks(), LocalIncoming)

return d.addTracker(ci)
}

// SendHello handles chain information from bootstrap peers.
func (d *Dispatcher) SendHello(ci *types2.ChainInfo) error {
return d.addTracker(ci)
return d.sendHead(ci)
}

// SendOwnBlock handles chain info from a node's own mining system
func (d *Dispatcher) SendOwnBlock(ci *types2.ChainInfo) error {
return d.addTracker(ci)
return d.sendHead(ci)
}

// SendGossipBlock handles chain info from new blocks sent on pubsub
func (d *Dispatcher) SendGossipBlock(ci *types2.ChainInfo) error {
return d.addTracker(ci)
return d.sendHead(ci)
}

func (d *Dispatcher) addTracker(ci *types2.ChainInfo) error {
d.incomingPubsub.Pub(ci.Head.Blocks(), LocalIncoming)
d.incoming <- &types.Target{
ChainInfo: *ci,
Base: d.syncer.Head(),
Current: d.syncer.Head(),
Start: time.Now(),
Head: ci.FullTipSet.TipSet(),
Base: d.syncer.Head(),
Current: d.syncer.Head(),
Start: time.Now(),
Sender: ci.Sender,
}
return nil
}
Expand Down Expand Up @@ -152,7 +183,7 @@ func (d *Dispatcher) processIncoming(ctx context.Context) {
// Sort new targets by putting on work queue.
if d.workTracker.Add(target) {
log.Infow("received new tipset", "height", target.Head.Height(), "blocks", target.Head.Len(), "from",
target.ChainInfo.Sender, "current work len", d.workTracker.Len(), "incoming channel len", len(d.incoming))
target.Sender, "current work len", d.workTracker.Len(), "incoming channel len", len(d.incoming))
}
}
}
Expand Down
37 changes: 23 additions & 14 deletions pkg/chainsync/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/ipfs/go-cid"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/venus/pkg/chain"
"github.com/filecoin-project/venus/pkg/testhelpers"

fbig "github.com/filecoin-project/go-state-types/big"
Expand Down Expand Up @@ -36,12 +38,18 @@ func (fs *mockSyncer) HandleNewTipSet(_ context.Context, ci *syncTypes.Target) e
return nil
}

func (fs *mockSyncer) ValidateMsgMeta(ctx context.Context, fblk *types.FullBlock) error {
return nil
}

func TestDispatchStartHappy(t *testing.T) {
tf.UnitTest(t)
s := &mockSyncer{
headsCalled: make([]*types.TipSet, 0),
}
testDispatch := dispatcher.NewDispatcher(s)
builder := chain.NewBuilder(t, address.Undef)

testDispatch := dispatcher.NewDispatcher(s, builder.Store())

cis := []*types.ChainInfo{
// We need to put these in priority order to avoid a race.
Expand Down Expand Up @@ -69,7 +77,7 @@ func TestDispatchStartHappy(t *testing.T) {
waitCh := make(chan struct{})
// stm: @CHAINSYNC_DISPATCHER_REGISTER_CALLBACK_001
testDispatch.RegisterCallback(func(target *syncTypes.Target, _ error) {
if target.Head.Key().Equals(cis[4].Head.Key()) {
if target.Head.Key().Equals(cis[4].FullTipSet.TipSet().Key()) {
waitCh <- struct{}{}
}
})
Expand All @@ -93,10 +101,10 @@ func TestQueueHappy(t *testing.T) {
testQ := syncTypes.NewTargetTracker(20)

// Add syncRequests out of order
sR0 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 0, 1001))}
sR1 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 1, 1001))}
sR2 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 2, 1001))}
sR47 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 47, 1001))}
sR0 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 0, 1001).FullTipSet.TipSet()}
sR1 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 1, 1001).FullTipSet.TipSet()}
sR2 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 2, 1001).FullTipSet.TipSet()}
sR47 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 47, 1001).FullTipSet.TipSet()}

testQ.Add(sR2)
testQ.Add(sR47)
Expand All @@ -108,7 +116,7 @@ func TestQueueHappy(t *testing.T) {
// Pop in order
out0 := requirePop(t, testQ)

weight := out0.ChainInfo.Head.ParentWeight()
weight := out0.Head.ParentWeight()
assert.Equal(t, int64(1001), weight.Int.Int64())
}

Expand All @@ -117,8 +125,8 @@ func TestQueueDuplicates(t *testing.T) {
testQ := syncTypes.NewTargetTracker(20)

// Add syncRequests with same height
sR0 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 0, 1001))}
sR0dup := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 0, 1001))}
sR0 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 0, 1001).FullTipSet.TipSet()}
sR0dup := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 0, 1001).FullTipSet.TipSet()}

testQ.Add(sR0)
testQ.Add(sR0dup)
Expand All @@ -128,15 +136,15 @@ func TestQueueDuplicates(t *testing.T) {

// Pop
first := requirePop(t, testQ)
assert.Equal(t, abi.ChainEpoch(0), first.ChainInfo.Head.Height())
assert.Equal(t, abi.ChainEpoch(0), first.Head.Height())
testQ.Remove(first)
}

func TestQueueEmptyPopErrors(t *testing.T) {
tf.UnitTest(t)
testQ := syncTypes.NewTargetTracker(20)
sR0 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 0, 1002))}
sR47 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 47, 1001))}
sR0 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 0, 1002).FullTipSet.TipSet()}
sR47 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 47, 1001).FullTipSet.TipSet()}

// Add 2
testQ.Add(sR47)
Expand Down Expand Up @@ -187,8 +195,9 @@ func chainInfoWithHeightAndWeight(t *testing.T, h int, weight int64) *types.Chai
Data: []byte{0x4},
},
}
b, _ := types.NewTipSet([]*types.BlockHeader{blk})
return &types.ChainInfo{
Head: b,
FullTipSet: &types.FullTipSet{
Blocks: []*types.FullBlock{{Header: blk}},
},
}
}
24 changes: 12 additions & 12 deletions pkg/chainsync/syncer/syncer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,20 @@ func TestLoadFork(t *testing.T) {
right := builder.AppendManyOn(ctx, 3, base)

leftTarget := &types.Target{
Base: nil,
Current: nil,
Start: time.Time{},
End: time.Time{},
Err: nil,
ChainInfo: *types2.NewChainInfo("", "", left),
Base: nil,
Current: nil,
Start: time.Time{},
End: time.Time{},
Err: nil,
Head: left,
}
rightTarget := &types.Target{
Base: nil,
Current: nil,
Start: time.Time{},
End: time.Time{},
Err: nil,
ChainInfo: *types2.NewChainInfo("", "", right),
Base: nil,
Current: nil,
Start: time.Time{},
End: time.Time{},
Err: nil,
Head: right,
}
// Sync the two branches, which stores all blocks in the underlying stores.
// stm: @CHAINSYNC_SYNCER_HANDLE_NEW_TIP_SET_001, @CHAINSYNC_SYNCER_SET_HEAD_001
Expand Down
Loading
Loading