diff --git a/chain/sync_manager.go b/chain/sync_manager.go index c7fdea7261e..c25068f60c2 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -2,7 +2,9 @@ package chain import ( "context" + "os" "sort" + "strings" "sync" "github.com/filecoin-project/lotus/chain/types" @@ -11,6 +13,14 @@ import ( const BootstrapPeerThreshold = 2 +var coalesceForksParents = false + +func init() { + if os.Getenv("LOTUS_SYNC_REL_PARENT") == "yes" { + coalesceForksParents = true + } +} + const ( BSStateInit = 0 BSStateSelected = 1 @@ -152,6 +162,19 @@ func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket { return &stb } +func (sbs *syncBucketSet) String() string { + var bStrings []string + for _, b := range sbs.buckets { + var tsStrings []string + for _, t := range b.tips { + tsStrings = append(tsStrings, t.String()) + } + bStrings = append(bStrings, "["+strings.Join(tsStrings, ",")+"]") + } + + return "{" + strings.Join(bStrings, ";") + "}" +} + func (sbs *syncBucketSet) RelatedToAny(ts *types.TipSet) bool { for _, b := range sbs.buckets { if b.sameChainAs(ts) { @@ -198,13 +221,17 @@ func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) { } func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket { + var bOut *syncTargetBucket for _, b := range sbs.buckets { if b.sameChainAs(ts) { sbs.removeBucket(b) - return b + if bOut == nil { + bOut = &syncTargetBucket{} + } + bOut.tips = append(bOut.tips, b.tips...) } } - return nil + return bOut } func (sbs *syncBucketSet) Heaviest() *types.TipSet { @@ -224,8 +251,7 @@ func (sbs *syncBucketSet) Empty() bool { } type syncTargetBucket struct { - tips []*types.TipSet - count int + tips []*types.TipSet } func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { @@ -239,12 +265,14 @@ func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { if ts.Parents() == t.Key() { return true } + if coalesceForksParents && ts.Parents() == t.Parents() { + return true + } } return false } func (stb *syncTargetBucket) add(ts *types.TipSet) { - stb.count++ for _, t := range stb.tips { if t.Equals(ts) { @@ -294,7 +322,6 @@ func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) { } func (sm *syncManager) syncScheduler() { - for { select { case ts, ok := <-sm.incomingTipSets: @@ -326,7 +353,8 @@ func (sm *syncManager) scheduleIncoming(ts *types.TipSet) { var relatedToActiveSync bool for _, acts := range sm.activeSyncs { if ts.Equals(acts) { - break + // ignore, we are already syncing it + return } if ts.Parents() == acts.Key() { @@ -376,7 +404,9 @@ func (sm *syncManager) scheduleProcessResult(res *syncResult) { sm.nextSyncTarget = relbucket sm.workerChan = sm.syncTargets } else { - sm.syncQueue.buckets = append(sm.syncQueue.buckets, relbucket) + for _, t := range relbucket.tips { + sm.syncQueue.Insert(t) + } } return } diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go index 269b3a62e07..709e03a4108 100644 --- a/chain/sync_manager_test.go +++ b/chain/sync_manager_test.go @@ -67,6 +67,69 @@ func assertGetSyncOp(t *testing.T, c chan *syncOp, ts *types.TipSet) { } } +func TestSyncManagerEdgeCase(t *testing.T) { + ctx := context.Background() + + a := mock.TipSet(mock.MkBlock(genTs, 1, 1)) + t.Logf("a: %s", a) + b1 := mock.TipSet(mock.MkBlock(a, 1, 2)) + t.Logf("b1: %s", b1) + b2 := mock.TipSet(mock.MkBlock(a, 2, 3)) + t.Logf("b2: %s", b2) + c1 := mock.TipSet(mock.MkBlock(b1, 2, 4)) + t.Logf("c1: %s", c1) + c2 := mock.TipSet(mock.MkBlock(b2, 1, 5)) + t.Logf("c2: %s", c2) + d1 := mock.TipSet(mock.MkBlock(c1, 1, 6)) + t.Logf("d1: %s", d1) + e1 := mock.TipSet(mock.MkBlock(d1, 1, 7)) + t.Logf("e1: %s", e1) + + runSyncMgrTest(t, "edgeCase", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) { + sm.SetPeerHead(ctx, "peer1", a) + assertGetSyncOp(t, stc, a) + + sm.SetPeerHead(ctx, "peer1", b1) + sm.SetPeerHead(ctx, "peer1", b2) + // b1 and b2 are being processed + + b1op := <-stc + b2op := <-stc + if !b1op.ts.Equals(b1) { + b1op, b2op = b2op, b1op + } + + sm.SetPeerHead(ctx, "peer2", c2) // c2 is put into activeSyncTips at index 0 + sm.SetPeerHead(ctx, "peer2", c1) // c1 is put into activeSyncTips at index 1 + sm.SetPeerHead(ctx, "peer3", b2) // b2 is related to c2 and even though it is actively synced it is put into activeSyncTips index 0 + sm.SetPeerHead(ctx, "peer1", a) // a is related to b2 and is put into activeSyncTips index 0 + + b1op.done() // b1 completes first, is related to a, so it pops activeSyncTips index 0 + // even though correct one is index 1 + + b2op.done() + // b2 completes and is not related to c1, so it leaves activeSyncTips as it is + + waitUntilAllWorkersAreDone(stc) + + if len(sm.activeSyncTips.buckets) != 0 { + t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String()) + } + }) +} + +func waitUntilAllWorkersAreDone(stc chan *syncOp) { + for i := 0; i < 10; { + select { + case so := <-stc: + so.done() + default: + i++ + time.Sleep(10 * time.Millisecond) + } + } +} + func TestSyncManager(t *testing.T) { ctx := context.Background()