Skip to content

Commit

Permalink
fix(requestmanager): make collect test requests with uuids sortable
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Dec 15, 2021
1 parent 52d93a4 commit 8a5e93e
Showing 1 changed file with 40 additions and 39 deletions.
79 changes: 40 additions & 39 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestNormalSimultaneousFetch(t *testing.T) {
returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
returnedResponseChan2, returnedErrorChan2 := td.requestManager.NewRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())

requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
requestRecords := readNNetworkRequests(requestCtx, t, td, 2)

td.tcm.AssertProtected(t, peers[0])
td.tcm.AssertProtectedWithTags(t, peers[0], requestRecords[0].gsr.ID().Tag(), requestRecords[1].gsr.ID().Tag())
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestCancelRequestInProgress(t *testing.T) {
returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
returnedResponseChan2, returnedErrorChan2 := td.requestManager.NewRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
requestRecords := readNNetworkRequests(requestCtx, t, td, 2)

td.tcm.AssertProtected(t, peers[0])
td.tcm.AssertProtectedWithTags(t, peers[0], requestRecords[0].gsr.ID().Tag(), requestRecords[1].gsr.ID().Tag())
Expand All @@ -148,7 +148,7 @@ func TestCancelRequestInProgress(t *testing.T) {
td.fal.SuccessResponseOn(peers[0], requestRecords[1].gsr.ID(), firstBlocks)
td.blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
cancel1()
rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]

require.True(t, rr.gsr.IsCancel())
require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) {

_, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)
requestRecords := readNNetworkRequests(requestCtx, t, td, 1)

td.tcm.AssertProtected(t, peers[0])
td.tcm.AssertProtectedWithTags(t, peers[0], requestRecords[0].gsr.ID().Tag())
Expand All @@ -215,7 +215,7 @@ func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) {
require.NoError(t, err)
postCancel <- struct{}{}

rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]

require.True(t, rr.gsr.IsCancel())
require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestCancelManagerExitsGracefully(t *testing.T) {

returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]

firstBlocks := td.blockChain.Blocks(0, 3)
firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestFailedRequest(t *testing.T) {

returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]
td.tcm.AssertProtected(t, peers[0])
td.tcm.AssertProtectedWithTags(t, peers[0], rr.gsr.ID().Tag())

Expand All @@ -299,7 +299,7 @@ func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {

returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]

// async loaded response responds immediately
td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks())
Expand Down Expand Up @@ -330,7 +330,7 @@ func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
})
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]

// async loaded response responds immediately
td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks())
Expand Down Expand Up @@ -358,7 +358,7 @@ func TestRequestReturnsMissingBlocks(t *testing.T) {

returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]

md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), false)
firstResponses := []gsmsg.GraphSyncResponse{
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestEncodingExtensions(t *testing.T) {
td.responseHooks.Register(hook)
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)

rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]

gsr := rr.gsr
returnedData1, found := gsr.Extension(td.extensionName1)
Expand Down Expand Up @@ -474,7 +474,7 @@ func TestEncodingExtensions(t *testing.T) {
testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
require.Equal(t, expectedData, received, "did not receive correct extension data from resposne")

rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr = readNNetworkRequests(requestCtx, t, td, 1)[0]
receivedUpdateData, has := rr.gsr.Extension(td.extensionName1)
require.True(t, has)
require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")
Expand Down Expand Up @@ -510,7 +510,7 @@ func TestEncodingExtensions(t *testing.T) {
testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne")

rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr = readNNetworkRequests(requestCtx, t, td, 1)[0]
receivedUpdateData, has = rr.gsr.Extension(td.extensionName1)
require.True(t, has)
require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
Expand Down Expand Up @@ -550,7 +550,7 @@ func TestBlockHooks(t *testing.T) {
td.blockHooks.Register(hook)
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)

rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]

gsr := rr.gsr
returnedData1, found := gsr.Extension(td.extensionName1)
Expand Down Expand Up @@ -602,7 +602,7 @@ func TestBlockHooks(t *testing.T) {
})
td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), firstBlocks)

ur := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
ur := readNNetworkRequests(requestCtx, t, td, 1)[0]
receivedUpdateData, has := ur.gsr.Extension(td.extensionName1)
require.True(t, has)
require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")
Expand Down Expand Up @@ -666,7 +666,7 @@ func TestBlockHooks(t *testing.T) {
})
td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), nextBlocks)

ur = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
ur = readNNetworkRequests(requestCtx, t, td, 1)[0]
receivedUpdateData, has = ur.gsr.Extension(td.extensionName1)
require.True(t, has)
require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
Expand Down Expand Up @@ -715,7 +715,7 @@ func TestOutgoingRequestHooks(t *testing.T) {
returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1)
returnedResponseChan2, returnedErrorChan2 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
requestRecords := readNNetworkRequests(requestCtx, t, td, 2)

