Skip to content

Commit

Permalink
polygon/sync,p2p: chain tip block fetch improvements (#12224)
Browse files Browse the repository at this point in the history
relates to #12182

improves p2p block fetching performance at tip by:
- fetching blocks corresponding to new block hashes event using a new
method FetchBlocksBackwardsByHash instead of by number because when
fetching by number the peer responds with the canonical block which in
some cases may not be the same block as the hash in the new block hashes
event causing us to miss adding a block to our canonical chain builder
- adds an option to override default fetcher configs related to p2p
request retries, timeout, etc - at tip we want 1 sec timeout (instead of
5) when calling FetchBlockByHash and 0 retries (there will be more block
events in the queue after it) to maximise performance
- adds some additional helpful logging for debugging purposes


After PR things look much better
<img width="1111" alt="Screenshot 2024-10-08 at 09 43 59"
src="https://github.com/user-attachments/assets/be36fc95-a902-4615-9243-c10f92c903fe">
  • Loading branch information
taratorio authored Oct 8, 2024
1 parent 9c84bf5 commit 4d3b905
Show file tree
Hide file tree
Showing 12 changed files with 1,078 additions and 284 deletions.
216 changes: 175 additions & 41 deletions polygon/p2p/fetcher_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,50 +21,68 @@ import (
"errors"
"fmt"
"reflect"
"slices"
"time"

"github.com/cenkalti/backoff/v4"

"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/generics"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/eth/protocols/eth"
)

type RequestIdGenerator func() uint64

type FetcherConfig struct {
responseTimeout time.Duration
retryBackOff time.Duration
maxRetries uint64
}

type Fetcher interface {
// FetchHeaders fetches [start,end) headers from a peer. Blocks until data is received.
FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error)
FetchHeaders(
ctx context.Context,
start uint64,
end uint64,
peerId *PeerId,
opts ...FetcherOption,
) (FetcherResponse[[]*types.Header], error)

// FetchBodies fetches block bodies for the given headers from a peer. Blocks until data is received.
FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error)
// FetchBlocks fetches headers and bodies for a given [start, end) range from a peer and
// assembles them into blocks. Blocks until data is received.
FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error)
FetchBodies(
ctx context.Context,
headers []*types.Header,
peerId *PeerId,
opts ...FetcherOption,
) (FetcherResponse[[]*types.Body], error)

// FetchBlocksBackwardsByHash fetches a number of blocks backwards starting from a block hash. Max amount is 1024
// blocks back. Blocks until data is received.
FetchBlocksBackwardsByHash(
ctx context.Context,
hash common.Hash,
amount uint64,
peerId *PeerId,
opts ...FetcherOption,
) (FetcherResponse[[]*types.Block], error)
}

func NewFetcher(
logger log.Logger,
config FetcherConfig,
messageListener MessageListener,
messageSender MessageSender,
requestIdGenerator RequestIdGenerator,
) Fetcher {
return newFetcher(config, messageListener, messageSender, requestIdGenerator)
return newFetcher(logger, config, messageListener, messageSender, requestIdGenerator)
}

