Skip to content

Commit

Permalink
Merge pull request #7827 from ethereum-optimism/10-24-indexer_FilterL…
Browse files Browse the repository at this point in the history
…og_client-side_consistency

feat(indexer): FilterLogs client-side consistency
  • Loading branch information
hamdiallam authored Oct 25, 2023
2 parents bf53047 + 6050d33 commit ed341cd
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 37 deletions.
26 changes: 19 additions & 7 deletions indexer/etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package etl
import (
"context"
"errors"
"fmt"
"math/big"
"time"

Expand Down Expand Up @@ -105,22 +106,33 @@ func (etl *ETL) processBatch(headers []types.Header) error {
}

headersWithLog := make(map[common.Hash]bool, len(headers))
logs, err := etl.EthClient.FilterLogs(ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts})
filterQuery := ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts}
logs, err := etl.EthClient.FilterLogs(filterQuery)
if err != nil {
batchLog.Info("failed to extract logs", "err", err)
return err
}
if len(logs) > 0 {
batchLog.Info("detected logs", "size", len(logs))

if logs.ToBlockHeader.Number.Cmp(lastHeader.Number) != 0 {
// Warn and simply wait for the provider to synchronize state
batchLog.Warn("mismatch in FilterLog#ToBlock number", "queried_to_block_number", lastHeader.Number, "reported_to_block_number", logs.ToBlockHeader.Number)
return fmt.Errorf("mismatch in FilterLog#ToBlock number")
} else if logs.ToBlockHeader.Hash() != lastHeader.Hash() {
batchLog.Error("mismatch in FitlerLog#ToBlock block hash!!!", "queried_to_block_hash", lastHeader.Hash().String(), "reported_to_block_hash", logs.ToBlockHeader.Hash().String())
return fmt.Errorf("mismatch in FitlerLog#ToBlock block hash!!!")
}

if len(logs.Logs) > 0 {
batchLog.Info("detected logs", "size", len(logs.Logs))
}

for i := range logs {
log := logs[i]
for i := range logs.Logs {
log := logs.Logs[i]
if _, ok := headerMap[log.BlockHash]; !ok {
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retrieval operations. Unlikely as long as the confirmation depth has
// been appropriately set or when we get to natively handling reorgs.
batchLog.Error("log found with block hash not in the batch", "block_hash", logs[i].BlockHash, "log_index", logs[i].Index)
batchLog.Error("log found with block hash not in the batch", "block_hash", logs.Logs[i].BlockHash, "log_index", logs.Logs[i].Index)
return errors.New("parsed log with a block hash not in the batch")
}

Expand All @@ -130,6 +142,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {

// ensure we use unique downstream references for the etl batch
headersRef := headers
etl.etlBatches <- ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs, HeadersWithLog: headersWithLog}
etl.etlBatches <- ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs.Logs, HeadersWithLog: headersWithLog}
return nil
}
72 changes: 44 additions & 28 deletions indexer/node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type EthClient interface {
TxByHash(common.Hash) (*types.Transaction, error)

StorageHash(common.Address, *big.Int) (common.Hash, error)
FilterLogs(ethereum.FilterQuery) ([]types.Log, error)
FilterLogs(ethereum.FilterQuery) (Logs, error)
}

type clnt struct {
Expand Down Expand Up @@ -122,15 +122,12 @@ func (c *clnt) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Hea
}

count := new(big.Int).Sub(endHeight, startHeight).Uint64() + 1
headers := make([]types.Header, count)
batchElems := make([]rpc.BatchElem, count)

for i := uint64(0); i < count; i++ {
height := new(big.Int).Add(startHeight, new(big.Int).SetUint64(i))
batchElems[i] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{toBlockNumArg(height), false},
Result: new(types.Header),
Error: nil,
}
batchElems[i] = rpc.BatchElem{Method: "eth_getBlockByNumber", Args: []interface{}{toBlockNumArg(height), false}, Result: &headers[i]}
}

ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
Expand All @@ -144,23 +141,21 @@ func (c *clnt) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Hea
// - Ensure integrity that they build on top of each other
// - Truncate out headers that do not exist (endHeight > "latest")
size := 0
headers := make([]types.Header, count)
for i, batchElem := range batchElems {
if batchElem.Error != nil {
return nil, batchElem.Error
if size == 0 {
return nil, batchElem.Error
} else {
break // try return whatever headers are available
}
} else if batchElem.Result == nil {
break
}

header, ok := batchElem.Result.(*types.Header)
if !ok {
return nil, fmt.Errorf("unable to transform rpc response %v into types.Header", batchElem.Result)
}
if i > 0 && header.ParentHash != headers[i-1].Hash() {
return nil, fmt.Errorf("queried header %s does not follow parent %s", header.Hash(), headers[i-1].Hash())
if i > 0 && headers[i].ParentHash != headers[i-1].Hash() {
return nil, fmt.Errorf("queried header %s does not follow parent %s", headers[i].Hash(), headers[i-1].Hash())
}

headers[i] = *header
size = size + 1
}

Expand Down Expand Up @@ -197,19 +192,43 @@ func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common
return proof.StorageHash, nil
}

// FilterLogs returns logs that fit the query parameters
func (c *clnt) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
type Logs struct {
Logs []types.Log
ToBlockHeader *types.Header
}

var result []types.Log
// FilterLogs returns logs that fit the query parameters. The underlying request is a batch
// request including `eth_getBlockByNumber` to allow the caller to check that connected
// node has the state necessary to fulfill this request
func (c *clnt) FilterLogs(query ethereum.FilterQuery) (Logs, error) {
arg, err := toFilterArg(query)
if err != nil {
return nil, err
return Logs{}, err
}

err = c.rpc.CallContext(ctxwt, &result, "eth_getLogs", arg)
return result, err
var logs []types.Log
var header types.Header

batchElems := make([]rpc.BatchElem, 2)
batchElems[0] = rpc.BatchElem{Method: "eth_getBlockByNumber", Args: []interface{}{toBlockNumArg(query.ToBlock), false}, Result: &header}
batchElems[1] = rpc.BatchElem{Method: "eth_getLogs", Args: []interface{}{arg}, Result: &logs}

ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
err = c.rpc.BatchCallContext(ctxwt, batchElems)
if err != nil {
return Logs{}, err
}

if batchElems[0].Error != nil {
return Logs{}, fmt.Errorf("unable to query for the `FilterQuery#ToBlock` header: %w", batchElems[0].Error)
}

if batchElems[1].Error != nil {
return Logs{}, fmt.Errorf("unable to query logs: %w", batchElems[1].Error)
}

return Logs{Logs: logs, ToBlockHeader: &header}, nil
}

// Modeled off op-service/client.go. We can refactor this once the client/metrics portion
Expand Down Expand Up @@ -262,10 +281,7 @@ func toBlockNumArg(number *big.Int) string {
}

func toFilterArg(q ethereum.FilterQuery) (interface{}, error) {
arg := map[string]interface{}{
"address": q.Addresses,
"topics": q.Topics,
}
arg := map[string]interface{}{"address": q.Addresses, "topics": q.Topics}
if q.BlockHash != nil {
arg["blockHash"] = *q.BlockHash
if q.FromBlock != nil || q.ToBlock != nil {
Expand Down
4 changes: 2 additions & 2 deletions indexer/node/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int
return args.Get(0).(common.Hash), args.Error(1)
}

func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) {
func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) (Logs, error) {
args := m.Called(query)
return args.Get(0).([]types.Log), args.Error(1)
return args.Get(0).(Logs), args.Error(1)
}

0 comments on commit ed341cd

Please sign in to comment.