Skip to content

Commit

Permalink
feat: update to go-data-transfer v1.2.9 (#508) (#504)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored Mar 22, 2021
1 parent 1f259af commit 20da4f9
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 1 deletion.
2 changes: 1 addition & 1 deletion storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var ClientEvents = fsm.Events{
FromMany(storagemarket.StorageDealStartDataTransfer, storagemarket.StorageDealTransferring).
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("failed to initiate data transfer: %w", err).Error()
deal.Message = xerrors.Errorf("failed to complete data transfer: %w", err).Error()
return nil
}),

Expand Down
122 changes: 122 additions & 0 deletions storagemarket/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,128 @@ func TestRestartClient(t *testing.T) {
}
}

// TestBounceConnectionDataTransfer tests that when the the connection is
// broken and then restarted, the data transfer will resume and the deal will
// complete successfully.
func TestBounceConnectionDataTransfer(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

// Configure data-transfer to make 5 attempts, backing off 1s each time
dtClientNetRetry := dtnet.RetryParameters(time.Second, time.Second, 5, 1)
td := shared_testutil.NewLibp2pTestData(ctx, t)
td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry)

// Configure data-transfer to automatically restart when connection goes down
restartConf := dtimpl.ChannelRestartConfig(channelmonitor.Config{
AcceptTimeout: 100 * time.Millisecond,
Interval: 100 * time.Millisecond,
MinBytesTransferred: 1,
ChecksPerInterval: 10,
RestartBackoff: 200 * time.Millisecond,
MaxConsecutiveRestarts: 5,
CompleteTimeout: 100 * time.Millisecond,
})
smState := testnodes.NewStorageMarketState()
depGen := dependencies.NewDepGenerator()
depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport, counter *storedcounter.StoredCounter) (datatransfer.Manager, error) {
return dtimpl.NewDataTransfer(ds, dir, transferNetwork, transport, counter, restartConf)
}
deps := depGen.New(t, ctx, td, smState, "", noOpDelay, noOpDelay)
h := testharness.NewHarnessWithTestData(t, td, deps, true, false)

client := h.Client
clientHost := h.TestData.Host1.ID()
providerHost := h.TestData.Host2.ID()

// start client and provider
shared_testutil.StartAndWaitForReady(ctx, t, h.Provider)
shared_testutil.StartAndWaitForReady(ctx, t, h.Client)

// set ask price where we'll accept any price
err := h.Provider.SetAsk(big.NewInt(0), big.NewInt(0), 50000)
require.NoError(t, err)

// Bounce connection after this many bytes have been queued for sending
bounceConnectionAt := map[uint64]bool{
1000: false,
5000: false,
}
h.DTClient.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
//t.Logf("dt-clnt %s: %s %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()], channelState.Message())
if event.Code == datatransfer.DataQueuedProgress {
//t.Logf(" > qued %d", channelState.Queued())

// Check if enough bytes have been queued that the connection
// should be bounced
for at, already := range bounceConnectionAt {
if channelState.Sent() > at && !already {
bounceConnectionAt[at] = true

// Break the connection
queued := channelState.Queued()
t.Logf(" breaking connection after sending %d bytes", queued)
h.TestData.MockNet.DisconnectPeers(clientHost, providerHost)
h.TestData.MockNet.UnlinkPeers(clientHost, providerHost)

go func() {
// Restore the connection
time.Sleep(100 * time.Millisecond)
t.Logf(" restoring connection from bounce at %d bytes", queued)
h.TestData.MockNet.LinkPeers(clientHost, providerHost)
}()
}
}
}
//if event.Code == datatransfer.DataSentProgress {
// t.Logf(" > sent %d", channelState.Sent())
//}
})
//h.DTProvider.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
// if event.Code == datatransfer.DataReceivedProgress {
// t.Logf(" > rcvd %d", channelState.Received())
// }
//})

result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}, false, false)
proposalCid := result.ProposalCid
t.Log("storage deal proposed")

// This wait group will complete after the deal has completed on both the
// client and provider
expireWg := sync.WaitGroup{}
expireWg.Add(1)
_ = h.Provider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
if event == storagemarket.ProviderEventDealExpired {
expireWg.Done()
}
})

expireWg.Add(1)
_ = client.SubscribeToEvents(func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
if event == storagemarket.ClientEventDealExpired {
expireWg.Done()
}
})

// Wait till both client and provider have completed the deal
waitGroupWait(ctx, &expireWg)
t.Log("---------- finished waiting for expected events-------")

// Ensure the client and provider both reached the final state
cd, err := client.GetLocalDeal(ctx, proposalCid)
require.NoError(t, err)
shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, cd.State)

providerDeals, err := h.Provider.ListLocalDeals()
require.NoError(t, err)

pd := providerDeals[0]
require.Equal(t, pd.ProposalCid, proposalCid)
shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, pd.State)
}

// TestCancelDataTransfer tests that cancelling a data transfer cancels the deal
func TestCancelDataTransfer(t *testing.T) {
run := func(t *testing.T, cancelByClient bool, hasConnectivity bool) {
Expand Down

0 comments on commit 20da4f9

Please sign in to comment.