diff --git a/api/api_storage.go b/api/api_storage.go index 3e029bc9ff3..64d79a1e94d 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -79,6 +79,8 @@ type StorageMiner interface { MarketGetAsk(ctx context.Context) (*storagemarket.SignedStorageAsk, error) MarketSetRetrievalAsk(ctx context.Context, rask *retrievalmarket.Ask) error MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error) + MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) + MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index f5203909e60..5f027543edf 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -238,6 +238,8 @@ type StorageMinerStruct struct { MarketGetAsk func(ctx context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"` MarketSetRetrievalAsk func(ctx context.Context, rask *retrievalmarket.Ask) error `perm:"admin"` MarketGetRetrievalAsk func(ctx context.Context) (*retrievalmarket.Ask, error) `perm:"read"` + MarketListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"` + MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"` PledgeSector func(context.Context) error `perm:"write"` @@ -1080,6 +1082,14 @@ func (c *StorageMinerStruct) MarketGetRetrievalAsk(ctx context.Context) (*retrie return c.Internal.MarketGetRetrievalAsk(ctx) } +func (c *StorageMinerStruct) MarketListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) { + return c.Internal.MarketListDataTransfers(ctx) +} + +func (c *StorageMinerStruct) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) { + return c.Internal.MarketDataTransferUpdates(ctx) +} + func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error { return c.Internal.DealsImportData(ctx, dealPropCid, file) } diff --git a/api/types.go b/api/types.go index 9bbbca1f685..0ef5d7acf50 100644 --- a/api/types.go +++ b/api/types.go @@ -2,6 +2,7 @@ package api import ( "encoding/json" + "fmt" "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -117,3 +118,35 @@ type DataTransferChannel struct { OtherPeer peer.ID Transferred uint64 } + +// NewDataTransferChannel constructs an API DataTransferChannel type from full channel state snapshot and a host id +func NewDataTransferChannel(hostID peer.ID, channelState datatransfer.ChannelState) DataTransferChannel { + channel := DataTransferChannel{ + TransferID: channelState.TransferID(), + Status: channelState.Status(), + BaseCID: channelState.BaseCID(), + IsSender: channelState.Sender() == hostID, + Message: channelState.Message(), + } + stringer, ok := channelState.Voucher().(fmt.Stringer) + if ok { + channel.Voucher = stringer.String() + } else { + voucherJSON, err := json.Marshal(channelState.Voucher()) + if err != nil { + channel.Voucher = fmt.Errorf("Voucher Serialization: %w", err).Error() + } else { + channel.Voucher = string(voucherJSON) + } + } + if channel.IsSender { + channel.IsInitiator = !channelState.IsPull() + channel.Transferred = channelState.Sent() + channel.OtherPeer = channelState.Recipient() + } else { + channel.IsInitiator = channelState.IsPull() + channel.Transferred = channelState.Received() + channel.OtherPeer = channelState.Sender() + } + return channel +} diff --git a/cli/client.go b/cli/client.go index b3ee6220ba1..8888dc5bc15 100644 --- a/cli/client.go +++ b/cli/client.go @@ -1254,7 +1254,7 @@ var clientListTransfers = &cli.Command{ tm.MoveCursor(1, 1) - outputChannels(tm.Screen, channels, completed, color) + OutputDataTransferChannels(tm.Screen, channels, completed, color) tm.Flush() @@ -1279,12 +1279,13 @@ var clientListTransfers = &cli.Command{ } } } - outputChannels(os.Stdout, channels, completed, color) + OutputDataTransferChannels(os.Stdout, channels, completed, color) return nil }, } -func outputChannels(out io.Writer, channels []api.DataTransferChannel, completed bool, color bool) { +// OutputDataTransferChannels generates table output for a list of channels +func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChannel, completed bool, color bool) { sort.Slice(channels, func(i, j int) bool { return channels[i].TransferID < channels[j].TransferID }) @@ -1346,7 +1347,7 @@ func channelStatusString(useColor bool, status datatransfer.Status) string { } } -func toChannelOutput(useColor bool, otherPartyColumn string, channel api.DataTransferChannel) map[string]interface{} { +func toChannelOutput(useColor bool, otherPartyColumn string, channel lapi.DataTransferChannel) map[string]interface{} { rootCid := channel.BaseCID.String() rootCid = "..." + rootCid[len(rootCid)-8:] diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go index 6775746a9c9..cc704f89199 100644 --- a/cmd/lotus-storage-miner/main.go +++ b/cmd/lotus-storage-miner/main.go @@ -37,6 +37,7 @@ func main() { lcli.WithCategory("chain", infoCmd), lcli.WithCategory("market", storageDealsCmd), lcli.WithCategory("market", retrievalDealsCmd), + lcli.WithCategory("market", dataTransfersCmd), lcli.WithCategory("storage", sectorsCmd), lcli.WithCategory("storage", provingCmd), lcli.WithCategory("storage", storageCmd), diff --git a/cmd/lotus-storage-miner/market.go b/cmd/lotus-storage-miner/market.go index 364db2693c5..2ddc8ee050b 100644 --- a/cmd/lotus-storage-miner/market.go +++ b/cmd/lotus-storage-miner/market.go @@ -9,6 +9,7 @@ import ( "text/tabwriter" "time" + tm "github.com/buger/goterm" "github.com/docker/go-units" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil/cidenc" @@ -506,3 +507,87 @@ var setSealDurationCmd = &cli.Command{ return nodeApi.SectorSetExpectedSealDuration(ctx, time.Duration(delay)) }, } + +var dataTransfersCmd = &cli.Command{ + Name: "data-transfers", + Usage: "Manage data transfers", + Subcommands: []*cli.Command{ + transfersListCmd, + }, +} + +var transfersListCmd = &cli.Command{ + Name: "list", + Usage: "List ongoing data transfers for this miner", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "color", + Usage: "use color in display output", + Value: true, + }, + &cli.BoolFlag{ + Name: "completed", + Usage: "show completed data transfers", + }, + &cli.BoolFlag{ + Name: "watch", + Usage: "watch deal updates in real-time, rather than a one time list", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + channels, err := api.MarketListDataTransfers(ctx) + if err != nil { + return err + } + + completed := cctx.Bool("completed") + color := cctx.Bool("color") + watch := cctx.Bool("watch") + + if watch { + channelUpdates, err := api.MarketDataTransferUpdates(ctx) + if err != nil { + return err + } + + for { + tm.Clear() // Clear current screen + + tm.MoveCursor(1, 1) + + lcli.OutputDataTransferChannels(tm.Screen, channels, completed, color) + + tm.Flush() + + select { + case <-ctx.Done(): + return nil + case channelUpdate := <-channelUpdates: + var found bool + for i, existing := range channels { + if existing.TransferID == channelUpdate.TransferID && + existing.OtherPeer == channelUpdate.OtherPeer && + existing.IsSender == channelUpdate.IsSender && + existing.IsInitiator == channelUpdate.IsInitiator { + channels[i] = channelUpdate + found = true + break + } + } + if !found { + channels = append(channels, channelUpdate) + } + } + } + } + lcli.OutputDataTransferChannels(os.Stdout, channels, completed, color) + return nil + }, +} diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 248b9ec6ac1..af7639ac52b 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -2,7 +2,6 @@ package client import ( "context" - "encoding/json" "fmt" "io" "os" @@ -773,7 +772,7 @@ func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferCh apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels)) for _, channelState := range inProgressChannels { - apiChannels = append(apiChannels, toAPIChannel(a.Host.ID(), channelState)) + apiChannels = append(apiChannels, api.NewDataTransferChannel(a.Host.ID(), channelState)) } return apiChannels, nil @@ -783,7 +782,7 @@ func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTra channels := make(chan api.DataTransferChannel) unsub := a.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) { - channel := toAPIChannel(a.Host.ID(), channelState) + channel := api.NewDataTransferChannel(a.Host.ID(), channelState) select { case <-ctx.Done(): case channels <- channel: @@ -797,34 +796,3 @@ func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTra return channels, nil } - -func toAPIChannel(hostID peer.ID, channelState datatransfer.ChannelState) api.DataTransferChannel { - channel := api.DataTransferChannel{ - TransferID: channelState.TransferID(), - Status: channelState.Status(), - BaseCID: channelState.BaseCID(), - IsSender: channelState.Sender() == hostID, - Message: channelState.Message(), - } - stringer, ok := channelState.Voucher().(fmt.Stringer) - if ok { - channel.Voucher = stringer.String() - } else { - voucherJSON, err := json.Marshal(channelState.Voucher()) - if err != nil { - channel.Voucher = fmt.Errorf("Voucher Serialization: %w", err).Error() - } else { - channel.Voucher = string(voucherJSON) - } - } - if channel.IsSender { - channel.IsInitiator = !channelState.IsPull() - channel.Transferred = channelState.Sent() - channel.OtherPeer = channelState.Recipient() - } else { - channel.IsInitiator = channelState.IsPull() - channel.Transferred = channelState.Received() - channel.OtherPeer = channelState.Sender() - } - return channel -} diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 77052164bc8..daf011203b2 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -8,8 +8,10 @@ import ( "strconv" "time" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/host" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -51,6 +53,8 @@ type StorageMinerAPI struct { StorageMgr *sectorstorage.Manager `optional:"true"` IStorageMgr sectorstorage.SectorManager *stores.Index + DataTransfer dtypes.ProviderDataTransfer + Host host.Host ConsiderOnlineStorageDealsConfigFunc dtypes.ConsiderOnlineStorageDealsConfigFunc SetConsiderOnlineStorageDealsConfigFunc dtypes.SetConsiderOnlineStorageDealsConfigFunc @@ -354,6 +358,39 @@ func (sm *StorageMinerAPI) MarketGetRetrievalAsk(ctx context.Context) (*retrieva return sm.RetrievalProvider.GetAsk(), nil } +func (sm *StorageMinerAPI) MarketListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) { + inProgressChannels, err := sm.DataTransfer.InProgressChannels(ctx) + if err != nil { + return nil, err + } + + apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels)) + for _, channelState := range inProgressChannels { + apiChannels = append(apiChannels, api.NewDataTransferChannel(sm.Host.ID(), channelState)) + } + + return apiChannels, nil +} + +func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) { + channels := make(chan api.DataTransferChannel) + + unsub := sm.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) { + channel := api.NewDataTransferChannel(sm.Host.ID(), channelState) + select { + case <-ctx.Done(): + case channels <- channel: + } + }) + + go func() { + defer unsub() + <-ctx.Done() + }() + + return channels, nil +} + func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error) { return sm.StorageProvider.ListDeals(ctx) }