diff --git a/chain/exchange/client.go b/chain/exchange/client.go index 57563d5b215..371c50aed70 100644 --- a/chain/exchange/client.go +++ b/chain/exchange/client.go @@ -65,7 +65,15 @@ func NewClient(lc fx.Lifecycle, host host.Host, pmgr peermgr.MaybePeerMgr) Clien // request options without disrupting external calls. In the future the // consumers should be forced to use a more standardized service and // adhere to a single API derived from this function. -func (c *client) doRequest(ctx context.Context, req *Request, singlePeer *peer.ID) (*validatedResponse, error) { +func (c *client) doRequest( + ctx context.Context, + req *Request, + singlePeer *peer.ID, + // In the `GetChainMessages` case, we won't request the headers but we still + // need them to check the integrity of the `CompactedMessages` in the response + // so the tipset blocks need to be provided by the caller. + tipsets []*types.TipSet, +) (*validatedResponse, error) { // Validate request. if req.Length == 0 { return nil, xerrors.Errorf("invalid request of length 0") @@ -116,7 +124,7 @@ func (c *client) doRequest(ctx context.Context, req *Request, singlePeer *peer.I } // Process and validate response. - validRes, err := c.processResponse(req, res) + validRes, err := c.processResponse(req, res, tipsets) if err != nil { log.Warnf("processing peer %s response failed: %s", peer.String(), err) @@ -144,7 +152,7 @@ func (c *client) doRequest(ctx context.Context, req *Request, singlePeer *peer.I // errors. Peer penalization should happen here then, before returning, so // we can apply the correct penalties depending on the cause of the error. // FIXME: Add the `peer` as argument once we implement penalties. -func (c *client) processResponse(req *Request, res *Response) (*validatedResponse, error) { +func (c *client) processResponse(req *Request, res *Response, tipsets []*types.TipSet) (*validatedResponse, error) { err := res.statusToError() if err != nil { return nil, xerrors.Errorf("status error: %s", err) @@ -176,6 +184,16 @@ func (c *client) processResponse(req *Request, res *Response) (*validatedRespons // Check for valid block sets and extract them into `TipSet`s. validRes.tipsets = make([]*types.TipSet, resLength) for i := 0; i < resLength; i++ { + if res.Chain[i] == nil { + return nil, xerrors.Errorf("response with nil tipset in pos %d", i) + } + for blockIdx, block := range res.Chain[i].Blocks { + if block == nil { + return nil, xerrors.Errorf("tipset with nil block in pos %d", blockIdx) + // FIXME: Maybe we should move this check to `NewTipSet`. + } + } + validRes.tipsets[i], err = types.NewTipSet(res.Chain[i].Blocks) if err != nil { return nil, xerrors.Errorf("invalid tipset blocks at height (head - %d): %w", i, err) @@ -210,36 +228,69 @@ func (c *client) processResponse(req *Request, res *Response) (*validatedRespons // If the headers were also returned check that the compression // indexes are valid before `toFullTipSets()` is called by the // consumer. - for tipsetIdx := 0; tipsetIdx < resLength; tipsetIdx++ { - msgs := res.Chain[tipsetIdx].Messages - blocksNum := len(res.Chain[tipsetIdx].Blocks) - if len(msgs.BlsIncludes) != blocksNum { - return nil, xerrors.Errorf("BlsIncludes (%d) does not match number of blocks (%d)", - len(msgs.BlsIncludes), blocksNum) + err := c.validateCompressedIndices(res.Chain) + if err != nil { + return nil, err + } + } else { + // If we didn't request the headers they should have been provided + // by the caller. + if len(tipsets) < len(res.Chain) { + return nil, xerrors.Errorf("not enought tipsets provided for message response validation, needed %d, have %d", len(res.Chain), len(tipsets)) + } + chain := make([]*BSTipSet, 0, resLength) + for i, resChain := range res.Chain { + next := &BSTipSet{ + Blocks: tipsets[i].Blocks(), + Messages: resChain.Messages, } - if len(msgs.SecpkIncludes) != blocksNum { - return nil, xerrors.Errorf("SecpkIncludes (%d) does not match number of blocks (%d)", - len(msgs.SecpkIncludes), blocksNum) + chain = append(chain, next) + } + + err := c.validateCompressedIndices(chain) + if err != nil { + return nil, err + } + } + } + + return validRes, nil +} + +func (c *client) validateCompressedIndices(chain []*BSTipSet) error { + resLength := len(chain) + for tipsetIdx := 0; tipsetIdx < resLength; tipsetIdx++ { + msgs := chain[tipsetIdx].Messages + blocksNum := len(chain[tipsetIdx].Blocks) + + if len(msgs.BlsIncludes) != blocksNum { + return xerrors.Errorf("BlsIncludes (%d) does not match number of blocks (%d)", + len(msgs.BlsIncludes), blocksNum) + } + + if len(msgs.SecpkIncludes) != blocksNum { + return xerrors.Errorf("SecpkIncludes (%d) does not match number of blocks (%d)", + len(msgs.SecpkIncludes), blocksNum) + } + + for blockIdx := 0; blockIdx < blocksNum; blockIdx++ { + for _, mi := range msgs.BlsIncludes[blockIdx] { + if int(mi) >= len(msgs.Bls) { + return xerrors.Errorf("index in BlsIncludes (%d) exceeds number of messages (%d)", + mi, len(msgs.Bls)) } - for blockIdx := 0; blockIdx < blocksNum; blockIdx++ { - for _, mi := range msgs.BlsIncludes[blockIdx] { - if int(mi) >= len(msgs.Bls) { - return nil, xerrors.Errorf("index in BlsIncludes (%d) exceeds number of messages (%d)", - mi, len(msgs.Bls)) - } - } - for _, mi := range msgs.SecpkIncludes[blockIdx] { - if int(mi) >= len(msgs.Secpk) { - return nil, xerrors.Errorf("index in SecpkIncludes (%d) exceeds number of messages (%d)", - mi, len(msgs.Secpk)) - } - } + } + + for _, mi := range msgs.SecpkIncludes[blockIdx] { + if int(mi) >= len(msgs.Secpk) { + return xerrors.Errorf("index in SecpkIncludes (%d) exceeds number of messages (%d)", + mi, len(msgs.Secpk)) } } } } - return validRes, nil + return nil } // GetBlocks implements Client.GetBlocks(). Refer to the godocs there. @@ -259,7 +310,7 @@ func (c *client) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) Options: Headers, } - validRes, err := c.doRequest(ctx, req, nil) + validRes, err := c.doRequest(ctx, req, nil, nil) if err != nil { return nil, err } @@ -277,7 +328,7 @@ func (c *client) GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipS Options: Headers | Messages, } - validRes, err := c.doRequest(ctx, req, &peer) + validRes, err := c.doRequest(ctx, req, &peer, nil) if err != nil { return nil, err } @@ -288,7 +339,10 @@ func (c *client) GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipS } // GetChainMessages implements Client.GetChainMessages(). Refer to the godocs there. -func (c *client) GetChainMessages(ctx context.Context, head *types.TipSet, length uint64) ([]*CompactedMessages, error) { +func (c *client) GetChainMessages(ctx context.Context, tipsets []*types.TipSet) ([]*CompactedMessages, error) { + head := tipsets[0] + length := uint64(len(tipsets)) + ctx, span := trace.StartSpan(ctx, "GetChainMessages") if span.IsRecordingEvents() { span.AddAttributes( @@ -304,7 +358,7 @@ func (c *client) GetChainMessages(ctx context.Context, head *types.TipSet, lengt Options: Messages, } - validRes, err := c.doRequest(ctx, req, nil) + validRes, err := c.doRequest(ctx, req, nil, tipsets) if err != nil { return nil, err } diff --git a/chain/exchange/interfaces.go b/chain/exchange/interfaces.go index 79d8fd4b1e2..acc0854da67 100644 --- a/chain/exchange/interfaces.go +++ b/chain/exchange/interfaces.go @@ -32,10 +32,9 @@ type Client interface { // or less. GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) - // GetChainMessages fetches messages from the network, from the provided - // tipset *backwards*, returning the messages from as many tipsets as the - // count parameter, or less. - GetChainMessages(ctx context.Context, head *types.TipSet, length uint64) ([]*CompactedMessages, error) + // GetChainMessages fetches messages from the network, starting from the first provided tipset + // and returning messages from as many tipsets as requested or less. + GetChainMessages(ctx context.Context, tipsets []*types.TipSet) ([]*CompactedMessages, error) // GetFullTipSet fetches a full tipset from a given peer. If successful, // the fetched object contains block headers and all messages in full form. diff --git a/chain/exchange/protocol.go b/chain/exchange/protocol.go index ac02cf60f31..2114793359f 100644 --- a/chain/exchange/protocol.go +++ b/chain/exchange/protocol.go @@ -139,6 +139,8 @@ func (res *Response) statusToError() error { // FIXME: Rename. type BSTipSet struct { + // List of blocks belonging to a single tipset to which the + // `CompactedMessages` are linked. Blocks []*types.BlockHeader Messages *CompactedMessages } diff --git a/chain/sync.go b/chain/sync.go index 74dc5aa1aa3..b47365b4b78 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1555,7 +1555,7 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet failed := false for offset := 0; !failed && offset < nreq; { nextI := j + offset - nextHeader := headers[nextI] + lastI := j + nreq var requestErr error var requestResult []*exchange.CompactedMessages @@ -1566,7 +1566,7 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet log.Infof("fetching messages at %d", startOffset+nextI) } - result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset)) + result, err := syncer.Exchange.GetChainMessages(ctx, headers[nextI:lastI]) if err != nil { requestErr = multierror.Append(requestErr, err) } else {