From febc28f83f71b46d7d121e22beca1d7d40108af4 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Thu, 22 Oct 2020 04:57:29 +0200 Subject: [PATCH 1/6] Add log and Insert Signed-off-by: Jakub Sztandera --- chain/sync_manager.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index c7fdea7261e..53fd07bc9e3 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -4,6 +4,7 @@ import ( "context" "sort" "sync" + "time" "github.com/filecoin-project/lotus/chain/types" peer "github.com/libp2p/go-libp2p-core/peer" @@ -224,8 +225,7 @@ func (sbs *syncBucketSet) Empty() bool { } type syncTargetBucket struct { - tips []*types.TipSet - count int + tips []*types.TipSet } func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { @@ -244,7 +244,6 @@ func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { } func (stb *syncTargetBucket) add(ts *types.TipSet) { - stb.count++ for _, t := range stb.tips { if t.Equals(ts) { @@ -294,6 +293,7 @@ func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) { } func (sm *syncManager) syncScheduler() { + t := time.NewTicker(10 * time.Second) for { select { @@ -311,6 +311,16 @@ func (sm *syncManager) syncScheduler() { case <-sm.stop: log.Info("sync scheduler shutting down") return + case <-t.C: + activeSyncs := make([]types.TipSetKey, len(sm.activeSyncs), 0) + for tsk := range sm.activeSyncs { + activeSyncs = append(activeSyncs, tsk) + } + sort.Slice(activeSyncs, func(i, j int) bool { + return string(activeSyncs[i].Bytes()) < string(activeSyncs[j].Bytes()) + }) + + log.Infof("activeSyncs: %v, ", activeSyncs) } } } @@ -376,7 +386,9 @@ func (sm *syncManager) scheduleProcessResult(res *syncResult) { sm.nextSyncTarget = relbucket sm.workerChan = sm.syncTargets } else { - sm.syncQueue.buckets = append(sm.syncQueue.buckets, relbucket) + for _, t := range relbucket.tips { + sm.syncQueue.Insert(t) + } } return } From 5edfc527b0b0eeec035d82792e7ab463f10073fe Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Wed, 21 Oct 2020 22:52:35 +0200 Subject: [PATCH 2/6] More than one bucket Signed-off-by: Jakub Sztandera --- chain/sync_manager.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 53fd07bc9e3..63bc377ec1d 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -199,13 +199,18 @@ func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) { } func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket { + var bOut *syncTargetBucket for _, b := range sbs.buckets { if b.sameChainAs(ts) { - sbs.removeBucket(b) - return b + if bOut == nil { + sbs.removeBucket(b) + bOut = b + } else { + log.Errorf("REPORT THIS more that one related bucket for %s", ts) + } } } - return nil + return bOut } func (sbs *syncBucketSet) Heaviest() *types.TipSet { @@ -312,7 +317,7 @@ func (sm *syncManager) syncScheduler() { log.Info("sync scheduler shutting down") return case <-t.C: - activeSyncs := make([]types.TipSetKey, len(sm.activeSyncs), 0) + activeSyncs := make([]types.TipSetKey, 0, len(sm.activeSyncs)) for tsk := range sm.activeSyncs { activeSyncs = append(activeSyncs, tsk) } From 24fc7d4cbdee6871012a1cb0c9f12c52def80307 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Thu, 22 Oct 2020 23:33:05 +0200 Subject: [PATCH 3/6] Add reproduction test and fix the issue Signed-off-by: Jakub Sztandera --- chain/sync_manager.go | 19 ++++++++++-- chain/sync_manager_test.go | 61 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 63bc377ec1d..1c5951bb5cf 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -3,6 +3,7 @@ package chain import ( "context" "sort" + "strings" "sync" "time" @@ -153,6 +154,19 @@ func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket { return &stb } +func (sbs *syncBucketSet) String() string { + var bStrings []string + for _, b := range sbs.buckets { + var tsStrings []string + for _, t := range b.tips { + tsStrings = append(tsStrings, t.String()) + } + bStrings = append(bStrings, "["+strings.Join(tsStrings, ",")+"]") + } + + return "{" + strings.Join(bStrings, ";") + "}" +} + func (sbs *syncBucketSet) RelatedToAny(ts *types.TipSet) bool { for _, b := range sbs.buckets { if b.sameChainAs(ts) { @@ -325,7 +339,7 @@ func (sm *syncManager) syncScheduler() { return string(activeSyncs[i].Bytes()) < string(activeSyncs[j].Bytes()) }) - log.Infof("activeSyncs: %v, ", activeSyncs) + log.Infof("activeSyncs: %v, activeSyncTips: %s ", activeSyncs, sm.activeSyncTips.String()) } } } @@ -341,7 +355,8 @@ func (sm *syncManager) scheduleIncoming(ts *types.TipSet) { var relatedToActiveSync bool for _, acts := range sm.activeSyncs { if ts.Equals(acts) { - break + // ignore, we are already syncing it + return } if ts.Parents() == acts.Key() { diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go index 269b3a62e07..276e4bb3604 100644 --- a/chain/sync_manager_test.go +++ b/chain/sync_manager_test.go @@ -67,6 +67,67 @@ func assertGetSyncOp(t *testing.T, c chan *syncOp, ts *types.TipSet) { } } +func TestSyncManagerEdgeCase(t *testing.T) { + ctx := context.Background() + + a := mock.TipSet(mock.MkBlock(genTs, 1, 1)) + t.Logf("a: %s", a) + b1 := mock.TipSet(mock.MkBlock(a, 1, 2)) + t.Logf("b1: %s", b1) + b2 := mock.TipSet(mock.MkBlock(a, 2, 3)) + t.Logf("b2: %s", b2) + c1 := mock.TipSet(mock.MkBlock(b1, 2, 4)) + t.Logf("c1: %s", c1) + c2 := mock.TipSet(mock.MkBlock(b2, 1, 5)) + t.Logf("c2: %s", c2) + + runSyncMgrTest(t, "edgeCase", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) { + sm.SetPeerHead(ctx, "peer1", a) + assertGetSyncOp(t, stc, a) + time.Sleep(10 * time.Millisecond) + t.Logf("bootstate: %d", sm.bootstrapState) + + sm.SetPeerHead(ctx, "peer1", b1) + sm.SetPeerHead(ctx, "peer1", b2) + // b1 and b2 are being processed + + b1op := <-stc + b2op := <-stc + if !b1op.ts.Equals(b1) { + b1op, b2op = b2op, b1op + } + t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String()) + + sm.SetPeerHead(ctx, "peer2", c2) // c2 is put into activeSyncTips at index 0 + sm.SetPeerHead(ctx, "peer2", c1) // c1 is put into activeSyncTips at index 1 + sm.SetPeerHead(ctx, "peer3", b2) // b2 is related to c2 and even though it is actively synced it is put into activeSyncTips index 0 + sm.SetPeerHead(ctx, "peer1", a) // a is related to b2 and is put into activeSyncTips index 0 + + b1op.done() // b1 completes first, is related to a, so it pops activeSyncTips index 0 + // even though correct one is index 1 + + time.Sleep(100 * time.Millisecond) + t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String()) + b2op.done() + // b2 completes and is not related to c1, so it leaves activeSyncTips as it is + for i := 0; i < 10; { + select { + case so := <-stc: + so.done() + default: + i++ + time.Sleep(10 * time.Millisecond) + } + } + + time.Sleep(10 * time.Millisecond) + if len(sm.activeSyncTips.buckets) != 0 { + t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String()) + } + t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String()) + }) +} + func TestSyncManager(t *testing.T) { ctx := context.Background() From 810feee5a159de58a166071dc3b05bb08004a689 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Fri, 23 Oct 2020 00:33:35 +0200 Subject: [PATCH 4/6] PopRelated should pop all Signed-off-by: Jakub Sztandera --- chain/sync_manager.go | 38 ++++++++++++-------------------------- chain/sync_manager_test.go | 34 ++++++++++++++++++---------------- 2 files changed, 30 insertions(+), 42 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 1c5951bb5cf..460405a91ec 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -5,7 +5,6 @@ import ( "sort" "strings" "sync" - "time" "github.com/filecoin-project/lotus/chain/types" peer "github.com/libp2p/go-libp2p-core/peer" @@ -212,16 +211,12 @@ func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) { sbs.buckets = nbuckets } -func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket { - var bOut *syncTargetBucket +func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) []*syncTargetBucket { + var bOut []*syncTargetBucket for _, b := range sbs.buckets { if b.sameChainAs(ts) { - if bOut == nil { - sbs.removeBucket(b) - bOut = b - } else { - log.Errorf("REPORT THIS more that one related bucket for %s", ts) - } + sbs.removeBucket(b) + bOut = append(bOut, b) } } return bOut @@ -312,8 +307,6 @@ func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) { } func (sm *syncManager) syncScheduler() { - t := time.NewTicker(10 * time.Second) - for { select { case ts, ok := <-sm.incomingTipSets: @@ -330,16 +323,6 @@ func (sm *syncManager) syncScheduler() { case <-sm.stop: log.Info("sync scheduler shutting down") return - case <-t.C: - activeSyncs := make([]types.TipSetKey, 0, len(sm.activeSyncs)) - for tsk := range sm.activeSyncs { - activeSyncs = append(activeSyncs, tsk) - } - sort.Slice(activeSyncs, func(i, j int) bool { - return string(activeSyncs[i].Bytes()) < string(activeSyncs[j].Bytes()) - }) - - log.Infof("activeSyncs: %v, activeSyncTips: %s ", activeSyncs, sm.activeSyncTips.String()) } } } @@ -399,14 +382,17 @@ func (sm *syncManager) scheduleProcessResult(res *syncResult) { } delete(sm.activeSyncs, res.ts.Key()) - relbucket := sm.activeSyncTips.PopRelated(res.ts) - if relbucket != nil { + relbuckets := sm.activeSyncTips.PopRelated(res.ts) + if len(relbuckets) != 0 { if res.success { if sm.nextSyncTarget == nil { - sm.nextSyncTarget = relbucket + sm.nextSyncTarget = relbuckets[0] sm.workerChan = sm.syncTargets - } else { - for _, t := range relbucket.tips { + relbuckets = relbuckets[1:] + } + + for _, b := range relbuckets { + for _, t := range b.tips { sm.syncQueue.Insert(t) } } diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go index 276e4bb3604..709e03a4108 100644 --- a/chain/sync_manager_test.go +++ b/chain/sync_manager_test.go @@ -80,12 +80,14 @@ func TestSyncManagerEdgeCase(t *testing.T) { t.Logf("c1: %s", c1) c2 := mock.TipSet(mock.MkBlock(b2, 1, 5)) t.Logf("c2: %s", c2) + d1 := mock.TipSet(mock.MkBlock(c1, 1, 6)) + t.Logf("d1: %s", d1) + e1 := mock.TipSet(mock.MkBlock(d1, 1, 7)) + t.Logf("e1: %s", e1) runSyncMgrTest(t, "edgeCase", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) { sm.SetPeerHead(ctx, "peer1", a) assertGetSyncOp(t, stc, a) - time.Sleep(10 * time.Millisecond) - t.Logf("bootstate: %d", sm.bootstrapState) sm.SetPeerHead(ctx, "peer1", b1) sm.SetPeerHead(ctx, "peer1", b2) @@ -96,7 +98,6 @@ func TestSyncManagerEdgeCase(t *testing.T) { if !b1op.ts.Equals(b1) { b1op, b2op = b2op, b1op } - t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String()) sm.SetPeerHead(ctx, "peer2", c2) // c2 is put into activeSyncTips at index 0 sm.SetPeerHead(ctx, "peer2", c1) // c1 is put into activeSyncTips at index 1 @@ -106,28 +107,29 @@ func TestSyncManagerEdgeCase(t *testing.T) { b1op.done() // b1 completes first, is related to a, so it pops activeSyncTips index 0 // even though correct one is index 1 - time.Sleep(100 * time.Millisecond) - t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String()) b2op.done() // b2 completes and is not related to c1, so it leaves activeSyncTips as it is - for i := 0; i < 10; { - select { - case so := <-stc: - so.done() - default: - i++ - time.Sleep(10 * time.Millisecond) - } - } - time.Sleep(10 * time.Millisecond) + waitUntilAllWorkersAreDone(stc) + if len(sm.activeSyncTips.buckets) != 0 { t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String()) } - t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String()) }) } +func waitUntilAllWorkersAreDone(stc chan *syncOp) { + for i := 0; i < 10; { + select { + case so := <-stc: + so.done() + default: + i++ + time.Sleep(10 * time.Millisecond) + } + } +} + func TestSyncManager(t *testing.T) { ctx := context.Background() From c8fe21c1ef2f5292dbe5a151a305a1203b959d4d Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Fri, 23 Oct 2020 01:18:13 +0200 Subject: [PATCH 5/6] PopRelated aggretates the bucket Signed-off-by: Jakub Sztandera --- chain/sync_manager.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 460405a91ec..27643b0549f 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -211,12 +211,15 @@ func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) { sbs.buckets = nbuckets } -func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) []*syncTargetBucket { - var bOut []*syncTargetBucket +func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket { + var bOut *syncTargetBucket for _, b := range sbs.buckets { if b.sameChainAs(ts) { sbs.removeBucket(b) - bOut = append(bOut, b) + if bOut == nil { + bOut = &syncTargetBucket{} + } + bOut.tips = append(bOut.tips, b.tips...) } } return bOut @@ -382,17 +385,14 @@ func (sm *syncManager) scheduleProcessResult(res *syncResult) { } delete(sm.activeSyncs, res.ts.Key()) - relbuckets := sm.activeSyncTips.PopRelated(res.ts) - if len(relbuckets) != 0 { + relbucket := sm.activeSyncTips.PopRelated(res.ts) + if relbucket != nil { if res.success { if sm.nextSyncTarget == nil { - sm.nextSyncTarget = relbuckets[0] + sm.nextSyncTarget = relbucket sm.workerChan = sm.syncTargets - relbuckets = relbuckets[1:] - } - - for _, b := range relbuckets { - for _, t := range b.tips { + } else { + for _, t := range relbucket.tips { sm.syncQueue.Insert(t) } } From 580a2f4dc6a8c301346684d6021917eb9f954ec0 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Fri, 23 Oct 2020 01:35:26 +0200 Subject: [PATCH 6/6] Add option to join by common parents Signed-off-by: Jakub Sztandera --- chain/sync_manager.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 27643b0549f..c25068f60c2 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -2,6 +2,7 @@ package chain import ( "context" + "os" "sort" "strings" "sync" @@ -12,6 +13,14 @@ import ( const BootstrapPeerThreshold = 2 +var coalesceForksParents = false + +func init() { + if os.Getenv("LOTUS_SYNC_REL_PARENT") == "yes" { + coalesceForksParents = true + } +} + const ( BSStateInit = 0 BSStateSelected = 1 @@ -256,6 +265,9 @@ func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { if ts.Parents() == t.Key() { return true } + if coalesceForksParents && ts.Parents() == t.Parents() { + return true + } } return false }