func newFetcher(
logger log.Logger,
config FetcherConfig,
messageListener MessageListener,
messageSender MessageSender,
requestIdGenerator RequestIdGenerator,
) *fetcher {
return &fetcher{
logger: logger,
config: config,
messageListener: messageListener,
messageSender: messageSender,
Expand All @@ -73,6 +91,7 @@ func newFetcher(
}

type fetcher struct {
logger log.Logger
config FetcherConfig
messageListener MessageListener
messageSender MessageSender
Expand All @@ -84,7 +103,13 @@ type FetcherResponse[T any] struct {
TotalSize int
}

func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) {
func (f *fetcher) FetchHeaders(
ctx context.Context,
start uint64,
end uint64,
peerId *PeerId,
opts ...FetcherOption,
) (FetcherResponse[[]*types.Header], error) {
if start >= end {
return FetcherResponse[[]*types.Header]{}, &ErrInvalidFetchHeadersRange{
start: start,
Expand All @@ -103,8 +128,8 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
if amount%eth.MaxHeadersServe > 0 {
numChunks++
}
totalHeadersSize := 0

totalHeadersSize := 0
headers := make([]*types.Header, 0, amount)
for chunkNum := uint64(0); chunkNum < numChunks; chunkNum++ {
chunkStart := start + chunkNum*eth.MaxHeadersServe
Expand All @@ -113,9 +138,14 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
// a node may not respond with all MaxHeadersServe in 1 response,
// so we keep on consuming from last received number (akin to consuming a paging api)
// until we have all headers of the chunk or the peer stopped returning headers
headersChunk, err := fetchWithRetry(f.config, func() (FetcherResponse[[]*types.Header], error) {
return f.fetchHeaders(ctx, chunkStart, chunkEnd, peerId)
})
request := &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{
Number: chunkStart,
},
Amount: chunkEnd - chunkStart,
}

headersChunk, err := f.fetchHeadersWithRetry(ctx, request, peerId, f.config.CopyWithOptions(opts...))
if err != nil {
return FetcherResponse[[]*types.Header]{}, err
}
Expand All @@ -139,7 +169,12 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
}, nil
}

func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) {
func (f *fetcher) FetchBodies(
ctx context.Context,
headers []*types.Header,
peerId *PeerId,
opts ...FetcherOption,
) (FetcherResponse[[]*types.Body], error) {
var bodies []*types.Body
totalBodiesSize := 0

Expand All @@ -155,9 +190,7 @@ func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peer
headersChunk = headers
}

bodiesChunk, err := fetchWithRetry(f.config, func() (*FetcherResponse[[]*types.Body], error) {
return f.fetchBodies(ctx, headersChunk, peerId)
})
bodiesChunk, err := f.fetchBodiesWithRetry(ctx, headersChunk, peerId, f.config.CopyWithOptions(opts...))
if err != nil {
return FetcherResponse[[]*types.Body]{}, err
}
Expand All @@ -176,29 +209,97 @@ func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peer
}, nil
}

func (f *fetcher) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) {
headers, err := f.FetchHeaders(ctx, start, end, peerId)
func (f *fetcher) FetchBlocksBackwardsByHash(
ctx context.Context,
hash common.Hash,
amount uint64,
peerId *PeerId,
opts ...FetcherOption,
) (FetcherResponse[[]*types.Block], error) {
if amount == 0 || amount > eth.MaxHeadersServe {
return FetcherResponse[[]*types.Block]{}, fmt.Errorf("%w: amount=%d", ErrInvalidFetchBlocksAmount, amount)
}
request := &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{
Hash: hash,
},
Amount: amount,
Reverse: true,
}

config := f.config.CopyWithOptions(opts...)
headersResponse, err := f.fetchHeadersWithRetry(ctx, request, peerId, config)
if err != nil {
return FetcherResponse[[]*types.Block]{}, err
}

bodies, err := f.FetchBodies(ctx, headers.Data, peerId)
headers := headersResponse.Data
if len(headers) == 0 {
return FetcherResponse[[]*types.Block]{}, &ErrMissingHeaderHash{requested: hash}
}

startHeader := headers[0]
if startHeader.Hash() != hash {
err := &ErrUnexpectedHeaderHash{requested: hash, received: startHeader.Hash()}
return FetcherResponse[[]*types.Block]{}, err
}

offset := amount - 1 // safe, we check that amount > 0 at function start
startNum := startHeader.Number.Uint64()
if startNum > offset {
startNum = startNum - offset
} else {
startNum = 0
}

slices.Reverse(headers)

if err := f.validateHeadersResponse(headers, startNum, amount); err != nil {
return FetcherResponse[[]*types.Block]{}, err
}

bodiesResponse, err := f.FetchBodies(ctx, headers, peerId, opts...)
if err != nil {
return FetcherResponse[[]*types.Block]{}, err
}

blocks := make([]*types.Block, len(headers.Data))
for i, header := range headers.Data {
blocks[i] = types.NewBlockFromNetwork(header, bodies.Data[i])
bodies := bodiesResponse.Data
blocks := make([]*types.Block, len(headers))
for i, header := range headers {
blocks[i] = types.NewBlockFromNetwork(header, bodies[i])
}

return FetcherResponse[[]*types.Block]{
response := FetcherResponse[[]*types.Block]{
Data: blocks,
TotalSize: headers.TotalSize + bodies.TotalSize,
}, nil
TotalSize: headersResponse.TotalSize + bodiesResponse.TotalSize,
}

return response, nil
}

func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) {
func (f *fetcher) fetchHeadersWithRetry(
ctx context.Context,
request *eth.GetBlockHeadersPacket,
peerId *PeerId,
config FetcherConfig,
) (FetcherResponse[[]*types.Header], error) {
attempt := 1
return fetchWithRetry(config, func() (FetcherResponse[[]*types.Header], error) {
response, err := f.fetchHeaders(ctx, request, peerId, config.responseTimeout)
if err != nil {
f.logger.Debug("[p2p.fetcher] fetching headers failure", "attempt", attempt, "peerId", peerId, "err", err)
attempt++
}
return response, err
})
}

func (f *fetcher) fetchHeaders(
ctx context.Context,
request *eth.GetBlockHeadersPacket,
peerId *PeerId,
responseTimeout time.Duration,
) (FetcherResponse[[]*types.Header], error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -217,19 +318,14 @@ func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *P

requestId := f.requestIdGenerator()
err := f.messageSender.SendGetBlockHeaders(ctx, peerId, eth.GetBlockHeadersPacket66{
RequestId: requestId,
GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{
Number: start,
},
Amount: end - start,
},
RequestId: requestId,
GetBlockHeadersPacket: request,
})
if err != nil {
return FetcherResponse[[]*types.Header]{}, err
}

message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockHeaders(peerId, requestId))
message, messageSize, err := awaitResponse(ctx, responseTimeout, messages, filterBlockHeaders(peerId, requestId))
if err != nil {
return FetcherResponse[[]*types.Header]{}, err
}
Expand All @@ -249,6 +345,7 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount
}
}

