Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle retrieval deals with zero price per byte #477

Merged
merged 1 commit into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/retrievalclient.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ stateDiagram-v2
state "DealStatusCancelled" as 26
state "DealStatusRetryLegacy" as 27
state "DealStatusWaitForAcceptanceLegacy" as 28
state "DealStatusWaitingForLastBlocks" as 29
0 : On entry runs ProposeDeal
4 : On entry runs WaitPaymentChannelReady
5 : On entry runs WaitPaymentChannelReady
Expand Down Expand Up @@ -82,9 +83,12 @@ stateDiagram-v2
13 --> 18 : ClientEventAllBlocksReceived
14 --> 12 : ClientEventAllBlocksReceived
18 --> 18 : ClientEventAllBlocksReceived
29 --> 15 : ClientEventAllBlocksReceived
10 --> 10 : ClientEventBlocksReceived
13 --> 13 : ClientEventBlocksReceived
14 --> 14 : ClientEventBlocksReceived
21 --> 21 : ClientEventBlocksReceived
29 --> 29 : ClientEventBlocksReceived
10 --> 11 : ClientEventSendFunds
14 --> 12 : ClientEventSendFunds
22 --> 23 : ClientEventFundsExpended
Expand All @@ -100,6 +104,7 @@ stateDiagram-v2
19 --> 15 : ClientEventComplete
21 --> 15 : ClientEventCompleteVerified
21 --> 17 : ClientEventEarlyTermination
21 --> 29 : ClientEventWaitForLastBlocks
8 --> 17 : ClientEventCancelComplete
25 --> 26 : ClientEventCancelComplete
23 --> 22 : ClientEventRecheckFunds
Expand Down
Binary file modified docs/retrievalclient.mmd.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions docs/retrievalclient.mmd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 8 additions & 0 deletions retrievalmarket/dealstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ const (

// DealStatusWaitForAcceptanceLegacy means we're waiting to hear the results on the legacy protocol
DealStatusWaitForAcceptanceLegacy

// DealStatusClientWaitingForLastBlocks means that the provider has told
// the client that all blocks were sent for the deal, and the client is
// waiting for the last blocks to arrive. This should only happen when
// the deal price per byte is zero (if it's not zero the provider asks
// for final payment after sending the last blocks).
DealStatusClientWaitingForLastBlocks
)

// DealStatuses maps deal status to a human readable representation
Expand Down Expand Up @@ -139,4 +146,5 @@ var DealStatuses = map[DealStatus]string{
DealStatusCancelled: "DealStatusCancelled",
DealStatusRetryLegacy: "DealStatusRetryLegacy",
DealStatusWaitForAcceptanceLegacy: "DealStatusWaitForAcceptanceLegacy",
DealStatusClientWaitingForLastBlocks: "DealStatusWaitingForLastBlocks",
}
6 changes: 6 additions & 0 deletions retrievalmarket/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ const (

// ClientEventCancel runs when a user cancels a deal
ClientEventCancel

// ClientEventWaitForLastBlocks is fired when the provider has told
// the client that all blocks were sent for the deal, and the client is
// waiting for the last blocks to arrive
ClientEventWaitForLastBlocks
)

// ClientEvents is a human readable map of client event name -> event description
Expand Down Expand Up @@ -152,6 +157,7 @@ var ClientEvents = map[ClientEvent]string{
ClientEventVoucherShortfall: "ClientEventVoucherShortfall",
ClientEventRecheckFunds: "ClientEventRecheckFunds",
ClientEventCancel: "ClientEventCancel",
ClientEventWaitForLastBlocks: "ClientEventWaitForLastBlocks",
}

