Skip to content

Commit

Permalink
Merge pull request #8072 from filecoin-project/bloxico/syncer-tests
Browse files Browse the repository at this point in the history
test: chain: unit tests for the syncer & sync manager
  • Loading branch information
arajasek authored Mar 2, 2022
2 parents 8091e70 + 0a03ec5 commit 9c22065
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 22 deletions.
22 changes: 0 additions & 22 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,25 +1244,3 @@ func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) {
bbr, ok := syncer.bad.Has(blk)
return bbr.String(), ok
}

func (syncer *Syncer) getLatestBeaconEntry(ctx context.Context, ts *types.TipSet) (*types.BeaconEntry, error) {
cur := ts
for i := 0; i < 20; i++ {
cbe := cur.Blocks()[0].BeaconEntries
if len(cbe) > 0 {
return &cbe[len(cbe)-1], nil
}

if cur.Height() == 0 {
return nil, xerrors.Errorf("made it back to genesis block without finding beacon entry")
}

next, err := syncer.store.LoadTipSet(ctx, cur.Parents())
if err != nil {
return nil, xerrors.Errorf("failed to load parents when searching back for latest beacon entry: %w", err)
}
cur = next
}

return nil, xerrors.Errorf("found NO beacon entries in the 20 latest tipsets")
}
32 changes: 32 additions & 0 deletions chain/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
"github.com/stretchr/testify/require"
)

func init() {
Expand Down Expand Up @@ -240,3 +241,34 @@ func TestSyncManager(t *testing.T) {
op3.done()
})
}

func TestSyncManagerBucketSet(t *testing.T) {
ts1 := mock.TipSet(mock.MkBlock(nil, 0, 0))
ts2 := mock.TipSet(mock.MkBlock(ts1, 1, 0))
bucket1 := newSyncTargetBucket(ts1, ts2)
bucketSet := syncBucketSet{buckets: []*syncTargetBucket{bucket1}}

// inserting a tipset (potential sync target) from an existing chain, should add to an existing bucket
//stm: @CHAIN_SYNCER_ADD_SYNC_TARGET_001
ts3 := mock.TipSet(mock.MkBlock(ts2, 2, 0))
bucketSet.Insert(ts3)
require.Equal(t, 1, len(bucketSet.buckets))
require.Equal(t, 3, len(bucketSet.buckets[0].tips))

// inserting a tipset from new chain, should create a new bucket
ts4fork := mock.TipSet(mock.MkBlock(nil, 1, 1))
bucketSet.Insert(ts4fork)
require.Equal(t, 2, len(bucketSet.buckets))
require.Equal(t, 3, len(bucketSet.buckets[0].tips))
require.Equal(t, 1, len(bucketSet.buckets[1].tips))

// Pop removes the best bucket (best sync target), e.g. bucket1
//stm: @CHAIN_SYNCER_SELECT_SYNC_TARGET_001
popped := bucketSet.Pop()
require.Equal(t, popped, bucket1)
require.Equal(t, 1, len(bucketSet.buckets))

// PopRelated removes the bucket containing the given tipset, leaving the set empty
bucketSet.PopRelated(ts4fork)
require.Equal(t, 0, len(bucketSet.buckets))
}
155 changes: 155 additions & 0 deletions chain/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,3 +1098,158 @@ func TestInvalidHeight(t *testing.T) {

tu.mineOnBlock(base, 0, nil, false, true, nil, -1, true)
}

// TestIncomingBlocks mines new blocks and checks if the incoming channel streams new block headers properly
func TestIncomingBlocks(t *testing.T) {
H := 50
tu := prepSyncTest(t, H)

client := tu.addClientNode()
require.NoError(t, tu.mn.LinkAll())

clientNode := tu.nds[client]
//stm: @CHAIN_SYNCER_INCOMING_BLOCKS_001
incoming, err := clientNode.SyncIncomingBlocks(tu.ctx)
require.NoError(tu.t, err)

tu.connect(client, 0)
tu.waitUntilSync(0, client)
tu.compareSourceState(client)

timeout := time.After(10 * time.Second)

for i := 0; i < 5; i++ {
tu.mineNewBlock(0, nil)
tu.waitUntilSync(0, client)
tu.compareSourceState(client)

// just in case, so we don't get deadlocked
select {
case <-incoming:
case <-timeout:
tu.t.Fatal("TestIncomingBlocks timeout")
}
}
}

