Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yzang/v6.0.0 hotfix 6 #1978

Open
wants to merge 5 commits into
base: v6.0.0-hotfix-5-branch
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 79 additions & 33 deletions evmrpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"sort"
"sync"
"time"

Expand All @@ -22,6 +24,8 @@ import (

const TxSearchPerPage = 10

const MaxNumOfWorkers = 500

type FilterType byte

const (
Expand Down Expand Up @@ -281,14 +285,14 @@ type LogFetcher struct {
includeSyntheticReceipts bool
}

func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) ([]*ethtypes.Log, int64, error) {
func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) (res []*ethtypes.Log, end int64, err error) {
bloomIndexes := EncodeFilters(crit.Addresses, crit.Topics)
if crit.BlockHash != nil {
block, err := blockByHashWithRetry(ctx, f.tmClient, crit.BlockHash[:], 1)
if err != nil {
return nil, 0, err
}
return f.GetLogsForBlock(ctx, block, crit, bloomIndexes), block.Block.Height, nil
return f.GetLogsForBlock(block, crit, bloomIndexes), block.Block.Height, nil
}
applyOpenEndedLogLimit := f.filterConfig.maxLog > 0 && (crit.FromBlock == nil || crit.ToBlock == nil)
latest := f.ctxProvider(LatestCtxHeight).BlockHeight()
Expand All @@ -313,52 +317,94 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr
if begin > end {
return nil, 0, fmt.Errorf("fromBlock %d is after toBlock %d", begin, end)
}
blockHeights := f.FindBlockesByBloom(begin, end, bloomIndexes)
res := []*ethtypes.Log{}
for _, height := range blockHeights {
h := height
block, err := blockByNumberWithRetry(ctx, f.tmClient, &h, 1)
if err != nil {
return nil, 0, err

// Parallelize execution
numWorkers := int(math.Min(MaxNumOfWorkers, float64(end-begin+1)))
var wg sync.WaitGroup
tasksChan := make(chan int64, end-begin+1)
resultsChan := make(chan *ethtypes.Log, end-begin+1)
res = []*ethtypes.Log{}
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("%s", e)
}
res = append(res, f.GetLogsForBlock(ctx, block, crit, bloomIndexes)...)
if applyOpenEndedLogLimit && int64(len(res)) >= f.filterConfig.maxLog {
res = res[:int(f.filterConfig.maxLog)]
break
}()
// Send tasks
go func() {
for height := begin; height <= end; height++ {
if height == 0 {
continue // Skip genesis height
}
tasksChan <- height
}
close(tasksChan) // Close the tasks channel to signal workers
}()

// Worker function
worker := func() {
defer wg.Done()
for height := range tasksChan {
if len(crit.Addresses) != 0 || len(crit.Topics) != 0 {
providerCtx := f.ctxProvider(height)
blockBloom := f.k.GetBlockBloom(providerCtx)
if !MatchFilters(blockBloom, bloomIndexes) {
continue
}
}
h := height
block, berr := blockByNumberWithRetry(ctx, f.tmClient, &h, 1)
if berr != nil {
panic(berr)
}
matchedLogs := f.GetLogsForBlock(block, crit, bloomIndexes)
for _, log := range matchedLogs {
resultsChan <- log
}
}
}

// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker()
}

// Collect results
go func() {
wg.Wait()
close(resultsChan) // Close the results channel after workers finish
}()

// Aggregate results into the final slice
for result := range resultsChan {
res = append(res, result)
}

// Sorting res in ascending order
sort.Slice(res, func(i, j int) bool {
return res[i].BlockNumber < res[j].BlockNumber
})

// Apply rate limit
if applyOpenEndedLogLimit && int64(len(res)) >= f.filterConfig.maxLog {
res = res[:int(f.filterConfig.maxLog)]
}

return res, end, nil
}

func (f *LogFetcher) GetLogsForBlock(ctx context.Context, block *coretypes.ResultBlock, crit filters.FilterCriteria, filters [][]bloomIndexes) []*ethtypes.Log {
possibleLogs := f.FindLogsByBloom(block.Block.Height, filters)
func (f *LogFetcher) GetLogsForBlock(block *coretypes.ResultBlock, crit filters.FilterCriteria, filters [][]bloomIndexes) []*ethtypes.Log {
possibleLogs := f.FindLogsByBloom(block, filters)
matchedLogs := utils.Filter(possibleLogs, func(l *ethtypes.Log) bool { return f.IsLogExactMatch(l, crit) })
for _, l := range matchedLogs {
l.BlockHash = common.Hash(block.BlockID.Hash)
}
return matchedLogs
}

func (f *LogFetcher) FindBlockesByBloom(begin, end int64, filters [][]bloomIndexes) (res []int64) {
//TODO: parallelize
for height := begin; height <= end; height++ {
if height == 0 {
// no block bloom on genesis height
continue
}
ctx := f.ctxProvider(height)
blockBloom := f.k.GetBlockBloom(ctx)
if MatchFilters(blockBloom, filters) {
res = append(res, height)
}
}
return
}

func (f *LogFetcher) FindLogsByBloom(height int64, filters [][]bloomIndexes) (res []*ethtypes.Log) {
func (f *LogFetcher) FindLogsByBloom(block *coretypes.ResultBlock, filters [][]bloomIndexes) (res []*ethtypes.Log) {
ctx := f.ctxProvider(LatestCtxHeight)
txHashes := f.k.GetTxHashesOnHeight(ctx, height)
txHashes := f.k.GetTxHashesOnHeight(ctx, block.Block.Height)
for _, hash := range txHashes {
receipt, err := f.k.GetReceipt(ctx, hash)
if err != nil {
Expand Down
Loading