Skip to content

Commit

Permalink
feat: add tests that don't make assumptions on the public network, an…
Browse files Browse the repository at this point in the history
…d switch to running them
  • Loading branch information
aschmahmann committed Jul 4, 2024
1 parent 31f4ae4 commit 7330a68
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 46 deletions.
11 changes: 0 additions & 11 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,6 @@ jobs:
- uses: actions/checkout@v4
with:
path: 'ipfs-check'
- name: Build and Start
run: |
cd ipfs-check
go build -o ipfs-check
./ipfs-check &
sleep 300
- uses: ipfs/download-ipfs-distribution-action@v1
- name: Configure Kubo Gateway
run: |
ipfs init;
- uses: ipfs/start-ipfs-daemon-action@v1
- name: Tests
working-directory: ipfs-check
run: |
Expand Down
13 changes: 8 additions & 5 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ type kademlia interface {
}

type daemon struct {
h host.Host
dht kademlia
dhtMessenger *dhtpb.ProtocolMessenger
h host.Host
dht kademlia
dhtMessenger *dhtpb.ProtocolMessenger
createTestHost func() (host.Host, error)
}

func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) {
Expand Down Expand Up @@ -85,7 +86,9 @@ func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) {
return nil, err
}

return &daemon{h: h, dht: d, dhtMessenger: pm}, nil
return &daemon{h: h, dht: d, dhtMessenger: pm, createTestHost: func() (host.Host, error) {
return libp2p.New(libp2p.ConnectionGater(&privateAddrFilterConnectionGater{}))
}}, nil
}

func (d *daemon) mustStart() {
Expand Down Expand Up @@ -151,7 +154,7 @@ func (d *daemon) runCheck(query url.Values) (*output, error) {
}
}

testHost, err := libp2p.New(libp2p.ConnectionGater(&privateAddrFilterConnectionGater{}))
testHost, err := d.createTestHost()
if err != nil {
return nil, fmt.Errorf("server error: %w", err)
}
Expand Down
157 changes: 157 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package main

