Skip to content

Commit

Permalink
Merge pull request #8130 from ethereum-optimism/seb/receipts-provider
Browse files Browse the repository at this point in the history
op-service: Refactor `EthClient` with `ReceiptsProvider` abstraction
  • Loading branch information
trianglesphere authored Dec 5, 2023
2 parents 658d2a6 + 0745c50 commit 1c01380
Show file tree
Hide file tree
Showing 12 changed files with 939 additions and 546 deletions.
109 changes: 33 additions & 76 deletions op-service/sources/eth_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ type EthClientConfig struct {
// If this is 0 then the client does not fall back to less optimal but available methods.
MethodResetDuration time.Duration

// [OPTIONAL] The reth DB path to fetch receipts from
// [OPTIONAL] The reth DB path to fetch receipts from.
// If it is specified, the rethdb receipts fetcher will be used
// and the RPC configuration parameters don't need to be set.
RethDBPath string
}

Expand All @@ -80,6 +82,15 @@ func (c *EthClientConfig) Check() error {
if c.PayloadsCacheSize < 0 {
return fmt.Errorf("invalid payloads cache size: %d", c.PayloadsCacheSize)
}
if c.RethDBPath != "" {
if buildRethdb {
// If the rethdb path is set, we use the rethdb receipts fetcher and skip creating
// an RCP receipts fetcher, so below rpc config parameters don't need to be checked.
return nil
} else {
return fmt.Errorf("rethdb path specified, but built without rethdb support")
}
}
if c.MaxConcurrentRequests < 1 {
return fmt.Errorf("expected at least 1 concurrent request, but max is %d", c.MaxConcurrentRequests)
}
Expand All @@ -96,21 +107,14 @@ func (c *EthClientConfig) Check() error {
type EthClient struct {
client client.RPC

maxBatchSize int
recProvider ReceiptsProvider

trustRPC bool

mustBePostMerge bool

provKind RPCProviderKind

log log.Logger

// cache receipts in bundles per block hash
// We cache the receipts fetching job to not lose progress when we have to retry the `Fetch` call
// common.Hash -> *receiptsFetchingJob
receiptsCache *caching.LRUCache[common.Hash, *receiptsFetchingJob]

// cache transactions in bundles per block hash
// common.Hash -> types.Transactions
transactionsCache *caching.LRUCache[common.Hash, types.Transactions]
Expand All @@ -122,46 +126,6 @@ type EthClient struct {
// cache payloads by hash
// common.Hash -> *eth.ExecutionPayload
payloadsCache *caching.LRUCache[common.Hash, *eth.ExecutionPayload]

// availableReceiptMethods tracks which receipt methods can be used for fetching receipts
// This may be modified concurrently, but we don't lock since it's a single
// uint64 that's not critical (fine to miss or mix up a modification)
availableReceiptMethods ReceiptsFetchingMethod

// lastMethodsReset tracks when availableReceiptMethods was last reset.
// When receipt-fetching fails it falls back to available methods,
// but periodically it will try to reset to the preferred optimal methods.
lastMethodsReset time.Time

// methodResetDuration defines how long we take till we reset lastMethodsReset
methodResetDuration time.Duration

// [OPTIONAL] The reth DB path to fetch receipts from
rethDbPath string
}

func (s *EthClient) PickReceiptsMethod(txCount uint64) ReceiptsFetchingMethod {
if now := time.Now(); now.Sub(s.lastMethodsReset) > s.methodResetDuration {
m := AvailableReceiptsFetchingMethods(s.provKind)
if s.availableReceiptMethods != m {
s.log.Warn("resetting back RPC preferences, please review RPC provider kind setting", "kind", s.provKind.String())
}
s.availableReceiptMethods = m
s.lastMethodsReset = now
}
return PickBestReceiptsFetchingMethod(s.provKind, s.availableReceiptMethods, txCount)
}

func (s *EthClient) OnReceiptsMethodErr(m ReceiptsFetchingMethod, err error) {
if unusableMethod(err) {
// clear the bit of the method that errored
s.availableReceiptMethods &^= m
s.log.Warn("failed to use selected RPC method for receipt fetching, temporarily falling back to alternatives",
"provider_kind", s.provKind, "failed_method", m, "fallback", s.availableReceiptMethods, "err", err)
} else {
s.log.Debug("failed to use selected RPC method for receipt fetching, but method does appear to be available, so we continue to use it",
"provider_kind", s.provKind, "failed_method", m, "fallback", s.availableReceiptMethods&^m, "err", err)
}
}

// NewEthClient returns an [EthClient], wrapping an RPC with bindings to fetch ethereum data with added error logging,
Expand All @@ -170,22 +134,18 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
if err := config.Check(); err != nil {
return nil, fmt.Errorf("bad config, cannot create L1 source: %w", err)
}

client = LimitRPC(client, config.MaxConcurrentRequests)
recProvider := newRecProviderFromConfig(client, log, metrics, config)
return &EthClient{
client: client,
maxBatchSize: config.MaxRequestsPerBatch,
trustRPC: config.TrustRPC,
mustBePostMerge: config.MustBePostMerge,
provKind: config.RPCProviderKind,
log: log,
receiptsCache: caching.NewLRUCache[common.Hash, *receiptsFetchingJob](metrics, "receipts", config.ReceiptsCacheSize),
transactionsCache: caching.NewLRUCache[common.Hash, types.Transactions](metrics, "txs", config.TransactionsCacheSize),
headersCache: caching.NewLRUCache[common.Hash, eth.BlockInfo](metrics, "headers", config.HeadersCacheSize),
payloadsCache: caching.NewLRUCache[common.Hash, *eth.ExecutionPayload](metrics, "payloads", config.PayloadsCacheSize),
availableReceiptMethods: AvailableReceiptsFetchingMethods(config.RPCProviderKind),
lastMethodsReset: time.Now(),
methodResetDuration: config.MethodResetDuration,
rethDbPath: config.RethDBPath,
client: client,
recProvider: recProvider,
trustRPC: config.TrustRPC,
mustBePostMerge: config.MustBePostMerge,
log: log,
transactionsCache: caching.NewLRUCache[common.Hash, types.Transactions](metrics, "txs", config.TransactionsCacheSize),
headersCache: caching.NewLRUCache[common.Hash, eth.BlockInfo](metrics, "headers", config.HeadersCacheSize),
payloadsCache: caching.NewLRUCache[common.Hash, *eth.ExecutionPayload](metrics, "payloads", config.PayloadsCacheSize),
}, nil
}

Expand Down Expand Up @@ -354,24 +314,21 @@ func (s *EthClient) PayloadByLabel(ctx context.Context, label eth.BlockLabel) (*
func (s *EthClient) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) {
info, txs, err := s.InfoAndTxsByHash(ctx, blockHash)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("querying block: %w", err)
}
// Try to reuse the receipts fetcher because is caches the results of intermediate calls. This means
// that if just one of many calls fail, we only retry the failed call rather than all of the calls.
// The underlying fetcher uses the receipts hash to verify receipt integrity.
var job *receiptsFetchingJob
if v, ok := s.receiptsCache.Get(blockHash); ok {
job = v
} else {
txHashes := eth.TransactionsToHashes(txs)
job = NewReceiptsFetchingJob(s, s.client, s.maxBatchSize, eth.ToBlockID(info), info.ReceiptHash(), txHashes, s.rethDbPath)
s.receiptsCache.Add(blockHash, job)
}
receipts, err := job.Fetch(ctx)

txHashes, block := eth.TransactionsToHashes(txs), eth.ToBlockID(info)
receipts, err := s.recProvider.FetchReceipts(ctx, block, txHashes)
if err != nil {
return nil, nil, err
}

if !s.trustRPC {
if err := validateReceipts(block, info.ReceiptHash(), txHashes, receipts); err != nil {
return info, nil, fmt.Errorf("invalid receipts: %w", err)
}
}

return info, receipts, nil
}

Expand Down
44 changes: 44 additions & 0 deletions op-service/sources/eth_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
crand "crypto/rand"
"math/big"
"math/rand"
"testing"

"github.com/stretchr/testify/mock"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
)

