Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate chain sync response indices when fetching messages #3939

Merged
merged 4 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 84 additions & 30 deletions chain/exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@ func NewClient(lc fx.Lifecycle, host host.Host, pmgr peermgr.MaybePeerMgr) Clien
// request options without disrupting external calls. In the future the
// consumers should be forced to use a more standardized service and
// adhere to a single API derived from this function.
func (c *client) doRequest(ctx context.Context, req *Request, singlePeer *peer.ID) (*validatedResponse, error) {
func (c *client) doRequest(
ctx context.Context,
req *Request,
singlePeer *peer.ID,
// In the `GetChainMessages` case, we won't request the headers but we still
// need them to check the integrity of the `CompactedMessages` in the response
// so the tipset blocks need to be provided by the caller.
tipsets []*types.TipSet,
) (*validatedResponse, error) {
// Validate request.
if req.Length == 0 {
return nil, xerrors.Errorf("invalid request of length 0")
Expand Down Expand Up @@ -116,7 +124,7 @@ func (c *client) doRequest(ctx context.Context, req *Request, singlePeer *peer.I
}

// Process and validate response.
validRes, err := c.processResponse(req, res)
validRes, err := c.processResponse(req, res, tipsets)
if err != nil {
log.Warnf("processing peer %s response failed: %s",
peer.String(), err)
Expand Down Expand Up @@ -144,7 +152,7 @@ func (c *client) doRequest(ctx context.Context, req *Request, singlePeer *peer.I
// errors. Peer penalization should happen here then, before returning, so
// we can apply the correct penalties depending on the cause of the error.
// FIXME: Add the `peer` as argument once we implement penalties.
func (c *client) processResponse(req *Request, res *Response) (*validatedResponse, error) {
func (c *client) processResponse(req *Request, res *Response, tipsets []*types.TipSet) (*validatedResponse, error) {
err := res.statusToError()
if err != nil {
return nil, xerrors.Errorf("status error: %s", err)
Expand Down Expand Up @@ -176,6 +184,16 @@ func (c *client) processResponse(req *Request, res *Response) (*validatedRespons
// Check for valid block sets and extract them into `TipSet`s.
validRes.tipsets = make([]*types.TipSet, resLength)
for i := 0; i < resLength; i++ {
if res.Chain[i] == nil {
return nil, xerrors.Errorf("response with nil tipset in pos %d", i)
}
for blockIdx, block := range res.Chain[i].Blocks {
if block == nil {
return nil, xerrors.Errorf("tipset with nil block in pos %d", blockIdx)
// FIXME: Maybe we should move this check to `NewTipSet`.
}
}

validRes.tipsets[i], err = types.NewTipSet(res.Chain[i].Blocks)
if err != nil {
return nil, xerrors.Errorf("invalid tipset blocks at height (head - %d): %w", i, err)
Expand Down Expand Up @@ -210,36 +228,69 @@ func (c *client) processResponse(req *Request, res *Response) (*validatedRespons
// If the headers were also returned check that the compression
// indexes are valid before `toFullTipSets()` is called by the
// consumer.
for tipsetIdx := 0; tipsetIdx < resLength; tipsetIdx++ {
msgs := res.Chain[tipsetIdx].Messages
blocksNum := len(res.Chain[tipsetIdx].Blocks)
if len(msgs.BlsIncludes) != blocksNum {
return nil, xerrors.Errorf("BlsIncludes (%d) does not match number of blocks (%d)",
len(msgs.BlsIncludes), blocksNum)
err := c.validateCompressedIndices(res.Chain)
if err != nil {
return nil, err
}
} else {
// If we didn't request the headers they should have been provided
// by the caller.
if len(tipsets) < len(res.Chain) {
return nil, xerrors.Errorf("not enought tipsets provided for message response validation, needed %d, have %d", len(res.Chain), len(tipsets))
}
chain := make([]*BSTipSet, 0, resLength)
for i, resChain := range res.Chain {
next := &BSTipSet{
Blocks: tipsets[i].Blocks(),
Messages: resChain.Messages,
}
if len(msgs.SecpkIncludes) != blocksNum {
return nil, xerrors.Errorf("SecpkIncludes (%d) does not match number of blocks (%d)",
len(msgs.SecpkIncludes), blocksNum)
chain = append(chain, next)
}

err := c.validateCompressedIndices(chain)
if err != nil {
return nil, err
}
}
}

return validRes, nil
}

func (c *client) validateCompressedIndices(chain []*BSTipSet) error {
resLength := len(chain)
for tipsetIdx := 0; tipsetIdx < resLength; tipsetIdx++ {
msgs := chain[tipsetIdx].Messages
blocksNum := len(chain[tipsetIdx].Blocks)

if len(msgs.BlsIncludes) != blocksNum {
return xerrors.Errorf("BlsIncludes (%d) does not match number of blocks (%d)",
len(msgs.BlsIncludes), blocksNum)
}

if len(msgs.SecpkIncludes) != blocksNum {
return xerrors.Errorf("SecpkIncludes (%d) does not match number of blocks (%d)",
len(msgs.SecpkIncludes), blocksNum)
}

for blockIdx := 0; blockIdx < blocksNum; blockIdx++ {
for _, mi := range msgs.BlsIncludes[blockIdx] {
if int(mi) >= len(msgs.Bls) {
return xerrors.Errorf("index in BlsIncludes (%d) exceeds number of messages (%d)",
mi, len(msgs.Bls))
}
for blockIdx := 0; blockIdx < blocksNum; blockIdx++ {
for _, mi := range msgs.BlsIncludes[blockIdx] {
if int(mi) >= len(msgs.Bls) {
return nil, xerrors.Errorf("index in BlsIncludes (%d) exceeds number of messages (%d)",
mi, len(msgs.Bls))
}
}
for _, mi := range msgs.SecpkIncludes[blockIdx] {
if int(mi) >= len(msgs.Secpk) {
return nil, xerrors.Errorf("index in SecpkIncludes (%d) exceeds number of messages (%d)",
mi, len(msgs.Secpk))
}
}
}

for _, mi := range msgs.SecpkIncludes[blockIdx] {
if int(mi) >= len(msgs.Secpk) {
return xerrors.Errorf("index in SecpkIncludes (%d) exceeds number of messages (%d)",
mi, len(msgs.Secpk))
}
}
}
}

return validRes, nil
return nil
}

// GetBlocks implements Client.GetBlocks(). Refer to the godocs there.
Expand All @@ -259,7 +310,7 @@ func (c *client) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int)
Options: Headers,
}

validRes, err := c.doRequest(ctx, req, nil)
validRes, err := c.doRequest(ctx, req, nil, nil)
if err != nil {
return nil, err
}
Expand All @@ -277,7 +328,7 @@ func (c *client) GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipS
Options: Headers | Messages,
}

validRes, err := c.doRequest(ctx, req, &peer)
validRes, err := c.doRequest(ctx, req, &peer, nil)
if err != nil {
return nil, err
}
Expand All @@ -288,7 +339,10 @@ func (c *client) GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipS
}

// GetChainMessages implements Client.GetChainMessages(). Refer to the godocs there.
func (c *client) GetChainMessages(ctx context.Context, head *types.TipSet, length uint64) ([]*CompactedMessages, error) {
func (c *client) GetChainMessages(ctx context.Context, tipsets []*types.TipSet) ([]*CompactedMessages, error) {
head := tipsets[0]
length := uint64(len(tipsets))

ctx, span := trace.StartSpan(ctx, "GetChainMessages")
if span.IsRecordingEvents() {
span.AddAttributes(
Expand All @@ -304,7 +358,7 @@ func (c *client) GetChainMessages(ctx context.Context, head *types.TipSet, lengt
Options: Messages,
}

validRes, err := c.doRequest(ctx, req, nil)
validRes, err := c.doRequest(ctx, req, nil, tipsets)
if err != nil {
return nil, err
}
Expand Down
7 changes: 3 additions & 4 deletions chain/exchange/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ type Client interface {
// or less.
GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error)

// GetChainMessages fetches messages from the network, from the provided
// tipset *backwards*, returning the messages from as many tipsets as the
// count parameter, or less.
GetChainMessages(ctx context.Context, head *types.TipSet, length uint64) ([]*CompactedMessages, error)
// GetChainMessages fetches messages from the network, starting from the first provided tipset
// and returning messages from as many tipsets as requested or less.
GetChainMessages(ctx context.Context, tipsets []*types.TipSet) ([]*CompactedMessages, error)

// GetFullTipSet fetches a full tipset from a given peer. If successful,
// the fetched object contains block headers and all messages in full form.
Expand Down
2 changes: 2 additions & 0 deletions chain/exchange/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ func (res *Response) statusToError() error {

// FIXME: Rename.
type BSTipSet struct {
// List of blocks belonging to a single tipset to which the
// `CompactedMessages` are linked.
Blocks []*types.BlockHeader
Messages *CompactedMessages
}
Expand Down
4 changes: 2 additions & 2 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet
failed := false
for offset := 0; !failed && offset < nreq; {
nextI := j + offset
nextHeader := headers[nextI]
lastI := j + nreq

var requestErr error
var requestResult []*exchange.CompactedMessages
Expand All @@ -1566,7 +1566,7 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet
log.Infof("fetching messages at %d", startOffset+nextI)
}

result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset))
result, err := syncer.Exchange.GetChainMessages(ctx, headers[nextI:lastI])
if err != nil {
requestErr = multierror.Append(requestErr, err)
} else {
Expand Down