import (
"context"
"github.com/aschmahmann/ipfs-check/test"
bsnet "github.com/ipfs/boxo/bitswap/network"
bsserver "github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockstore"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
mplex "github.com/libp2p/go-libp2p-mplex"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func TestBasicIntegration(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testDHTPrefix := protocol.TestingID
testDHTID := protocol.TestingID + "/kad/1.0.0"

dhtHost, err := libp2p.New()
require.NoError(t, err)
defer dhtHost.Close()
dhtServer, err := dht.New(ctx, dhtHost, dht.Mode(dht.ModeServer), dht.ProtocolPrefix(testDHTPrefix))
require.NoError(t, err)
defer dhtServer.Close()

go func() {
rm, err := NewResourceManager()
require.NoError(t, err)

c, err := connmgr.NewConnManager(600, 900, connmgr.WithGracePeriod(time.Second*30))
require.NoError(t, err)

queryHost, err := libp2p.New(
libp2p.DefaultMuxers,
libp2p.Muxer(mplex.ID, mplex.DefaultTransport),
libp2p.ConnectionManager(c),
libp2p.ResourceManager(rm),
libp2p.EnableHolePunching(),
)
require.NoError(t, err)

pm, err := dhtProtocolMessenger(testDHTID, queryHost)
require.NoError(t, err)
queryDHT, err := dht.New(ctx, queryHost, dht.ProtocolPrefix(testDHTPrefix), dht.BootstrapPeers(peer.AddrInfo{ID: dhtHost.ID(), Addrs: dhtHost.Addrs()}))
require.NoError(t, err)

d := &daemon{
h: queryHost,
dht: queryDHT,
dhtMessenger: pm,
createTestHost: func() (host.Host, error) {
return libp2p.New(libp2p.DefaultMuxers,
libp2p.Muxer(mplex.ID, mplex.DefaultTransport),
libp2p.EnableHolePunching())
},
}
_ = startServer(ctx, d, ":1234")
}()

h, err := libp2p.New()
defer h.Close()
require.NoError(t, err)
bn := bsnet.NewFromIpfsHost(h, routinghelpers.Null{})
bstore := blockstore.NewBlockstore(dssync.MutexWrap(datastore.NewMapDatastore()))
bswap := bsserver.New(ctx, bn, bstore)
bn.Start(bswap)
defer bswap.Close()
dhtClient, err := dht.New(ctx, h, dht.ProtocolPrefix(testDHTPrefix), dht.Mode(dht.ModeClient), dht.BootstrapPeers(peer.AddrInfo{ID: dhtHost.ID(), Addrs: dhtHost.Addrs()}))
require.NoError(t, err)
defer dhtClient.Close()
err = dhtClient.Bootstrap(ctx)
require.NoError(t, err)
for dhtClient.RoutingTable().Size() == 0 {
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-time.After(time.Millisecond * 5):
}
}

mas, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
require.NoError(t, err)
hostAddr := mas[0]

t.Run("Data on reachable peer that's advertised", func(t *testing.T) {
testData := []byte(t.Name())
mh, err := multihash.Sum(testData, multihash.SHA2_256, -1)
require.NoError(t, err)
testCid := cid.NewCidV1(cid.Raw, mh)
testBlock, err := blocks.NewBlockWithCid(testData, testCid)
require.NoError(t, err)
err = bstore.Put(ctx, testBlock)
require.NoError(t, err)
err = dhtClient.Provide(ctx, testCid, true)
require.NoError(t, err)

obj := test.Query(t, "http://localhost:1234", testCid.String(), hostAddr.String())

obj.Value("CidInDHT").Boolean().IsTrue()
obj.Value("ConnectionError").String().IsEmpty()
obj.Value("DataAvailableOverBitswap").Object().Value("Error").String().IsEmpty()
obj.Value("DataAvailableOverBitswap").Object().Value("Found").Boolean().IsTrue()
obj.Value("DataAvailableOverBitswap").Object().Value("Responded").Boolean().IsTrue()
})

t.Run("Data on reachable peer that's not advertised", func(t *testing.T) {
testData := []byte(t.Name())
mh, err := multihash.Sum(testData, multihash.SHA2_256, -1)
require.NoError(t, err)
testCid := cid.NewCidV1(cid.Raw, mh)
testBlock, err := blocks.NewBlockWithCid(testData, testCid)
require.NoError(t, err)
err = bstore.Put(ctx, testBlock)
require.NoError(t, err)

obj := test.Query(t, "http://localhost:1234", testCid.String(), hostAddr.String())

obj.Value("CidInDHT").Boolean().IsFalse()
obj.Value("ConnectionError").String().IsEmpty()
obj.Value("DataAvailableOverBitswap").Object().Value("Error").String().IsEmpty()
obj.Value("DataAvailableOverBitswap").Object().Value("Found").Boolean().IsTrue()
obj.Value("DataAvailableOverBitswap").Object().Value("Responded").Boolean().IsTrue()
})

t.Run("Data that's advertised but not served", func(t *testing.T) {
testData := []byte(t.Name())
mh, err := multihash.Sum(testData, multihash.SHA2_256, -1)
require.NoError(t, err)
testCid := cid.NewCidV1(cid.Raw, mh)
require.NoError(t, err)
err = dhtClient.Provide(ctx, testCid, true)
require.NoError(t, err)

obj := test.Query(t, "http://localhost:1234", testCid.String(), hostAddr.String())

obj.Value("CidInDHT").Boolean().IsTrue()
obj.Value("ConnectionError").String().IsEmpty()
obj.Value("DataAvailableOverBitswap").Object().Value("Error").String().IsEmpty()
obj.Value("DataAvailableOverBitswap").Object().Value("Found").Boolean().IsFalse()
obj.Value("DataAvailableOverBitswap").Object().Value("Responded").Boolean().IsTrue()
})
}
75 changes: 46 additions & 29 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"log"
"net"
Expand Down Expand Up @@ -28,44 +29,60 @@ func main() {
Usage: "run the accelerated DHT client",
},
}
app.Action = func(ctx *cli.Context) error {
daemon, err := newDaemon(ctx.Context, ctx.Bool("accelerated-dht"))
app.Action = func(cctx *cli.Context) error {
ctx := cctx.Context
d, err := newDaemon(ctx, cctx.Bool("accelerated-dht"))
if err != nil {
return err
}
return startServer(ctx, d, cctx.String("address"))
}

l, err := net.Listen("tcp", ctx.String("address"))
if err != nil {
return err
}
err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
}
}

