Skip to content

Commit

Permalink
PopRelated should pop all
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Sztandera <[email protected]>
  • Loading branch information
Jakub Sztandera committed Oct 22, 2020
1 parent 24fc7d4 commit a436491
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 42 deletions.
38 changes: 12 additions & 26 deletions chain/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/filecoin-project/lotus/chain/types"
peer "github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -212,16 +211,12 @@ func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) {
sbs.buckets = nbuckets
}

func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket {
var bOut *syncTargetBucket
func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) []*syncTargetBucket {
var bOut []*syncTargetBucket
for _, b := range sbs.buckets {
if b.sameChainAs(ts) {
if bOut == nil {
sbs.removeBucket(b)
bOut = b
} else {
log.Errorf("REPORT THIS more that one related bucket for %s", ts)
}
sbs.removeBucket(b)
bOut = append(bOut, b)
}
}
return bOut
Expand Down Expand Up @@ -312,8 +307,6 @@ func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) {
}

func (sm *syncManager) syncScheduler() {
t := time.NewTicker(10 * time.Second)

for {
select {
case ts, ok := <-sm.incomingTipSets:
Expand All @@ -330,16 +323,6 @@ func (sm *syncManager) syncScheduler() {
case <-sm.stop:
log.Info("sync scheduler shutting down")
return
case <-t.C:
activeSyncs := make([]types.TipSetKey, 0, len(sm.activeSyncs))
for tsk := range sm.activeSyncs {
activeSyncs = append(activeSyncs, tsk)
}
sort.Slice(activeSyncs, func(i, j int) bool {
return string(activeSyncs[i].Bytes()) < string(activeSyncs[j].Bytes())
})

log.Infof("activeSyncs: %v, activeSyncTips: %s ", activeSyncs, sm.activeSyncTips.String())
}
}
}
Expand Down Expand Up @@ -399,15 +382,18 @@ func (sm *syncManager) scheduleProcessResult(res *syncResult) {
}

delete(sm.activeSyncs, res.ts.Key())
relbucket := sm.activeSyncTips.PopRelated(res.ts)
if relbucket != nil {
relbuckets := sm.activeSyncTips.PopRelated(res.ts)
if len(relbuckets) != 0 {
if res.success {
if sm.nextSyncTarget == nil {
sm.nextSyncTarget = relbucket
sm.nextSyncTarget = relbuckets[0]
sm.workerChan = sm.syncTargets
relbuckets = relbuckets[1:]
} else {
for _, t := range relbucket.tips {
sm.syncQueue.Insert(t)
for _, b := range relbuckets {
for _, t := range b.tips {
sm.syncQueue.Insert(t)
}
}
}
return
Expand Down
34 changes: 18 additions & 16 deletions chain/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ func TestSyncManagerEdgeCase(t *testing.T) {
t.Logf("c1: %s", c1)
c2 := mock.TipSet(mock.MkBlock(b2, 1, 5))
t.Logf("c2: %s", c2)
d1 := mock.TipSet(mock.MkBlock(c1, 1, 6))
t.Logf("d1: %s", d1)
e1 := mock.TipSet(mock.MkBlock(d1, 1, 7))
t.Logf("e1: %s", e1)

runSyncMgrTest(t, "edgeCase", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
sm.SetPeerHead(ctx, "peer1", a)
assertGetSyncOp(t, stc, a)
time.Sleep(10 * time.Millisecond)
t.Logf("bootstate: %d", sm.bootstrapState)

sm.SetPeerHead(ctx, "peer1", b1)
sm.SetPeerHead(ctx, "peer1", b2)
Expand All @@ -96,7 +98,6 @@ func TestSyncManagerEdgeCase(t *testing.T) {
if !b1op.ts.Equals(b1) {
b1op, b2op = b2op, b1op
}
t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String())

sm.SetPeerHead(ctx, "peer2", c2) // c2 is put into activeSyncTips at index 0
sm.SetPeerHead(ctx, "peer2", c1) // c1 is put into activeSyncTips at index 1
Expand All @@ -106,28 +107,29 @@ func TestSyncManagerEdgeCase(t *testing.T) {
b1op.done() // b1 completes first, is related to a, so it pops activeSyncTips index 0
// even though correct one is index 1

time.Sleep(100 * time.Millisecond)
t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String())
b2op.done()
// b2 completes and is not related to c1, so it leaves activeSyncTips as it is
for i := 0; i < 10; {
select {
case so := <-stc:
so.done()
default:
i++
time.Sleep(10 * time.Millisecond)
}
}

time.Sleep(10 * time.Millisecond)
waitUntilAllWorkersAreDone(stc)

if len(sm.activeSyncTips.buckets) != 0 {
t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String())
}
t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String())
})
}

func waitUntilAllWorkersAreDone(stc chan *syncOp) {
for i := 0; i < 10; {
select {
case so := <-stc:
so.done()
default:
i++
time.Sleep(10 * time.Millisecond)
}
}
}

func TestSyncManager(t *testing.T) {
ctx := context.Background()

Expand Down

0 comments on commit a436491

Please sign in to comment.