Skip to content

Commit

Permalink
Merge pull request #3643 from filecoin-project/revert-3602-feat/delay…
Browse files Browse the repository at this point in the history
…ed-pubsub-join

Revert "only subscribe to pubsub topics once we are synced"
  • Loading branch information
arajasek authored Sep 8, 2020
2 parents 60cae87 + c77f5f6 commit f985f42
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 110 deletions.
31 changes: 3 additions & 28 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"io"
"os"
"strconv"
Expand Down Expand Up @@ -52,8 +51,6 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation")
var DefaultTipSetCacheSize = 8192
var DefaultMsgMetaCacheSize = 2048

var ErrNotifieeDone = errors.New("notifee is done and should be removed")

func init() {
if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" {
tscs, err := strconv.Atoi(s)
Expand Down Expand Up @@ -361,33 +358,11 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
apply[i], apply[opp] = apply[opp], apply[i]
}

var toremove map[int]struct{}
for i, hcf := range notifees {
err := hcf(revert, apply)
if err != nil {
if err == ErrNotifieeDone {
if toremove == nil {
toremove = make(map[int]struct{})
}
toremove[i] = struct{}{}
} else {
log.Error("head change func errored (BAD): ", err)
}
for _, hcf := range notifees {
if err := hcf(revert, apply); err != nil {
log.Error("head change func errored (BAD): ", err)
}
}

if len(toremove) > 0 {
newNotifees := make([]ReorgNotifee, 0, len(notifees)-len(toremove))
for i, hcf := range notifees {
_, remove := toremove[i]
if remove {
continue
}
newNotifees = append(newNotifees, hcf)
}
notifees = newNotifees
}

case <-ctx.Done():
return
}
Expand Down
3 changes: 0 additions & 3 deletions chain/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)

Expand Down Expand Up @@ -234,7 +233,6 @@ func (tu *syncTestUtil) addSourceNode(gen int) {
node.Repo(sourceRepo),
node.MockHost(tu.mn),
node.Test(),
node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)),

node.Override(new(modules.Genesis), modules.LoadGenesis(genesis)),
)
Expand Down Expand Up @@ -267,7 +265,6 @@ func (tu *syncTestUtil) addClientNode() int {
node.Repo(repo.NewMemory(nil)),
node.MockHost(tu.mn),
node.Test(),
node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)),

node.Override(new(modules.Genesis), modules.LoadGenesis(tu.genesis)),
)
Expand Down
89 changes: 13 additions & 76 deletions node/modules/services.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package modules

import (
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
eventbus "github.com/libp2p/go-eventbus"
Expand All @@ -24,7 +22,6 @@ import (
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/sub"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/peermgr"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/modules/dtypes"
Expand Down Expand Up @@ -76,45 +73,14 @@ func RunBlockSync(h host.Host, svc *blocksync.BlockSyncService) {
h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream)
}

func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) {
nearsync := uint64(epochs) * uint64(build.BlockDelaySecs) * uint64(time.Second) //nolint
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
ctx := helpers.LifecycleCtx(mctx, lc)

// early check, are we synced at start up?
ts := stmgr.ChainStore().GetHeaviestTipSet()
timestamp := ts.MinTimestamp()
now := uint64(build.Clock.Now().UnixNano())
if timestamp > now-nearsync {
subscribe()
return
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
if err != nil {
panic(err)
}

// we are not synced, subscribe to head changes and wait for sync
stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
if len(app) == 0 {
return nil
}

latest := app[0].MinTimestamp()
for _, ts := range app[1:] {
timestamp := ts.MinTimestamp()
if timestamp > latest {
latest = timestamp
}
}

now := uint64(build.Clock.Now().UnixNano())
if latest > now-nearsync {
subscribe()
return store.ErrNotifieeDone
}

return nil
})
}

func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
ctx := helpers.LifecycleCtx(mctx, lc)

v := sub.NewBlockValidator(
h.ID(), chain, stmgr,
func(p peer.ID) {
Expand All @@ -126,53 +92,24 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
panic(err)
}

subscribe := func() {
log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))

blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
if err != nil {
panic(err)
}

go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
}

if bootstrapper {
subscribe()
return
}

// wait until we are synced within 10 blocks
waitForSync(stmgr, 10, subscribe)
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
}

func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) {
ctx := helpers.LifecycleCtx(mctx, lc)

v := sub.NewMessageValidator(h.ID(), mpool)

if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck
if err != nil {
panic(err)
}

subscribe := func() {
log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn))

msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint
if err != nil {
panic(err)
}

go sub.HandleIncomingMessages(ctx, mpool, msgsub)
}
v := sub.NewMessageValidator(h.ID(), mpool)

if bootstrapper {
subscribe()
return
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
panic(err)
}

// wait until we are synced within 1 block
waitForSync(stmgr, 1, subscribe)
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
}

func NewLocalDiscovery(ds dtypes.MetadataDS) *discovery.Local {
Expand Down
3 changes: 0 additions & 3 deletions node/test/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
miner2 "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
testing2 "github.com/filecoin-project/lotus/node/modules/testing"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/mockstorage"
Expand Down Expand Up @@ -371,8 +370,6 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner) ([]test

node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),

node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)),

genesis,
)
if err != nil {
Expand Down

0 comments on commit f985f42

Please sign in to comment.