From 861a73aebe9135438a7a6581dadaa0ad88b31934 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Tue, 23 Apr 2024 20:25:58 +0100 Subject: [PATCH 1/3] added downloader request count --- erigon-lib/downloader/downloader.go | 78 ++++--------------- .../downloader/downloader_grpc_server.go | 4 + 2 files changed, 18 insertions(+), 64 deletions(-) diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 84648574583..5a10db7e6ba 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -24,7 +24,6 @@ import ( "encoding/json" "errors" "fmt" - "math" "math/rand" "net/http" "net/url" @@ -34,7 +33,6 @@ import ( "reflect" "runtime" "sort" - "strconv" "strings" "sync" "sync/atomic" @@ -109,6 +107,7 @@ type downloadProgress struct { } type AggStats struct { + Requested int MetadataReady, FilesTotal int32 LastMetadataUpdate *time.Time PeersUnique int32 @@ -150,52 +149,15 @@ func insertCloudflareHeaders(req *http.Request) { } } -// retryBackoff performs exponential backoff based on the attempt number and limited -// by the provided minimum and maximum durations. -// -// It also tries to parse Retry-After response header when a http.StatusTooManyRequests -// (HTTP Code 429) is found in the resp parameter. Hence it will return the number of -// seconds the server states it may be ready to process more requests from this client. -func calcBackoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration { - if resp != nil { - if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable { - if s, ok := resp.Header["Retry-After"]; ok { - if sleep, err := strconv.ParseInt(s[0], 10, 64); err == nil { - return time.Second * time.Duration(sleep) - } - } - } - } - - mult := math.Pow(2, float64(attemptNum)) * float64(min) - sleep := time.Duration(mult) - if float64(sleep) != mult || sleep > max { - sleep = max - } - - return sleep -} - func (r *requestHandler) RoundTrip(req *http.Request) (resp *http.Response, err error) { - defer func() { - if r := recover(); r != nil { - if resp != nil && resp.Body != nil { - resp.Body.Close() - resp.Body = nil - } - - err = fmt.Errorf("http client panic: %s", r) - } - }() - insertCloudflareHeaders(req) resp, err = r.Transport.RoundTrip(req) + delay := 500 * time.Millisecond attempts := 1 retry := true - const minDelay = 500 * time.Millisecond const maxDelay = 5 * time.Second const maxAttempts = 10 @@ -219,24 +181,11 @@ func (r *requestHandler) RoundTrip(req *http.Request) (resp *http.Response, err r.downloader.stats.WebseedBytesDownload.Add(resp.ContentLength) retry = false - // the first two statuses here have been observed from cloudflare - // during testing. The remainder are generally understood to be - // retriable http responses, calcBackoff will use the Retry-After - // header if its availible - case http.StatusInternalServerError, http.StatusBadGateway, - http.StatusRequestTimeout, http.StatusTooEarly, - http.StatusTooManyRequests, http.StatusServiceUnavailable, - http.StatusGatewayTimeout: - + case http.StatusInternalServerError, http.StatusBadGateway: r.downloader.stats.WebseedServerFails.Add(1) - if resp.Body != nil { - resp.Body.Close() - resp.Body = nil - } - attempts++ - delayTimer := time.NewTimer(calcBackoff(minDelay, maxDelay, attempts, resp)) + delayTimer := time.NewTimer(delay) select { case <-delayTimer.C: @@ -244,10 +193,14 @@ func (r *requestHandler) RoundTrip(req *http.Request) (resp *http.Response, err resp, err = r.Transport.RoundTrip(req) r.downloader.stats.WebseedTripCount.Add(1) + if err == nil && delay < maxDelay { + delay = delay + (time.Duration(rand.Intn(200-75)+75)*delay)/100 + } + case <-req.Context().Done(): err = req.Context().Err() } - retry = attempts < maxAttempts + retry = attempts > maxAttempts default: r.downloader.stats.WebseedBytesDownload.Add(resp.ContentLength) @@ -1827,7 +1780,7 @@ func (d *Downloader) ReCalcStats(interval time.Duration) { prevStats, stats := d.stats, d.stats - stats.Completed = true + stats.Completed = len(torrents) == stats.Requested stats.BytesDownload = uint64(connStats.BytesReadUsefulIntendedData.Int64()) stats.BytesUpload = uint64(connStats.BytesWrittenData.Int64()) @@ -1927,11 +1880,8 @@ func (d *Downloader) ReCalcStats(interval time.Duration) { } // more detailed statistic: download rate of each peer (for each file) - if !torrentComplete && progress != 0 { - if _, ok := downloading[torrentName]; ok { - downloading[torrentName] = progress - } - + if _, ok := downloading[torrentName]; ok { + downloading[torrentName] = progress d.logger.Log(d.verbosity, "[snapshots] progress", "file", torrentName, "progress", fmt.Sprintf("%.2f%%", progress), "peers", len(peersOfThisFile), "webseeds", len(weebseedPeersOfThisFile)) d.logger.Log(d.verbosity, "[snapshots] webseed peers", webseedRates...) d.logger.Log(d.verbosity, "[snapshots] bittorrent peers", rates...) @@ -2586,8 +2536,8 @@ func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientC if err != nil { return nil, nil, nil, nil, fmt.Errorf("torrentcfg.openClient: %w", err) } - //c, err = NewMdbxPieceCompletion(db) - c, err = NewMdbxPieceCompletionBatch(db) + c, err = NewMdbxPieceCompletion(db) + //c, err = NewMdbxPieceCompletionBatch(db) if err != nil { return nil, nil, nil, nil, fmt.Errorf("torrentcfg.NewMdbxPieceCompletion: %w", err) } diff --git a/erigon-lib/downloader/downloader_grpc_server.go b/erigon-lib/downloader/downloader_grpc_server.go index 4e0aa0edd34..6923c2db923 100644 --- a/erigon-lib/downloader/downloader_grpc_server.go +++ b/erigon-lib/downloader/downloader_grpc_server.go @@ -58,6 +58,10 @@ func (s *GrpcServer) Add(ctx context.Context, request *proto_downloader.AddReque logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() + s.d.lock.Lock() + s.d.stats.Requested += len(request.Items) + s.d.lock.Unlock() + for i, it := range request.Items { if it.Path == "" { return nil, fmt.Errorf("field 'path' is required") From b0053d855166811aed6f35cc76e25ea8297bb8fb Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Tue, 23 Apr 2024 20:46:09 +0100 Subject: [PATCH 2/3] reinsert missed lines --- erigon-lib/downloader/downloader.go | 66 +++++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 9 deletions(-) diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 5a10db7e6ba..dec8657f6dd 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -24,6 +24,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "math/rand" "net/http" "net/url" @@ -33,6 +34,7 @@ import ( "reflect" "runtime" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -149,15 +151,52 @@ func insertCloudflareHeaders(req *http.Request) { } } +// retryBackoff performs exponential backoff based on the attempt number and limited +// by the provided minimum and maximum durations. +// +// It also tries to parse Retry-After response header when a http.StatusTooManyRequests +// (HTTP Code 429) is found in the resp parameter. Hence it will return the number of +// seconds the server states it may be ready to process more requests from this client. +func calcBackoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration { + if resp != nil { + if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable { + if s, ok := resp.Header["Retry-After"]; ok { + if sleep, err := strconv.ParseInt(s[0], 10, 64); err == nil { + return time.Second * time.Duration(sleep) + } + } + } + } + + mult := math.Pow(2, float64(attemptNum)) * float64(min) + sleep := time.Duration(mult) + if float64(sleep) != mult || sleep > max { + sleep = max + } + + return sleep +} + func (r *requestHandler) RoundTrip(req *http.Request) (resp *http.Response, err error) { + defer func() { + if r := recover(); r != nil { + if resp != nil && resp.Body != nil { + resp.Body.Close() + resp.Body = nil + } + + err = fmt.Errorf("http client panic: %s", r) + } + }() + insertCloudflareHeaders(req) resp, err = r.Transport.RoundTrip(req) - delay := 500 * time.Millisecond attempts := 1 retry := true + const minDelay = 500 * time.Millisecond const maxDelay = 5 * time.Second const maxAttempts = 10 @@ -181,11 +220,24 @@ func (r *requestHandler) RoundTrip(req *http.Request) (resp *http.Response, err r.downloader.stats.WebseedBytesDownload.Add(resp.ContentLength) retry = false - case http.StatusInternalServerError, http.StatusBadGateway: + // the first two statuses here have been observed from cloudflare + // during testing. The remainder are generally understood to be + // retriable http responses, calcBackoff will use the Retry-After + // header if its availible + case http.StatusInternalServerError, http.StatusBadGateway, + http.StatusRequestTimeout, http.StatusTooEarly, + http.StatusTooManyRequests, http.StatusServiceUnavailable, + http.StatusGatewayTimeout: + r.downloader.stats.WebseedServerFails.Add(1) + if resp.Body != nil { + resp.Body.Close() + resp.Body = nil + } + attempts++ - delayTimer := time.NewTimer(delay) + delayTimer := time.NewTimer(calcBackoff(minDelay, maxDelay, attempts, resp)) select { case <-delayTimer.C: @@ -193,10 +245,6 @@ func (r *requestHandler) RoundTrip(req *http.Request) (resp *http.Response, err resp, err = r.Transport.RoundTrip(req) r.downloader.stats.WebseedTripCount.Add(1) - if err == nil && delay < maxDelay { - delay = delay + (time.Duration(rand.Intn(200-75)+75)*delay)/100 - } - case <-req.Context().Done(): err = req.Context().Err() } @@ -2536,8 +2584,8 @@ func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientC if err != nil { return nil, nil, nil, nil, fmt.Errorf("torrentcfg.openClient: %w", err) } - c, err = NewMdbxPieceCompletion(db) - //c, err = NewMdbxPieceCompletionBatch(db) + //c, err = NewMdbxPieceCompletion(db) + c, err = NewMdbxPieceCompletionBatch(db) if err != nil { return nil, nil, nil, nil, fmt.Errorf("torrentcfg.NewMdbxPieceCompletion: %w", err) } From 8da8b05cc95486a83e432a38a98668e22740a2e3 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Tue, 23 Apr 2024 21:00:05 +0100 Subject: [PATCH 3/3] missed fix --- erigon-lib/downloader/downloader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index dec8657f6dd..6c8adb14b0e 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -248,7 +248,7 @@ func (r *requestHandler) RoundTrip(req *http.Request) (resp *http.Response, err case <-req.Context().Done(): err = req.Context().Err() } - retry = attempts > maxAttempts + retry = attempts < maxAttempts default: r.downloader.stats.WebseedBytesDownload.Add(resp.ContentLength)