log.Printf("listening on %v\n", l.Addr())
func startServer(ctx context.Context, d *daemon, tcpListener string) error {
l, err := net.Listen("tcp", tcpListener)
if err != nil {
return err
}

daemon.mustStart()
log.Printf("listening on %v\n", l.Addr())

log.Printf("ready to start serving")
d.mustStart()

// 1. Is the peer findable in the DHT?
// 2. Does the multiaddr work? If not, what's the error?
// 3. Is the CID in the DHT?
// 4. Does the peer respond that it has the given data over Bitswap?
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Access-Control-Allow-Origin", "*")
data, err := daemon.runCheck(r.URL.Query())
if err == nil {
w.Header().Add("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(data)
} else {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
}
})
log.Printf("ready to start serving")

return http.Serve(l, nil)
}
// 1. Is the peer findable in the DHT?
// 2. Does the multiaddr work? If not, what's the error?
// 3. Is the CID in the DHT?
// 4. Does the peer respond that it has the given data over Bitswap?
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Access-Control-Allow-Origin", "*")
data, err := d.runCheck(r.URL.Query())
if err == nil {
w.Header().Add("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(data)
} else {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
}
})

err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
done := make(chan error, 1)
go func() {
defer close(done)
done <- http.Serve(l, nil)
}()

select {
case err := <-done:
return err
case <-ctx.Done():
_ = l.Close()
return <-done
}
}
23 changes: 23 additions & 0 deletions test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package test

import (
"fmt"
"os"
"testing"
"time"
)
Expand All @@ -15,9 +16,16 @@ var (
WIKIPEDIA_CID string
WIKIPEDIA_PEER_ID string
WIKIPEDIA_PEER_ADDR string
SKIP_E2E_TESTS bool
)

func init() {
if os.Getenv("E2E_TESTS") == "true" {
SKIP_E2E_TESTS = false
} else {
SKIP_E2E_TESTS = true
return
}
BOOTSTRAP_PEER_ADDR = call("bash", "-c", "ipfs bootstrap list | head -n 1")
// ipfs name resolve /ipns/en.wikipedia-on-ipfs.org => /ipfs/CID, we remove the /ipfs/ prefix
WIKIPEDIA_CID = call("ipfs", "name", "resolve", "/ipns/en.wikipedia-on-ipfs.org")[6:]
Expand All @@ -26,6 +34,9 @@ func init() {
}

func TestEmptyDirOnBoostrapPeer(t *testing.T) {
if SKIP_E2E_TESTS {
t.Skip("Skipping e2e tests")
}
obj := Q(t, EMPTY_DIR_CID, BOOTSTRAP_PEER_ADDR)

obj.Value("CidInDHT").Boolean().IsTrue()
Expand All @@ -36,6 +47,9 @@ func TestEmptyDirOnBoostrapPeer(t *testing.T) {
}

func TestWikipediaOnSomeProviderPeer(t *testing.T) {
if SKIP_E2E_TESTS {
t.Skip("Skipping e2e tests")
}
obj := Q(t, WIKIPEDIA_CID, WIKIPEDIA_PEER_ADDR)
obj.Value("CidInDHT").Boolean().IsTrue()
// It seems that most peers do not provide over bitswap:
Expand All @@ -46,6 +60,9 @@ func TestWikipediaOnSomeProviderPeer(t *testing.T) {
}

func TestRandomFileOnBootstrapPeer(t *testing.T) {
if SKIP_E2E_TESTS {
t.Skip("Skipping e2e tests")
}
t.Skip("the random file CID is marked as \"not found in the DHT\" when calling bootstrap peers")

randomFileCid := call("bash", "-c", "cat /dev/urandom | head | sha256sum | ipfs add --quiet -")
Expand All @@ -65,6 +82,9 @@ func TestRandomFileOnBootstrapPeer(t *testing.T) {
}

func TestRandomFileOnLocalPeer(t *testing.T) {
if SKIP_E2E_TESTS {
t.Skip("Skipping e2e tests")
}
// ipfs id -f "<id>"
nodeId := call("ipfs", "id", "-f", "<id>")
localAddr := fmt.Sprintf("/p2p/%s", nodeId)
Expand All @@ -88,6 +108,9 @@ func TestRandomFileOnLocalPeer(t *testing.T) {
}

func TestRandomFileNeverUploadedOnBootstrapPeer(t *testing.T) {
if SKIP_E2E_TESTS {
t.Skip("Skipping e2e tests")
}
randomFileCid := call("bash", "-c", "cat /dev/urandom | head | sha256sum | ipfs add --quiet --only-hash -")

obj := Q(t, randomFileCid, BOOTSTRAP_PEER_ADDR)
Expand Down
Loading

0 comments on commit 7330a68

Please sign in to comment.