// ProviderEvent is an event that occurs in a deal lifecycle on the provider
Expand Down
11 changes: 10 additions & 1 deletion retrievalmarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,17 @@ var ClientEvents = fsm.Events{
FromMany(paymentChannelCreationStates...).ToJustRecord().
FromMany(rm.DealStatusSendFunds, rm.DealStatusFundsNeeded).ToJustRecord().
From(rm.DealStatusFundsNeededLastPayment).To(rm.DealStatusSendFundsLastPayment).
From(rm.DealStatusClientWaitingForLastBlocks).To(rm.DealStatusCompleted).
Action(func(deal *rm.ClientDealState) error {
deal.AllBlocksReceived = true
return nil
}),
fsm.Event(rm.ClientEventBlocksReceived).
FromMany(rm.DealStatusOngoing,
rm.DealStatusFundsNeeded,
rm.DealStatusFundsNeededLastPayment).ToNoChange().
rm.DealStatusFundsNeededLastPayment,
rm.DealStatusCheckComplete,
rm.DealStatusClientWaitingForLastBlocks).ToNoChange().
FromMany(paymentChannelCreationStates...).ToJustRecord().
Action(recordReceived),

Expand Down Expand Up @@ -260,6 +263,12 @@ var ClientEvents = fsm.Events{
return nil
}),

// the provider indicated that all blocks have been sent, so the client
// should wait for the last blocks to arrive (only needed when price
// per byte is zero)
fsm.Event(rm.ClientEventWaitForLastBlocks).
From(rm.DealStatusCheckComplete).To(rm.DealStatusClientWaitingForLastBlocks),

// after cancelling a deal is complete
fsm.Event(rm.ClientEventCancelComplete).
From(rm.DealStatusFailing).To(rm.DealStatusErrored).
Expand Down
19 changes: 16 additions & 3 deletions retrievalmarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,22 @@ func CancelDeal(ctx fsm.Context, environment ClientDealEnvironment, deal rm.Clie

// CheckComplete verifies that a provider that completed without a last payment requested did in fact send us all the data
func CheckComplete(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientDealState) error {
if !deal.AllBlocksReceived {
return ctx.Trigger(rm.ClientEventEarlyTermination)
// This function is called when the provider tells the client that it has
// sent all the blocks, so check if all blocks have been received.
if deal.AllBlocksReceived {
return ctx.Trigger(rm.ClientEventCompleteVerified)
}

return ctx.Trigger(rm.ClientEventCompleteVerified)
// If the deal price per byte is zero, wait for the last blocks to
// arrive
if deal.PricePerByte.IsZero() {
return ctx.Trigger(rm.ClientEventWaitForLastBlocks)
}

// If the deal price per byte is non-zero, the provider should only
// have sent the complete message after receiving the last payment
// from the client, which should happen after all blocks have been
// received. So if they haven't been received the provider is trying
// to terminate the deal early.
return ctx.Trigger(rm.ClientEventEarlyTermination)
}
12 changes: 10 additions & 2 deletions retrievalmarket/impl/clientstates/client_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,12 +548,12 @@ func TestCancelDeal(t *testing.T) {
require.Equal(t, retrievalmarket.DealStatusCancelled, dealState.Status)
})
}

func TestCheckComplete(t *testing.T) {
ctx := context.Background()
eventMachine, err := fsm.NewEventProcessor(retrievalmarket.ClientDealState{}, "Status", clientstates.ClientEvents)
require.NoError(t, err)
runCheckComplete := func(t *testing.T,
dealState *retrievalmarket.ClientDealState) {
runCheckComplete := func(t *testing.T, dealState *retrievalmarket.ClientDealState) {
node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{})
environment := &fakeEnvironment{node, nil, nil, nil}
fsmCtx := fsmtest.NewTestContext(ctx, eventMachine)
Expand All @@ -576,6 +576,14 @@ func TestCheckComplete(t *testing.T) {
require.Equal(t, retrievalmarket.DealStatusErrored, dealState.Status)
require.Equal(t, "Provider sent complete status without sending all data", dealState.Message)
})

t.Run("when not all blocks are received and deal price per byte is zero", func(t *testing.T) {
dealState := makeDealState(retrievalmarket.DealStatusCheckComplete)
dealState.PricePerByte = abi.NewTokenAmount(0)
dealState.AllBlocksReceived = false
runCheckComplete(t, dealState)
require.Equal(t, retrievalmarket.DealStatusClientWaitingForLastBlocks, dealState.Status)
})
}

