diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 1c5951bb5cf..0dcf6e8edf9 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -5,7 +5,6 @@ import ( "sort" "strings" "sync" - "time" "github.com/filecoin-project/lotus/chain/types" peer "github.com/libp2p/go-libp2p-core/peer" @@ -212,16 +211,12 @@ func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) { sbs.buckets = nbuckets } -func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket { - var bOut *syncTargetBucket +func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) []*syncTargetBucket { + var bOut []*syncTargetBucket for _, b := range sbs.buckets { if b.sameChainAs(ts) { - if bOut == nil { - sbs.removeBucket(b) - bOut = b - } else { - log.Errorf("REPORT THIS more that one related bucket for %s", ts) - } + sbs.removeBucket(b) + bOut = append(bOut, b) } } return bOut @@ -312,8 +307,6 @@ func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) { } func (sm *syncManager) syncScheduler() { - t := time.NewTicker(10 * time.Second) - for { select { case ts, ok := <-sm.incomingTipSets: @@ -330,16 +323,6 @@ func (sm *syncManager) syncScheduler() { case <-sm.stop: log.Info("sync scheduler shutting down") return - case <-t.C: - activeSyncs := make([]types.TipSetKey, 0, len(sm.activeSyncs)) - for tsk := range sm.activeSyncs { - activeSyncs = append(activeSyncs, tsk) - } - sort.Slice(activeSyncs, func(i, j int) bool { - return string(activeSyncs[i].Bytes()) < string(activeSyncs[j].Bytes()) - }) - - log.Infof("activeSyncs: %v, activeSyncTips: %s ", activeSyncs, sm.activeSyncTips.String()) } } } @@ -399,15 +382,18 @@ func (sm *syncManager) scheduleProcessResult(res *syncResult) { } delete(sm.activeSyncs, res.ts.Key()) - relbucket := sm.activeSyncTips.PopRelated(res.ts) - if relbucket != nil { + relbuckets := sm.activeSyncTips.PopRelated(res.ts) + if len(relbuckets) != 0 { if res.success { if sm.nextSyncTarget == nil { - sm.nextSyncTarget = relbucket + sm.nextSyncTarget = relbuckets[0] sm.workerChan = sm.syncTargets + relbuckets = relbuckets[1:] } else { - for _, t := range relbucket.tips { - sm.syncQueue.Insert(t) + for _, b := range relbuckets { + for _, t := range b.tips { + sm.syncQueue.Insert(t) + } } } return diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go index 276e4bb3604..709e03a4108 100644 --- a/chain/sync_manager_test.go +++ b/chain/sync_manager_test.go @@ -80,12 +80,14 @@ func TestSyncManagerEdgeCase(t *testing.T) { t.Logf("c1: %s", c1) c2 := mock.TipSet(mock.MkBlock(b2, 1, 5)) t.Logf("c2: %s", c2) + d1 := mock.TipSet(mock.MkBlock(c1, 1, 6)) + t.Logf("d1: %s", d1) + e1 := mock.TipSet(mock.MkBlock(d1, 1, 7)) + t.Logf("e1: %s", e1) runSyncMgrTest(t, "edgeCase", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) { sm.SetPeerHead(ctx, "peer1", a) assertGetSyncOp(t, stc, a) - time.Sleep(10 * time.Millisecond) - t.Logf("bootstate: %d", sm.bootstrapState) sm.SetPeerHead(ctx, "peer1", b1) sm.SetPeerHead(ctx, "peer1", b2) @@ -96,7 +98,6 @@ func TestSyncManagerEdgeCase(t *testing.T) { if !b1op.ts.Equals(b1) { b1op, b2op = b2op, b1op } - t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String()) sm.SetPeerHead(ctx, "peer2", c2) // c2 is put into activeSyncTips at index 0 sm.SetPeerHead(ctx, "peer2", c1) // c1 is put into activeSyncTips at index 1 @@ -106,28 +107,29 @@ func TestSyncManagerEdgeCase(t *testing.T) { b1op.done() // b1 completes first, is related to a, so it pops activeSyncTips index 0 // even though correct one is index 1 - time.Sleep(100 * time.Millisecond) - t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String()) b2op.done() // b2 completes and is not related to c1, so it leaves activeSyncTips as it is - for i := 0; i < 10; { - select { - case so := <-stc: - so.done() - default: - i++ - time.Sleep(10 * time.Millisecond) - } - } - time.Sleep(10 * time.Millisecond) + waitUntilAllWorkersAreDone(stc) + if len(sm.activeSyncTips.buckets) != 0 { t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String()) } - t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String()) }) } +func waitUntilAllWorkersAreDone(stc chan *syncOp) { + for i := 0; i < 10; { + select { + case so := <-stc: + so.done() + default: + i++ + time.Sleep(10 * time.Millisecond) + } + } +} + func TestSyncManager(t *testing.T) { ctx := context.Background()