var prevHash common.Hash
for i, header := range headers {
expectedHeaderNum := start + uint64(i)
currentHeaderNumber := header.Number.Uint64()
Expand All @@ -258,6 +355,21 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount
expected: expectedHeaderNum,
}
}

if i == 0 {
prevHash = header.Hash()
continue
}

if prevHash != header.ParentHash {
return &ErrNonSequentialHeaderHashes{
hash: header.Hash(),
parentHash: header.ParentHash,
prevHash: prevHash,
}
}

prevHash = header.Hash()
}

if headersLen < amount {
Expand All @@ -271,7 +383,29 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount
return nil
}

func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (*FetcherResponse[[]*types.Body], error) {
func (f *fetcher) fetchBodiesWithRetry(
ctx context.Context,
headers []*types.Header,
peerId *PeerId,
config FetcherConfig,
) (*FetcherResponse[[]*types.Body], error) {
attempt := 1
return fetchWithRetry(config, func() (*FetcherResponse[[]*types.Body], error) {
response, err := f.fetchBodies(ctx, headers, peerId, config.responseTimeout)
if err != nil {
f.logger.Debug("[p2p.fetcher] fetching bodies failure", "attempt", attempt, "peerId", peerId, "err", err)
attempt++
}
return response, err
})
}

func (f *fetcher) fetchBodies(
ctx context.Context,
headers []*types.Header,
peerId *PeerId,
responseTimeout time.Duration,
) (*FetcherResponse[[]*types.Body], error) {
// cleanup for the chan message observer
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -303,7 +437,7 @@ func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peer
return nil, err
}

message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockBodies(peerId, requestId))
message, messageSize, err := awaitResponse(ctx, responseTimeout, messages, filterBlockBodies(peerId, requestId))
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 4d3b905

Please sign in to comment.