var defaultTotalFunds = abi.NewTokenAmount(4000000)
Expand Down
5 changes: 3 additions & 2 deletions retrievalmarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func clientEvent(event datatransfer.Event, channelState datatransfer.ChannelStat

// ClientDataTransferSubscriber is the function called when an event occurs in a data
// transfer initiated on the client -- it reads the voucher to verify this even occurred
// in a storage market deal, then, based on the data transfer event that occurred, it dispatches
// in a retrieval market deal, then, based on the data transfer event that occurred, it dispatches
// an event to the appropriate state machine
func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
Expand All @@ -148,7 +148,8 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
// data transfer events for progress do not affect deal state
err := deals.Send(dealProposal.ID, retrievalEvent, params...)
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event %s for state %s: %s",
datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()], err)
}
}
}
Expand Down
17 changes: 14 additions & 3 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {
voucherAmts []abi.TokenAmount
selector ipld.Node
unsealPrice abi.TokenAmount
zeroPricePerByte bool
paramsV1, addFunds bool
skipStores bool
failsUnseal bool
Expand Down Expand Up @@ -260,6 +261,11 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {
filesize: 19000,
voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10136000), abi.NewTokenAmount(9784000)},
},
{name: "multi-block file retrieval with zero price per byte succeeds",
filename: "lorem.txt",
filesize: 19000,
zeroPricePerByte: true,
},
{name: "multi-block file retrieval succeeds with V1 params and AllSelector",
filename: "lorem.txt",
filesize: 19000,
Expand Down Expand Up @@ -337,6 +343,9 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {
paymentIntervalIncrease = uint64(1000)
}
pricePerByte := abi.NewTokenAmount(1000)
if testCase.zeroPricePerByte {
pricePerByte = abi.NewTokenAmount(0)
}
unsealPrice := testCase.unsealPrice
if unsealPrice.Int == nil {
unsealPrice = big.Zero()
Expand Down Expand Up @@ -509,9 +518,11 @@ CurrentInterval: %d
require.NotNil(t, createdChan)
require.Equal(t, expectedTotal, createdChan.amt)
require.Equal(t, clientPaymentChannel, *newLaneAddr)
// verify that the voucher was saved/seen by the client with correct values
require.NotNil(t, createdVoucher)
tut.TestVoucherEquality(t, createdVoucher, expectedVoucher)
if !testCase.zeroPricePerByte {
// verify that the voucher was saved/seen by the client with correct values
require.NotNil(t, createdVoucher)
tut.TestVoucherEquality(t, createdVoucher, expectedVoucher)
}
assert.Equal(t, retrievalmarket.DealStatusCompleted, clientDealState.Status)
}
ctx, cancel = context.WithTimeout(bgCtx, 5*time.Second)
Expand Down
29 changes: 16 additions & 13 deletions retrievalmarket/impl/requestvalidation/revalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func (pr *ProviderRevalidator) loadDealState(channel *channelData) error {
func (pr *ProviderRevalidator) writeDealState(deal rm.ProviderDealState) {
channel := pr.trackedChannels[deal.ChannelID]
channel.totalSent = deal.TotalSent
channel.totalPaidFor = big.Div(big.Max(big.Sub(deal.FundsReceived, deal.UnsealPrice), big.Zero()), deal.PricePerByte).Uint64()
if !deal.PricePerByte.IsZero() {
channel.totalPaidFor = big.Div(big.Max(big.Sub(deal.FundsReceived, deal.UnsealPrice), big.Zero()), deal.PricePerByte).Uint64()
}
channel.interval = deal.CurrentInterval
channel.pricePerByte = deal.PricePerByte
channel.legacyProtocol = deal.LegacyProtocol
Expand Down Expand Up @@ -195,19 +197,20 @@ func (pr *ProviderRevalidator) OnPullDataSent(chid datatransfer.ChannelID, addit
}

channel.totalSent += additionalBytesSent
if channel.totalSent-channel.totalPaidFor >= channel.interval {
paymentOwed := big.Mul(abi.NewTokenAmount(int64(channel.totalSent-channel.totalPaidFor)), channel.pricePerByte)
err := pr.env.SendEvent(channel.dealID, rm.ProviderEventPaymentRequested, channel.totalSent)
if err != nil {
return true, nil, err
}
return true, finalResponse(&rm.DealResponse{
ID: channel.dealID.DealID,
Status: rm.DealStatusFundsNeeded,
PaymentOwed: paymentOwed,
}, channel.legacyProtocol), datatransfer.ErrPause
if channel.pricePerByte.IsZero() || channel.totalSent-channel.totalPaidFor < channel.interval {
return true, nil, pr.env.SendEvent(channel.dealID, rm.ProviderEventBlockSent, channel.totalSent)
}
return true, nil, pr.env.SendEvent(channel.dealID, rm.ProviderEventBlockSent, channel.totalSent)

paymentOwed := big.Mul(abi.NewTokenAmount(int64(channel.totalSent-channel.totalPaidFor)), channel.pricePerByte)
err = pr.env.SendEvent(channel.dealID, rm.ProviderEventPaymentRequested, channel.totalSent)
if err != nil {
return true, nil, err
}
return true, finalResponse(&rm.DealResponse{
ID: channel.dealID.DealID,
Status: rm.DealStatusFundsNeeded,
PaymentOwed: paymentOwed,
}, channel.legacyProtocol), datatransfer.ErrPause
}

// OnPushDataReceived is called on the responder side when more bytes are received
Expand Down
35 changes: 34 additions & 1 deletion retrievalmarket/impl/requestvalidation/revalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ func TestOnPushDataReceived(t *testing.T) {
require.NoError(t, err)
require.Nil(t, voucherResult)
}
func TestOnPullDataSent(t *testing.T) {

func TestOnPullDataSent(t *testing.T) {
deal := *makeDealState(rm.DealStatusOngoing)
dealZeroPricePerByte := deal
dealZeroPricePerByte.PricePerByte = big.Zero()
legacyDeal := deal
legacyDeal.LegacyProtocol = true
testCases := map[string]struct {
Expand Down Expand Up @@ -60,6 +62,15 @@ func TestOnPullDataSent(t *testing.T) {
expectedHandled: true,
dataAmount: uint64(500),
},
"record block zero price per byte": {
deal: dealZeroPricePerByte,
channelID: dealZeroPricePerByte.ChannelID,
expectedID: dealZeroPricePerByte.Identifier(),
expectedEvent: rm.ProviderEventBlockSent,
expectedArgs: []interface{}{dealZeroPricePerByte.TotalSent + uint64(500)},
expectedHandled: true,
dataAmount: uint64(500),
},
"request payment": {
deal: deal,
channelID: deal.ChannelID,
Expand Down Expand Up @@ -125,6 +136,8 @@ func TestOnPullDataSent(t *testing.T) {

func TestOnComplete(t *testing.T) {
deal := *makeDealState(rm.DealStatusOngoing)
dealZeroPricePerByte := deal
dealZeroPricePerByte.PricePerByte = big.Zero()
legacyDeal := deal
legacyDeal.LegacyProtocol = true
channelID := deal.ChannelID
Expand Down Expand Up @@ -210,6 +223,26 @@ func TestOnComplete(t *testing.T) {
deal: deal,
channelID: channelID,
},
"all funds paid zero price per byte": {
unpaidAmount: uint64(0),
expectedEvents: []eventSent{
{
ID: dealZeroPricePerByte.Identifier(),
Event: rm.ProviderEventBlockSent,
Args: []interface{}{dealZeroPricePerByte.TotalSent},
},
{
ID: dealZeroPricePerByte.Identifier(),
Event: rm.ProviderEventBlocksCompleted,
},
},
expectedResult: &rm.DealResponse{
ID: dealZeroPricePerByte.ID,
Status: rm.DealStatusCompleted,
},
deal: dealZeroPricePerByte,
channelID: channelID,
},
"all funds paid, legacyDeal": {
unpaidAmount: uint64(0),
expectedEvents: []eventSent{
Expand Down