type mockRPC struct {
Expand Down Expand Up @@ -177,3 +179,45 @@ func TestEthClient_WrongInfoByHash(t *testing.T) {
require.Error(t, err, "cannot accept the wrong block")
m.Mock.AssertExpectations(t)
}

func TestEthClient_validateReceipts(t *testing.T) {
require := require.New(t)
mrpc := new(mockRPC)
mrp := new(mockReceiptsProvider)
const numTxs = 4
block, receipts := randomRpcBlockAndReceipts(rand.New(rand.NewSource(420)), numTxs)
txHashes := receiptTxHashes(receipts)
ctx := context.Background()

// mutate a field to make validation fail.
receipts[2].Bloom[0] = 1

mrpc.On("CallContext", ctx, mock.AnythingOfType("**sources.rpcBlock"),
"eth_getBlockByHash", []any{block.Hash, true}).
Run(func(args mock.Arguments) {
*(args[1].(**rpcBlock)) = block
}).
Return([]error{nil}).Once()

mrp.On("FetchReceipts", ctx, block.BlockID(), txHashes).
Return(types.Receipts(receipts), error(nil)).Once()

ethcl := newEthClientWithCaches(nil, numTxs)
ethcl.client = mrpc
ethcl.recProvider = mrp
ethcl.trustRPC = false

_, _, err := ethcl.FetchReceipts(ctx, block.Hash)
require.ErrorContains(err, "invalid receipts")

mrpc.AssertExpectations(t)
mrp.AssertExpectations(t)
}

func newEthClientWithCaches(metrics caching.Metrics, cacheSize int) *EthClient {
return &EthClient{
transactionsCache: caching.NewLRUCache[common.Hash, types.Transactions](metrics, "txs", cacheSize),
headersCache: caching.NewLRUCache[common.Hash, eth.BlockInfo](metrics, "headers", cacheSize),
payloadsCache: caching.NewLRUCache[common.Hash, *eth.ExecutionPayload](metrics, "payloads", cacheSize),
}
}
Loading

0 comments on commit 1c01380

Please sign in to comment.