From 66007fb4641d60e5a6eaedcecf4eabbeea71505a Mon Sep 17 00:00:00 2001 From: Hamdi Allam Date: Tue, 24 Oct 2023 16:50:46 -0400 Subject: [PATCH 1/2] indexer FilterLog client-side consistency --- indexer/etl/etl.go | 26 +++++++++++---- indexer/node/client.go | 72 ++++++++++++++++++++++++++---------------- indexer/node/mocks.go | 4 +-- 3 files changed, 65 insertions(+), 37 deletions(-) diff --git a/indexer/etl/etl.go b/indexer/etl/etl.go index c69d1a6e7397..f11c69ec7862 100644 --- a/indexer/etl/etl.go +++ b/indexer/etl/etl.go @@ -3,6 +3,7 @@ package etl import ( "context" "errors" + "fmt" "math/big" "time" @@ -105,22 +106,33 @@ func (etl *ETL) processBatch(headers []types.Header) error { } headersWithLog := make(map[common.Hash]bool, len(headers)) - logs, err := etl.EthClient.FilterLogs(ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts}) + filterQuery := ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts} + logs, err := etl.EthClient.FilterLogs(filterQuery) if err != nil { batchLog.Info("failed to extract logs", "err", err) return err } - if len(logs) > 0 { - batchLog.Info("detected logs", "size", len(logs)) + + if logs.ToBlockHeader.Number.Cmp(lastHeader.Number) != 0 { + // Warn and simply wait for the provider to synchronize state + batchLog.Warn("mismatch in FilterLog#ToBlock number", "queried_to_block_number", lastHeader.Number, "reported_to_block_number", logs.ToBlockHeader.Number) + return fmt.Errorf("mismatch in FilterLog#ToBlock number") + } else if logs.ToBlockHeader.Hash() != lastHeader.Hash() { + batchLog.Error("mismatch in FitlerLog#ToBlock block hash!!!", "queried_to_block_hash", lastHeader.Hash().String(), "reported_to_block_hash", logs.ToBlockHeader.Hash().String()) + return fmt.Errorf("mismatch in FitlerLog#ToBlock block hash!!!") + } + + if len(logs.Logs) > 0 { + batchLog.Info("detected logs", "size", len(logs.Logs)) } - for i := range logs { - log := logs[i] + for i := range logs.Logs { + log := logs.Logs[i] if _, ok := headerMap[log.BlockHash]; !ok { // NOTE. Definitely an error state if the none of the headers were re-orged out in between // the blocks and logs retrieval operations. Unlikely as long as the confirmation depth has // been appropriately set or when we get to natively handling reorgs. - batchLog.Error("log found with block hash not in the batch", "block_hash", logs[i].BlockHash, "log_index", logs[i].Index) + batchLog.Error("log found with block hash not in the batch", "block_hash", logs.Logs[i].BlockHash, "log_index", logs.Logs[i].Index) return errors.New("parsed log with a block hash not in the batch") } @@ -130,6 +142,6 @@ func (etl *ETL) processBatch(headers []types.Header) error { // ensure we use unique downstream references for the etl batch headersRef := headers - etl.etlBatches <- ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs, HeadersWithLog: headersWithLog} + etl.etlBatches <- ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs.Logs, HeadersWithLog: headersWithLog} return nil } diff --git a/indexer/node/client.go b/indexer/node/client.go index 6e974e0b792c..363ddc26f30c 100644 --- a/indexer/node/client.go +++ b/indexer/node/client.go @@ -39,7 +39,7 @@ type EthClient interface { TxByHash(common.Hash) (*types.Transaction, error) StorageHash(common.Address, *big.Int) (common.Hash, error) - FilterLogs(ethereum.FilterQuery) ([]types.Log, error) + FilterLogs(ethereum.FilterQuery) (Logs, error) } type clnt struct { @@ -122,15 +122,12 @@ func (c *clnt) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Hea } count := new(big.Int).Sub(endHeight, startHeight).Uint64() + 1 + headers := make([]types.Header, count) batchElems := make([]rpc.BatchElem, count) + for i := uint64(0); i < count; i++ { height := new(big.Int).Add(startHeight, new(big.Int).SetUint64(i)) - batchElems[i] = rpc.BatchElem{ - Method: "eth_getBlockByNumber", - Args: []interface{}{toBlockNumArg(height), false}, - Result: new(types.Header), - Error: nil, - } + batchElems[i] = rpc.BatchElem{Method: "eth_getBlockByNumber", Args: []interface{}{toBlockNumArg(height), false}, Result: &headers[i]} } ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) @@ -144,23 +141,21 @@ func (c *clnt) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Hea // - Ensure integrity that they build on top of each other // - Truncate out headers that do not exist (endHeight > "latest") size := 0 - headers := make([]types.Header, count) for i, batchElem := range batchElems { if batchElem.Error != nil { - return nil, batchElem.Error + if size == 0 { + return nil, batchElem.Error + } else { + break // try return whatever headers are available + } } else if batchElem.Result == nil { break } - header, ok := batchElem.Result.(*types.Header) - if !ok { - return nil, fmt.Errorf("unable to transform rpc response %v into types.Header", batchElem.Result) - } - if i > 0 && header.ParentHash != headers[i-1].Hash() { - return nil, fmt.Errorf("queried header %s does not follow parent %s", header.Hash(), headers[i-1].Hash()) + if i > 0 && headers[i].ParentHash != headers[i-1].Hash() { + return nil, fmt.Errorf("queried header %s does not follow parent %s", headers[i].Hash(), headers[i-1].Hash()) } - headers[i] = *header size = size + 1 } @@ -197,19 +192,43 @@ func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common return proof.StorageHash, nil } -// FilterLogs returns logs that fit the query parameters -func (c *clnt) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) { - ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) - defer cancel() +type Logs struct { + Logs []types.Log + ToBlockHeader *types.Header +} - var result []types.Log +// FilterLogs returns logs that fit the query parameters. The underlying request is a batch +// request including `eth_getBlockHeaderByNumber` to allow the caller to check that connected +// node has the state necessary to fulfill this request +func (c *clnt) FilterLogs(query ethereum.FilterQuery) (Logs, error) { arg, err := toFilterArg(query) if err != nil { - return nil, err + return Logs{}, err } - err = c.rpc.CallContext(ctxwt, &result, "eth_getLogs", arg) - return result, err + var logs []types.Log + var header types.Header + + batchElems := make([]rpc.BatchElem, 2) + batchElems[0] = rpc.BatchElem{Method: "eth_getBlockByNumber", Args: []interface{}{toBlockNumArg(query.ToBlock), false}, Result: &header} + batchElems[1] = rpc.BatchElem{Method: "eth_getLogs", Args: []interface{}{arg}, Result: &logs} + + ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) + defer cancel() + err = c.rpc.BatchCallContext(ctxwt, batchElems) + if err != nil { + return Logs{}, err + } + + if batchElems[0].Error != nil { + return Logs{}, fmt.Errorf("unable to query for the `FilterQuery#ToBlock` header: %w", batchElems[0].Error) + } + + if batchElems[1].Error != nil { + return Logs{}, fmt.Errorf("unable to query logs: %w", batchElems[1].Error) + } + + return Logs{Logs: logs, ToBlockHeader: &header}, nil } // Modeled off op-service/client.go. We can refactor this once the client/metrics portion @@ -262,10 +281,7 @@ func toBlockNumArg(number *big.Int) string { } func toFilterArg(q ethereum.FilterQuery) (interface{}, error) { - arg := map[string]interface{}{ - "address": q.Addresses, - "topics": q.Topics, - } + arg := map[string]interface{}{"address": q.Addresses, "topics": q.Topics} if q.BlockHash != nil { arg["blockHash"] = *q.BlockHash if q.FromBlock != nil || q.ToBlock != nil { diff --git a/indexer/node/mocks.go b/indexer/node/mocks.go index 115a81767b2e..b18196307ee1 100644 --- a/indexer/node/mocks.go +++ b/indexer/node/mocks.go @@ -41,7 +41,7 @@ func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int return args.Get(0).(common.Hash), args.Error(1) } -func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) { +func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) (Logs, error) { args := m.Called(query) - return args.Get(0).([]types.Log), args.Error(1) + return args.Get(0).(Logs), args.Error(1) } From 6050d330ed017da1e36092201a5b185744fc92ec Mon Sep 17 00:00:00 2001 From: Hamdi Allam Date: Wed, 25 Oct 2023 11:00:25 -0400 Subject: [PATCH 2/2] comment update --- indexer/node/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/node/client.go b/indexer/node/client.go index 363ddc26f30c..be046f930b48 100644 --- a/indexer/node/client.go +++ b/indexer/node/client.go @@ -198,7 +198,7 @@ type Logs struct { } // FilterLogs returns logs that fit the query parameters. The underlying request is a batch -// request including `eth_getBlockHeaderByNumber` to allow the caller to check that connected +// request including `eth_getBlockByNumber` to allow the caller to check that connected // node has the state necessary to fulfill this request func (c *clnt) FilterLogs(query ethereum.FilterQuery) (Logs, error) { arg, err := toFilterArg(query)