Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data transfer list command to miner #3191

Merged
merged 1 commit into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type StorageMiner interface {
MarketGetAsk(ctx context.Context) (*storagemarket.SignedStorageAsk, error)
MarketSetRetrievalAsk(ctx context.Context, rask *retrievalmarket.Ask) error
MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error)
MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)

DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error)
Expand Down
10 changes: 10 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ type StorageMinerStruct struct {
MarketGetAsk func(ctx context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
MarketSetRetrievalAsk func(ctx context.Context, rask *retrievalmarket.Ask) error `perm:"admin"`
MarketGetRetrievalAsk func(ctx context.Context) (*retrievalmarket.Ask, error) `perm:"read"`
MarketListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`

PledgeSector func(context.Context) error `perm:"write"`

Expand Down Expand Up @@ -1080,6 +1082,14 @@ func (c *StorageMinerStruct) MarketGetRetrievalAsk(ctx context.Context) (*retrie
return c.Internal.MarketGetRetrievalAsk(ctx)
}

func (c *StorageMinerStruct) MarketListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
return c.Internal.MarketListDataTransfers(ctx)
}

func (c *StorageMinerStruct) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
return c.Internal.MarketDataTransferUpdates(ctx)
}

func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error {
return c.Internal.DealsImportData(ctx, dealPropCid, file)
}
Expand Down
33 changes: 33 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"encoding/json"
"fmt"

"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
Expand Down Expand Up @@ -117,3 +118,35 @@ type DataTransferChannel struct {
OtherPeer peer.ID
Transferred uint64
}

// NewDataTransferChannel constructs an API DataTransferChannel type from full channel state snapshot and a host id
func NewDataTransferChannel(hostID peer.ID, channelState datatransfer.ChannelState) DataTransferChannel {
channel := DataTransferChannel{
TransferID: channelState.TransferID(),
Status: channelState.Status(),
BaseCID: channelState.BaseCID(),
IsSender: channelState.Sender() == hostID,
Message: channelState.Message(),
}
stringer, ok := channelState.Voucher().(fmt.Stringer)
if ok {
channel.Voucher = stringer.String()
} else {
voucherJSON, err := json.Marshal(channelState.Voucher())
if err != nil {
channel.Voucher = fmt.Errorf("Voucher Serialization: %w", err).Error()
} else {
channel.Voucher = string(voucherJSON)
}
}
if channel.IsSender {
channel.IsInitiator = !channelState.IsPull()
channel.Transferred = channelState.Sent()
channel.OtherPeer = channelState.Recipient()
} else {
channel.IsInitiator = channelState.IsPull()
channel.Transferred = channelState.Received()
channel.OtherPeer = channelState.Sender()
}
return channel
}
9 changes: 5 additions & 4 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,7 @@ var clientListTransfers = &cli.Command{

tm.MoveCursor(1, 1)

outputChannels(tm.Screen, channels, completed, color)
OutputDataTransferChannels(tm.Screen, channels, completed, color)

tm.Flush()

Expand All @@ -1279,12 +1279,13 @@ var clientListTransfers = &cli.Command{
}
}
}
outputChannels(os.Stdout, channels, completed, color)
OutputDataTransferChannels(os.Stdout, channels, completed, color)
return nil
},
}

func outputChannels(out io.Writer, channels []api.DataTransferChannel, completed bool, color bool) {
// OutputDataTransferChannels generates table output for a list of channels
func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChannel, completed bool, color bool) {
sort.Slice(channels, func(i, j int) bool {
return channels[i].TransferID < channels[j].TransferID
})
Expand Down Expand Up @@ -1346,7 +1347,7 @@ func channelStatusString(useColor bool, status datatransfer.Status) string {
}
}

func toChannelOutput(useColor bool, otherPartyColumn string, channel api.DataTransferChannel) map[string]interface{} {
func toChannelOutput(useColor bool, otherPartyColumn string, channel lapi.DataTransferChannel) map[string]interface{} {
rootCid := channel.BaseCID.String()
rootCid = "..." + rootCid[len(rootCid)-8:]

Expand Down
1 change: 1 addition & 0 deletions cmd/lotus-storage-miner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func main() {
lcli.WithCategory("chain", infoCmd),
lcli.WithCategory("market", storageDealsCmd),
lcli.WithCategory("market", retrievalDealsCmd),
lcli.WithCategory("market", dataTransfersCmd),
lcli.WithCategory("storage", sectorsCmd),
lcli.WithCategory("storage", provingCmd),
lcli.WithCategory("storage", storageCmd),
Expand Down
85 changes: 85 additions & 0 deletions cmd/lotus-storage-miner/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"text/tabwriter"
"time"

tm "github.com/buger/goterm"
"github.com/docker/go-units"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
Expand Down Expand Up @@ -506,3 +507,87 @@ var setSealDurationCmd = &cli.Command{
return nodeApi.SectorSetExpectedSealDuration(ctx, time.Duration(delay))
},
}

var dataTransfersCmd = &cli.Command{
Name: "data-transfers",
Usage: "Manage data transfers",
Subcommands: []*cli.Command{
transfersListCmd,
},
}

var transfersListCmd = &cli.Command{
Name: "list",
Usage: "List ongoing data transfers for this miner",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "color",
Usage: "use color in display output",
Value: true,
},
&cli.BoolFlag{
Name: "completed",
Usage: "show completed data transfers",
},
&cli.BoolFlag{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)

channels, err := api.MarketListDataTransfers(ctx)
if err != nil {
return err
}

completed := cctx.Bool("completed")
color := cctx.Bool("color")
watch := cctx.Bool("watch")

if watch {
channelUpdates, err := api.MarketDataTransferUpdates(ctx)
if err != nil {
return err
}

for {
tm.Clear() // Clear current screen

tm.MoveCursor(1, 1)

lcli.OutputDataTransferChannels(tm.Screen, channels, completed, color)

tm.Flush()

select {
case <-ctx.Done():
return nil
case channelUpdate := <-channelUpdates:
var found bool
for i, existing := range channels {
if existing.TransferID == channelUpdate.TransferID &&
existing.OtherPeer == channelUpdate.OtherPeer &&
existing.IsSender == channelUpdate.IsSender &&
existing.IsInitiator == channelUpdate.IsInitiator {
channels[i] = channelUpdate
found = true
break
}
}
if !found {
channels = append(channels, channelUpdate)
}
}
}
}
lcli.OutputDataTransferChannels(os.Stdout, channels, completed, color)
return nil
},
}
36 changes: 2 additions & 34 deletions node/impl/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"context"
"encoding/json"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -773,7 +772,7 @@ func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferCh

apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
for _, channelState := range inProgressChannels {
apiChannels = append(apiChannels, toAPIChannel(a.Host.ID(), channelState))
apiChannels = append(apiChannels, api.NewDataTransferChannel(a.Host.ID(), channelState))
}

return apiChannels, nil
Expand All @@ -783,7 +782,7 @@ func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTra
channels := make(chan api.DataTransferChannel)

unsub := a.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) {
channel := toAPIChannel(a.Host.ID(), channelState)
channel := api.NewDataTransferChannel(a.Host.ID(), channelState)
select {
case <-ctx.Done():
case channels <- channel:
Expand All @@ -797,34 +796,3 @@ func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTra

return channels, nil
}

func toAPIChannel(hostID peer.ID, channelState datatransfer.ChannelState) api.DataTransferChannel {
channel := api.DataTransferChannel{
TransferID: channelState.TransferID(),
Status: channelState.Status(),
BaseCID: channelState.BaseCID(),
IsSender: channelState.Sender() == hostID,
Message: channelState.Message(),
}
stringer, ok := channelState.Voucher().(fmt.Stringer)
if ok {
channel.Voucher = stringer.String()
} else {
voucherJSON, err := json.Marshal(channelState.Voucher())
if err != nil {
channel.Voucher = fmt.Errorf("Voucher Serialization: %w", err).Error()
} else {
channel.Voucher = string(voucherJSON)
}
}
if channel.IsSender {
channel.IsInitiator = !channelState.IsPull()
channel.Transferred = channelState.Sent()
channel.OtherPeer = channelState.Recipient()
} else {
channel.IsInitiator = channelState.IsPull()
channel.Transferred = channelState.Received()
channel.OtherPeer = channelState.Sender()
}
return channel
}
37 changes: 37 additions & 0 deletions node/impl/storminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"strconv"
"time"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -51,6 +53,8 @@ type StorageMinerAPI struct {
StorageMgr *sectorstorage.Manager `optional:"true"`
IStorageMgr sectorstorage.SectorManager
*stores.Index
DataTransfer dtypes.ProviderDataTransfer
Host host.Host

ConsiderOnlineStorageDealsConfigFunc dtypes.ConsiderOnlineStorageDealsConfigFunc
SetConsiderOnlineStorageDealsConfigFunc dtypes.SetConsiderOnlineStorageDealsConfigFunc
Expand Down Expand Up @@ -354,6 +358,39 @@ func (sm *StorageMinerAPI) MarketGetRetrievalAsk(ctx context.Context) (*retrieva
return sm.RetrievalProvider.GetAsk(), nil
}

func (sm *StorageMinerAPI) MarketListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
inProgressChannels, err := sm.DataTransfer.InProgressChannels(ctx)
if err != nil {
return nil, err
}

apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
for _, channelState := range inProgressChannels {
apiChannels = append(apiChannels, api.NewDataTransferChannel(sm.Host.ID(), channelState))
}

return apiChannels, nil
}

func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
channels := make(chan api.DataTransferChannel)

unsub := sm.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) {
channel := api.NewDataTransferChannel(sm.Host.ID(), channelState)
select {
case <-ctx.Done():
case channels <- channel:
}
})

go func() {
defer unsub()
<-ctx.Done()
}()

return channels, nil
}

func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error) {
return sm.StorageProvider.ListDeals(ctx)
}
Expand Down