dedupData, has := requestRecords[0].gsr.Extension(graphsync.ExtensionDeDupByKey)
require.True(t, has)
Expand Down Expand Up @@ -773,7 +773,7 @@ func TestOutgoingRequestListeners(t *testing.T) {

returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1)

requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)
requestRecords := readNNetworkRequests(requestCtx, t, td, 1)

// Should have fired by now
select {
Expand Down Expand Up @@ -836,7 +836,7 @@ func TestPauseResume(t *testing.T) {
// Start request
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]

// Start processing responses
md := metadataForBlocks(td.blockChain.AllBlocks(), true)
Expand All @@ -862,7 +862,7 @@ func TestPauseResume(t *testing.T) {
<-holdForPause

// read the outgoing cancel request
pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
pauseCancel := readNNetworkRequests(requestCtx, t, td, 1)[0]
require.True(t, pauseCancel.gsr.IsCancel())

// verify no further responses come through
Expand All @@ -875,7 +875,7 @@ func TestPauseResume(t *testing.T) {
require.NoError(t, err)

// verify the correct new request with Do-no-send-cids & other extensions
resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
resumedRequest := readNNetworkRequests(requestCtx, t, td, 1)[0]
doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
require.NoError(t, err)
Expand Down Expand Up @@ -922,7 +922,7 @@ func TestPauseResumeExternal(t *testing.T) {
// Start request
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]

// Start processing responses
md := metadataForBlocks(td.blockChain.AllBlocks(), true)
Expand All @@ -942,7 +942,7 @@ func TestPauseResumeExternal(t *testing.T) {
<-holdForPause

// read the outgoing cancel request
pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
pauseCancel := readNNetworkRequests(requestCtx, t, td, 1)[0]
require.True(t, pauseCancel.gsr.IsCancel())

// verify no further responses come through
Expand All @@ -955,7 +955,7 @@ func TestPauseResumeExternal(t *testing.T) {
require.NoError(t, err)

// verify the correct new request with Do-no-send-cids & other extensions
resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
resumedRequest := readNNetworkRequests(requestCtx, t, td, 1)[0]
doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
require.NoError(t, err)
Expand Down Expand Up @@ -991,7 +991,7 @@ func TestStats(t *testing.T) {
_, _ = td.requestManager.NewRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
_, _ = td.requestManager.NewRequest(requestCtx, peers[1], td.blockChain.TipLink, td.blockChain.Selector())

requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 3)
requestRecords := readNNetworkRequests(requestCtx, t, td, 3)

peerState := td.requestManager.PeerState(peers[0])
require.Len(t, peerState.RequestStates, 2)
Expand Down Expand Up @@ -1026,25 +1026,22 @@ func (fph *fakePeerHandler) AllocateAndBuildMessage(p peer.ID, blkSize uint64,
}
}

func readNNetworkRequests(ctx context.Context,
t *testing.T,
requestRecordChan <-chan requestRecord,
count int) []requestRecord {
requestRecords := make([]requestRecord, 0, count)
func readNNetworkRequests(ctx context.Context, t *testing.T, td *testData, count int) []requestRecord {
requestRecords := make(map[graphsync.RequestID]requestRecord, count)
for i := 0; i < count; i++ {
var rr requestRecord
testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
requestRecords = append(requestRecords, rr)
testutil.AssertReceive(ctx, t, td.requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
requestRecords[rr.gsr.ID()] = rr
}
// because of the simultaneous request queues it's possible for the requests to go to the network layer out of order
// if the requests are queued at a near identical time
// TODO: howdo?
/*
sort.Slice(requestRecords, func(i, j int) bool {
return requestRecords[i].gsr.ID() < requestRecords[j].gsr.ID()
})
*/
return requestRecords
sorted := make([]requestRecord, 0, len(requestRecords))
for _, id := range td.requestIds {
if rr, ok := requestRecords[id]; ok {
sorted = append(sorted, rr)
}
}
return sorted
}

func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
Expand Down Expand Up @@ -1091,6 +1088,7 @@ type testData struct {
outgoingRequestProcessingListeners *listeners.OutgoingRequestProcessingListeners
taskqueue *taskqueue.WorkerTaskQueue
executor *executor.Executor
requestIds []graphsync.RequestID
}

func newTestData(ctx context.Context, t *testing.T) *testData {
Expand Down Expand Up @@ -1127,5 +1125,8 @@ func newTestData(ctx context.Context, t *testing.T) *testData {
Name: td.extensionName2,
Data: td.extensionData2,
}
td.requestHooks.Register(func(p peer.ID, request graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
td.requestIds = append(td.requestIds, request.ID())
})
return td
}

0 comments on commit 8a5e93e

Please sign in to comment.