diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index a781f8602f7..b49dd9837d3 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -1007,7 +1007,7 @@ func (d *Downloader) mainLoop(silent bool) error { switch { case len(t.PeerConns()) > 0: - d.logger.Debug("[snapshots] Downloading from torrent", "file", t.Name(), "peers", len(t.PeerConns())) + d.logger.Debug("[snapshots] Downloading from BitTorrent", "file", t.Name(), "peers", len(t.PeerConns())) delete(waiting, t.Name()) d.torrentDownload(t, downloadComplete, sem) case len(t.WebseedPeerConns()) > 0: @@ -2108,7 +2108,7 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash, if d.alreadyHaveThisName(name) || !IsSnapNameAllowed(name) { return nil } - isProhibited, err := d.torrentFiles.newDownloadsAreProhibited(name) + isProhibited, err := d.torrentFiles.NewDownloadsAreProhibited(name) if err != nil { return err } @@ -2145,12 +2145,8 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash, // TOOD: add `d.webseeds.Complete` chan - to prevent race - Discover is also async // TOOD: maybe run it in goroutine and return channel - to select with p2p - ok, err := d.webseeds.DownloadAndSaveTorrentFile(ctx, name) + ts, ok, err := d.webseeds.DownloadAndSaveTorrentFile(ctx, name) if ok && err == nil { - ts, err := d.torrentFiles.LoadByPath(filepath.Join(d.SnapDir(), name+".torrent")) - if err != nil { - return - } _, _, err = addTorrentFile(ctx, ts, d.torrentClient, d.db, d.webseeds) if err != nil { return diff --git a/erigon-lib/downloader/downloader_grpc_server.go b/erigon-lib/downloader/downloader_grpc_server.go index c5981b4134b..d9e2a9b2f49 100644 --- a/erigon-lib/downloader/downloader_grpc_server.go +++ b/erigon-lib/downloader/downloader_grpc_server.go @@ -46,7 +46,7 @@ type GrpcServer struct { } func (s *GrpcServer) ProhibitNewDownloads(ctx context.Context, req *proto_downloader.ProhibitNewDownloadsRequest) (*emptypb.Empty, error) { - return &emptypb.Empty{}, s.d.torrentFiles.prohibitNewDownloads(req.Type) + return &emptypb.Empty{}, s.d.torrentFiles.ProhibitNewDownloads(req.Type) } // Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast) diff --git a/erigon-lib/downloader/torrent_files.go b/erigon-lib/downloader/torrent_files.go index 35fc23a8b2c..91a90e48b6a 100644 --- a/erigon-lib/downloader/torrent_files.go +++ b/erigon-lib/downloader/torrent_files.go @@ -57,9 +57,37 @@ func (tf *TorrentFiles) Create(name string, res []byte) error { tf.lock.Lock() defer tf.lock.Unlock() - return tf.create(filepath.Join(tf.dir, name), res) + return tf.create(name, res) } -func (tf *TorrentFiles) create(torrentFilePath string, res []byte) error { + +func (tf *TorrentFiles) CreateIfNotProhibited(name string, res []byte) (ts *torrent.TorrentSpec, prohibited, created bool, err error) { + tf.lock.Lock() + defer tf.lock.Unlock() + prohibited, err = tf.newDownloadsAreProhibited(name) + if err != nil { + return nil, false, false, err + } + + if !tf.exists(name) && !prohibited { + err = tf.create(name, res) + if err != nil { + return nil, false, false, err + } + } + + ts, err = tf.load(filepath.Join(tf.dir, name)) + if err != nil { + return nil, false, false, err + } + return ts, prohibited, false, nil +} + +func (tf *TorrentFiles) create(name string, res []byte) error { + if !strings.HasSuffix(name, ".torrent") { + name += ".torrent" + } + torrentFilePath := filepath.Join(tf.dir, name) + if len(res) == 0 { return fmt.Errorf("try to write 0 bytes to file: %s", torrentFilePath) } @@ -132,9 +160,13 @@ const ProhibitNewDownloadsFileName = "prohibit_new_downloads.lock" // Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast) // After "download once" - Erigon will produce and seed new files // Downloader will able: seed new files (already existing on FS), download uncomplete parts of existing files (if Verify found some bad parts) -func (tf *TorrentFiles) prohibitNewDownloads(t string) error { +func (tf *TorrentFiles) ProhibitNewDownloads(t string) error { tf.lock.Lock() defer tf.lock.Unlock() + return tf.prohibitNewDownloads(t) +} + +func (tf *TorrentFiles) prohibitNewDownloads(t string) error { // open or create file ProhibitNewDownloadsFileName f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_RDONLY, 0644) if err != nil { @@ -174,9 +206,13 @@ func (tf *TorrentFiles) prohibitNewDownloads(t string) error { return f.Sync() } -func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) { +func (tf *TorrentFiles) NewDownloadsAreProhibited(name string) (bool, error) { tf.lock.Lock() defer tf.lock.Unlock() + return tf.newDownloadsAreProhibited(name) +} + +func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) { f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_APPEND|os.O_RDONLY, 0644) if err != nil { return false, err @@ -185,11 +221,11 @@ func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) { var prohibitedList []string torrentListJsonBytes, err := io.ReadAll(f) if err != nil { - return false, fmt.Errorf("newDownloadsAreProhibited: read file: %w", err) + return false, fmt.Errorf("NewDownloadsAreProhibited: read file: %w", err) } if len(torrentListJsonBytes) > 0 { if err := json.Unmarshal(torrentListJsonBytes, &prohibitedList); err != nil { - return false, fmt.Errorf("newDownloadsAreProhibited: unmarshal: %w", err) + return false, fmt.Errorf("NewDownloadsAreProhibited: unmarshal: %w", err) } } for _, p := range prohibitedList { diff --git a/erigon-lib/downloader/webseed.go b/erigon-lib/downloader/webseed.go index a707bee68dd..fcd3b6fe2c6 100644 --- a/erigon-lib/downloader/webseed.go +++ b/erigon-lib/downloader/webseed.go @@ -15,7 +15,6 @@ import ( "sync" "github.com/anacrolix/torrent" - "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon-lib/chain/snapcfg" "golang.org/x/sync/errgroup" @@ -338,7 +337,7 @@ func (d *WebSeeds) constructListsOfFiles(ctx context.Context, httpProviders []*u } // check if we need to prohibit new downloads for some files for name := range manifestResponse { - prohibited, err := d.torrentFiles.newDownloadsAreProhibited(name) + prohibited, err := d.torrentFiles.NewDownloadsAreProhibited(name) if prohibited || err != nil { delete(manifestResponse, name) } @@ -356,7 +355,7 @@ func (d *WebSeeds) constructListsOfFiles(ctx context.Context, httpProviders []*u } // check if we need to prohibit new downloads for some files for name := range response { - prohibited, err := d.torrentFiles.newDownloadsAreProhibited(name) + prohibited, err := d.torrentFiles.NewDownloadsAreProhibited(name) if prohibited || err != nil { delete(response, name) } @@ -576,34 +575,28 @@ func (d *WebSeeds) downloadTorrentFilesFromProviders(ctx context.Context, rootDi return webSeedMap } -func (d *WebSeeds) DownloadAndSaveTorrentFile(ctx context.Context, name string) (bool, error) { +func (d *WebSeeds) DownloadAndSaveTorrentFile(ctx context.Context, name string) (ts *torrent.TorrentSpec, ok bool, err error) { urls, ok := d.ByFileName(name) if !ok { - return false, nil + return nil, false, nil } for _, urlStr := range urls { urlStr += ".torrent" parsedUrl, err := url.Parse(urlStr) if err != nil { d.logger.Log(d.verbosity, "[snapshots] callTorrentHttpProvider parse url", "err", err) - continue + continue // it's ok if some HTTP provider failed - try next one } res, err := d.callTorrentHttpProvider(ctx, parsedUrl, name) if err != nil { - d.logger.Log(d.verbosity, "[snapshots] callTorrentHttpProvider", "name", name, "err", err) - continue - } - if d.torrentFiles.Exists(name) { - continue - } - if err := d.torrentFiles.Create(name, res); err != nil { d.logger.Log(d.verbosity, "[snapshots] .torrent from webseed rejected", "name", name, "err", err) - continue + continue // it's ok if some HTTP provider failed - try next one } - return true, nil + ts, _, _, err = d.torrentFiles.CreateIfNotProhibited(name, res) + return ts, ts != nil, err } - return false, nil + return nil, false, nil } func (d *WebSeeds) callTorrentHttpProvider(ctx context.Context, url *url.URL, fileName string) ([]byte, error) {