Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use time-based counter instead of stored counter for transfer ID #169

Merged
merged 1 commit into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"

"github.com/filecoin-project/go-storedcounter"

datatransfer "github.com/filecoin-project/go-data-transfer"
tn "github.com/filecoin-project/go-data-transfer/benchmarks/testnet"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
Expand Down Expand Up @@ -170,8 +168,7 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD
storer := storeutil.StorerForBlockstore(bstore)
gs := gsimpl.New(ctx, gsNet, loader, storer, gsimpl.RejectAllRequestsByDefault())
transport := gstransport.NewTransport(p, gs)
dtCounter := storedcounter.New(dstore, datastore.NewKey("/data-transfers/counter"))
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), os.TempDir(), dtNet, transport, dtCounter)
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), os.TempDir(), dtNet, transport)
if err != nil {
return Instance{}, err
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.14
require (
github.com/filecoin-project/go-ds-versioning v0.1.0
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/ipfs/go-block-format v0.0.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe h
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNpjmVH56sVtS/RfclBAYocb4as=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
Expand Down
8 changes: 3 additions & 5 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-storedcounter"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channelmonitor"
"github.com/filecoin-project/go-data-transfer/channels"
Expand All @@ -39,10 +37,10 @@ type manager struct {
channels *channels.Channels
peerID peer.ID
transport datatransfer.Transport
storedCounter *storedcounter.StoredCounter
cidLists cidlists.CIDLists
channelMonitor *channelmonitor.Monitor
channelMonitorCfg *channelmonitor.Config
transferIDGen *timeCounter
}

type internalEvent struct {
Expand Down Expand Up @@ -88,7 +86,7 @@ func ChannelRestartConfig(cfg channelmonitor.Config) DataTransferOption {
}

// NewDataTransfer initializes a new instance of a data transfer manager
func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, storedCounter *storedcounter.StoredCounter, options ...DataTransferOption) (datatransfer.Manager, error) {
func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, options ...DataTransferOption) (datatransfer.Manager, error) {
m := &manager{
dataTransferNetwork: dataTransferNetwork,
validatedTypes: registry.NewRegistry(),
Expand All @@ -99,7 +97,7 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw
readySub: pubsub.New(readyDispatcher),
peerID: dataTransferNetwork.ID(),
transport: transport,
storedCounter: storedCounter,
transferIDGen: newTimeCounter(),
}

cidLists, err := cidlists.NewCIDLists(cidListsDir)
Expand Down
9 changes: 2 additions & 7 deletions impl/initiating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-storedcounter"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels"
. "github.com/filecoin-project/go-data-transfer/impl"
Expand Down Expand Up @@ -330,8 +328,7 @@ func TestDataTransferInitiating(t *testing.T) {
h.network = testutil.NewFakeNetwork(h.peers[0])
h.transport = testutil.NewFakeTransport()
h.ds = dss.MutexWrap(datastore.NewMapDatastore())
h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter"))
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, h.storedCounter, verify.options...)
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, verify.options...)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt)
h.dt = dt
Expand Down Expand Up @@ -575,11 +572,10 @@ func TestDataTransferRestartInitiating(t *testing.T) {
h.network = testutil.NewFakeNetwork(h.peers[0])
h.transport = testutil.NewFakeTransport()
h.ds = dss.MutexWrap(datastore.NewMapDatastore())
h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter"))
h.voucherValidator = testutil.NewStubbedValidator()

// setup data transfer``
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, h.storedCounter)
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt)
h.dt = dt
Expand Down Expand Up @@ -620,7 +616,6 @@ type harness struct {
network *testutil.FakeNetwork
transport *testutil.FakeTransport
ds datastore.Batching
storedCounter *storedcounter.StoredCounter
dt datatransfer.Manager
voucherValidator *testutil.StubbedValidator
stor ipld.Node
Expand Down
48 changes: 22 additions & 26 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-storedcounter"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channelmonitor"
"github.com/filecoin-project/go-data-transfer/encoding"
Expand Down Expand Up @@ -106,10 +104,10 @@ func TestRoundTrip(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

Expand Down Expand Up @@ -261,10 +259,10 @@ func TestMultipleRoundTripMultipleStores(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

Expand Down Expand Up @@ -383,7 +381,7 @@ func TestManyReceiversAtOnce(t *testing.T) {
host1 := gsData.Host1 // initiator, data sender

tp1 := gsData.SetupGSTransportHost1()
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)

Expand Down Expand Up @@ -411,9 +409,7 @@ func TestManyReceiversAtOnce(t *testing.T) {

dtDs := namespace.Wrap(ds, datastore.NewKey("datatransfer"))

storedCounter := storedcounter.New(ds, datastore.NewKey("counter"))

receiver, err := NewDataTransfer(dtDs, os.TempDir(), dtnet, gsTransport, storedCounter)
receiver, err := NewDataTransfer(dtDs, os.TempDir(), dtnet, gsTransport)
require.NoError(t, err)
err = receiver.Start(gsData.Ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -671,12 +667,12 @@ func TestAutoRestart(t *testing.T) {
MaxConsecutiveRestarts: 5,
CompleteTimeout: 100 * time.Millisecond,
})
initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, gsData.StoredCounter1, restartConf)
initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, restartConf)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, initiator)
defer initiator.Stop(ctx)

responder, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, responderGSTspt, gsData.StoredCounter2)
responder, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, responderGSTspt)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, responder)
defer responder.Stop(ctx)
Expand Down Expand Up @@ -821,10 +817,10 @@ func TestRoundTripCancelledRequest(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

Expand Down Expand Up @@ -962,10 +958,10 @@ func TestSimulatedRetrievalFlow(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)
var chid datatransfer.ChannelID
Expand Down Expand Up @@ -1076,10 +1072,10 @@ func TestPauseAndResume(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)
finished := make(chan struct{}, 2)
Expand Down Expand Up @@ -1215,10 +1211,10 @@ func TestUnrecognizedVoucherRoundTrip(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

Expand Down Expand Up @@ -1285,14 +1281,14 @@ func TestDataTransferSubscribing(t *testing.T) {
sv := testutil.NewStubbedValidator()
sv.StubErrorPull()
sv.StubErrorPush()
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)
require.NoError(t, dt2.RegisterVoucherType(&testutil.FakeDTType{}, sv))
voucher := testutil.FakeDTType{Data: "applesauce"}
baseCid := testutil.GenerateCids(1)[0]

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
subscribe1Calls := make(chan struct{}, 1)
Expand Down Expand Up @@ -1424,7 +1420,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
gsData.GsNet2.SetDelegate(gsr)

tp1 := gsData.SetupGSTransportHost1()
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
voucherResult := testutil.NewFakeDTType()
Expand Down Expand Up @@ -1512,7 +1508,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) {

gs1 := gsData.SetupGraphsyncHost1()
tp1 := tp.NewTransport(host1.ID(), gs1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
t.Run("when it's not our extension, does not error and does not validate", func(t *testing.T) {
Expand Down Expand Up @@ -1555,7 +1551,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
sv := testutil.NewStubbedValidator()
sv.ExpectSuccessPull()

dt1, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt1, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
Expand Down Expand Up @@ -1587,7 +1583,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
test: func(t *testing.T, gsData *testutil.GraphsyncTestingData, tp2 datatransfer.Transport, link ipld.Link, id datatransfer.TransferID, gsr *fakeGraphSyncReceiver) {
sv := testutil.NewStubbedValidator()
sv.ExpectErrorPull()
dt1, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt1, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
Expand Down
9 changes: 2 additions & 7 deletions impl/responding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-storedcounter"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels"
. "github.com/filecoin-project/go-data-transfer/impl"
Expand Down Expand Up @@ -566,8 +564,7 @@ func TestDataTransferResponding(t *testing.T) {
h.network = testutil.NewFakeNetwork(h.peers[0])
h.transport = testutil.NewFakeTransport()
h.ds = dss.MutexWrap(datastore.NewMapDatastore())
h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter"))
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, h.storedCounter)
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt)
h.dt = dt
Expand Down Expand Up @@ -989,8 +986,7 @@ func TestDataTransferRestartResponding(t *testing.T) {
h.network = testutil.NewFakeNetwork(h.peers[0])
h.transport = testutil.NewFakeTransport()
h.ds = dss.MutexWrap(datastore.NewMapDatastore())
h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter"))
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, h.storedCounter)
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt)
h.dt = dt
Expand Down Expand Up @@ -1036,7 +1032,6 @@ type receiverHarness struct {
sv *testutil.StubbedValidator
srv *testutil.StubbedRevalidator
ds datastore.Batching
storedCounter *storedcounter.StoredCounter
dt datatransfer.Manager
stor ipld.Node
voucher *testutil.FakeDTType
Expand Down
12 changes: 6 additions & 6 deletions impl/restart_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestRestartPush(t *testing.T) {
require.NoError(t, rh.dt1.Stop(rh.testCtx))
time.Sleep(100 * time.Millisecond)
tp1 := rh.gsData.SetupGSTransportHost1()
rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.TempDir1, rh.gsData.DtNet1, tp1, rh.gsData.StoredCounter1)
rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.TempDir1, rh.gsData.DtNet1, tp1)
require.NoError(rh.t, err)
require.NoError(rh.t, rh.dt1.RegisterVoucherType(&testutil.FakeDTType{}, rh.sv))
testutil.StartAndWaitForReady(rh.testCtx, t, rh.dt1)
Expand All @@ -71,7 +71,7 @@ func TestRestartPush(t *testing.T) {
require.NoError(t, rh.dt2.Stop(rh.testCtx))
time.Sleep(100 * time.Millisecond)
tp2 := rh.gsData.SetupGSTransportHost2()
rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.TempDir2, rh.gsData.DtNet2, tp2, rh.gsData.StoredCounter2)
rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.TempDir2, rh.gsData.DtNet2, tp2)
require.NoError(rh.t, err)
require.NoError(rh.t, rh.dt2.RegisterVoucherType(&testutil.FakeDTType{}, rh.sv))
testutil.StartAndWaitForReady(rh.testCtx, t, rh.dt2)
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestRestartPull(t *testing.T) {
require.NoError(t, rh.dt2.Stop(rh.testCtx))
time.Sleep(100 * time.Millisecond)
tp2 := rh.gsData.SetupGSTransportHost2()
rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.TempDir2, rh.gsData.DtNet2, tp2, rh.gsData.StoredCounter2)
rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.TempDir2, rh.gsData.DtNet2, tp2)
require.NoError(rh.t, err)
require.NoError(rh.t, rh.dt2.RegisterVoucherType(&testutil.FakeDTType{}, rh.sv))
testutil.StartAndWaitForReady(rh.testCtx, t, rh.dt2)
Expand All @@ -277,7 +277,7 @@ func TestRestartPull(t *testing.T) {
require.NoError(t, rh.dt1.Stop(rh.testCtx))
time.Sleep(100 * time.Millisecond)
tp1 := rh.gsData.SetupGSTransportHost1()
rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.TempDir1, rh.gsData.DtNet1, tp1, rh.gsData.StoredCounter1)
rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.TempDir1, rh.gsData.DtNet1, tp1)
require.NoError(rh.t, err)
require.NoError(rh.t, rh.dt1.RegisterVoucherType(&testutil.FakeDTType{}, rh.sv))
testutil.StartAndWaitForReady(rh.testCtx, t, rh.dt1)
Expand Down Expand Up @@ -475,10 +475,10 @@ func newRestartHarness(t *testing.T) *restartHarness {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)

dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)

sv := testutil.NewStubbedValidator()
Expand Down
Loading