From 34b7953f487029c9313d1376aba7eac4a2209b02 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Wed, 4 Dec 2024 11:17:08 +0800 Subject: [PATCH 1/5] optimize getLogs --- evmrpc/filter.go | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/evmrpc/filter.go b/evmrpc/filter.go index 2ffb77dd49..7f69595461 100644 --- a/evmrpc/filter.go +++ b/evmrpc/filter.go @@ -281,7 +281,7 @@ type LogFetcher struct { includeSyntheticReceipts bool } -func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) ([]*ethtypes.Log, int64, error) { +func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) (res []*ethtypes.Log, end int64, err error) { bloomIndexes := EncodeFilters(crit.Addresses, crit.Topics) if crit.BlockHash != nil { block, err := blockByHashWithRetry(ctx, f.tmClient, crit.BlockHash[:], 1) @@ -314,25 +314,40 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr return nil, 0, fmt.Errorf("fromBlock %d is after toBlock %d", begin, end) } blockHeights := f.FindBlockesByBloom(begin, end, bloomIndexes) - res := []*ethtypes.Log{} - for _, height := range blockHeights { - h := height - block, err := blockByNumberWithRetry(ctx, f.tmClient, &h, 1) - if err != nil { - return nil, 0, err - } - res = append(res, f.GetLogsForBlock(ctx, block, crit, bloomIndexes)...) - if applyOpenEndedLogLimit && int64(len(res)) >= f.filterConfig.maxLog { - res = res[:int(f.filterConfig.maxLog)] - break + wg := sync.WaitGroup{} + res = []*ethtypes.Log{} + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("%s", e) } + }() + slots := make([][]*ethtypes.Log, len(blockHeights)) + for i, height := range blockHeights { + wg.Add(1) + h := height + i := i + go func() { + defer wg.Done() + block, err := blockByNumberWithRetry(ctx, f.tmClient, &h, 1) + if err != nil { + panic(err) + } + slots[i] = f.GetLogsForBlock(ctx, block, crit, bloomIndexes) + }() + } + wg.Wait() + for _, logs := range slots { + res = append(res, logs...) + } + if applyOpenEndedLogLimit && int64(len(res)) >= f.filterConfig.maxLog { + res = res[:int(f.filterConfig.maxLog)] } return res, end, nil } func (f *LogFetcher) GetLogsForBlock(ctx context.Context, block *coretypes.ResultBlock, crit filters.FilterCriteria, filters [][]bloomIndexes) []*ethtypes.Log { - possibleLogs := f.FindLogsByBloom(block.Block.Height, filters) + possibleLogs := f.FindLogsByBloom(block, filters) matchedLogs := utils.Filter(possibleLogs, func(l *ethtypes.Log) bool { return f.IsLogExactMatch(l, crit) }) for _, l := range matchedLogs { l.BlockHash = common.Hash(block.BlockID.Hash) @@ -356,9 +371,9 @@ func (f *LogFetcher) FindBlockesByBloom(begin, end int64, filters [][]bloomIndex return } -func (f *LogFetcher) FindLogsByBloom(height int64, filters [][]bloomIndexes) (res []*ethtypes.Log) { +func (f *LogFetcher) FindLogsByBloom(block *coretypes.ResultBlock, filters [][]bloomIndexes) (res []*ethtypes.Log) { ctx := f.ctxProvider(LatestCtxHeight) - txHashes := f.k.GetTxHashesOnHeight(ctx, height) + txHashes := f.k.GetTxHashesOnHeight(ctx, block.Block.Height) for _, hash := range txHashes { receipt, err := f.k.GetReceipt(ctx, hash) if err != nil { From 1c5fcd140c940911eb3069f6e703c67f4ee9b2cc Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 4 Dec 2024 14:30:32 -0800 Subject: [PATCH 2/5] optimize getLogs --- evmrpc/filter.go | 72 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 61 insertions(+), 11 deletions(-) diff --git a/evmrpc/filter.go b/evmrpc/filter.go index 7f69595461..27657da2db 100644 --- a/evmrpc/filter.go +++ b/evmrpc/filter.go @@ -5,6 +5,8 @@ import ( "encoding/json" "errors" "fmt" + "math" + "sort" "sync" "time" @@ -313,7 +315,17 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr if begin > end { return nil, 0, fmt.Errorf("fromBlock %d is after toBlock %d", begin, end) } - blockHeights := f.FindBlockesByBloom(begin, end, bloomIndexes) + + var blockHeights []int64 + if len(crit.Addresses) != 0 || len(crit.Topics) != 0 { + blockHeights = f.FindBlocksByBloom(begin, end, bloomIndexes) + } else { + blockHeights = make([]int64, end-begin+1) + for i := range blockHeights { + blockHeights[i] = begin + int64(i) + } + } + wg := sync.WaitGroup{} res = []*ethtypes.Log{} defer func() { @@ -355,19 +367,57 @@ func (f *LogFetcher) GetLogsForBlock(ctx context.Context, block *coretypes.Resul return matchedLogs } -func (f *LogFetcher) FindBlockesByBloom(begin, end int64, filters [][]bloomIndexes) (res []int64) { - //TODO: parallelize - for height := begin; height <= end; height++ { - if height == 0 { - // no block bloom on genesis height - continue +func (f *LogFetcher) FindBlocksByBloom(begin, end int64, filters [][]bloomIndexes) (res []int64) { + numWorkers := int(math.Min(100, float64(end-begin))) + var wg sync.WaitGroup + tasks := make(chan int64, end-begin+1) + results := make(chan int64, end-begin+1) + + // Worker function + worker := func() { + defer wg.Done() + for height := range tasks { + if height == 0 { + continue // Skip genesis height + } + ctx := f.ctxProvider(height) + blockBloom := f.k.GetBlockBloom(ctx) + if MatchFilters(blockBloom, filters) { + results <- height + } } - ctx := f.ctxProvider(height) - blockBloom := f.k.GetBlockBloom(ctx) - if MatchFilters(blockBloom, filters) { - res = append(res, height) + } + + // Start workers + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go worker() + } + + // Send tasks + go func() { + for height := begin; height <= end; height++ { + tasks <- height } + close(tasks) // Close the tasks channel to signal workers + }() + + // Collect results + go func() { + wg.Wait() + close(results) // Close the results channel after workers finish + }() + + // Aggregate results into the final slice + for result := range results { + res = append(res, result) } + + // Sorting in ascending order + sort.Slice(res, func(i, j int) bool { + return res[i] < res[j] + }) + return } From ea05a67cd1901008132476ed640ae1d7c71ee87a Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 4 Dec 2024 14:59:15 -0800 Subject: [PATCH 3/5] Parallelize execution for bloom as well --- evmrpc/filter.go | 127 ++++++++++++++++++++++------------------------- 1 file changed, 58 insertions(+), 69 deletions(-) diff --git a/evmrpc/filter.go b/evmrpc/filter.go index 27657da2db..22f437c037 100644 --- a/evmrpc/filter.go +++ b/evmrpc/filter.go @@ -290,7 +290,7 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr if err != nil { return nil, 0, err } - return f.GetLogsForBlock(ctx, block, crit, bloomIndexes), block.Block.Height, nil + return f.GetLogsForBlock(block, crit, bloomIndexes), block.Block.Height, nil } applyOpenEndedLogLimit := f.filterConfig.maxLog > 0 && (crit.FromBlock == nil || crit.ToBlock == nil) latest := f.ctxProvider(LatestCtxHeight).BlockHeight() @@ -316,74 +316,47 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr return nil, 0, fmt.Errorf("fromBlock %d is after toBlock %d", begin, end) } - var blockHeights []int64 - if len(crit.Addresses) != 0 || len(crit.Topics) != 0 { - blockHeights = f.FindBlocksByBloom(begin, end, bloomIndexes) - } else { - blockHeights = make([]int64, end-begin+1) - for i := range blockHeights { - blockHeights[i] = begin + int64(i) - } - } - - wg := sync.WaitGroup{} + // Parallelize execution + numWorkers := int(math.Min(100, float64(end-begin))) + var wg sync.WaitGroup + tasksChan := make(chan int64, end-begin+1) + resultsChan := make(chan *ethtypes.Log, end-begin+1) res = []*ethtypes.Log{} defer func() { if e := recover(); e != nil { err = fmt.Errorf("%s", e) } }() - slots := make([][]*ethtypes.Log, len(blockHeights)) - for i, height := range blockHeights { - wg.Add(1) - h := height - i := i - go func() { - defer wg.Done() - block, err := blockByNumberWithRetry(ctx, f.tmClient, &h, 1) - if err != nil { - panic(err) + // Send tasks + go func() { + for height := begin; height <= end; height++ { + if height == 0 { + continue // Skip genesis height } - slots[i] = f.GetLogsForBlock(ctx, block, crit, bloomIndexes) - }() - } - wg.Wait() - for _, logs := range slots { - res = append(res, logs...) - } - if applyOpenEndedLogLimit && int64(len(res)) >= f.filterConfig.maxLog { - res = res[:int(f.filterConfig.maxLog)] - } - - return res, end, nil -} - -func (f *LogFetcher) GetLogsForBlock(ctx context.Context, block *coretypes.ResultBlock, crit filters.FilterCriteria, filters [][]bloomIndexes) []*ethtypes.Log { - possibleLogs := f.FindLogsByBloom(block, filters) - matchedLogs := utils.Filter(possibleLogs, func(l *ethtypes.Log) bool { return f.IsLogExactMatch(l, crit) }) - for _, l := range matchedLogs { - l.BlockHash = common.Hash(block.BlockID.Hash) - } - return matchedLogs -} - -func (f *LogFetcher) FindBlocksByBloom(begin, end int64, filters [][]bloomIndexes) (res []int64) { - numWorkers := int(math.Min(100, float64(end-begin))) - var wg sync.WaitGroup - tasks := make(chan int64, end-begin+1) - results := make(chan int64, end-begin+1) + tasksChan <- height + } + close(tasksChan) // Close the tasks channel to signal workers + }() // Worker function worker := func() { defer wg.Done() - for height := range tasks { - if height == 0 { - continue // Skip genesis height + for height := range tasksChan { + if len(crit.Addresses) != 0 || len(crit.Topics) != 0 { + providerCtx := f.ctxProvider(height) + blockBloom := f.k.GetBlockBloom(providerCtx) + if !MatchFilters(blockBloom, bloomIndexes) { + continue + } } - ctx := f.ctxProvider(height) - blockBloom := f.k.GetBlockBloom(ctx) - if MatchFilters(blockBloom, filters) { - results <- height + h := height + block, berr := blockByNumberWithRetry(ctx, f.tmClient, &h, 1) + if berr != nil { + panic(berr) + } + matchedLogs := f.GetLogsForBlock(block, crit, bloomIndexes) + for _, log := range matchedLogs { + resultsChan <- log } } } @@ -394,30 +367,46 @@ func (f *LogFetcher) FindBlocksByBloom(begin, end int64, filters [][]bloomIndexe go worker() } - // Send tasks - go func() { - for height := begin; height <= end; height++ { - tasks <- height - } - close(tasks) // Close the tasks channel to signal workers - }() - // Collect results go func() { wg.Wait() - close(results) // Close the results channel after workers finish + close(resultsChan) // Close the results channel after workers finish }() // Aggregate results into the final slice - for result := range results { + for result := range resultsChan { res = append(res, result) } - // Sorting in ascending order + // Sorting res in ascending order sort.Slice(res, func(i, j int) bool { - return res[i] < res[j] + return res[i].BlockNumber < res[j].BlockNumber }) + // Apply rate limit + if applyOpenEndedLogLimit && int64(len(res)) >= f.filterConfig.maxLog { + res = res[:int(f.filterConfig.maxLog)] + } + + return res, end, nil +} + +func (f *LogFetcher) GetLogsForBlock(block *coretypes.ResultBlock, crit filters.FilterCriteria, filters [][]bloomIndexes) []*ethtypes.Log { + possibleLogs := f.FindLogsByBloom(block, filters) + matchedLogs := utils.Filter(possibleLogs, func(l *ethtypes.Log) bool { return f.IsLogExactMatch(l, crit) }) + for _, l := range matchedLogs { + l.BlockHash = common.Hash(block.BlockID.Hash) + } + return matchedLogs +} + +func (f *LogFetcher) FindBlocksByBloom(begin, end int64, filters [][]bloomIndexes) (res []int64) { + + // Aggregate results into the final slice + for result := range results { + res = append(res, result) + } + return } From 5d1aa28d7da45a771f2e16adea083f6b1254c9a6 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 4 Dec 2024 15:01:02 -0800 Subject: [PATCH 4/5] Fix build --- evmrpc/filter.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/evmrpc/filter.go b/evmrpc/filter.go index 22f437c037..ea2d14244c 100644 --- a/evmrpc/filter.go +++ b/evmrpc/filter.go @@ -24,6 +24,8 @@ import ( const TxSearchPerPage = 10 +const MaxNumOfWorkers = 500 + type FilterType byte const ( @@ -317,7 +319,7 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr } // Parallelize execution - numWorkers := int(math.Min(100, float64(end-begin))) + numWorkers := int(math.Min(MaxNumOfWorkers, float64(end-begin))) var wg sync.WaitGroup tasksChan := make(chan int64, end-begin+1) resultsChan := make(chan *ethtypes.Log, end-begin+1) @@ -400,16 +402,6 @@ func (f *LogFetcher) GetLogsForBlock(block *coretypes.ResultBlock, crit filters. return matchedLogs } -func (f *LogFetcher) FindBlocksByBloom(begin, end int64, filters [][]bloomIndexes) (res []int64) { - - // Aggregate results into the final slice - for result := range results { - res = append(res, result) - } - - return -} - func (f *LogFetcher) FindLogsByBloom(block *coretypes.ResultBlock, filters [][]bloomIndexes) (res []*ethtypes.Log) { ctx := f.ctxProvider(LatestCtxHeight) txHashes := f.k.GetTxHashesOnHeight(ctx, block.Block.Height) From 3746f950508770792a92af37fdc518bfde6ec272 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 4 Dec 2024 21:26:10 -0800 Subject: [PATCH 5/5] Fix --- evmrpc/filter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/evmrpc/filter.go b/evmrpc/filter.go index ea2d14244c..b0b6de8810 100644 --- a/evmrpc/filter.go +++ b/evmrpc/filter.go @@ -319,7 +319,7 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr } // Parallelize execution - numWorkers := int(math.Min(MaxNumOfWorkers, float64(end-begin))) + numWorkers := int(math.Min(MaxNumOfWorkers, float64(end-begin+1))) var wg sync.WaitGroup tasksChan := make(chan int64, end-begin+1) resultsChan := make(chan *ethtypes.Log, end-begin+1)