Skip to content

Commit

Permalink
downloader: manual .lock remove may lead to race and creation of data…
Browse files Browse the repository at this point in the history
… files without .torrent (#9782)

```
if !prohibited(file) && !exists(file) {
   create(file)
}
```
such pattern is always race - even if `prohibited`/`exists()`/`create()`
are atomic inside.
to eliminate race - need move towards analog of `CompareAndSwap` -
`CreateIfNotProhibited` which in 1 mutex lock will: check_lock,
existence, create.
  • Loading branch information
AskAlexSharov authored Apr 11, 2024
1 parent a5d82cf commit c756a55
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 30 deletions.
10 changes: 3 additions & 7 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ func (d *Downloader) mainLoop(silent bool) error {

switch {
case len(t.PeerConns()) > 0:
d.logger.Debug("[snapshots] Downloading from torrent", "file", t.Name(), "peers", len(t.PeerConns()))
d.logger.Debug("[snapshots] Downloading from BitTorrent", "file", t.Name(), "peers", len(t.PeerConns()))
delete(waiting, t.Name())
d.torrentDownload(t, downloadComplete, sem)
case len(t.WebseedPeerConns()) > 0:
Expand Down Expand Up @@ -2108,7 +2108,7 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
if d.alreadyHaveThisName(name) || !IsSnapNameAllowed(name) {
return nil
}
isProhibited, err := d.torrentFiles.newDownloadsAreProhibited(name)
isProhibited, err := d.torrentFiles.NewDownloadsAreProhibited(name)
if err != nil {
return err
}
Expand Down Expand Up @@ -2145,12 +2145,8 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
// TOOD: add `d.webseeds.Complete` chan - to prevent race - Discover is also async
// TOOD: maybe run it in goroutine and return channel - to select with p2p

ok, err := d.webseeds.DownloadAndSaveTorrentFile(ctx, name)
ts, ok, err := d.webseeds.DownloadAndSaveTorrentFile(ctx, name)
if ok && err == nil {
ts, err := d.torrentFiles.LoadByPath(filepath.Join(d.SnapDir(), name+".torrent"))
if err != nil {
return
}
_, _, err = addTorrentFile(ctx, ts, d.torrentClient, d.db, d.webseeds)
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type GrpcServer struct {
}

func (s *GrpcServer) ProhibitNewDownloads(ctx context.Context, req *proto_downloader.ProhibitNewDownloadsRequest) (*emptypb.Empty, error) {
return &emptypb.Empty{}, s.d.torrentFiles.prohibitNewDownloads(req.Type)
return &emptypb.Empty{}, s.d.torrentFiles.ProhibitNewDownloads(req.Type)
}

// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
Expand Down
48 changes: 42 additions & 6 deletions erigon-lib/downloader/torrent_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,37 @@ func (tf *TorrentFiles) Create(name string, res []byte) error {

tf.lock.Lock()
defer tf.lock.Unlock()
return tf.create(filepath.Join(tf.dir, name), res)
return tf.create(name, res)
}
func (tf *TorrentFiles) create(torrentFilePath string, res []byte) error {

func (tf *TorrentFiles) CreateIfNotProhibited(name string, res []byte) (ts *torrent.TorrentSpec, prohibited, created bool, err error) {
tf.lock.Lock()
defer tf.lock.Unlock()
prohibited, err = tf.newDownloadsAreProhibited(name)
if err != nil {
return nil, false, false, err
}

if !tf.exists(name) && !prohibited {
err = tf.create(name, res)
if err != nil {
return nil, false, false, err
}
}

ts, err = tf.load(filepath.Join(tf.dir, name))
if err != nil {
return nil, false, false, err
}
return ts, prohibited, false, nil
}

func (tf *TorrentFiles) create(name string, res []byte) error {
if !strings.HasSuffix(name, ".torrent") {
name += ".torrent"
}
torrentFilePath := filepath.Join(tf.dir, name)

if len(res) == 0 {
return fmt.Errorf("try to write 0 bytes to file: %s", torrentFilePath)
}
Expand Down Expand Up @@ -132,9 +160,13 @@ const ProhibitNewDownloadsFileName = "prohibit_new_downloads.lock"
// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
// After "download once" - Erigon will produce and seed new files
// Downloader will able: seed new files (already existing on FS), download uncomplete parts of existing files (if Verify found some bad parts)
func (tf *TorrentFiles) prohibitNewDownloads(t string) error {
func (tf *TorrentFiles) ProhibitNewDownloads(t string) error {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.prohibitNewDownloads(t)
}

func (tf *TorrentFiles) prohibitNewDownloads(t string) error {
// open or create file ProhibitNewDownloadsFileName
f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_RDONLY, 0644)
if err != nil {
Expand Down Expand Up @@ -174,9 +206,13 @@ func (tf *TorrentFiles) prohibitNewDownloads(t string) error {
return f.Sync()
}

func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) {
func (tf *TorrentFiles) NewDownloadsAreProhibited(name string) (bool, error) {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.newDownloadsAreProhibited(name)
}

func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) {
f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_APPEND|os.O_RDONLY, 0644)
if err != nil {
return false, err
Expand All @@ -185,11 +221,11 @@ func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) {
var prohibitedList []string
torrentListJsonBytes, err := io.ReadAll(f)
if err != nil {
return false, fmt.Errorf("newDownloadsAreProhibited: read file: %w", err)
return false, fmt.Errorf("NewDownloadsAreProhibited: read file: %w", err)
}
if len(torrentListJsonBytes) > 0 {
if err := json.Unmarshal(torrentListJsonBytes, &prohibitedList); err != nil {
return false, fmt.Errorf("newDownloadsAreProhibited: unmarshal: %w", err)
return false, fmt.Errorf("NewDownloadsAreProhibited: unmarshal: %w", err)
}
}
for _, p := range prohibitedList {
Expand Down
25 changes: 9 additions & 16 deletions erigon-lib/downloader/webseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"sync"

"github.com/anacrolix/torrent"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/chain/snapcfg"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -338,7 +337,7 @@ func (d *WebSeeds) constructListsOfFiles(ctx context.Context, httpProviders []*u
}
// check if we need to prohibit new downloads for some files
for name := range manifestResponse {
prohibited, err := d.torrentFiles.newDownloadsAreProhibited(name)
prohibited, err := d.torrentFiles.NewDownloadsAreProhibited(name)
if prohibited || err != nil {
delete(manifestResponse, name)
}
Expand All @@ -356,7 +355,7 @@ func (d *WebSeeds) constructListsOfFiles(ctx context.Context, httpProviders []*u
}
// check if we need to prohibit new downloads for some files
for name := range response {
prohibited, err := d.torrentFiles.newDownloadsAreProhibited(name)
prohibited, err := d.torrentFiles.NewDownloadsAreProhibited(name)
if prohibited || err != nil {
delete(response, name)
}
Expand Down Expand Up @@ -576,34 +575,28 @@ func (d *WebSeeds) downloadTorrentFilesFromProviders(ctx context.Context, rootDi
return webSeedMap
}

func (d *WebSeeds) DownloadAndSaveTorrentFile(ctx context.Context, name string) (bool, error) {
func (d *WebSeeds) DownloadAndSaveTorrentFile(ctx context.Context, name string) (ts *torrent.TorrentSpec, ok bool, err error) {
urls, ok := d.ByFileName(name)
if !ok {
return false, nil
return nil, false, nil
}
for _, urlStr := range urls {
urlStr += ".torrent"
parsedUrl, err := url.Parse(urlStr)
if err != nil {
d.logger.Log(d.verbosity, "[snapshots] callTorrentHttpProvider parse url", "err", err)
continue
continue // it's ok if some HTTP provider failed - try next one
}
res, err := d.callTorrentHttpProvider(ctx, parsedUrl, name)
if err != nil {
d.logger.Log(d.verbosity, "[snapshots] callTorrentHttpProvider", "name", name, "err", err)
continue
}
if d.torrentFiles.Exists(name) {
continue
}
if err := d.torrentFiles.Create(name, res); err != nil {
d.logger.Log(d.verbosity, "[snapshots] .torrent from webseed rejected", "name", name, "err", err)
continue
continue // it's ok if some HTTP provider failed - try next one
}
return true, nil
ts, _, _, err = d.torrentFiles.CreateIfNotProhibited(name, res)
return ts, ts != nil, err
}

return false, nil
return nil, false, nil
}

func (d *WebSeeds) callTorrentHttpProvider(ctx context.Context, url *url.URL, fileName string) ([]byte, error) {
Expand Down

0 comments on commit c756a55

Please sign in to comment.