diff --git a/storagemarket/impl/clientstates/client_fsm.go b/storagemarket/impl/clientstates/client_fsm.go index d124bf0a..f7ed90d9 100644 --- a/storagemarket/impl/clientstates/client_fsm.go +++ b/storagemarket/impl/clientstates/client_fsm.go @@ -79,7 +79,7 @@ var ClientEvents = fsm.Events{ FromMany(storagemarket.StorageDealStartDataTransfer, storagemarket.StorageDealTransferring). To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.ClientDeal, err error) error { - deal.Message = xerrors.Errorf("failed to initiate data transfer: %w", err).Error() + deal.Message = xerrors.Errorf("failed to complete data transfer: %w", err).Error() return nil }), diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index dcb74a73..08c27efd 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -622,6 +622,128 @@ func TestRestartClient(t *testing.T) { } } +// TestBounceConnectionDataTransfer tests that when the the connection is +// broken and then restarted, the data transfer will resume and the deal will +// complete successfully. +func TestBounceConnectionDataTransfer(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // Configure data-transfer to make 5 attempts, backing off 1s each time + dtClientNetRetry := dtnet.RetryParameters(time.Second, time.Second, 5, 1) + td := shared_testutil.NewLibp2pTestData(ctx, t) + td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry) + + // Configure data-transfer to automatically restart when connection goes down + restartConf := dtimpl.ChannelRestartConfig(channelmonitor.Config{ + AcceptTimeout: 100 * time.Millisecond, + Interval: 100 * time.Millisecond, + MinBytesTransferred: 1, + ChecksPerInterval: 10, + RestartBackoff: 200 * time.Millisecond, + MaxConsecutiveRestarts: 5, + CompleteTimeout: 100 * time.Millisecond, + }) + smState := testnodes.NewStorageMarketState() + depGen := dependencies.NewDepGenerator() + depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport, counter *storedcounter.StoredCounter) (datatransfer.Manager, error) { + return dtimpl.NewDataTransfer(ds, dir, transferNetwork, transport, counter, restartConf) + } + deps := depGen.New(t, ctx, td, smState, "", noOpDelay, noOpDelay) + h := testharness.NewHarnessWithTestData(t, td, deps, true, false) + + client := h.Client + clientHost := h.TestData.Host1.ID() + providerHost := h.TestData.Host2.ID() + + // start client and provider + shared_testutil.StartAndWaitForReady(ctx, t, h.Provider) + shared_testutil.StartAndWaitForReady(ctx, t, h.Client) + + // set ask price where we'll accept any price + err := h.Provider.SetAsk(big.NewInt(0), big.NewInt(0), 50000) + require.NoError(t, err) + + // Bounce connection after this many bytes have been queued for sending + bounceConnectionAt := map[uint64]bool{ + 1000: false, + 5000: false, + } + h.DTClient.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { + //t.Logf("dt-clnt %s: %s %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()], channelState.Message()) + if event.Code == datatransfer.DataQueuedProgress { + //t.Logf(" > qued %d", channelState.Queued()) + + // Check if enough bytes have been queued that the connection + // should be bounced + for at, already := range bounceConnectionAt { + if channelState.Sent() > at && !already { + bounceConnectionAt[at] = true + + // Break the connection + queued := channelState.Queued() + t.Logf(" breaking connection after sending %d bytes", queued) + h.TestData.MockNet.DisconnectPeers(clientHost, providerHost) + h.TestData.MockNet.UnlinkPeers(clientHost, providerHost) + + go func() { + // Restore the connection + time.Sleep(100 * time.Millisecond) + t.Logf(" restoring connection from bounce at %d bytes", queued) + h.TestData.MockNet.LinkPeers(clientHost, providerHost) + }() + } + } + } + //if event.Code == datatransfer.DataSentProgress { + // t.Logf(" > sent %d", channelState.Sent()) + //} + }) + //h.DTProvider.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { + // if event.Code == datatransfer.DataReceivedProgress { + // t.Logf(" > rcvd %d", channelState.Received()) + // } + //}) + + result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}, false, false) + proposalCid := result.ProposalCid + t.Log("storage deal proposed") + + // This wait group will complete after the deal has completed on both the + // client and provider + expireWg := sync.WaitGroup{} + expireWg.Add(1) + _ = h.Provider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { + if event == storagemarket.ProviderEventDealExpired { + expireWg.Done() + } + }) + + expireWg.Add(1) + _ = client.SubscribeToEvents(func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { + if event == storagemarket.ClientEventDealExpired { + expireWg.Done() + } + }) + + // Wait till both client and provider have completed the deal + waitGroupWait(ctx, &expireWg) + t.Log("---------- finished waiting for expected events-------") + + // Ensure the client and provider both reached the final state + cd, err := client.GetLocalDeal(ctx, proposalCid) + require.NoError(t, err) + shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, cd.State) + + providerDeals, err := h.Provider.ListLocalDeals() + require.NoError(t, err) + + pd := providerDeals[0] + require.Equal(t, pd.ProposalCid, proposalCid) + shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, pd.State) +} + // TestCancelDataTransfer tests that cancelling a data transfer cancels the deal func TestCancelDataTransfer(t *testing.T) { run := func(t *testing.T, cancelByClient bool, hasConnectivity bool) {