diff --git a/app/submodule/syncer/syncer_api.go b/app/submodule/syncer/syncer_api.go index 4eefaba763..c3beea90df 100644 --- a/app/submodule/syncer/syncer_api.go +++ b/app/submodule/syncer/syncer_api.go @@ -32,13 +32,14 @@ func (sa *syncerAPI) SyncerTracker(ctx context.Context) *types.TargetTracker { } convertTarget := func(src *syncTypes.Target) *types.Target { return &types.Target{ - State: convertSyncStateStage(src.State), - Base: src.Base, - Current: src.Current, - Start: src.Start, - End: src.End, - Err: src.Err, - ChainInfo: src.ChainInfo, + State: convertSyncStateStage(src.State), + Base: src.Base, + Current: src.Current, + Start: src.Start, + End: src.End, + Err: src.Err, + Head: src.Head, + Sender: src.Sender, } } for _, target := range tracker.History() { @@ -136,16 +137,11 @@ func (sa *syncerAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) e return fmt.Errorf("provided messages did not match block: %v", err) } - ts, err := types.NewTipSet([]*types.BlockHeader{blk.Header}) - if err != nil { - return fmt.Errorf("somehow failed to make a tipset out of a single block: %v", err) - } - if _, err := chainModule.ChainReader.PutObject(ctx, blk.Header); err != nil { return err } localPeer := sa.syncer.NetworkModule.Network.GetPeerID() - ci := types.NewChainInfo(localPeer, localPeer, ts) + ci := types.NewChainInfo(localPeer, localPeer, &types.FullTipSet{Blocks: []*types.FullBlock{fb}}) if err := sa.syncer.SyncProvider.HandleNewTipSet(ci); err != nil { return fmt.Errorf("sync to submitted block failed: %v", err) } diff --git a/app/submodule/syncer/syncer_submodule.go b/app/submodule/syncer/syncer_submodule.go index c1020a60fe..5de977107d 100644 --- a/app/submodule/syncer/syncer_submodule.go +++ b/app/submodule/syncer/syncer_submodule.go @@ -207,11 +207,13 @@ func (syncer *SyncerSubmodule) handleIncomingBlocks(ctx context.Context, msg pub blkSvc := blockservice.New(syncer.BlockstoreModule.Blockstore, syncer.NetworkModule.Bitswap) - if _, err := syncer.NetworkModule.FetchMessagesByCids(ctx, blkSvc, bm.BlsMessages); err != nil { + blsMsgs, err := syncer.NetworkModule.FetchMessagesByCids(ctx, blkSvc, bm.BlsMessages) + if err != nil { log.Errorf("fetch block bls messages failed:%s", err.Error()) return } - if _, err := syncer.NetworkModule.FetchSignedMessagesByCids(ctx, blkSvc, bm.SecpkMessages); err != nil { + secpMsgs, err := syncer.NetworkModule.FetchSignedMessagesByCids(ctx, blkSvc, bm.SecpkMessages) + if err != nil { log.Errorf("fetch block signed messages failed:%s", err.Error()) return } @@ -224,8 +226,12 @@ func (syncer *SyncerSubmodule) handleIncomingBlocks(ctx context.Context, msg pub syncer.NetworkModule.Host.ConnManager().TagPeer(sender, "new-block", 20) - ts, _ := types.NewTipSet([]*types.BlockHeader{header}) - chainInfo := types.NewChainInfo(source, sender, ts) + fullBlock := &types.FullBlock{ + Header: header, + BLSMessages: blsMsgs, + SECPMessages: secpMsgs, + } + chainInfo := types.NewChainInfo(source, sender, &types.FullTipSet{Blocks: []*types.FullBlock{fullBlock}}) if err = syncer.ChainSyncManager.BlockProposer().SendGossipBlock(chainInfo); err != nil { log.Errorf("failed to notify syncer of new block, block: %s", err) diff --git a/pkg/chainsync/chainsync.go b/pkg/chainsync/chainsync.go index 3d55f30e00..662050d7fd 100644 --- a/pkg/chainsync/chainsync.go +++ b/pkg/chainsync/chainsync.go @@ -54,7 +54,10 @@ func NewManager( } return Manager{ - dispatcher: dispatcher.NewDispatcher(chainSyncer), + dispatcher: dispatcher.NewDispatcher(struct { + *syncer.Syncer + *consensus.BlockValidator + }{Syncer: chainSyncer, BlockValidator: hv}, submodule.ChainReader), }, nil } diff --git a/pkg/chainsync/dispatcher/dispatcher.go b/pkg/chainsync/dispatcher/dispatcher.go index beceeceb9b..de3478d62d 100644 --- a/pkg/chainsync/dispatcher/dispatcher.go +++ b/pkg/chainsync/dispatcher/dispatcher.go @@ -3,12 +3,14 @@ package dispatcher import ( "container/list" "context" + "fmt" "runtime/debug" "sync" atmoic2 "sync/atomic" "time" "github.com/filecoin-project/pubsub" + "github.com/filecoin-project/venus/pkg/chain" "github.com/filecoin-project/venus/pkg/chainsync/types" types2 "github.com/filecoin-project/venus/venus-shared/types" "github.com/streadway/handy/atomic" @@ -30,15 +32,16 @@ const LocalIncoming = "incoming" type dispatchSyncer interface { Head() *types2.TipSet HandleNewTipSet(context.Context, *types.Target) error + ValidateMsgMeta(ctx context.Context, fblk *types2.FullBlock) error } // NewDispatcher creates a new syncing dispatcher with default queue sizes. -func NewDispatcher(catchupSyncer dispatchSyncer) *Dispatcher { - return NewDispatcherWithSizes(catchupSyncer, DefaultWorkQueueSize, DefaultInQueueSize) +func NewDispatcher(catchupSyncer dispatchSyncer, chainStore *chain.Store) *Dispatcher { + return NewDispatcherWithSizes(catchupSyncer, chainStore, DefaultWorkQueueSize, DefaultInQueueSize) } // NewDispatcherWithSizes creates a new syncing dispatcher. -func NewDispatcherWithSizes(syncer dispatchSyncer, workQueueSize, inQueueSize int) *Dispatcher { +func NewDispatcherWithSizes(syncer dispatchSyncer, chainStore *chain.Store, workQueueSize, inQueueSize int) *Dispatcher { return &Dispatcher{ workTracker: types.NewTargetTracker(workQueueSize), syncer: syncer, @@ -48,6 +51,7 @@ func NewDispatcherWithSizes(syncer dispatchSyncer, workQueueSize, inQueueSize in cancelControler: list.New(), maxCount: 1, incomingPubsub: pubsub.New(50), + chainStore: chainStore, } } @@ -90,35 +94,62 @@ type Dispatcher struct { maxCount int64 incomingPubsub *pubsub.PubSub + chainStore *chain.Store } -// SyncTracker returns the target tracker of syncing +// SyncTracker returnss the target tracker of syncing func (d *Dispatcher) SyncTracker() *types.TargetTracker { return d.workTracker } +func (d *Dispatcher) sendHead(ci *types2.ChainInfo) error { + ctx := context.Background() + fts := ci.FullTipSet + if fts == nil { + return fmt.Errorf("got nil tipset") + } + + for _, b := range fts.Blocks { + if err := d.syncer.ValidateMsgMeta(ctx, b); err != nil { + log.Warnf("invalid block %s received: %s", b.Cid(), err) + return fmt.Errorf("validate block %s message meta failed: %v", b.Cid(), err) + } + } + + for _, b := range fts.Blocks { + _, err := d.chainStore.PutObject(ctx, b.Header) + if err != nil { + return fmt.Errorf("fail to save block to tipset") + } + } + + d.incomingPubsub.Pub(fts.TipSet().Blocks(), LocalIncoming) + + return d.addTracker(ci) +} + // SendHello handles chain information from bootstrap peers. func (d *Dispatcher) SendHello(ci *types2.ChainInfo) error { - return d.addTracker(ci) + return d.sendHead(ci) } // SendOwnBlock handles chain info from a node's own mining system func (d *Dispatcher) SendOwnBlock(ci *types2.ChainInfo) error { - return d.addTracker(ci) + return d.sendHead(ci) } // SendGossipBlock handles chain info from new blocks sent on pubsub func (d *Dispatcher) SendGossipBlock(ci *types2.ChainInfo) error { - return d.addTracker(ci) + return d.sendHead(ci) } func (d *Dispatcher) addTracker(ci *types2.ChainInfo) error { - d.incomingPubsub.Pub(ci.Head.Blocks(), LocalIncoming) d.incoming <- &types.Target{ - ChainInfo: *ci, - Base: d.syncer.Head(), - Current: d.syncer.Head(), - Start: time.Now(), + Head: ci.FullTipSet.TipSet(), + Base: d.syncer.Head(), + Current: d.syncer.Head(), + Start: time.Now(), + Sender: ci.Sender, } return nil } @@ -152,7 +183,7 @@ func (d *Dispatcher) processIncoming(ctx context.Context) { // Sort new targets by putting on work queue. if d.workTracker.Add(target) { log.Infow("received new tipset", "height", target.Head.Height(), "blocks", target.Head.Len(), "from", - target.ChainInfo.Sender, "current work len", d.workTracker.Len(), "incoming channel len", len(d.incoming)) + target.Sender, "current work len", d.workTracker.Len(), "incoming channel len", len(d.incoming)) } } } diff --git a/pkg/chainsync/dispatcher/dispatcher_test.go b/pkg/chainsync/dispatcher/dispatcher_test.go index f852d62650..134186f295 100644 --- a/pkg/chainsync/dispatcher/dispatcher_test.go +++ b/pkg/chainsync/dispatcher/dispatcher_test.go @@ -8,6 +8,8 @@ import ( "github.com/ipfs/go-cid" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/venus/pkg/chain" "github.com/filecoin-project/venus/pkg/testhelpers" fbig "github.com/filecoin-project/go-state-types/big" @@ -36,12 +38,18 @@ func (fs *mockSyncer) HandleNewTipSet(_ context.Context, ci *syncTypes.Target) e return nil } +func (fs *mockSyncer) ValidateMsgMeta(ctx context.Context, fblk *types.FullBlock) error { + return nil +} + func TestDispatchStartHappy(t *testing.T) { tf.UnitTest(t) s := &mockSyncer{ headsCalled: make([]*types.TipSet, 0), } - testDispatch := dispatcher.NewDispatcher(s) + builder := chain.NewBuilder(t, address.Undef) + + testDispatch := dispatcher.NewDispatcher(s, builder.Store()) cis := []*types.ChainInfo{ // We need to put these in priority order to avoid a race. @@ -69,7 +77,7 @@ func TestDispatchStartHappy(t *testing.T) { waitCh := make(chan struct{}) // stm: @CHAINSYNC_DISPATCHER_REGISTER_CALLBACK_001 testDispatch.RegisterCallback(func(target *syncTypes.Target, _ error) { - if target.Head.Key().Equals(cis[4].Head.Key()) { + if target.Head.Key().Equals(cis[4].FullTipSet.TipSet().Key()) { waitCh <- struct{}{} } }) @@ -93,10 +101,10 @@ func TestQueueHappy(t *testing.T) { testQ := syncTypes.NewTargetTracker(20) // Add syncRequests out of order - sR0 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 0, 1001))} - sR1 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 1, 1001))} - sR2 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 2, 1001))} - sR47 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 47, 1001))} + sR0 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 0, 1001).FullTipSet.TipSet()} + sR1 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 1, 1001).FullTipSet.TipSet()} + sR2 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 2, 1001).FullTipSet.TipSet()} + sR47 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 47, 1001).FullTipSet.TipSet()} testQ.Add(sR2) testQ.Add(sR47) @@ -108,7 +116,7 @@ func TestQueueHappy(t *testing.T) { // Pop in order out0 := requirePop(t, testQ) - weight := out0.ChainInfo.Head.ParentWeight() + weight := out0.Head.ParentWeight() assert.Equal(t, int64(1001), weight.Int.Int64()) } @@ -117,8 +125,8 @@ func TestQueueDuplicates(t *testing.T) { testQ := syncTypes.NewTargetTracker(20) // Add syncRequests with same height - sR0 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 0, 1001))} - sR0dup := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 0, 1001))} + sR0 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 0, 1001).FullTipSet.TipSet()} + sR0dup := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 0, 1001).FullTipSet.TipSet()} testQ.Add(sR0) testQ.Add(sR0dup) @@ -128,15 +136,15 @@ func TestQueueDuplicates(t *testing.T) { // Pop first := requirePop(t, testQ) - assert.Equal(t, abi.ChainEpoch(0), first.ChainInfo.Head.Height()) + assert.Equal(t, abi.ChainEpoch(0), first.Head.Height()) testQ.Remove(first) } func TestQueueEmptyPopErrors(t *testing.T) { tf.UnitTest(t) testQ := syncTypes.NewTargetTracker(20) - sR0 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 0, 1002))} - sR47 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 47, 1001))} + sR0 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 0, 1002).FullTipSet.TipSet()} + sR47 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 47, 1001).FullTipSet.TipSet()} // Add 2 testQ.Add(sR47) @@ -187,8 +195,9 @@ func chainInfoWithHeightAndWeight(t *testing.T, h int, weight int64) *types.Chai Data: []byte{0x4}, }, } - b, _ := types.NewTipSet([]*types.BlockHeader{blk}) return &types.ChainInfo{ - Head: b, + FullTipSet: &types.FullTipSet{ + Blocks: []*types.FullBlock{{Header: blk}}, + }, } } diff --git a/pkg/chainsync/syncer/syncer_integration_test.go b/pkg/chainsync/syncer/syncer_integration_test.go index 532c14160c..fad7aad0f1 100644 --- a/pkg/chainsync/syncer/syncer_integration_test.go +++ b/pkg/chainsync/syncer/syncer_integration_test.go @@ -53,20 +53,20 @@ func TestLoadFork(t *testing.T) { right := builder.AppendManyOn(ctx, 3, base) leftTarget := &types.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types2.NewChainInfo("", "", left), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: left, } rightTarget := &types.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types2.NewChainInfo("", "", right), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: right, } // Sync the two branches, which stores all blocks in the underlying stores. // stm: @CHAINSYNC_SYNCER_HANDLE_NEW_TIP_SET_001, @CHAINSYNC_SYNCER_SET_HEAD_001 diff --git a/pkg/chainsync/syncer/syncer_test.go b/pkg/chainsync/syncer/syncer_test.go index b62632b7ae..42bae1a97d 100644 --- a/pkg/chainsync/syncer/syncer_test.go +++ b/pkg/chainsync/syncer/syncer_test.go @@ -34,12 +34,12 @@ func TestOneBlock(t *testing.T) { builder, syncer := setup(ctx, t) t1 := builder.AppendOn(ctx, builder.Genesis(), 1) target := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", t1), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: t1, } assert.NoError(t, syncer.HandleNewTipSet(ctx, target)) @@ -58,12 +58,12 @@ func TestMultiBlockTip(t *testing.T) { tip := builder.AppendOn(ctx, genesis, 2) target := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", tip), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: tip, } assert.NoError(t, syncer.HandleNewTipSet(ctx, target)) @@ -88,38 +88,38 @@ func TestChainIncremental(t *testing.T) { t4 := builder.AppendOn(ctx, t3, 2) target1 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", t1), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: t1, } target2 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", t2), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: t2, } target3 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", t3), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: t3, } target4 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", t4), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: t4, } assert.NoError(t, syncer.HandleNewTipSet(ctx, target1)) assert.NoError(t, builder.FlushHead(ctx)) @@ -154,12 +154,12 @@ func TestChainJump(t *testing.T) { t4 := builder.AppendOn(ctx, t3, 2) target1 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", t4), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: t4, } assert.NoError(t, syncer.HandleNewTipSet(ctx, target1)) assert.NoError(t, builder.FlushHead(ctx)) @@ -186,12 +186,12 @@ func TestIgnoreLightFork(t *testing.T) { // Sync heaviest branch first. target4 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", t4), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: t4, } assert.NoError(t, syncer.HandleNewTipSet(ctx, target4)) assert.NoError(t, builder.FlushHead(ctx)) @@ -201,12 +201,12 @@ func TestIgnoreLightFork(t *testing.T) { // Lighter fork is processed but not change head. forkHeadTarget := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", forkHead), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: forkHead, } assert.Error(t, syncer.HandleNewTipSet(ctx, forkHeadTarget)) assert.NoError(t, builder.FlushHead(ctx)) @@ -233,12 +233,12 @@ func TestAcceptHeavierFork(t *testing.T) { fork3 := builder.AppendOn(ctx, fork2, 1) main4Target := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", main4), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: main4, } assert.NoError(t, syncer.HandleNewTipSet(ctx, main4Target)) assert.NoError(t, builder.FlushHead(ctx)) @@ -247,12 +247,12 @@ func TestAcceptHeavierFork(t *testing.T) { // Heavier fork updates head3 fork3Target := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", fork3), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: fork3, } assert.NoError(t, syncer.HandleNewTipSet(ctx, fork3Target)) assert.NoError(t, builder.FlushHead(ctx)) @@ -270,12 +270,12 @@ func TestRejectFinalityFork(t *testing.T) { head := builder.AppendManyOn(ctx, int(policy.ChainFinality+2), genesis) target := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", head), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: head, } assert.NoError(t, s.HandleNewTipSet(ctx, target)) @@ -287,12 +287,12 @@ func TestRejectFinalityFork(t *testing.T) { }) forkFinalityHead := builder.AppendManyOn(ctx, int(policy.ChainFinality), forkFinalityBase) forkHeadTarget := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", forkFinalityHead), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: forkFinalityHead, } assert.Error(t, s.HandleNewTipSet(ctx, forkHeadTarget)) } @@ -305,12 +305,12 @@ func TestNoUncessesaryFetch(t *testing.T) { head := builder.AppendManyOn(ctx, 4, genesis) target := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", head), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: head, } assert.NoError(t, s.HandleNewTipSet(ctx, target)) @@ -332,12 +332,12 @@ func TestNoUncessesaryFetch(t *testing.T) { require.NoError(t, err) target2 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", head), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: head, } err = newSyncer.HandleNewTipSet(ctx, target2) assert.Contains(t, err.Error(), "do not sync to a target has synced before") @@ -367,12 +367,12 @@ func TestSubsetParent(t *testing.T) { tipA1A2 := builder.AppendOn(ctx, genesis, 2) tipB1B2B3 := builder.AppendOn(ctx, tipA1A2, 3) target1 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", tipB1B2B3), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: tipB1B2B3, } require.NoError(t, s.HandleNewTipSet(ctx, target1)) @@ -382,12 +382,12 @@ func TestSubsetParent(t *testing.T) { tipC1C2 := builder.AppendOn(ctx, tipB1B2, 2) target2 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", tipC1C2), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: tipC1C2, } assert.NoError(t, s.HandleNewTipSet(ctx, target2)) @@ -397,24 +397,24 @@ func TestSubsetParent(t *testing.T) { tipD1OnC1 := builder.AppendOn(ctx, tipC1, 1) target3 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", tipD1OnC1), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: tipD1OnC1, } assert.NoError(t, s.HandleNewTipSet(ctx, target3)) // A full parent also works fine: {C1, C2} -> D1 tipD1OnC1C2 := builder.AppendOn(ctx, tipC1C2, 1) target4 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", tipD1OnC1C2), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: tipD1OnC1C2, } assert.NoError(t, s.HandleNewTipSet(ctx, target4)) } @@ -436,24 +436,24 @@ func TestBlockNotLinkedRejected(t *testing.T) { b1 := shadowBuilder.AppendOn(ctx, genesis, 1) b2 := shadowBuilder.AppendOn(ctx, b1, 1) target1 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", b2), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: b2, } assert.Error(t, syncer.HandleNewTipSet(ctx, target1)) // Make the same block available from the syncer's builder builder.AppendBlockOn(ctx, genesis) target2 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", b1), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: b1, } assert.NoError(t, syncer.HandleNewTipSet(ctx, target2)) } @@ -525,12 +525,12 @@ func TestSemanticallyBadTipSetFails(t *testing.T) { // Set up a fresh builder without any of this data target1 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", link1), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: link1, } err = syncer.HandleNewTipSet(ctx, target1) require.Error(t, err) @@ -553,12 +553,12 @@ func TestStoresMessageReceipts(t *testing.T) { }) target1 := &syncTypes.Target{ - Base: nil, - Current: nil, - Start: time.Time{}, - End: time.Time{}, - Err: nil, - ChainInfo: *types.NewChainInfo("", "", t1), + Base: nil, + Current: nil, + Start: time.Time{}, + End: time.Time{}, + Err: nil, + Head: t1, } assert.NoError(t, syncer.HandleNewTipSet(ctx, target1)) diff --git a/pkg/chainsync/types/target_tracker.go b/pkg/chainsync/types/target_tracker.go index ae9409336e..94fd166603 100644 --- a/pkg/chainsync/types/target_tracker.go +++ b/pkg/chainsync/types/target_tracker.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/venus/venus-shared/actors/builtin" "github.com/filecoin-project/venus/venus-shared/actors/policy" + "github.com/libp2p/go-libp2p/core/peer" fbig "github.com/filecoin-project/go-state-types/big" "github.com/ipfs/go-cid" @@ -30,7 +31,8 @@ type Target struct { Start time.Time End time.Time Err error - types.ChainInfo + Head *types.TipSet + Sender peer.ID } // IsNeighbor the target t is neighbor or not @@ -242,11 +244,11 @@ func (tq *TargetTracker) Add(t *Target) bool { return false } } else { - delete(tq.targetSet, replaceTarget.ChainInfo.Head.String()) + delete(tq.targetSet, replaceTarget.Head.String()) tq.q[replaceIndex] = t } - tq.targetSet[t.ChainInfo.Head.String()] = t + tq.targetSet[t.Head.String()] = t sortTarget(tq.q) // update lowweight tq.lowWeight = tq.q[len(tq.q)-1].Head.At(0).ParentWeight @@ -388,7 +390,7 @@ func (tq *TargetTracker) Remove(t *Target) { t.End = time.Now() if tq.history.Len() > tq.historySize { tq.history.Remove(tq.history.Front()) // remove olddest - popKey := tq.history.Front().Value.(*Target).ChainInfo.Head.String() + popKey := tq.history.Front().Value.(*Target).Head.String() delete(tq.targetSet, popKey) } tq.history.PushBack(t) diff --git a/pkg/consensus/block_validator.go b/pkg/consensus/block_validator.go index f7005e789a..0775b38981 100644 --- a/pkg/consensus/block_validator.go +++ b/pkg/consensus/block_validator.go @@ -169,7 +169,7 @@ func (bv *BlockValidator) validateBlock(ctx context.Context, blk *types.BlockHea return fmt.Errorf("block was from the future (now=%d, blk=%d): %v", now, blk.Timestamp, ErrTemporal) } if blk.Timestamp > now { - logExpect.Warn("Got block from the future, but within threshold", blk.Timestamp, time.Now().Unix()) + logExpect.Warn("Got block from the future, but within threshold ", blk.Timestamp, time.Now().Unix()) } // get parent beacon diff --git a/pkg/net/helloprotocol/hello_protocol.go b/pkg/net/helloprotocol/hello_protocol.go index 9f773426c1..3281f413ee 100644 --- a/pkg/net/helloprotocol/hello_protocol.go +++ b/pkg/net/helloprotocol/hello_protocol.go @@ -157,20 +157,6 @@ func (h *HelloProtocolHandler) handleNewStream(s net.Stream) { fullTipSet, err := h.loadLocalFullTipset(ctx, types.NewTipSetKey(hello.HeaviestTipSetCids...)) if err != nil { fullTipSet, err = h.exchange.GetFullTipSet(ctx, []peer.ID{from}, types.NewTipSetKey(hello.HeaviestTipSetCids...)) //nolint - if err == nil { - for _, b := range fullTipSet.Blocks { - _, err = h.chainStore.PutObject(ctx, b.Header) - if err != nil { - log.Errorf("fail to save block to tipset") - return - } - _, err = h.messageStore.StoreMessages(ctx, b.SECPMessages, b.BLSMessages) - if err != nil { - log.Errorf("fail to save block to tipset") - return - } - } - } h.host.ConnManager().TagPeer(from, "new-block", 40) } if err != nil { @@ -183,7 +169,7 @@ func (h *HelloProtocolHandler) handleNewStream(s net.Stream) { } // notify the local node of the new `block.ChainInfo` - ci := types.NewChainInfo(from, from, fullTipSet.TipSet()) + ci := types.NewChainInfo(from, from, fullTipSet) h.peerDiscovered(ci) } diff --git a/pkg/net/helloprotocol/hello_protocol_test.go b/pkg/net/helloprotocol/hello_protocol_test.go index 4aee711056..4230a59e7f 100644 --- a/pkg/net/helloprotocol/hello_protocol_test.go +++ b/pkg/net/helloprotocol/hello_protocol_test.go @@ -34,7 +34,7 @@ type mockHelloCallback struct { } func (msb *mockHelloCallback) HelloCallback(ci *types.ChainInfo) { - msb.Called(ci.Sender, ci.Head.Key()) + msb.Called(ci.Sender, ci.FullTipSet.TipSet().Key()) } func TestHelloHandshake(t *testing.T) { diff --git a/venus-shared/api/chain/v0/method.md b/venus-shared/api/chain/v0/method.md index 3fd9b17793..844a36ae2b 100644 --- a/venus-shared/api/chain/v0/method.md +++ b/venus-shared/api/chain/v0/method.md @@ -5472,10 +5472,103 @@ Inputs: { "Source": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "Sender": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", - "Head": { - "Cids": null, - "Blocks": null, - "Height": 0 + "FullTipSet": { + "Blocks": [ + { + "Header": { + "Miner": "f01234", + "Ticket": { + "VRFProof": "Bw==" + }, + "ElectionProof": { + "WinCount": 9, + "VRFProof": "Bw==" + }, + "BeaconEntries": [ + { + "Round": 42, + "Data": "Ynl0ZSBhcnJheQ==" + } + ], + "WinPoStProof": [ + { + "PoStProof": 8, + "ProofBytes": "Ynl0ZSBhcnJheQ==" + } + ], + "Parents": [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } + ], + "ParentWeight": "0", + "Height": 10101, + "ParentStateRoot": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "ParentMessageReceipts": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "Messages": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "BLSAggregate": { + "Type": 2, + "Data": "Ynl0ZSBhcnJheQ==" + }, + "Timestamp": 42, + "BlockSig": { + "Type": 2, + "Data": "Ynl0ZSBhcnJheQ==" + }, + "ForkSignaling": 42, + "ParentBaseFee": "0" + }, + "BLSMessages": [ + { + "CID": { + "/": "bafy2bzacebbpdegvr3i4cosewthysg5xkxpqfn2wfcz6mv2hmoktwbdxkax4s" + }, + "Version": 42, + "To": "f01234", + "From": "f01234", + "Nonce": 42, + "Value": "0", + "GasLimit": 9, + "GasFeeCap": "0", + "GasPremium": "0", + "Method": 1, + "Params": "Ynl0ZSBhcnJheQ==" + } + ], + "SECPMessages": [ + { + "Message": { + "CID": { + "/": "bafy2bzacebbpdegvr3i4cosewthysg5xkxpqfn2wfcz6mv2hmoktwbdxkax4s" + }, + "Version": 42, + "To": "f01234", + "From": "f01234", + "Nonce": 42, + "Value": "0", + "GasLimit": 9, + "GasFeeCap": "0", + "GasPremium": "0", + "Method": 1, + "Params": "Ynl0ZSBhcnJheQ==" + }, + "Signature": { + "Type": 2, + "Data": "Ynl0ZSBhcnJheQ==" + }, + "CID": { + "/": "bafy2bzacebbpdegvr3i4cosewthysg5xkxpqfn2wfcz6mv2hmoktwbdxkax4s" + } + } + ] + } + ] } } ] @@ -5723,13 +5816,12 @@ Response: "Start": "0001-01-01T00:00:00Z", "End": "0001-01-01T00:00:00Z", "Err": {}, - "Source": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", - "Sender": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "Head": { "Cids": null, "Blocks": null, "Height": 0 - } + }, + "Sender": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf" } ], "Buckets": [ @@ -5748,13 +5840,12 @@ Response: "Start": "0001-01-01T00:00:00Z", "End": "0001-01-01T00:00:00Z", "Err": {}, - "Source": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", - "Sender": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "Head": { "Cids": null, "Blocks": null, "Height": 0 - } + }, + "Sender": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf" } ] } diff --git a/venus-shared/api/chain/v1/method.md b/venus-shared/api/chain/v1/method.md index 2bfdd04c5d..143f53a90e 100644 --- a/venus-shared/api/chain/v1/method.md +++ b/venus-shared/api/chain/v1/method.md @@ -6760,10 +6760,103 @@ Inputs: { "Source": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "Sender": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", - "Head": { - "Cids": null, - "Blocks": null, - "Height": 0 + "FullTipSet": { + "Blocks": [ + { + "Header": { + "Miner": "f01234", + "Ticket": { + "VRFProof": "Bw==" + }, + "ElectionProof": { + "WinCount": 9, + "VRFProof": "Bw==" + }, + "BeaconEntries": [ + { + "Round": 42, + "Data": "Ynl0ZSBhcnJheQ==" + } + ], + "WinPoStProof": [ + { + "PoStProof": 8, + "ProofBytes": "Ynl0ZSBhcnJheQ==" + } + ], + "Parents": [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } + ], + "ParentWeight": "0", + "Height": 10101, + "ParentStateRoot": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "ParentMessageReceipts": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "Messages": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "BLSAggregate": { + "Type": 2, + "Data": "Ynl0ZSBhcnJheQ==" + }, + "Timestamp": 42, + "BlockSig": { + "Type": 2, + "Data": "Ynl0ZSBhcnJheQ==" + }, + "ForkSignaling": 42, + "ParentBaseFee": "0" + }, + "BLSMessages": [ + { + "CID": { + "/": "bafy2bzacebbpdegvr3i4cosewthysg5xkxpqfn2wfcz6mv2hmoktwbdxkax4s" + }, + "Version": 42, + "To": "f01234", + "From": "f01234", + "Nonce": 42, + "Value": "0", + "GasLimit": 9, + "GasFeeCap": "0", + "GasPremium": "0", + "Method": 1, + "Params": "Ynl0ZSBhcnJheQ==" + } + ], + "SECPMessages": [ + { + "Message": { + "CID": { + "/": "bafy2bzacebbpdegvr3i4cosewthysg5xkxpqfn2wfcz6mv2hmoktwbdxkax4s" + }, + "Version": 42, + "To": "f01234", + "From": "f01234", + "Nonce": 42, + "Value": "0", + "GasLimit": 9, + "GasFeeCap": "0", + "GasPremium": "0", + "Method": 1, + "Params": "Ynl0ZSBhcnJheQ==" + }, + "Signature": { + "Type": 2, + "Data": "Ynl0ZSBhcnJheQ==" + }, + "CID": { + "/": "bafy2bzacebbpdegvr3i4cosewthysg5xkxpqfn2wfcz6mv2hmoktwbdxkax4s" + } + } + ] + } + ] } } ] @@ -7011,13 +7104,12 @@ Response: "Start": "0001-01-01T00:00:00Z", "End": "0001-01-01T00:00:00Z", "Err": {}, - "Source": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", - "Sender": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "Head": { "Cids": null, "Blocks": null, "Height": 0 - } + }, + "Sender": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf" } ], "Buckets": [ @@ -7036,13 +7128,12 @@ Response: "Start": "0001-01-01T00:00:00Z", "End": "0001-01-01T00:00:00Z", "Err": {}, - "Source": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", - "Sender": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "Head": { "Cids": null, "Blocks": null, "Height": 0 - } + }, + "Sender": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf" } ] } diff --git a/venus-shared/types/api_types.go b/venus-shared/types/api_types.go index 9a1fe9178f..968b7c5302 100644 --- a/venus-shared/types/api_types.go +++ b/venus-shared/types/api_types.go @@ -344,7 +344,12 @@ type Target struct { Start time.Time End time.Time Err error - ChainInfo + Head *TipSet + Sender peer.ID +} + +func (target *Target) String() string { + return fmt.Sprintf("{sender:%s height=%d head=%s}", target.Sender, target.Head.Height(), target.Head.Key()) } type TargetTracker struct { diff --git a/venus-shared/types/chain_info.go b/venus-shared/types/chain_info.go index 6ab4ddd3b0..c49dc63594 100644 --- a/venus-shared/types/chain_info.go +++ b/venus-shared/types/chain_info.go @@ -11,21 +11,22 @@ type ChainInfo struct { // The originator of the TipSetKey propagation wave. Source peer.ID // The peer that sent us the TipSetKey message. - Sender peer.ID - Head *TipSet + Sender peer.ID + FullTipSet *FullTipSet } // NewChainInfo creates a chain info from a peer id a head tipset key and a // chain height. -func NewChainInfo(source peer.ID, sender peer.ID, head *TipSet) *ChainInfo { +func NewChainInfo(source peer.ID, sender peer.ID, fts *FullTipSet) *ChainInfo { return &ChainInfo{ - Source: source, - Sender: sender, - Head: head, + Source: source, + Sender: sender, + FullTipSet: fts, } } // String returns a human-readable string representation of a chain info func (i *ChainInfo) String() string { - return fmt.Sprintf("{source=%s sender:%s height=%d head=%s}", i.Source, i.Sender, i.Head.Height(), i.Head.Key()) + ts := i.FullTipSet.TipSet() + return fmt.Sprintf("{source=%s sender:%s height=%d head=%s}", i.Source, i.Sender, ts.Height(), ts.Key()) }