Skip to content

Commit

Permalink
feat(libp2p): add v1.0.0 network compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Jan 13, 2022
1 parent d354eff commit 9d9b198
Show file tree
Hide file tree
Showing 10 changed files with 1,214 additions and 309 deletions.
2 changes: 1 addition & 1 deletion benchmarks/testnet/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (n *network) SendMessage(
rateLimiters[to] = rateLimiter
}

pbMsg, err := mes.ToProto()
pbMsg, err := gsmsg.NewMessageHandler().ToProto(mes)
if err != nil {
return err
}
Expand Down
243 changes: 137 additions & 106 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"

Expand All @@ -49,6 +50,22 @@ import (
"github.com/ipfs/go-graphsync/testutil"
)

// nil means use the default protocols
// tests data transfer for the following protocol combinations:
// default protocol -> default protocols
// old protocol -> default protocols
// default protocols -> old protocol
// old protocol -> old protocol
var protocolsForTest = map[string]struct {
host1Protocols []protocol.ID
host2Protocols []protocol.ID
}{
"(v1.1 -> v1.1)": {nil, nil},
"(v1.0 -> v1.1)": {[]protocol.ID{gsnet.ProtocolGraphsync_1_0_0}, nil},
"(v1.1 -> v1.0)": {nil, []protocol.ID{gsnet.ProtocolGraphsync_1_0_0}},
"(v1.0 -> v1.0)": {[]protocol.ID{gsnet.ProtocolGraphsync_1_0_0}, []protocol.ID{gsnet.ProtocolGraphsync_1_0_0}},
}

func TestMakeRequestToNetwork(t *testing.T) {

// create network
Expand Down Expand Up @@ -269,7 +286,6 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
}

func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {

// create network
ctx := context.Background()
ctx, collectTracing := testutil.SetupTracing(ctx)
Expand Down Expand Up @@ -317,112 +333,115 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
}

func TestGraphsyncRoundTrip(t *testing.T) {

// create network
ctx := context.Background()
ctx, collectTracing := testutil.SetupTracing(ctx)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
assertComplete := assertCompletionFunction(responder, 1)

var receivedResponseData []byte
var receivedRequestData []byte

requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
data, has := responseData.Extension(td.extensionName)
if has {
receivedResponseData = data
for pname, ps := range protocolsForTest {
t.Run(pname, func(t *testing.T) {
// create network
ctx := context.Background()
ctx, collectTracing := testutil.SetupTracing(ctx)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
td := newOptionalGsTestData(ctx, t, ps.host1Protocols, ps.host2Protocols)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
assertComplete := assertCompletionFunction(responder, 1)

var receivedResponseData []byte
var receivedRequestData []byte

requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
data, has := responseData.Extension(td.extensionName)
if has {
receivedResponseData = data
}
})

responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
var has bool
receivedRequestData, has = requestData.Extension(td.extensionName)
if !has {
hookActions.TerminateWithError(errors.New("Missing extension"))
} else {
hookActions.SendExtensionData(td.extensionResponse)
}
})

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

// verify extension roundtrip
require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data")
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")

// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)

drain(requestor)
drain(responder)
assertComplete(ctx, t)

tracing := collectTracing(t)

traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)")
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

processUpdateSpan := tracing.FindSpanByTraceString("response(0)")
require.Equal(t, int64(0), testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "priority").AsInt64())
require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice())

// each verifyBlock span should link to a cacheProcess span that stored it

cacheProcessSpans := tracing.FindSpans("cacheProcess")
cacheProcessLinks := make(map[string]int64)
verifyBlockSpans := tracing.FindSpans("verifyBlock")

for _, verifyBlockSpan := range verifyBlockSpans {
require.Len(t, verifyBlockSpan.Links, 1, "verifyBlock span should have one link")
found := false
for _, cacheProcessSpan := range cacheProcessSpans {
sid := cacheProcessSpan.SpanContext.SpanID().String()
if verifyBlockSpan.Links[0].SpanContext.SpanID().String() == sid {
found = true
cacheProcessLinks[sid] = cacheProcessLinks[sid] + 1
break
}
}
require.True(t, found, "verifyBlock should link to a known cacheProcess span")
}
})

responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
var has bool
receivedRequestData, has = requestData.Extension(td.extensionName)
if !has {
hookActions.TerminateWithError(errors.New("Missing extension"))
} else {
hookActions.SendExtensionData(td.extensionResponse)
}
})
// each cacheProcess span should be linked to one verifyBlock span per block it stored

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

// verify extension roundtrip
require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data")
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")

// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)

drain(requestor)
drain(responder)
assertComplete(ctx, t)

tracing := collectTracing(t)

traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)")
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

processUpdateSpan := tracing.FindSpanByTraceString("response(0)")
require.Equal(t, int64(0), testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "priority").AsInt64())
require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice())

// each verifyBlock span should link to a cacheProcess span that stored it

cacheProcessSpans := tracing.FindSpans("cacheProcess")
cacheProcessLinks := make(map[string]int64)
verifyBlockSpans := tracing.FindSpans("verifyBlock")

for _, verifyBlockSpan := range verifyBlockSpans {
require.Len(t, verifyBlockSpan.Links, 1, "verifyBlock span should have one link")
found := false
for _, cacheProcessSpan := range cacheProcessSpans {
sid := cacheProcessSpan.SpanContext.SpanID().String()
if verifyBlockSpan.Links[0].SpanContext.SpanID().String() == sid {
found = true
cacheProcessLinks[sid] = cacheProcessLinks[sid] + 1
break
for _, cacheProcessSpan := range cacheProcessSpans {
blockCount := testutil.AttributeValueInTraceSpan(t, cacheProcessSpan, "blockCount").AsInt64()
require.Equal(t, cacheProcessLinks[cacheProcessSpan.SpanContext.SpanID().String()], blockCount, "cacheProcess span should be linked to one verifyBlock span per block it processed")
}
}
require.True(t, found, "verifyBlock should link to a known cacheProcess span")
}

// each cacheProcess span should be linked to one verifyBlock span per block it stored

for _, cacheProcessSpan := range cacheProcessSpans {
blockCount := testutil.AttributeValueInTraceSpan(t, cacheProcessSpan, "blockCount").AsInt64()
require.Equal(t, cacheProcessLinks[cacheProcessSpan.SpanContext.SpanID().String()], blockCount, "cacheProcess span should be linked to one verifyBlock span per block it processed")
})
}
}

Expand Down Expand Up @@ -1701,6 +1720,10 @@ func assertCancelOrCompleteFunction(gs graphsync.GraphExchange, requestCount int
}

func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
return newOptionalGsTestData(ctx, t, nil, nil)
}

func newOptionalGsTestData(ctx context.Context, t *testing.T, network1Protocols []protocol.ID, network2Protocols []protocol.ID) *gsTestData {
t.Helper()
td := &gsTestData{ctx: ctx}
td.mn = mocknet.New(ctx)
Expand All @@ -1713,8 +1736,16 @@ func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
err = td.mn.LinkAll()
require.NoError(t, err, "error linking hosts")

td.gsnet1 = gsnet.NewFromLibp2pHost(td.host1)
td.gsnet2 = gsnet.NewFromLibp2pHost(td.host2)
opts := make([]gsnet.Option, 0)
if network1Protocols != nil {
opts = append(opts, gsnet.GraphsyncProtocols(network1Protocols))
}
td.gsnet1 = gsnet.NewFromLibp2pHost(td.host1, opts...)
opts = make([]gsnet.Option, 0)
if network2Protocols != nil {
opts = append(opts, gsnet.GraphsyncProtocols(network2Protocols))
}
td.gsnet2 = gsnet.NewFromLibp2pHost(td.host2, opts...)
td.blockStore1 = make(map[ipld.Link][]byte)
td.persistence1 = testutil.NewTestStore(td.blockStore1)
td.blockStore2 = make(map[ipld.Link][]byte)
Expand Down Expand Up @@ -1784,7 +1815,7 @@ func processResponsesTraces(t *testing.T, tracing *testutil.Collector, responseC
finalStub := tracing.FindSpanByTraceString(fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1))
require.NotNil(t, finalStub)
if len(testutil.AttributeValueInTraceSpan(t, *finalStub, "requestIDs").AsStringSlice()) == 0 {
return append(traces, fmt.Sprintf("responseMessage(%d)->loaderProcess(0)", responseCount-1))
return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1))
}
return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)->cacheProcess(0)", responseCount-1))
}
Loading

0 comments on commit 9d9b198

Please sign in to comment.