diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index c6e0c8b803f..f3b6d412881 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -13,6 +13,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/util/adt" lru "github.com/hashicorp/golang-lru" blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-blockservice" bserv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" @@ -44,6 +45,10 @@ var ErrSoftFailure = errors.New("soft validation failure") var ErrInsufficientPower = errors.New("incoming block's miner does not have minimum power") func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bserv bserv.BlockService, cmgr connmgr.ConnManager) { + // Timeout after (block time + propagation delay). This is useless at + // this point. + timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second + for { msg, err := bsub.Next(ctx) if err != nil { @@ -64,15 +69,22 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha src := msg.GetFrom() go func() { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // NOTE: we could also share a single session between + // all requests but that may have other consequences. + ses := blockservice.NewSession(ctx, bserv) + start := build.Clock.Now() log.Debug("about to fetch messages for block from pubsub") - bmsgs, err := FetchMessagesByCids(context.TODO(), bserv, blk.BlsMessages) + bmsgs, err := FetchMessagesByCids(ctx, ses, blk.BlsMessages) if err != nil { log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; source: %s", err, src) return } - smsgs, err := FetchSignedMessagesByCids(context.TODO(), bserv, blk.SecpkMessages) + smsgs, err := FetchSignedMessagesByCids(ctx, ses, blk.SecpkMessages) if err != nil { log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; source: %s", err, src) return @@ -97,7 +109,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha func FetchMessagesByCids( ctx context.Context, - bserv bserv.BlockService, + bserv bserv.BlockGetter, cids []cid.Cid, ) ([]*types.Message, error) { out := make([]*types.Message, len(cids)) @@ -126,7 +138,7 @@ func FetchMessagesByCids( // FIXME: Duplicate of above. func FetchSignedMessagesByCids( ctx context.Context, - bserv bserv.BlockService, + bserv bserv.BlockGetter, cids []cid.Cid, ) ([]*types.SignedMessage, error) { out := make([]*types.SignedMessage, len(cids)) @@ -156,12 +168,12 @@ func FetchSignedMessagesByCids( // blocks we did not request. func fetchCids( ctx context.Context, - bserv bserv.BlockService, + bserv bserv.BlockGetter, cids []cid.Cid, cb func(int, blocks.Block) error, ) error { // FIXME: Why don't we use the context here? - fetchedBlocks := bserv.GetBlocks(context.TODO(), cids) + fetchedBlocks := bserv.GetBlocks(ctx, cids) cidIndex := make(map[cid.Cid]int) for i, c := range cids {