// TestSyncManualBadTS tests manually marking and unmarking blocks in the bad TS cache
func TestSyncManualBadTS(t *testing.T) {
// Test setup:
// - source node is fully synced,
// - client node is unsynced
// - client manually marked source's head and it's parent as bad
H := 50
tu := prepSyncTest(t, H)

client := tu.addClientNode()
require.NoError(t, tu.mn.LinkAll())

sourceHead, err := tu.nds[source].ChainHead(tu.ctx)
require.NoError(tu.t, err)

clientHead, err := tu.nds[client].ChainHead(tu.ctx)
require.NoError(tu.t, err)

require.True(tu.t, !sourceHead.Equals(clientHead), "source and client should be out of sync in test setup")

//stm: @CHAIN_SYNCER_MARK_BAD_001
err = tu.nds[client].SyncMarkBad(tu.ctx, sourceHead.Cids()[0])
require.NoError(tu.t, err)

sourceHeadParent := sourceHead.Parents().Cids()[0]
err = tu.nds[client].SyncMarkBad(tu.ctx, sourceHeadParent)
require.NoError(tu.t, err)

//stm: @CHAIN_SYNCER_CHECK_BAD_001
reason, err := tu.nds[client].SyncCheckBad(tu.ctx, sourceHead.Cids()[0])
require.NoError(tu.t, err)
require.NotEqual(tu.t, "", reason, "block is not bad after manually marking")

reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHeadParent)
require.NoError(tu.t, err)
require.NotEqual(tu.t, "", reason, "block is not bad after manually marking")

// Assertion 1:
// - client shouldn't be synced after timeout, because the source TS is marked bad.
// - bad block is the first block that should be synced, 1sec should be enough
tu.connect(1, 0)
timeout := time.After(1 * time.Second)
<-timeout

clientHead, err = tu.nds[client].ChainHead(tu.ctx)
require.NoError(tu.t, err)
require.True(tu.t, !sourceHead.Equals(clientHead), "source and client should be out of sync if source head is bad")

// Assertion 2:
// - after unmarking blocks as bad and reconnecting, source & client should be in sync
//stm: @CHAIN_SYNCER_UNMARK_BAD_001
err = tu.nds[client].SyncUnmarkBad(tu.ctx, sourceHead.Cids()[0])
require.NoError(tu.t, err)

reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHead.Cids()[0])
require.NoError(tu.t, err)
require.Equal(tu.t, "", reason, "block is still bad after manually unmarking")

err = tu.nds[client].SyncUnmarkAllBad(tu.ctx)
require.NoError(tu.t, err)

reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHeadParent)
require.NoError(tu.t, err)
require.Equal(tu.t, "", reason, "block is still bad after manually unmarking")

tu.disconnect(1, 0)
tu.connect(1, 0)

tu.waitUntilSync(0, client)
tu.compareSourceState(client)
}

// TestState tests fetching the sync worker state before, during & after the sync
func TestSyncState(t *testing.T) {
H := 50
tu := prepSyncTest(t, H)

client := tu.addClientNode()
require.NoError(t, tu.mn.LinkAll())
clientNode := tu.nds[client]
sourceHead, err := tu.nds[source].ChainHead(tu.ctx)
require.NoError(tu.t, err)

// sync state should be empty before the sync
state, err := clientNode.SyncState(tu.ctx)
require.NoError(tu.t, err)
require.Equal(tu.t, len(state.ActiveSyncs), 0)

tu.connect(client, 0)

// wait until sync starts, or at most `timeout` seconds
timeout := time.After(5 * time.Second)
activeSyncs := []api.ActiveSync{}

for len(activeSyncs) == 0 {
//stm: @CHAIN_SYNCER_STATE_001
state, err = clientNode.SyncState(tu.ctx)
require.NoError(tu.t, err)
activeSyncs = state.ActiveSyncs

sleep := time.After(100 * time.Millisecond)
select {
case <-sleep:
case <-timeout:
tu.t.Fatal("TestSyncState timeout")
}
}

// check state during sync
require.Equal(tu.t, len(activeSyncs), 1)
require.True(tu.t, activeSyncs[0].Target.Equals(sourceHead))

tu.waitUntilSync(0, client)
tu.compareSourceState(client)

// check state after sync
state, err = clientNode.SyncState(tu.ctx)
require.NoError(tu.t, err)
require.Equal(tu.t, len(state.ActiveSyncs), 1)
require.Equal(tu.t, state.ActiveSyncs[0].Stage, api.StageSyncComplete)
}

0 comments on commit 9c22065

Please sign in to comment.