Skip to content

Commit

Permalink
fix: handle retrieval deals with zero price per byte (#477)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored Feb 1, 2021
1 parent 5af7592 commit e061bc5
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 28 deletions.
5 changes: 5 additions & 0 deletions docs/retrievalclient.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ stateDiagram-v2
state "DealStatusCancelled" as 26
state "DealStatusRetryLegacy" as 27
state "DealStatusWaitForAcceptanceLegacy" as 28
state "DealStatusWaitingForLastBlocks" as 29
0 : On entry runs ProposeDeal
4 : On entry runs WaitPaymentChannelReady
5 : On entry runs WaitPaymentChannelReady
Expand Down Expand Up @@ -82,9 +83,12 @@ stateDiagram-v2
13 --> 18 : ClientEventAllBlocksReceived
14 --> 12 : ClientEventAllBlocksReceived
18 --> 18 : ClientEventAllBlocksReceived
29 --> 15 : ClientEventAllBlocksReceived
10 --> 10 : ClientEventBlocksReceived
13 --> 13 : ClientEventBlocksReceived
14 --> 14 : ClientEventBlocksReceived
21 --> 21 : ClientEventBlocksReceived
29 --> 29 : ClientEventBlocksReceived
10 --> 11 : ClientEventSendFunds
14 --> 12 : ClientEventSendFunds
22 --> 23 : ClientEventFundsExpended
Expand All @@ -100,6 +104,7 @@ stateDiagram-v2
19 --> 15 : ClientEventComplete
21 --> 15 : ClientEventCompleteVerified
21 --> 17 : ClientEventEarlyTermination
21 --> 29 : ClientEventWaitForLastBlocks
8 --> 17 : ClientEventCancelComplete
25 --> 26 : ClientEventCancelComplete
23 --> 22 : ClientEventRecheckFunds
Expand Down
Binary file modified docs/retrievalclient.mmd.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions docs/retrievalclient.mmd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 8 additions & 0 deletions retrievalmarket/dealstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ const (

// DealStatusWaitForAcceptanceLegacy means we're waiting to hear the results on the legacy protocol
DealStatusWaitForAcceptanceLegacy

// DealStatusClientWaitingForLastBlocks means that the provider has told
// the client that all blocks were sent for the deal, and the client is
// waiting for the last blocks to arrive. This should only happen when
// the deal price per byte is zero (if it's not zero the provider asks
// for final payment after sending the last blocks).
DealStatusClientWaitingForLastBlocks
)

// DealStatuses maps deal status to a human readable representation
Expand Down Expand Up @@ -139,4 +146,5 @@ var DealStatuses = map[DealStatus]string{
DealStatusCancelled: "DealStatusCancelled",
DealStatusRetryLegacy: "DealStatusRetryLegacy",
DealStatusWaitForAcceptanceLegacy: "DealStatusWaitForAcceptanceLegacy",
DealStatusClientWaitingForLastBlocks: "DealStatusWaitingForLastBlocks",
}
6 changes: 6 additions & 0 deletions retrievalmarket/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ const (

// ClientEventCancel runs when a user cancels a deal
ClientEventCancel

// ClientEventWaitForLastBlocks is fired when the provider has told
// the client that all blocks were sent for the deal, and the client is
// waiting for the last blocks to arrive
ClientEventWaitForLastBlocks
)

// ClientEvents is a human readable map of client event name -> event description
Expand Down Expand Up @@ -152,6 +157,7 @@ var ClientEvents = map[ClientEvent]string{
ClientEventVoucherShortfall: "ClientEventVoucherShortfall",
ClientEventRecheckFunds: "ClientEventRecheckFunds",
ClientEventCancel: "ClientEventCancel",
ClientEventWaitForLastBlocks: "ClientEventWaitForLastBlocks",
}

// ProviderEvent is an event that occurs in a deal lifecycle on the provider
Expand Down
11 changes: 10 additions & 1 deletion retrievalmarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,17 @@ var ClientEvents = fsm.Events{
FromMany(paymentChannelCreationStates...).ToJustRecord().
FromMany(rm.DealStatusSendFunds, rm.DealStatusFundsNeeded).ToJustRecord().
From(rm.DealStatusFundsNeededLastPayment).To(rm.DealStatusSendFundsLastPayment).
From(rm.DealStatusClientWaitingForLastBlocks).To(rm.DealStatusCompleted).
Action(func(deal *rm.ClientDealState) error {
deal.AllBlocksReceived = true
return nil
}),
fsm.Event(rm.ClientEventBlocksReceived).
FromMany(rm.DealStatusOngoing,
rm.DealStatusFundsNeeded,
rm.DealStatusFundsNeededLastPayment).ToNoChange().
rm.DealStatusFundsNeededLastPayment,
rm.DealStatusCheckComplete,
rm.DealStatusClientWaitingForLastBlocks).ToNoChange().
FromMany(paymentChannelCreationStates...).ToJustRecord().
Action(recordReceived),

Expand Down Expand Up @@ -260,6 +263,12 @@ var ClientEvents = fsm.Events{
return nil
}),

