Skip to content

Commit

Permalink
use bitswap sessions when fetching messages, and cancel them
Browse files Browse the repository at this point in the history
1. Explicitly use a session when fetching messages for a block. Technically,
GetBlocks would use an internal session so we'd only end up with two, but one
per block is even better.
2. Actually cancel sessions after a timeout (threading through the context).

NOTE: We should seriously consider having a single session for all blocks, but
we can do that in a followup as it may have unintended consequences (e.g., leaks).
  • Loading branch information
Stebalien committed Oct 2, 2020
1 parent efc1b24 commit 6d58def
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions chain/sub/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/util/adt"
lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
Expand Down Expand Up @@ -44,6 +45,10 @@ var ErrSoftFailure = errors.New("soft validation failure")
var ErrInsufficientPower = errors.New("incoming block's miner does not have minimum power")

func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bserv bserv.BlockService, cmgr connmgr.ConnManager) {
// Timeout after (block time + propagation delay). This is useless at
// this point.
timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second

for {
msg, err := bsub.Next(ctx)
if err != nil {
Expand All @@ -64,15 +69,22 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
src := msg.GetFrom()

go func() {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

// NOTE: we could also share a single session between
// all requests but that may have other consequences.
ses := blockservice.NewSession(ctx, bserv)

start := build.Clock.Now()
log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := FetchMessagesByCids(context.TODO(), bserv, blk.BlsMessages)
bmsgs, err := FetchMessagesByCids(ctx, ses, blk.BlsMessages)
if err != nil {
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; source: %s", err, src)
return
}

smsgs, err := FetchSignedMessagesByCids(context.TODO(), bserv, blk.SecpkMessages)
smsgs, err := FetchSignedMessagesByCids(ctx, ses, blk.SecpkMessages)
if err != nil {
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; source: %s", err, src)
return
Expand All @@ -97,7 +109,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha

func FetchMessagesByCids(
ctx context.Context,
bserv bserv.BlockService,
bserv bserv.BlockGetter,
cids []cid.Cid,
) ([]*types.Message, error) {
out := make([]*types.Message, len(cids))
Expand Down Expand Up @@ -126,7 +138,7 @@ func FetchMessagesByCids(
// FIXME: Duplicate of above.
func FetchSignedMessagesByCids(
ctx context.Context,
bserv bserv.BlockService,
bserv bserv.BlockGetter,
cids []cid.Cid,
) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, len(cids))
Expand Down Expand Up @@ -156,12 +168,11 @@ func FetchSignedMessagesByCids(
// blocks we did not request.
func fetchCids(
ctx context.Context,
bserv bserv.BlockService,
bserv bserv.BlockGetter,
cids []cid.Cid,
cb func(int, blocks.Block) error,
) error {
// FIXME: Why don't we use the context here?
fetchedBlocks := bserv.GetBlocks(context.TODO(), cids)
fetchedBlocks := bserv.GetBlocks(ctx, cids)

cidIndex := make(map[cid.Cid]int)
for i, c := range cids {
Expand Down

0 comments on commit 6d58def

Please sign in to comment.