From 09f83922208205df52f2d54b5244b5c25126e892 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Men=C3=A9ndez?= Date: Sun, 23 Jun 2024 20:27:34 +0200 Subject: [PATCH] new 'next' method for updater to get the next token to update in each iteration an support sorted scan --- cmd/census3/main.go | 2 +- scanner/providers/web3/erc20_provider.go | 3 +- scanner/providers/web3/erc721_provider.go | 3 +- scanner/providers/web3/erc777_provider.go | 3 +- scanner/scanner.go | 2 +- scanner/updater.go | 172 ++++++++++++++-------- 6 files changed, 114 insertions(+), 71 deletions(-) diff --git a/cmd/census3/main.go b/cmd/census3/main.go index 2e951e07..5fe4a6e5 100644 --- a/cmd/census3/main.go +++ b/cmd/census3/main.go @@ -209,7 +209,7 @@ func main() { log.Fatal(err) } // start the token updater with the database and the provider manager - updater := scanner.NewUpdater(database, w3p, pm, filtersDB) + updater := scanner.NewUpdater(database, w3p, pm, filtersDB, config.scannerCoolDown) // start the holder scanner with the database and the provider manager hc := scanner.NewScanner(database, updater, w3p, pm, config.scannerCoolDown) // if the admin token is not defined, generate a random one diff --git a/scanner/providers/web3/erc20_provider.go b/scanner/providers/web3/erc20_provider.go index 81edce35..adee6a9a 100644 --- a/scanner/providers/web3/erc20_provider.go +++ b/scanner/providers/web3/erc20_provider.go @@ -159,6 +159,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro alreadyProcessedLogs := uint64(0) balances := make(map[common.Address]*big.Int) // iterate the logs and update the balances + log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs)) for _, currentLog := range logs { // skip the log if it has been removed if currentLog.Removed { @@ -208,7 +209,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro balances[logData.From] = new(big.Int).Neg(logData.Value) } } - log.Infow("saving blocks", + log.Infow("logs parsed", "count", len(balances), "new_logs", newTransfers, "already_processed_logs", alreadyProcessedLogs, diff --git a/scanner/providers/web3/erc721_provider.go b/scanner/providers/web3/erc721_provider.go index e7404c9a..d5a0253e 100644 --- a/scanner/providers/web3/erc721_provider.go +++ b/scanner/providers/web3/erc721_provider.go @@ -152,7 +152,6 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr if errors.Is(err, ErrTooManyRequests) { log.Warnf("too many requests, the provider will continue in the next iteration from block %d", lastBlock) } - log.Warnw("logs received", "number_of_logs", len(logs), "last_block", lastBlock) // encode the number of new transfers newTransfers := uint64(0) alreadyProcessedLogs := uint64(0) @@ -399,6 +398,6 @@ func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error) if _, err := hashFn.Write([]byte(transferID)); err != nil { return false, err } - hID := hashFn.Sum(nil) + hID := hashFn.Sum(nil)[:8] return p.filter.TestAndAdd(hID, nil) } diff --git a/scanner/providers/web3/erc777_provider.go b/scanner/providers/web3/erc777_provider.go index 2ea955f0..f3fbbba6 100644 --- a/scanner/providers/web3/erc777_provider.go +++ b/scanner/providers/web3/erc777_provider.go @@ -152,7 +152,6 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr if errors.Is(err, ErrTooManyRequests) { log.Warnf("too many requests, the provider will continue in the next iteration from block %d", lastBlock) } - log.Warnw("logs received", "number_of_logs", len(logs), "last_block", lastBlock) // encode the number of new transfers newTransfers := uint64(0) alreadyProcessedLogs := uint64(0) @@ -399,6 +398,6 @@ func (p *ERC777HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error) if _, err := hashFn.Write([]byte(transferID)); err != nil { return false, err } - hID := hashFn.Sum(nil) + hID := hashFn.Sum(nil)[:8] return p.filter.TestAndAdd(hID, nil) } diff --git a/scanner/scanner.go b/scanner/scanner.go index 367f0f8f..d0406a4d 100644 --- a/scanner/scanner.go +++ b/scanner/scanner.go @@ -109,7 +109,6 @@ func (s *Scanner) Start(ctx context.Context) { continue } } - log.Infow("checking token in the updater queue", "address", token.Address.Hex(), "chainID", token.ChainID, @@ -435,6 +434,7 @@ func (s *Scanner) prepareToken(token *ScannerToken) error { return err } token.CreationBlock = creationBlock + token.LastBlock = creationBlock token.Ready = true } return nil diff --git a/scanner/updater.go b/scanner/updater.go index 7c8090f4..a43a1663 100644 --- a/scanner/updater.go +++ b/scanner/updater.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -51,62 +52,73 @@ type Updater struct { ctx context.Context cancel context.CancelFunc - db *db.DB - networks *web3.Web3Pool - providers *manager.ProviderManager - queue map[string]*UpdateRequest - queueMtx sync.Mutex - processing sync.Map - waiter sync.WaitGroup - kvdb dvotedb.Database + db *db.DB + networks *web3.Web3Pool + providers *manager.ProviderManager + sortedQueue []string + queue map[string]*UpdateRequest + queueMtx sync.Mutex + processing sync.Map + nextReq atomic.Uint64 + waiter sync.WaitGroup + kvdb dvotedb.Database + coolDown time.Duration } // NewUpdater creates a new instance of Updater. func NewUpdater(db *db.DB, networks *web3.Web3Pool, pm *manager.ProviderManager, - kvdb dvotedb.Database, + kvdb dvotedb.Database, coolDown time.Duration, ) *Updater { return &Updater{ - db: db, - networks: networks, - providers: pm, - queue: make(map[string]*UpdateRequest), - kvdb: kvdb, + db: db, + networks: networks, + providers: pm, + sortedQueue: []string{}, + queue: make(map[string]*UpdateRequest), + kvdb: kvdb, + coolDown: coolDown, } } // Start starts the updater process in a goroutine. func (u *Updater) Start(ctx context.Context, concurrentTokens int) { u.ctx, u.cancel = context.WithCancel(ctx) + sem := make(chan struct{}, concurrentTokens) + defer close(sem) for { select { case <-u.ctx.Done(): return default: - pending := u.pendingRequests() - if len(pending) == 0 { - time.Sleep(coolDown) + req, id := u.next() + if req == nil { + log.Info("no more requests to process, sleeping...") + time.Sleep(u.coolDown) continue } - sem := make(chan struct{}, concurrentTokens) - defer close(sem) - for id, req := range u.pendingRequests() { - u.processing.Store(id, true) - sem <- struct{}{} - go func(id string, req *UpdateRequest) { - defer func() { - <-sem - u.processing.Store(id, false) - }() - if err := u.process(req); err != nil { - log.Errorf("Error processing update request: %v", err) - return - } - // update the request in the queue - u.queueMtx.Lock() - u.queue[id] = req - u.queueMtx.Unlock() - }(id, req) - } + sem <- struct{}{} + u.waiter.Add(1) + go func(id string, req UpdateRequest) { + defer func() { + u.waiter.Done() + <-sem + }() + log.Infow("processing token", + "address", req.Address.Hex(), + "from", req.CreationBlock, + "to", req.EndBlock, + "current", req.LastBlock) + res, err := u.process(id, req) + if err != nil { + log.Errorf("error processing update request: %v", err) + return + } + // update the request in the queue + log.Infow("updating request in the queue", "lastBlock", req.LastBlock, "done", req.Done) + if err := u.SetRequest(id, &res); err != nil { + log.Errorf("error updating request in the queue: %v", err) + } + }(id, *req) } } } @@ -127,12 +139,20 @@ func (u *Updater) RequestStatus(id string, deleteOnDone bool) *UpdateRequest { if !ok { return nil } - res := *req if deleteOnDone && req.Done { + // remove the request from the processing map u.processing.Delete(id) + // remove the request from the queue delete(u.queue, id) + // remove the request from the sorted queue + for i, v := range u.sortedQueue { + if v == id { + u.sortedQueue = append(u.sortedQueue[:i], u.sortedQueue[i+1:]...) + break + } + } } - return &res + return req } // SetRequest adds a new request to the queue. It will return an error if the @@ -157,8 +177,10 @@ func (u *Updater) SetRequest(id string, req *UpdateRequest) error { } u.queueMtx.Lock() defer u.queueMtx.Unlock() + if _, exists := u.queue[id]; !exists { + u.sortedQueue = append(u.sortedQueue, id) + } u.queue[id] = req - u.processing.Store(id, false) return nil } @@ -195,17 +217,42 @@ func RequestID(address common.Address, chainID uint64, externalID string) (strin return hex.EncodeToString(bHash[:4]), nil } -func (u *Updater) pendingRequests() map[string]*UpdateRequest { +func (u *Updater) next() (*UpdateRequest, string) { u.queueMtx.Lock() defer u.queueMtx.Unlock() - queue := map[string]*UpdateRequest{} - for k, v := range u.queue { - if processing, ok := u.processing.Load(k); v.Done || !ok || processing.(bool) { - continue + // if the queue is empty return nil + if len(u.sortedQueue) == 0 { + return nil, "" + } + // get the next request in the queue, if the next request is out of the + // range of the sorted queue, return nil and set the next request index to 0 + i := u.nextReq.Load() + max := uint64(len(u.sortedQueue)) + if i >= max { + u.nextReq.Store(0) + return nil, "" + } + // iterate over the sorted queue to find the next request that is not being + // processed or already done + for ; i < max; i++ { + id := u.sortedQueue[i] + req, exists := u.queue[id] + if !exists { + // if the request is not found, remove the ID from the sorted queue and + // return nil setting the next request index to 0 + u.sortedQueue = append(u.sortedQueue[:i], u.sortedQueue[i+1:]...) + u.nextReq.Store(0) + return nil, "" + } + // if request is not done and not being processed, return it + if isProcessing, ok := u.processing.Load(id); !req.Done && (!ok || !isProcessing.(bool)) { + u.nextReq.Store(i + 1) + return req, id } - queue[k] = v } - return queue + // if the next request is not found, set the next request index to 0 + u.nextReq.Store(0) + return nil, "" } // process iterates over the current queue items, getting the token holders @@ -213,30 +260,28 @@ func (u *Updater) pendingRequests() map[string]*UpdateRequest { // equal to the end block. It updates th status of the request in the queue. It // will return an error if the provider is not found, the token is external or // there is an error getting the token holders balances. -func (u *Updater) process(req *UpdateRequest) error { - // log the start of the process - log.Infow("rescanning token", - "address", req.Address.Hex(), - "from", req.CreationBlock, - "to", req.EndBlock, - "current", req.LastBlock) +func (u *Updater) process(id string, req UpdateRequest) (UpdateRequest, error) { + // set the request as processing and defer to set it as not processing + u.processing.Store(id, true) + defer u.processing.Store(id, false) + // create a context with a timeout to avoid blocking the process ctx, cancel := context.WithTimeout(u.ctx, UPDATE_TIMEOUT) defer cancel() // get the provider by token type provider, err := u.providers.GetProvider(ctx, req.Type) if err != nil { - return fmt.Errorf("error getting provider for token: %v", err) + return req, fmt.Errorf("error getting provider for token: %v", err) } // if the token is a external token, return an error if !provider.IsExternal() { chainAddress, ok := u.networks.ChainAddress(req.ChainID, req.Address.Hex()) if !ok { - return fmt.Errorf("error getting chain address for token: %v", err) + return req, fmt.Errorf("error getting chain address for token: %v", err) } // load filter of the token from the database filter, err := treedb.LoadTree(u.kvdb, chainAddress) if err != nil { - return err + return req, err } // set the reference of the token to update in the provider if err := provider.SetRef(web3provider.Web3ProviderRef{ @@ -245,7 +290,7 @@ func (u *Updater) process(req *UpdateRequest) error { CreationBlock: req.CreationBlock, Filter: filter, }); err != nil { - return fmt.Errorf("error setting provider reference: %v", err) + return req, fmt.Errorf("error setting provider reference: %v", err) } } // update the last block number of the provider to the last block of @@ -257,7 +302,7 @@ func (u *Updater) process(req *UpdateRequest) error { ChainID: req.ChainID, }) if err != nil { - return fmt.Errorf("error getting token holders from database: %v", err) + return req, fmt.Errorf("error getting token holders from database: %v", err) } currentHolders := map[common.Address]*big.Int{} for _, holder := range results { @@ -275,7 +320,7 @@ func (u *Updater) process(req *UpdateRequest) error { } // set the current holders in the provider if err := provider.SetLastBalances(ctx, nil, currentHolders, req.LastBlock); err != nil { - return fmt.Errorf("error setting last balances in provider: %v", err) + return req, fmt.Errorf("error setting last balances in provider: %v", err) } // get range balances from the provider, it will check itereate again // over transfers logs, checking if there are new transfers using the @@ -296,7 +341,7 @@ func (u *Updater) process(req *UpdateRequest) error { } } if err != nil { - return fmt.Errorf("error getting token holders balances: %v", err) + return req, fmt.Errorf("error getting token holders balances: %v", err) } log.Debugw("new logs received", "address", req.Address.Hex(), @@ -311,13 +356,12 @@ func (u *Updater) process(req *UpdateRequest) error { ChainID: req.ChainID, }, balances, delta.NewLogsCount, delta.Block, delta.Synced, delta.TotalSupply) if err != nil { - return fmt.Errorf("error saving token holders balances: %v", err) + return req, fmt.Errorf("error saving token holders balances: %v", err) } log.Debugw("token holders balances updated", "token", req.Address.Hex(), "chainID", req.ChainID, "created", created, "updated", updated) - log.Infow("updating request in the queue", "lastBlock", req.LastBlock, "done", req.Done) - return nil + return req, nil }