Skip to content

Commit

Permalink
Merge pull request #628 from libp2p/test/fix-hung-test
Browse files Browse the repository at this point in the history
fix all flaky tests
  • Loading branch information
Stebalien authored May 13, 2020
2 parents a7fb794 + 297911c commit 9be7169
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 77 deletions.
95 changes: 52 additions & 43 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
"github.com/libp2p/go-libp2p-testing/ci"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"

Expand Down Expand Up @@ -129,7 +128,7 @@ func setupDHT(ctx context.Context, t *testing.T, client bool, options ...Option)
return d
}

func setupDHTS(t *testing.T, ctx context.Context, n int) []*IpfsDHT {
func setupDHTS(t *testing.T, ctx context.Context, n int, options ...Option) []*IpfsDHT {
addrs := make([]ma.Multiaddr, n)
dhts := make([]*IpfsDHT, n)
peers := make([]peer.ID, n)
Expand All @@ -138,7 +137,7 @@ func setupDHTS(t *testing.T, ctx context.Context, n int) []*IpfsDHT {
sanityPeersMap := make(map[string]struct{})

for i := 0; i < n; i++ {
dhts[i] = setupDHT(ctx, t, false)
dhts[i] = setupDHT(ctx, t, false, options...)
peers[i] = dhts[i].PeerID()
addrs[i] = dhts[i].host.Addrs()[0]

Expand Down Expand Up @@ -673,8 +672,9 @@ func TestLocalProvides(t *testing.T) {
}

// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) {
// test "well-formed-ness" (>= minPeers peers in every routing table)
t.Helper()

checkTables := func() bool {
totalPeers := 0
Expand All @@ -699,11 +699,12 @@ func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers i
for {
select {
case <-timeoutA:
logger.Debugf("did not reach well-formed routing tables by %s", timeout)
return false // failed
t.Errorf("failed to reach well-formed routing tables after %s", timeout)
return
case <-time.After(5 * time.Millisecond):
if checkTables() {
return true // succeeded
// succeeded
return
}
}
}
Expand Down Expand Up @@ -760,6 +761,7 @@ func TestRefresh(t *testing.T) {
}

waitForWellFormedTables(t, dhts, 7, 10, 10*time.Second)

cancelT()

if u.Debug {
Expand Down Expand Up @@ -830,12 +832,12 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) {
}

func TestPeriodicRefresh(t *testing.T) {
if ci.IsRunning() {
t.Skip("skipping on CI. highly timing dependent")
}
if testing.Short() {
t.SkipNow()
}
if detectrace.WithRace() {
t.Skip("skipping due to race detector max goroutines")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -894,7 +896,9 @@ func TestPeriodicRefresh(t *testing.T) {
}

func TestProvidesMany(t *testing.T) {
t.Skip("this test doesn't work")
if detectrace.WithRace() {
t.Skip("skipping due to race detector max goroutines")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -1149,9 +1153,6 @@ func TestConnectCollision(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
if ci.IsRunning() {
t.Skip("Skipping on CI.")
}

runTimes := 10

Expand Down Expand Up @@ -1337,7 +1338,7 @@ func minInt(a, b int) int {
}

func TestFindPeerQueryMinimal(t *testing.T) {
testFindPeerQuery(t, 2, 22, 11)
testFindPeerQuery(t, 2, 22, 1)
}

func TestFindPeerQuery(t *testing.T) {
Expand All @@ -1348,62 +1349,70 @@ func TestFindPeerQuery(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
if curFileLimit() < 1024 {
t.Skip("insufficient file descriptors available")
}
testFindPeerQuery(t, 20, 80, 16)
testFindPeerQuery(t, 5, 40, 3)
}

// NOTE: You must have ATLEAST (minRTRefreshThreshold+1) test peers before using this.
func testFindPeerQuery(t *testing.T,
bootstrappers, // Number of nodes connected to the querying node
leafs, // Number of nodes that might be connected to from the bootstrappers
bootstrapperLeafConns int, // Number of connections each bootstrapper has to the leaf nodes
bootstrapConns int, // Number of bootstrappers each leaf should connect to.
) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dhts := setupDHTS(t, ctx, 1+bootstrappers+leafs)
dhts := setupDHTS(t, ctx, 1+bootstrappers+leafs, BucketSize(4))
defer func() {
for _, d := range dhts {
d.Close()
d.host.Close()
}
}()

t.Log("connecting")

mrand := rand.New(rand.NewSource(42))
guy := dhts[0]
others := dhts[1:]
for i := 0; i < bootstrappers; i++ {
for j := 0; j < bootstrapperLeafConns; j++ {
v := mrand.Intn(leafs)
connectNoSync(t, ctx, others[i], others[bootstrappers+v])
for i := 0; i < leafs; i++ {
for _, v := range mrand.Perm(bootstrappers)[:bootstrapConns] {
connectNoSync(t, ctx, others[v], others[bootstrappers+i])
}
}

for i := 0; i < bootstrappers; i++ {
connectNoSync(t, ctx, guy, others[i])
}

t.Log("waiting for routing tables")

// give some time for things to settle down
waitForWellFormedTables(t, dhts, minRTRefreshThreshold, minRTRefreshThreshold, 5*time.Second)
waitForWellFormedTables(t, dhts, bootstrapConns, bootstrapConns, 5*time.Second)

for _, d := range dhts {
if len(d.RoutingTable().ListPeers()) > 0 {
if err := <-d.RefreshRoutingTable(); err != nil {
t.Fatal(err)
}
}
t.Log("refreshing")

var wg sync.WaitGroup
for _, dht := range dhts {
wg.Add(1)
go func(d *IpfsDHT) {
<-d.RefreshRoutingTable()
wg.Done()
}(dht)
}

var reachableIds []peer.ID
for i, d := range dhts {
lp := len(d.host.Network().Peers())
if i != 0 && lp > 0 {
reachableIds = append(reachableIds, d.PeerID())
}
wg.Wait()

t.Log("waiting for routing tables again")

// Wait for refresh to work. At least one bucket should be full.
waitForWellFormedTables(t, dhts, 4, 0, 5*time.Second)

var peers []peer.ID
for _, d := range others {
peers = append(peers, d.PeerID())
}
t.Logf("%d reachable ids", len(reachableIds))

t.Log("querying")

val := "foobar"
rtval := kb.ConvertKey(val)
Expand All @@ -1418,7 +1427,7 @@ func testFindPeerQuery(t *testing.T,

sort.Sort(peer.IDSlice(outpeers))

exp := kb.SortClosestPeers(reachableIds, rtval)[:minInt(guy.bucketSize, len(reachableIds))]
exp := kb.SortClosestPeers(peers, rtval)[:minInt(guy.bucketSize, len(peers))]
t.Logf("got %d peers", len(outpeers))
got := kb.SortClosestPeers(outpeers, rtval)

Expand Down Expand Up @@ -1588,19 +1597,19 @@ func TestHandleRemotePeerProtocolChanges(t *testing.T) {
connect(t, ctx, dhtA, dhtB)

// now assert both have each other in their RT
require.True(t, waitForWellFormedTables(t, []*IpfsDHT{dhtA, dhtB}, 1, 1, 10*time.Second), "both RT should have one peer each")
waitForWellFormedTables(t, []*IpfsDHT{dhtA, dhtB}, 1, 1, 10*time.Second)

// dhtB becomes a client
require.NoError(t, dhtB.setMode(modeClient))

// which means that dhtA should evict it from it's RT
require.True(t, waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 0, 0, 10*time.Second), "dHTA routing table should have 0 peers")
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 0, 0, 10*time.Second)

// dhtB becomes a server
require.NoError(t, dhtB.setMode(modeServer))

// which means dhtA should have it in the RT again because of fixLowPeers
require.True(t, waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 1, 1, 10*time.Second), "dHTA routing table should have 1 peers")
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 1, 1, 10*time.Second)
}

func TestGetSetPluggedProtocol(t *testing.T) {
Expand Down
65 changes: 48 additions & 17 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ import (

ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
"github.com/stretchr/testify/require"
)

// Test that one hung request to a peer doesn't prevent another request
// using that same peer from obeying its context.
func TestHungRequest(t *testing.T) {
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mn, err := mocknet.FullMeshLinked(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()

os := []Option{testPrefix, DisableAutoRefresh()}
os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
Expand All @@ -46,8 +47,18 @@ func TestHungRequest(t *testing.T) {
})
}

require.NoError(t, hosts[0].Peerstore().AddProtocols(hosts[1].ID(), protocol.ConvertToStrings(d.serverProtocols)...))
d.peerFound(ctx, hosts[1].ID(), true)
err = mn.ConnectAllButSelf()
if err != nil {
t.Fatal("failed to connect peers", err)
}

// Wait at a bit for a peer in our routing table.
for i := 0; i < 100 && d.routingTable.Size() == 0; i++ {
time.Sleep(10 * time.Millisecond)
}
if d.routingTable.Size() == 0 {
t.Fatal("failed to fill routing table")
}

ctx1, cancel1 := context.WithTimeout(ctx, 1*time.Second)
defer cancel1()
Expand All @@ -66,15 +77,19 @@ func TestHungRequest(t *testing.T) {
t.Errorf("expected to fail with deadline exceeded, got: %s", ctx2.Err())
}
select {
case <-done:
t.Errorf("GetClosestPeers should not have returned yet")
case err = <-done:
t.Error("GetClosestPeers should not have returned yet", err)
default:
err = <-done
if err != context.DeadlineExceeded {
t.Errorf("expected the deadline to be exceeded, got %s", err)
}
}

if d.routingTable.Size() == 0 {
// make sure we didn't just disconnect
t.Fatal("expected peers in the routing table")
}
}

func TestGetFailures(t *testing.T) {
Expand Down Expand Up @@ -202,6 +217,11 @@ func TestGetFailures(t *testing.T) {
t.Fatal("shouldnt have provider peers")
}
}

if d.routingTable.Size() == 0 {
// make sure we didn't just disconnect
t.Fatal("expected peers in the routing table")
}
}

func TestNotFound(t *testing.T) {
Expand All @@ -217,16 +237,12 @@ func TestNotFound(t *testing.T) {
}
hosts := mn.Hosts()

os := []Option{testPrefix, DisableAutoRefresh()}
os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}

for _, p := range hosts {
d.peerFound(ctx, p.ID(), true)
}

// Reply with random peers to every message
for _, host := range hosts {
host := host // shadow loop var
Expand All @@ -239,7 +255,8 @@ func TestNotFound(t *testing.T) {

pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
// this isn't an error, it just means the stream has died.
return
}

switch pmes.GetType() {
Expand All @@ -255,13 +272,23 @@ func TestNotFound(t *testing.T) {

resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
return
}
default:
panic("Shouldnt recieve this.")
}
})
}
for _, peer := range hosts {
if host == peer {
continue
}
_ = peer.Peerstore().AddProtocols(host.ID(), protocol.ConvertToStrings(d.serverProtocols)...)
}
}

for _, p := range hosts {
d.peerFound(ctx, p.ID(), true)
}

// long timeout to ensure timing is not at play.
Expand All @@ -275,6 +302,10 @@ func TestNotFound(t *testing.T) {
}
switch err {
case routing.ErrNotFound:
if d.routingTable.Size() == 0 {
// make sure we didn't just disconnect
t.Fatal("expected peers in the routing table")
}
//Success!
return
case u.ErrTimeout:
Expand All @@ -299,7 +330,7 @@ func TestLessThanKResponses(t *testing.T) {
}
hosts := mn.Hosts()

os := []Option{testPrefix, DisableAutoRefresh()}
os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -371,7 +402,7 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
os := []Option{testPrefix, DisableAutoRefresh()}
os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 9be7169

Please sign in to comment.