From 4b149450bbcfcd7d590be5e995304debef49545e Mon Sep 17 00:00:00 2001 From: Jan Kalina Date: Mon, 8 Jan 2024 18:06:41 +0100 Subject: [PATCH] Fix race on BasePeerLeecher.done --- .../basepeerleecher/session.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/gossip/basestream/basestreamleecher/basepeerleecher/session.go b/gossip/basestream/basestreamleecher/basepeerleecher/session.go index 19d1ed76..d2cb4d7f 100644 --- a/gossip/basestream/basestreamleecher/basepeerleecher/session.go +++ b/gossip/basestream/basestreamleecher/basepeerleecher/session.go @@ -3,6 +3,7 @@ package basepeerleecher import ( "errors" "sync" + "sync/atomic" "time" ) @@ -37,9 +38,8 @@ type BasePeerLeecher struct { notifyReceivedChunk chan *receivedChunk - quitMu sync.Mutex - quit chan struct{} - done bool + quit chan struct{} + done uint32 wg *sync.WaitGroup @@ -78,16 +78,14 @@ func (d *BasePeerLeecher) Stop() { } func (d *BasePeerLeecher) Terminate() { - d.quitMu.Lock() - defer d.quitMu.Unlock() - if !d.done { + // set done, close the chan if it was not done yet + if atomic.SwapUint32(&d.done, 1) == 0 { close(d.quit) - d.done = true } } func (d *BasePeerLeecher) Stopped() bool { - return d.done + return atomic.LoadUint32(&d.done) != 0 } // NotifyChunkReceived injects new pack infos from a peer @@ -115,8 +113,7 @@ func (d *BasePeerLeecher) loop() { return case op := <-d.notifyReceivedChunk: - if d.done { - d.Terminate() + if d.Stopped() { continue } if len(d.processingChunks) < d.cfg.ParallelChunksDownload*2 {