// the provider indicated that all blocks have been sent, so the client
// should wait for the last blocks to arrive (only needed when price
// per byte is zero)
fsm.Event(rm.ClientEventWaitForLastBlocks).
From(rm.DealStatusCheckComplete).To(rm.DealStatusClientWaitingForLastBlocks),

// after cancelling a deal is complete
fsm.Event(rm.ClientEventCancelComplete).
From(rm.DealStatusFailing).To(rm.DealStatusErrored).
Expand Down
19 changes: 16 additions & 3 deletions retrievalmarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,22 @@ func CancelDeal(ctx fsm.Context, environment ClientDealEnvironment, deal rm.Clie

// CheckComplete verifies that a provider that completed without a last payment requested did in fact send us all the data
func CheckComplete(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientDealState) error {
if !deal.AllBlocksReceived {
return ctx.Trigger(rm.ClientEventEarlyTermination)
// This function is called when the provider tells the client that it has
// sent all the blocks, so check if all blocks have been received.
if deal.AllBlocksReceived {
return ctx.Trigger(rm.ClientEventCompleteVerified)
}

return ctx.Trigger(rm.ClientEventCompleteVerified)
// If the deal price per byte is zero, wait for the last blocks to
// arrive
if deal.PricePerByte.IsZero() {
return ctx.Trigger(rm.ClientEventWaitForLastBlocks)
}

// If the deal price per byte is non-zero, the provider should only
// have sent the complete message after receiving the last payment
// from the client, which should happen after all blocks have been
// received. So if they haven't been received the provider is trying
// to terminate the deal early.
return ctx.Trigger(rm.ClientEventEarlyTermination)
}
12 changes: 10 additions & 2 deletions retrievalmarket/impl/clientstates/client_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,12 +548,12 @@ func TestCancelDeal(t *testing.T) {
require.Equal(t, retrievalmarket.DealStatusCancelled, dealState.Status)
})
}

func TestCheckComplete(t *testing.T) {
ctx := context.Background()
eventMachine, err := fsm.NewEventProcessor(retrievalmarket.ClientDealState{}, "Status", clientstates.ClientEvents)
require.NoError(t, err)
runCheckComplete := func(t *testing.T,
dealState *retrievalmarket.ClientDealState) {
runCheckComplete := func(t *testing.T, dealState *retrievalmarket.ClientDealState) {
node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{})
environment := &fakeEnvironment{node, nil, nil, nil}
fsmCtx := fsmtest.NewTestContext(ctx, eventMachine)
Expand All @@ -576,6 +576,14 @@ func TestCheckComplete(t *testing.T) {
require.Equal(t, retrievalmarket.DealStatusErrored, dealState.Status)
require.Equal(t, "Provider sent complete status without sending all data", dealState.Message)
})

t.Run("when not all blocks are received and deal price per byte is zero", func(t *testing.T) {
dealState := makeDealState(retrievalmarket.DealStatusCheckComplete)
dealState.PricePerByte = abi.NewTokenAmount(0)
dealState.AllBlocksReceived = false
runCheckComplete(t, dealState)
require.Equal(t, retrievalmarket.DealStatusClientWaitingForLastBlocks, dealState.Status)
})
}

