Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry fetch logs #317

Merged
merged 4 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ docker_manifests:
image_templates:
- 'avaplatform/awm-relayer:{{ .Tag }}-amd64'
- 'avaplatform/awm-relayer:{{ .Tag }}-arm64'
# If tag is an rc, do not push the latest tag
skip_push: auto
release:
# Repo in which the release will be created.
# Default is extracted from the origin remote URL or empty if its private hosted.
Expand Down
40 changes: 33 additions & 7 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ package types
import (
"context"
"errors"
"math/big"
"fmt"
"time"

avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/subnet-evm/core/types"
Expand All @@ -19,6 +20,11 @@ import (
var WarpPrecompileLogFilter = warp.WarpABI.Events["SendWarpMessage"].ID
var ErrInvalidLog = errors.New("invalid warp message log")

const (
filterLogsRetries = 5
retryInterval = 1 * time.Second
)

// WarpBlockInfo describes the block height and logs needed to process Warp messages.
// WarpBlockInfo instances are populated by the subscriber, and forwared to the
// Listener to process
Expand All @@ -44,12 +50,7 @@ func NewWarpBlockInfo(header *types.Header, ethClient ethclient.Client) (*WarpBl
)
// Check if the block contains warp logs, and fetch them from the client if it does
if header.Bloom.Test(WarpPrecompileLogFilter[:]) {
logs, err = ethClient.FilterLogs(context.Background(), interfaces.FilterQuery{
Topics: [][]common.Hash{{WarpPrecompileLogFilter}},
Addresses: []common.Address{warp.ContractAddress},
FromBlock: big.NewInt(int64(header.Number.Uint64())),
ToBlock: big.NewInt(int64(header.Number.Uint64())),
})
logs, err = fetchWarpLogsWithRetries(ethClient, header, filterLogsRetries, retryInterval)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -101,3 +102,28 @@ func UnpackWarpMessage(unsignedMsgBytes []byte) (*avalancheWarp.UnsignedMessage,
}
return unsignedMsg, nil
}

// The node serving the filter logs request may be behind the node serving the block header request,
// so we retry a few times to ensure we get the logs
func fetchWarpLogsWithRetries(ethClient ethclient.Client, header *types.Header, numRetries int, retryInterval time.Duration) ([]types.Log, error) {
var (
logs []types.Log
err error
)

for i := 0; i < numRetries; i++ {
logs, err = ethClient.FilterLogs(context.Background(), interfaces.FilterQuery{
Topics: [][]common.Hash{{WarpPrecompileLogFilter}},
Addresses: []common.Address{warp.ContractAddress},
FromBlock: header.Number,
ToBlock: header.Number,
})
if err == nil {
return logs, nil
}
if i != numRetries-1 {
time.Sleep(retryInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to consider some type of exponential back off between retries if the height difference between nodes can potentially be significant.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of the nonce ordering fix, I'm going to implement a generic retry helper, and will consider adding exponential backoffs there.

}
}
return nil, fmt.Errorf("failed to fetch warp logs for block %d after %d retries: %w", header.Number.Uint64(), filterLogsRetries, err)
}