Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize SearchForMessage and GetReceipt #4246

Merged
merged 2 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 33 additions & 24 deletions chain/stmgr/stmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stmgr

import (
"context"
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -507,16 +508,7 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T
return nil, fmt.Errorf("failed to load message: %w", err)
}

r, _, err := sm.tipsetExecutedMessage(ts, msg, m.VMMessage())
if err != nil {
return nil, err
}

if r != nil {
return r, nil
}

_, r, _, err = sm.searchBackForMsg(ctx, ts, m)
_, r, _, err := sm.searchBackForMsg(ctx, ts, m)
if err != nil {
return nil, fmt.Errorf("failed to look back through chain for message: %w", err)
}
Expand Down Expand Up @@ -674,6 +666,18 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*ty
func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {

cur := from
curActor, err := sm.LoadActor(ctx, m.VMMessage().From, cur)
if err != nil {
return nil, nil, cid.Undef, xerrors.Errorf("failed to load initital tipset")
}

mFromId, err := sm.LookupID(ctx, m.VMMessage().From, from)
if err != nil {
return nil, nil, cid.Undef, xerrors.Errorf("looking up From id address: %w", err)
}

mNonce := m.VMMessage().Nonce

for {
if cur.Height() == 0 {
// it ain't here!
Expand All @@ -686,32 +690,37 @@ func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet
default:
}

act, err := sm.LoadActor(ctx, m.VMMessage().From, cur)
if err != nil {
return nil, nil, cid.Cid{}, err
}

// we either have no messages from the sender, or the latest message we found has a lower nonce than the one being searched for,
// either way, no reason to lookback, it ain't there
if act.Nonce == 0 || act.Nonce < m.VMMessage().Nonce {
if curActor == nil || curActor.Nonce == 0 || curActor.Nonce < mNonce {
return nil, nil, cid.Undef, nil
}

ts, err := sm.cs.LoadTipSet(cur.Parents())
pts, err := sm.cs.LoadTipSet(cur.Parents())
if err != nil {
return nil, nil, cid.Undef, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err)
return nil, nil, cid.Undef, xerrors.Errorf("failed to load tipset during msg wait searchback: %w", err)
}

r, foundMsg, err := sm.tipsetExecutedMessage(ts, m.Cid(), m.VMMessage())
if err != nil {
return nil, nil, cid.Undef, fmt.Errorf("checking for message execution during lookback: %w", err)
act, err := sm.LoadActor(ctx, mFromId, pts)
actorNoExist := errors.Is(err, types.ErrActorNotFound)
if err != nil && !actorNoExist {
return nil, nil, cid.Cid{}, xerrors.Errorf("failed to load the actor: %w", err)
}

if r != nil {
return ts, r, foundMsg, nil
// check that between cur and parent tipset the nonce fell into range of our message
if actorNoExist || (curActor.Nonce > mNonce && act.Nonce <= mNonce) {
r, foundMsg, err := sm.tipsetExecutedMessage(cur, m.Cid(), m.VMMessage())
if err != nil {
return nil, nil, cid.Undef, xerrors.Errorf("checking for message execution during lookback: %w", err)
}

if r != nil {
return pts, r, foundMsg, nil
}
}

cur = ts
cur = pts
curActor = act
}
}

Expand Down
17 changes: 8 additions & 9 deletions chain/sub/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,25 +172,24 @@ func fetchCids(
cids []cid.Cid,
cb func(int, blocks.Block) error,
) error {
fetchedBlocks := bserv.GetBlocks(ctx, cids)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

cidIndex := make(map[cid.Cid]int)
for i, c := range cids {
cidIndex[c] = i
}
if len(cids) != len(cidIndex) {
return fmt.Errorf("duplicate CIDs in fetchCids input")
}

fetchedBlocks := bserv.GetBlocks(ctx, cids)

for i := 0; i < len(cids); i++ {
select {
case block, ok := <-fetchedBlocks:
if !ok {
// Closed channel, no more blocks fetched, check if we have all
// of the CIDs requested.
// FIXME: Review this check. We don't call the callback on the
// last index?
if i == len(cids)-1 {
break
}

return fmt.Errorf("failed to fetch all messages")
}

Expand Down
63 changes: 63 additions & 0 deletions chain/sub/incoming_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package sub

import (
"context"
"testing"

address "github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)

type getter struct {
msgs []*types.Message
}

func (g *getter) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { panic("NYI") }

func (g *getter) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
ch := make(chan blocks.Block, len(g.msgs))
for _, m := range g.msgs {
by, err := m.Serialize()
if err != nil {
panic(err)
}
b, err := blocks.NewBlockWithCid(by, m.Cid())
if err != nil {
panic(err)
}
ch <- b
}
close(ch)
return ch
}

func TestFetchCidsWithDedup(t *testing.T) {
msgs := []*types.Message{}
for i := 0; i < 10; i++ {
msgs = append(msgs, &types.Message{
From: address.TestAddress,
To: address.TestAddress,

Nonce: uint64(i),
})
}
cids := []cid.Cid{}
for _, m := range msgs {
cids = append(cids, m.Cid())
}
g := &getter{msgs}

// the cids have a duplicate
res, err := FetchMessagesByCids(context.TODO(), g, append(cids, cids[0]))

t.Logf("err: %+v", err)
t.Logf("res: %+v", res)
if err == nil {
t.Errorf("there should be an error")
}
if err == nil && (res[0] == nil || res[len(res)-1] == nil) {
t.Fatalf("there is a nil message: first %p, last %p", res[0], res[len(res)-1])
}
}
22 changes: 17 additions & 5 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ func (syncer *Syncer) Stop() {
// This should be called when connecting to new peers, and additionally
// when receiving new blocks from the network
func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool {
defer func() {
if err := recover(); err != nil {
log.Errorf("panic in InformNewHead: ", err)
}
}()

ctx := context.Background()
if fts == nil {
log.Errorf("got nil tipset in InformNewHead")
Expand Down Expand Up @@ -1281,9 +1287,11 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet

blockSet := []*types.TipSet{incoming}

// Parent of the new (possibly better) tipset that we need to fetch next.
at := incoming.Parents()

// we want to sync all the blocks until the height above the block we have
// we want to sync all the blocks until the height above our
// best tipset so far
untilHeight := known.Height() + 1

ss.SetHeight(blockSet[len(blockSet)-1].Height())
Expand Down Expand Up @@ -1377,13 +1385,17 @@ loop:
}

base := blockSet[len(blockSet)-1]
if base.Parents() == known.Parents() {
// common case: receiving a block thats potentially part of the same tipset as our best block
if base.IsChildOf(known) {
// common case: receiving blocks that are building on top of our best tipset
return blockSet, nil
}

if types.CidArrsEqual(base.Parents().Cids(), known.Cids()) {
// common case: receiving blocks that are building on top of our best tipset
knownParent, err := syncer.store.LoadTipSet(known.Parents())
if err != nil {
return nil, xerrors.Errorf("failed to load next local tipset: %w", err)
}
if base.IsChildOf(knownParent) {
// common case: receiving a block thats potentially part of the same tipset as our best block
return blockSet, nil
}

Expand Down