var defaultTotalFunds = abi.NewTokenAmount(4000000)
Expand Down
5 changes: 3 additions & 2 deletions retrievalmarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func clientEvent(event datatransfer.Event, channelState datatransfer.ChannelStat

// ClientDataTransferSubscriber is the function called when an event occurs in a data
// transfer initiated on the client -- it reads the voucher to verify this even occurred
// in a storage market deal, then, based on the data transfer event that occurred, it dispatches
// in a retrieval market deal, then, based on the data transfer event that occurred, it dispatches
// an event to the appropriate state machine
func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
Expand All @@ -148,7 +148,8 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
// data transfer events for progress do not affect deal state
err := deals.Send(dealProposal.ID, retrievalEvent, params...)
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event %s for state %s: %s",
datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()], err)
}
}
}
Expand Down
17 changes: 14 additions & 3 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {
voucherAmts []abi.TokenAmount
selector ipld.Node
unsealPrice abi.TokenAmount
zeroPricePerByte bool
paramsV1, addFunds bool
skipStores bool
failsUnseal bool
Expand Down Expand Up @@ -260,6 +261,11 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {
filesize: 19000,
voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10136000), abi.NewTokenAmount(9784000)},
},
{name: "multi-block file retrieval with zero price per byte succeeds",
filename: "lorem.txt",
filesize: 19000,
zeroPricePerByte: true,
},
{name: "multi-block file retrieval succeeds with V1 params and AllSelector",
filename: "lorem.txt",
filesize: 19000,
Expand Down Expand Up @@ -337,6 +343,9 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {
paymentIntervalIncrease = uint64(1000)
}
pricePerByte := abi.NewTokenAmount(1000)
if testCase.zeroPricePerByte {
pricePerByte = abi.NewTokenAmount(0)
}
unsealPrice := testCase.unsealPrice
if unsealPrice.Int == nil {
unsealPrice = big.Zero()
Expand Down Expand Up @@ -509,9 +518,11 @@ CurrentInterval: %d
require.NotNil(t, createdChan)
require.Equal(t, expectedTotal, createdChan.amt)
require.Equal(t, clientPaymentChannel, *newLaneAddr)
// verify that the voucher was saved/seen by the client with correct values
require.NotNil(t, createdVoucher)
tut.TestVoucherEquality(t, createdVoucher, expectedVoucher)
if !testCase.zeroPricePerByte {
// verify that the voucher was saved/seen by the client with correct values
require.NotNil(t, createdVoucher)
tut.TestVoucherEquality(t, createdVoucher, expectedVoucher)
}
assert.Equal(t, retrievalmarket.DealStatusCompleted, clientDealState.Status)
}
ctx, cancel = context.WithTimeout(bgCtx, 5*time.Second)
Expand Down
29 changes: 16 additions & 13 deletions retrievalmarket/impl/requestvalidation/revalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func (pr *ProviderRevalidator) loadDealState(channel *channelData) error {
func (pr *ProviderRevalidator) writeDealState(deal rm.ProviderDealState) {
channel := pr.trackedChannels[deal.ChannelID]
channel.totalSent = deal.TotalSent
channel.totalPaidFor = big.Div(big.Max(big.Sub(deal.FundsReceived, deal.UnsealPrice), big.Zero()), deal.PricePerByte).Uint64()
if !deal.PricePerByte.IsZero() {
channel.totalPaidFor = big.Div(big.Max(big.Sub(deal.FundsReceived, deal.UnsealPrice), big.Zero()), deal.PricePerByte).Uint64()
}
channel.interval = deal.CurrentInterval
channel.pricePerByte = deal.PricePerByte
channel.legacyProtocol = deal.LegacyProtocol
Expand Down Expand Up @@ -195,19 +197,20 @@ func (pr *ProviderRevalidator) OnPullDataSent(chid datatransfer.ChannelID, addit
}

channel.totalSent += additionalBytesSent
if channel.totalSent-channel.totalPaidFor >= channel.interval {
paymentOwed := big.Mul(abi.NewTokenAmount(int64(channel.totalSent-channel.totalPaidFor)), channel.pricePerByte)
err := pr.env.SendEvent(channel.dealID, rm.ProviderEventPaymentRequested, channel.totalSent)
if err != nil {
return true, nil, err
}
return true, finalResponse(&rm.DealResponse{
ID: channel.dealID.DealID,
Status: rm.DealStatusFundsNeeded,
PaymentOwed: paymentOwed,
}, channel.legacyProtocol), datatransfer.ErrPause
if channel.pricePerByte.IsZero() || channel.totalSent-channel.totalPaidFor < channel.interval {
return true, nil, pr.env.SendEvent(channel.dealID, rm.ProviderEventBlockSent, channel.totalSent)
}
return true, nil, pr.env.SendEvent(channel.dealID, rm.ProviderEventBlockSent, channel.totalSent)

paymentOwed := big.Mul(abi.NewTokenAmount(int64(channel.totalSent-channel.totalPaidFor)), channel.pricePerByte)
err = pr.env.SendEvent(channel.dealID, rm.ProviderEventPaymentRequested, channel.totalSent)
if err != nil {
return true, nil, err
}
return true, finalResponse(&rm.DealResponse{
ID: channel.dealID.DealID,
Status: rm.DealStatusFundsNeeded,
PaymentOwed: paymentOwed,
}, channel.legacyProtocol), datatransfer.ErrPause
}

// OnPushDataReceived is called on the responder side when more bytes are received
Expand Down
35 changes: 34 additions & 1 deletion retrievalmarket/impl/requestvalidation/revalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ func TestOnPushDataReceived(t *testing.T) {
require.NoError(t, err)
require.Nil(t, voucherResult)
}
func TestOnPullDataSent(t *testing.T) {

func TestOnPullDataSent(t *testing.T) {
deal := *makeDealState(rm.DealStatusOngoing)
dealZeroPricePerByte := deal
dealZeroPricePerByte.PricePerByte = big.Zero()
legacyDeal := deal
legacyDeal.LegacyProtocol = true
testCases := map[string]struct {
Expand Down Expand Up @@ -60,6 +62,15 @@ func TestOnPullDataSent(t *testing.T) {
expectedHandled: true,
dataAmount: uint64(500),
},
"record block zero price per byte": {
deal: dealZeroPricePerByte,
channelID: dealZeroPricePerByte.ChannelID,
expectedID: dealZeroPricePerByte.Identifier(),
expectedEvent: rm.ProviderEventBlockSent,
expectedArgs: []interface{}{dealZeroPricePerByte.TotalSent + uint64(500)},
expectedHandled: true,
dataAmount: uint64(500),
},
"request payment": {
deal: deal,
channelID: deal.ChannelID,
Expand Down Expand Up @@ -125,6 +136,8 @@ func TestOnPullDataSent(t *testing.T) {

func TestOnComplete(t *testing.T) {
deal := *makeDealState(rm.DealStatusOngoing)
dealZeroPricePerByte := deal
dealZeroPricePerByte.PricePerByte = big.Zero()
legacyDeal := deal
legacyDeal.LegacyProtocol = true
channelID := deal.ChannelID
Expand Down Expand Up @@ -210,6 +223,26 @@ func TestOnComplete(t *testing.T) {
deal: deal,
channelID: channelID,
},
"all funds paid zero price per byte": {
unpaidAmount: uint64(0),
expectedEvents: []eventSent{
{
ID: dealZeroPricePerByte.Identifier(),
Event: rm.ProviderEventBlockSent,
Args: []interface{}{dealZeroPricePerByte.TotalSent},
},
{
ID: dealZeroPricePerByte.Identifier(),
Event: rm.ProviderEventBlocksCompleted,
},
},
expectedResult: &rm.DealResponse{
ID: dealZeroPricePerByte.ID,
Status: rm.DealStatusCompleted,
},
deal: dealZeroPricePerByte,
channelID: channelID,
},
"all funds paid, legacyDeal": {
unpaidAmount: uint64(0),
expectedEvents: []eventSent{
Expand Down

0 comments on commit e061bc5

Please sign in to comment.