From 8a5e93e9541a2e4e169b70a676e65749016c577f Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 14 Dec 2021 15:40:43 +1100 Subject: [PATCH] fix(requestmanager): make collect test requests with uuids sortable --- requestmanager/requestmanager_test.go | 79 ++++++++++++++------------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 8af232de..317074dc 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -41,7 +41,7 @@ func TestNormalSimultaneousFetch(t *testing.T) { returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) returnedResponseChan2, returnedErrorChan2 := td.requestManager.NewRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector()) - requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2) + requestRecords := readNNetworkRequests(requestCtx, t, td, 2) td.tcm.AssertProtected(t, peers[0]) td.tcm.AssertProtectedWithTags(t, peers[0], requestRecords[0].gsr.ID().Tag(), requestRecords[1].gsr.ID().Tag()) @@ -130,7 +130,7 @@ func TestCancelRequestInProgress(t *testing.T) { returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) returnedResponseChan2, returnedErrorChan2 := td.requestManager.NewRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) - requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2) + requestRecords := readNNetworkRequests(requestCtx, t, td, 2) td.tcm.AssertProtected(t, peers[0]) td.tcm.AssertProtectedWithTags(t, peers[0], requestRecords[0].gsr.ID().Tag(), requestRecords[1].gsr.ID().Tag()) @@ -148,7 +148,7 @@ func TestCancelRequestInProgress(t *testing.T) { td.fal.SuccessResponseOn(peers[0], requestRecords[1].gsr.ID(), firstBlocks) td.blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3) cancel1() - rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr := readNNetworkRequests(requestCtx, t, td, 1)[0] require.True(t, rr.gsr.IsCancel()) require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID()) @@ -194,7 +194,7 @@ func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) { _, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) - requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1) + requestRecords := readNNetworkRequests(requestCtx, t, td, 1) td.tcm.AssertProtected(t, peers[0]) td.tcm.AssertProtectedWithTags(t, peers[0], requestRecords[0].gsr.ID().Tag()) @@ -215,7 +215,7 @@ func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) { require.NoError(t, err) postCancel <- struct{}{} - rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr := readNNetworkRequests(requestCtx, t, td, 1)[0] require.True(t, rr.gsr.IsCancel()) require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID()) @@ -243,7 +243,7 @@ func TestCancelManagerExitsGracefully(t *testing.T) { returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) - rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr := readNNetworkRequests(requestCtx, t, td, 1)[0] firstBlocks := td.blockChain.Blocks(0, 3) firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true) @@ -275,7 +275,7 @@ func TestFailedRequest(t *testing.T) { returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) - rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr := readNNetworkRequests(requestCtx, t, td, 1)[0] td.tcm.AssertProtected(t, peers[0]) td.tcm.AssertProtectedWithTags(t, peers[0], rr.gsr.ID().Tag()) @@ -299,7 +299,7 @@ func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) { returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) - rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr := readNNetworkRequests(requestCtx, t, td, 1)[0] // async loaded response responds immediately td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks()) @@ -330,7 +330,7 @@ func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) { }) returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) - rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr := readNNetworkRequests(requestCtx, t, td, 1)[0] // async loaded response responds immediately td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks()) @@ -358,7 +358,7 @@ func TestRequestReturnsMissingBlocks(t *testing.T) { returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) - rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr := readNNetworkRequests(requestCtx, t, td, 1)[0] md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), false) firstResponses := []gsmsg.GraphSyncResponse{ @@ -436,7 +436,7 @@ func TestEncodingExtensions(t *testing.T) { td.responseHooks.Register(hook) returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2) - rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr := readNNetworkRequests(requestCtx, t, td, 1)[0] gsr := rr.gsr returnedData1, found := gsr.Extension(td.extensionName1) @@ -474,7 +474,7 @@ func TestEncodingExtensions(t *testing.T) { testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data") require.Equal(t, expectedData, received, "did not receive correct extension data from resposne") - rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr = readNNetworkRequests(requestCtx, t, td, 1)[0] receivedUpdateData, has := rr.gsr.Extension(td.extensionName1) require.True(t, has) require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension") @@ -510,7 +510,7 @@ func TestEncodingExtensions(t *testing.T) { testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data") require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne") - rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr = readNNetworkRequests(requestCtx, t, td, 1)[0] receivedUpdateData, has = rr.gsr.Extension(td.extensionName1) require.True(t, has) require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension") @@ -550,7 +550,7 @@ func TestBlockHooks(t *testing.T) { td.blockHooks.Register(hook) returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2) - rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr := readNNetworkRequests(requestCtx, t, td, 1)[0] gsr := rr.gsr returnedData1, found := gsr.Extension(td.extensionName1) @@ -602,7 +602,7 @@ func TestBlockHooks(t *testing.T) { }) td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), firstBlocks) - ur := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + ur := readNNetworkRequests(requestCtx, t, td, 1)[0] receivedUpdateData, has := ur.gsr.Extension(td.extensionName1) require.True(t, has) require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension") @@ -666,7 +666,7 @@ func TestBlockHooks(t *testing.T) { }) td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), nextBlocks) - ur = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + ur = readNNetworkRequests(requestCtx, t, td, 1)[0] receivedUpdateData, has = ur.gsr.Extension(td.extensionName1) require.True(t, has) require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension") @@ -715,7 +715,7 @@ func TestOutgoingRequestHooks(t *testing.T) { returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1) returnedResponseChan2, returnedErrorChan2 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) - requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2) + requestRecords := readNNetworkRequests(requestCtx, t, td, 2) dedupData, has := requestRecords[0].gsr.Extension(graphsync.ExtensionDeDupByKey) require.True(t, has) @@ -773,7 +773,7 @@ func TestOutgoingRequestListeners(t *testing.T) { returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1) - requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1) + requestRecords := readNNetworkRequests(requestCtx, t, td, 1) // Should have fired by now select { @@ -836,7 +836,7 @@ func TestPauseResume(t *testing.T) { // Start request returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) - rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr := readNNetworkRequests(requestCtx, t, td, 1)[0] // Start processing responses md := metadataForBlocks(td.blockChain.AllBlocks(), true) @@ -862,7 +862,7 @@ func TestPauseResume(t *testing.T) { <-holdForPause // read the outgoing cancel request - pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + pauseCancel := readNNetworkRequests(requestCtx, t, td, 1)[0] require.True(t, pauseCancel.gsr.IsCancel()) // verify no further responses come through @@ -875,7 +875,7 @@ func TestPauseResume(t *testing.T) { require.NoError(t, err) // verify the correct new request with Do-no-send-cids & other extensions - resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + resumedRequest := readNNetworkRequests(requestCtx, t, td, 1)[0] doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs) doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData) require.NoError(t, err) @@ -922,7 +922,7 @@ func TestPauseResumeExternal(t *testing.T) { // Start request returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) - rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + rr := readNNetworkRequests(requestCtx, t, td, 1)[0] // Start processing responses md := metadataForBlocks(td.blockChain.AllBlocks(), true) @@ -942,7 +942,7 @@ func TestPauseResumeExternal(t *testing.T) { <-holdForPause // read the outgoing cancel request - pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + pauseCancel := readNNetworkRequests(requestCtx, t, td, 1)[0] require.True(t, pauseCancel.gsr.IsCancel()) // verify no further responses come through @@ -955,7 +955,7 @@ func TestPauseResumeExternal(t *testing.T) { require.NoError(t, err) // verify the correct new request with Do-no-send-cids & other extensions - resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + resumedRequest := readNNetworkRequests(requestCtx, t, td, 1)[0] doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs) doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData) require.NoError(t, err) @@ -991,7 +991,7 @@ func TestStats(t *testing.T) { _, _ = td.requestManager.NewRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector()) _, _ = td.requestManager.NewRequest(requestCtx, peers[1], td.blockChain.TipLink, td.blockChain.Selector()) - requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 3) + requestRecords := readNNetworkRequests(requestCtx, t, td, 3) peerState := td.requestManager.PeerState(peers[0]) require.Len(t, peerState.RequestStates, 2) @@ -1026,25 +1026,22 @@ func (fph *fakePeerHandler) AllocateAndBuildMessage(p peer.ID, blkSize uint64, } } -func readNNetworkRequests(ctx context.Context, - t *testing.T, - requestRecordChan <-chan requestRecord, - count int) []requestRecord { - requestRecords := make([]requestRecord, 0, count) +func readNNetworkRequests(ctx context.Context, t *testing.T, td *testData, count int) []requestRecord { + requestRecords := make(map[graphsync.RequestID]requestRecord, count) for i := 0; i < count; i++ { var rr requestRecord - testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i)) - requestRecords = append(requestRecords, rr) + testutil.AssertReceive(ctx, t, td.requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i)) + requestRecords[rr.gsr.ID()] = rr } // because of the simultaneous request queues it's possible for the requests to go to the network layer out of order // if the requests are queued at a near identical time - // TODO: howdo? - /* - sort.Slice(requestRecords, func(i, j int) bool { - return requestRecords[i].gsr.ID() < requestRecords[j].gsr.ID() - }) - */ - return requestRecords + sorted := make([]requestRecord, 0, len(requestRecords)) + for _, id := range td.requestIds { + if rr, ok := requestRecords[id]; ok { + sorted = append(sorted, rr) + } + } + return sorted } func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata { @@ -1091,6 +1088,7 @@ type testData struct { outgoingRequestProcessingListeners *listeners.OutgoingRequestProcessingListeners taskqueue *taskqueue.WorkerTaskQueue executor *executor.Executor + requestIds []graphsync.RequestID } func newTestData(ctx context.Context, t *testing.T) *testData { @@ -1127,5 +1125,8 @@ func newTestData(ctx context.Context, t *testing.T) *testData { Name: td.extensionName2, Data: td.extensionData2, } + td.requestHooks.Register(func(p peer.ID, request graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) { + td.requestIds = append(td.requestIds, request.ID()) + }) return td }