Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(requestid): use uuids for requestids #313

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/testnet/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (n *network) SendMessage(
rateLimiters[to] = rateLimiter
}

pbMsg, err := mes.ToProto()
pbMsg, err := gsmsg.NewMessageHandler().ToProto(mes)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/ipfs/go-graphsync
go 1.16

require (
github.com/google/uuid v1.3.0
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/ipfs/go-block-format v0.0.3
Expand Down
30 changes: 28 additions & 2 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,44 @@ import (
"errors"
"fmt"

"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/libp2p/go-libp2p-core/peer"
)

// RequestID is a unique identifier for a GraphSync request.
type RequestID int32
type RequestID struct{ string }

// Tag returns an easy way to identify this request id as a graphsync request (for libp2p connections)
func (r RequestID) Tag() string {
return fmt.Sprintf("graphsync-request-%d", r)
return r.String()
}

// String form of a RequestID (should be a well-formed UUIDv4 string)
func (r RequestID) String() string {
return uuid.Must(uuid.FromBytes([]byte(r.string))).String()
}

// Byte form of a RequestID
func (r RequestID) Bytes() []byte {
return []byte(r.string)
}

// Create a new, random RequestID (should be a UUIDv4)
func NewRequestID() RequestID {
u := uuid.New()
return RequestID{string(u[:])}
}

// Create a RequestID from a byte slice
func ParseRequestID(b []byte) (RequestID, error) {
_, err := uuid.FromBytes(b)
if err != nil {
return RequestID{}, err
}
return RequestID{string(b)}, nil
}

// Priority a priority for a GraphSync request.
Expand Down
246 changes: 138 additions & 108 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"testing"
Expand All @@ -35,6 +34,7 @@ import (
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"

Expand All @@ -50,6 +50,22 @@ import (
"github.com/ipfs/go-graphsync/testutil"
)

// nil means use the default protocols
// tests data transfer for the following protocol combinations:
// default protocol -> default protocols
// old protocol -> default protocols
// default protocols -> old protocol
// old protocol -> old protocol
var protocolsForTest = map[string]struct {
host1Protocols []protocol.ID
host2Protocols []protocol.ID
}{
"(v1.1 -> v1.1)": {nil, nil},
"(v1.0 -> v1.1)": {[]protocol.ID{gsnet.ProtocolGraphsync_1_0_0}, nil},
"(v1.1 -> v1.0)": {nil, []protocol.ID{gsnet.ProtocolGraphsync_1_0_0}},
"(v1.0 -> v1.0)": {[]protocol.ID{gsnet.ProtocolGraphsync_1_0_0}, []protocol.ID{gsnet.ProtocolGraphsync_1_0_0}},
}

func TestMakeRequestToNetwork(t *testing.T) {

// create network
Expand Down Expand Up @@ -134,7 +150,7 @@ func TestSendResponseToIncomingRequest(t *testing.T) {
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

requestID := graphsync.RequestID(rand.Int31())
requestID := graphsync.NewRequestID()

builder := gsmsg.NewBuilder()
builder.AddRequest(gsmsg.NewRequest(requestID, blockChain.TipLink.(cidlink.Link).Cid, blockChain.Selector(), graphsync.Priority(math.MaxInt32), td.extension))
Expand Down Expand Up @@ -270,7 +286,6 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
}

func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {

// create network
ctx := context.Background()
ctx, collectTracing := testutil.SetupTracing(ctx)
Expand Down Expand Up @@ -318,112 +333,115 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
}

func TestGraphsyncRoundTrip(t *testing.T) {

// create network
ctx := context.Background()
ctx, collectTracing := testutil.SetupTracing(ctx)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
assertComplete := assertCompletionFunction(responder, 1)

var receivedResponseData []byte
var receivedRequestData []byte

requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
data, has := responseData.Extension(td.extensionName)
if has {
receivedResponseData = data
for pname, ps := range protocolsForTest {
t.Run(pname, func(t *testing.T) {
// create network
ctx := context.Background()
ctx, collectTracing := testutil.SetupTracing(ctx)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
td := newOptionalGsTestData(ctx, t, ps.host1Protocols, ps.host2Protocols)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
assertComplete := assertCompletionFunction(responder, 1)

var receivedResponseData []byte
var receivedRequestData []byte

requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
data, has := responseData.Extension(td.extensionName)
if has {
receivedResponseData = data
}
})

responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
var has bool
receivedRequestData, has = requestData.Extension(td.extensionName)
if !has {
hookActions.TerminateWithError(errors.New("Missing extension"))
} else {
hookActions.SendExtensionData(td.extensionResponse)
}
})

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

// verify extension roundtrip
require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data")
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")

// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)

drain(requestor)
drain(responder)
assertComplete(ctx, t)

tracing := collectTracing(t)

traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)")
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

processUpdateSpan := tracing.FindSpanByTraceString("response(0)")
require.Equal(t, int64(0), testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "priority").AsInt64())
require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice())

// each verifyBlock span should link to a cacheProcess span that stored it

cacheProcessSpans := tracing.FindSpans("cacheProcess")
cacheProcessLinks := make(map[string]int64)
verifyBlockSpans := tracing.FindSpans("verifyBlock")

for _, verifyBlockSpan := range verifyBlockSpans {
require.Len(t, verifyBlockSpan.Links, 1, "verifyBlock span should have one link")
found := false
for _, cacheProcessSpan := range cacheProcessSpans {
sid := cacheProcessSpan.SpanContext.SpanID().String()
if verifyBlockSpan.Links[0].SpanContext.SpanID().String() == sid {
found = true
cacheProcessLinks[sid] = cacheProcessLinks[sid] + 1
break
}
}
require.True(t, found, "verifyBlock should link to a known cacheProcess span")
}
})

responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
var has bool
receivedRequestData, has = requestData.Extension(td.extensionName)
if !has {
hookActions.TerminateWithError(errors.New("Missing extension"))
} else {
hookActions.SendExtensionData(td.extensionResponse)
}
})
// each cacheProcess span should be linked to one verifyBlock span per block it stored

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

// verify extension roundtrip
require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data")
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")

// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)

drain(requestor)
drain(responder)
assertComplete(ctx, t)

tracing := collectTracing(t)

traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)")
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

processUpdateSpan := tracing.FindSpanByTraceString("response(0)")
require.Equal(t, int64(0), testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "priority").AsInt64())
require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice())

// each verifyBlock span should link to a cacheProcess span that stored it

cacheProcessSpans := tracing.FindSpans("cacheProcess")
cacheProcessLinks := make(map[string]int64)
verifyBlockSpans := tracing.FindSpans("verifyBlock")

for _, verifyBlockSpan := range verifyBlockSpans {
require.Len(t, verifyBlockSpan.Links, 1, "verifyBlock span should have one link")
found := false
for _, cacheProcessSpan := range cacheProcessSpans {
sid := cacheProcessSpan.SpanContext.SpanID().String()
if verifyBlockSpan.Links[0].SpanContext.SpanID().String() == sid {
found = true
cacheProcessLinks[sid] = cacheProcessLinks[sid] + 1
break
for _, cacheProcessSpan := range cacheProcessSpans {
blockCount := testutil.AttributeValueInTraceSpan(t, cacheProcessSpan, "blockCount").AsInt64()
require.Equal(t, cacheProcessLinks[cacheProcessSpan.SpanContext.SpanID().String()], blockCount, "cacheProcess span should be linked to one verifyBlock span per block it processed")
}
}
require.True(t, found, "verifyBlock should link to a known cacheProcess span")
}

// each cacheProcess span should be linked to one verifyBlock span per block it stored

for _, cacheProcessSpan := range cacheProcessSpans {
blockCount := testutil.AttributeValueInTraceSpan(t, cacheProcessSpan, "blockCount").AsInt64()
require.Equal(t, cacheProcessLinks[cacheProcessSpan.SpanContext.SpanID().String()], blockCount, "cacheProcess span should be linked to one verifyBlock span per block it processed")
})
}
}

Expand Down Expand Up @@ -1702,6 +1720,10 @@ func assertCancelOrCompleteFunction(gs graphsync.GraphExchange, requestCount int
}

func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
return newOptionalGsTestData(ctx, t, nil, nil)
}

func newOptionalGsTestData(ctx context.Context, t *testing.T, network1Protocols []protocol.ID, network2Protocols []protocol.ID) *gsTestData {
t.Helper()
td := &gsTestData{ctx: ctx}
td.mn = mocknet.New(ctx)
Expand All @@ -1714,8 +1736,16 @@ func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
err = td.mn.LinkAll()
require.NoError(t, err, "error linking hosts")

td.gsnet1 = gsnet.NewFromLibp2pHost(td.host1)
td.gsnet2 = gsnet.NewFromLibp2pHost(td.host2)
opts := make([]gsnet.Option, 0)
if network1Protocols != nil {
opts = append(opts, gsnet.GraphsyncProtocols(network1Protocols))
}
td.gsnet1 = gsnet.NewFromLibp2pHost(td.host1, opts...)
opts = make([]gsnet.Option, 0)
if network2Protocols != nil {
opts = append(opts, gsnet.GraphsyncProtocols(network2Protocols))
}
td.gsnet2 = gsnet.NewFromLibp2pHost(td.host2, opts...)
td.blockStore1 = make(map[ipld.Link][]byte)
td.persistence1 = testutil.NewTestStore(td.blockStore1)
td.blockStore2 = make(map[ipld.Link][]byte)
Expand Down Expand Up @@ -1784,7 +1814,7 @@ func processResponsesTraces(t *testing.T, tracing *testutil.Collector, responseC
traces := testutil.RepeatTraceStrings("processResponses({})->loaderProcess(0)->cacheProcess(0)", responseCount-1)
finalStub := tracing.FindSpanByTraceString(fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1))
require.NotNil(t, finalStub)
if len(testutil.AttributeValueInTraceSpan(t, *finalStub, "requestIDs").AsInt64Slice()) == 0 {
if len(testutil.AttributeValueInTraceSpan(t, *finalStub, "requestIDs").AsStringSlice()) == 0 {
return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1))
}
return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)->cacheProcess(0)", responseCount-1))
Expand Down
Loading