diff --git a/api/clients/mix_msgclient.go b/api/clients/mix_msgclient.go index a88dd80b..105d01ee 100644 --- a/api/clients/mix_msgclient.go +++ b/api/clients/mix_msgclient.go @@ -109,7 +109,7 @@ func (msgClient *MixMsgClient) PushMessage(ctx context.Context, msg *types.Messa return cid.Undef, err } - log.Warnf("push message %s to venus-messager", msgID.String()) + log.Infof("push message %s to venus-messager", msgID.String()) return msgID, nil } diff --git a/api/impl/venus_market.go b/api/impl/venus_market.go index e5c1b921..9b0e268b 100644 --- a/api/impl/venus_market.go +++ b/api/impl/venus_market.go @@ -223,7 +223,20 @@ func (m *MarketNodeImpl) MarketImportDealData(ctx context.Context, propCid cid.C } defer fi.Close() //nolint:errcheck - return m.StorageProvider.ImportDataForDeal(ctx, propCid, fi, true) + ref := types.ImportDataRef{ + ProposalCID: propCid, + File: path, + } + + res, err := m.StorageProvider.ImportDataForDeals(ctx, []*types.ImportDataRef{&ref}, false) + if err != nil { + return err + } + if len(res[0].Message) > 0 { + return fmt.Errorf(res[0].Message) + } + + return nil } func (m *MarketNodeImpl) MarketImportPublishedDeal(ctx context.Context, deal types.MinerDeal) error { @@ -1092,7 +1105,7 @@ func (m *MarketNodeImpl) UpdateDealStatus(ctx context.Context, miner address.Add return m.DealAssigner.UpdateDealStatus(ctx, miner, dealId, pieceStatus, dealStatus) } -func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, fname string, skipCommP bool) error { +func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string, skipCommP bool) error { deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, dealPropCid) if err != nil { return err @@ -1100,13 +1113,23 @@ func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Ci if err := jwtclient.CheckPermissionByMiner(ctx, m.AuthClient, deal.Proposal.Provider); err != nil { return err } - fi, err := os.Open(fname) + + ref := &types.ImportDataRef{ + ProposalCID: dealPropCid, + File: file, + } + res, err := m.DealsBatchImportData(ctx, types.ImportDataRefs{ + Refs: []*types.ImportDataRef{ref}, + SkipCommP: skipCommP, + }) if err != nil { - return fmt.Errorf("failed to open given file: %w", err) + return err + } + if len(res[0].Message) > 0 { + return fmt.Errorf(res[0].Message) } - defer fi.Close() //nolint:errcheck - return m.StorageProvider.ImportDataForDeal(ctx, dealPropCid, fi, skipCommP) + return nil } func (m *MarketNodeImpl) GetDeals(ctx context.Context, miner address.Address, pageIndex, pageSize int) ([]*types.DealInfo, error) { @@ -1250,3 +1273,35 @@ func (m *MarketNodeImpl) MarketGetReserved(ctx context.Context, addr address.Add } return m.FundAPI.MarketGetReserved(ctx, addr) } + +func (m *MarketNodeImpl) DealsBatchImportData(ctx context.Context, refs types.ImportDataRefs) ([]*types.ImportDataResult, error) { + refLen := len(refs.Refs) + results := make([]*types.ImportDataResult, 0, refLen) + validRefs := make([]*types.ImportDataRef, 0, refLen) + for _, ref := range refs.Refs { + deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, ref.ProposalCID) + if err != nil { + results = append(results, &types.ImportDataResult{ + ProposalCID: ref.ProposalCID, + Message: err.Error(), + }) + continue + } + if err := jwtclient.CheckPermissionByMiner(ctx, m.AuthClient, deal.Proposal.Provider); err != nil { + results = append(results, &types.ImportDataResult{ + ProposalCID: ref.ProposalCID, + Message: err.Error(), + }) + continue + } + validRefs = append(validRefs, ref) + } + + res, err := m.StorageProvider.ImportDataForDeals(ctx, validRefs, refs.SkipCommP) + if err != nil { + return nil, err + } + results = append(results, res...) + + return results, nil +} diff --git a/cli/stats.go b/cli/stats.go index 85ade6cd..7a754123 100644 --- a/cli/stats.go +++ b/cli/stats.go @@ -25,7 +25,7 @@ var StatsCmds = &cli.Command{ Usage: "Stats about deals, sectors, and other things", Subcommands: []*cli.Command{ StatsPowerCmd, - StatsDealskCmd, + StatsDealsCmd, }, } @@ -254,7 +254,7 @@ var StatsPowerCmd = &cli.Command{ }, } -var StatsDealskCmd = &cli.Command{ +var StatsDealsCmd = &cli.Command{ Name: "deals", Description: "Statistics on active market deals", Action: func(cctx *cli.Context) error { diff --git a/cli/storage-deals.go b/cli/storage-deals.go index af439150..55b50ec1 100644 --- a/cli/storage-deals.go +++ b/cli/storage-deals.go @@ -6,6 +6,7 @@ import ( "io" "io/ioutil" "os" + "path/filepath" "sort" "strings" "text/tabwriter" @@ -42,6 +43,7 @@ var storageDealsCmds = &cli.Command{ Usage: "Manage storage deals and related configuration", Subcommands: []*cli.Command{ dealsImportDataCmd, + dealsBatchImportDataCmd, importOfflineDealCmd, dealsListCmd, updateStorageDealStateCmd, @@ -97,6 +99,106 @@ var dealsImportDataCmd = &cli.Command{ }, } +var dealsBatchImportDataCmd = &cli.Command{ + Name: "batch-import-data", + Usage: "Batch import data for deals", + Flags: []cli.Flag{ + &cli.StringSliceFlag{ + Name: "proposals", + Usage: "proposal cid and car file, eg. --proposals , --proposals ,", + }, + &cli.StringFlag{ + Name: "manifest", + Usage: `A file containing proposal cid and piece cid, eg. +proposalCID,pieceCID +baadfdxxx,badddxxx +basdefxxx,baefaxxx +`, + }, + &cli.BoolFlag{ + Name: "skip-commp", + Usage: "skip calculate the piece-cid, please use with caution", + }, + &cli.BoolFlag{ + Name: "really-do-it", + Usage: "Actually send transaction performing the action", + }, + &cli.StringFlag{ + Name: "car-dir", + Usage: "Directory of car files", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := DaemonContext(cctx) + + var proposalFiles []string + var refs []*market.ImportDataRef + if cctx.IsSet("proposals") { + proposalFiles = cctx.StringSlice("proposals") + } else { + manifest := cctx.String("manifest") + if len(manifest) == 0 { + return fmt.Errorf("must pass proposals or manifest") + } + data, err := os.ReadFile(manifest) + if err != nil { + return err + } + proposalFiles = strings.Split(string(data), "\n") + } + carDir := cctx.String("car-dir") + for _, proposalFile := range proposalFiles { + arr := strings.Split(proposalFile, ",") + if len(arr) != 2 { + continue + } + proposalCID, err := cid.Parse(arr[0]) + if err == nil && len(arr[1]) != 0 { + ref := &market.ImportDataRef{ + ProposalCID: proposalCID, + File: arr[1], + } + if len(carDir) != 0 { + ref.File = filepath.Join(carDir, ref.File) + } + + refs = append(refs, ref) + } + } + + var skipCommP bool + if cctx.IsSet("skip-commp") { + if !cctx.IsSet("really-do-it") { + return fmt.Errorf("pass --really-do-it to actually execute this action") + } + skipCommP = true + } + res, err := api.DealsBatchImportData(ctx, market.ImportDataRefs{ + Refs: refs, + SkipCommP: skipCommP, + }) + if err != nil { + return err + } + + for _, r := range res { + if len(r.Message) == 0 { + fmt.Printf("import data success: %s\n", r.ProposalCID) + } else { + fmt.Printf("import data failed, deal: %s, error: %s\n", r.ProposalCID, r.Message) + } + } + + return nil + }, +} + var importOfflineDealCmd = &cli.Command{ Name: "import-offlinedeal", Usage: "Manually import offline deal", diff --git a/client/client.go b/client/client.go index bdb4fe17..120fb794 100644 --- a/client/client.go +++ b/client/client.go @@ -61,6 +61,7 @@ import ( "github.com/filecoin-project/venus-market/v2/api/clients/signer" "github.com/filecoin-project/venus-market/v2/config" "github.com/filecoin-project/venus-market/v2/imports" + "github.com/filecoin-project/venus-market/v2/models/repo" marketNetwork "github.com/filecoin-project/venus-market/v2/network" "github.com/filecoin-project/venus-market/v2/retrievalprovider" "github.com/filecoin-project/venus-market/v2/storageprovider" @@ -100,6 +101,9 @@ type API struct { Cfg *config.MarketClientConfig Signer signer.ISigner + + OfflineDealRepo repo.ClientOfflineDealRepo + DealTracker *DealTracker } func calcDealExpiration(minDuration uint64, md *dline.Info, startEpoch abi.ChainEpoch) abi.ChainEpoch { @@ -121,15 +125,38 @@ func (a *API) importManager() *imports.Manager { return a.Imports } -func (a *API) ClientStartDeal(ctx context.Context, params *types.StartDealParams) (*cid.Cid, error) { +func (a *API) ClientBatchDeal(ctx context.Context, params []*types.DealParams) (*types.DealResults, error) { + if len(params) == 0 { + return &types.DealResults{}, nil + } + + var res types.DealResults + for _, param := range params { + proposalCid, err := a.dealStarter(ctx, param, true) + if err != nil { + res.Results = append(res.Results, &types.DealResult{ + ProposalCID: cid.Undef, + Message: err.Error(), + }) + continue + } + res.Results = append(res.Results, &types.DealResult{ + ProposalCID: *proposalCid, + }) + } + + return &res, nil +} + +func (a *API) ClientStartDeal(ctx context.Context, params *types.DealParams) (*cid.Cid, error) { return a.dealStarter(ctx, params, false) } -func (a *API) ClientStatelessDeal(ctx context.Context, params *types.StartDealParams) (*cid.Cid, error) { +func (a *API) ClientStatelessDeal(ctx context.Context, params *types.DealParams) (*cid.Cid, error) { return a.dealStarter(ctx, params, true) } -func (a *API) dealStarter(ctx context.Context, params *types.StartDealParams, isStateless bool) (*cid.Cid, error) { +func (a *API) dealStarter(ctx context.Context, params *types.DealParams, isStateless bool) (*cid.Cid, error) { if isStateless { if params.Data.TransferType != storagemarket.TTManual { return nil, fmt.Errorf("invalid transfer type %s for stateless storage deal", params.Data.TransferType) @@ -270,39 +297,76 @@ func (a *API) dealStarter(ctx context.Context, params *types.StartDealParams, is Proposal: *dealProposal, ClientSignature: *dealProposalSig, } - dStream, err := network.NewFromLibp2pHost(a.Host, + + dealProposalIpld, err := cborutil.AsIpld(dealProposalSigned) + if err != nil { + return nil, fmt.Errorf("serializing proposal node failed: %w", err) + } + + offlineDeal := &types.ClientOfflineDeal{ + ClientDealProposal: *dealProposalSigned, + ProposalCID: dealProposalIpld.Cid(), + DataRef: params.Data, + State: storagemarket.StorageDealUnknown, + SlashEpoch: -1, + FastRetrieval: params.FastRetrieval, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + if err := a.OfflineDealRepo.SaveDeal(ctx, offlineDeal); err != nil { + return nil, fmt.Errorf("save offline deal failed: %v", err) + } + + offlineDeal.State = storagemarket.StorageDealWaitingForData + offlineDeal.UpdatedAt = time.Now() + proposalCID, err := a.sendOfflineDealProposal(ctx, params.Miner, *mi.PeerId, dealProposalSigned, params.Data, params.Wallet) + if err != nil { + offlineDeal.State = storagemarket.StorageDealError + offlineDeal.Message = err.Error() + } + if dbErr := a.OfflineDealRepo.SaveDeal(ctx, offlineDeal); dbErr != nil { + log.Errorf("save deal %s failed: %v", proposalCID, dbErr) + } + + return proposalCID, err +} + +func (a *API) sendOfflineDealProposal(ctx context.Context, + miner address.Address, + minerPeerID peer.ID, + dealProposal *vTypes.ClientDealProposal, + dataRef *storagemarket.DataRef, + signAddr address.Address, +) (*cid.Cid, error) { + stream, err := network.NewFromLibp2pHost(a.Host, // params duplicated from .../node/modules/client.go // https://github.com/filecoin-project/lotus/pull/5961#discussion_r629768011 network.RetryParameters(time.Second, 5*time.Minute, 15, 5), - ).NewDealStream(ctx, *mi.PeerId) + ).NewDealStream(ctx, minerPeerID) if err != nil { - return nil, fmt.Errorf("opening dealstream to %s/%s failed: %w", params.Miner, *mi.PeerId, err) + return nil, fmt.Errorf("opening dealstream to %s/%v failed: %w", miner, minerPeerID, err) } - if err = dStream.WriteDealProposal(network.Proposal{ + if err = stream.WriteDealProposal(network.Proposal{ FastRetrieval: true, - DealProposal: dealProposalSigned, - Piece: &storagemarket.DataRef{ - TransferType: storagemarket.TTManual, - Root: params.Data.Root, - PieceCid: params.Data.PieceCid, - PieceSize: params.Data.PieceSize, - }, + DealProposal: dealProposal, + Piece: dataRef, }); err != nil { return nil, fmt.Errorf("sending deal proposal failed: %w", err) } - resp, _, err := dStream.ReadDealResponse() + resp, _, err := stream.ReadDealResponse() if err != nil { return nil, fmt.Errorf("reading proposal response failed: %w", err) } - dealProposalIpld, err := cborutil.AsIpld(dealProposalSigned) + dealProposalIpld, err := cborutil.AsIpld(dealProposal) if err != nil { return nil, fmt.Errorf("serializing proposal node failed: %w", err) } + proposalCID := dealProposalIpld.Cid() - if !dealProposalIpld.Cid().Equals(resp.Response.Proposal) { + if !proposalCID.Equals(resp.Response.Proposal) { return nil, fmt.Errorf("provider returned proposal cid %s but we expected %s", resp.Response.Proposal, dealProposalIpld.Cid()) } @@ -310,7 +374,7 @@ func (a *API) dealStarter(ctx context.Context, params *types.StartDealParams, is return nil, fmt.Errorf("provider returned unexpected state %d for proposal %s, with message: %s", resp.Response.State, resp.Response.Proposal, resp.Response.Message) } - return &resp.Response.Proposal, nil + return &proposalCID, nil } func (a *API) ClientListDeals(ctx context.Context) ([]types.DealInfo, error) { @@ -358,7 +422,11 @@ func (a *API) transfersByID(ctx context.Context) (map[datatransfer.ChannelID]typ func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*types.DealInfo, error) { v, err := a.SMDealClient.GetLocalDeal(ctx, d) if err != nil { - return nil, err + deal, err := a.OfflineDealRepo.GetDeal(ctx, d) + if err != nil { + return nil, err + } + return deal.DealInfo(), nil } di := a.newDealInfo(ctx, v) @@ -1494,3 +1562,60 @@ func (a *API) dealBlockstore(root cid.Cid) (bstore.Blockstore, func(), error) { func (a *API) DefaultAddress(ctx context.Context) (address.Address, error) { return address.Address(a.Cfg.DefaultMarketAddress), nil } + +func (a *API) ClientGetVerifiedDealDistribution(ctx context.Context, providers []address.Address, client address.Address) (*types.DealDistribution, error) { + var verifiedDealProposals []*vTypes.ClientDealProposal + + // offline deal + offlineDeals, err := a.OfflineDealRepo.ListDeal(ctx) + if err != nil { + return nil, err + } + for i, deal := range offlineDeals { + if !storageprovider.IsTerminateState(deal.State) && deal.Proposal.VerifiedDeal { + verifiedDealProposals = append(verifiedDealProposals, &offlineDeals[i].ClientDealProposal) + } + } + + deals, err := a.SMDealClient.ListLocalDeals(ctx) + if err != nil { + return nil, err + } + for i, deal := range deals { + if !storageprovider.IsTerminateState(deal.State) && deal.Proposal.VerifiedDeal { + verifiedDealProposals = append(verifiedDealProposals, &deals[i].ClientDealProposal) + } + } + + dd := statDealDistribution(verifiedDealProposals) + res := &types.DealDistribution{} + for _, pd := range dd.ProvidersDistribution { + for _, provider := range providers { + if pd.Provider == provider { + res.ProvidersDistribution = append(res.ProvidersDistribution, pd) + } + } + } + for _, rd := range dd.ReplicasDistribution { + if rd.Client == client { + res.ReplicasDistribution = append(res.ReplicasDistribution, rd) + break + } + } + + return res, nil +} + +func (a *API) ClientListOfflineDeals(ctx context.Context) ([]types.DealInfo, error) { + deals, err := a.OfflineDealRepo.ListDeal(ctx) + if err != nil { + return nil, err + } + + res := make([]types.DealInfo, 0, len(deals)) + for _, deal := range deals { + res = append(res, *deal.DealInfo()) + } + + return res, nil +} diff --git a/client/dela_tracker.go b/client/dela_tracker.go new file mode 100644 index 00000000..597e18ba --- /dev/null +++ b/client/dela_tracker.go @@ -0,0 +1,204 @@ +package client + +import ( + "context" + "fmt" + "io" + "strings" + "time" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/venus-market/v2/models/repo" + "github.com/filecoin-project/venus-market/v2/storageprovider" + v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" + shared "github.com/filecoin-project/venus/venus-shared/types" + types "github.com/filecoin-project/venus/venus-shared/types/market/client" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "go.uber.org/fx" +) + +const ( + maxEOFCount = 3 +) + +var dealTrackerLog = logging.Logger("deal-tracker") + +type DealTracker struct { + full v1api.FullNode + dealRepo repo.ClientOfflineDealRepo + stream *ClientStream + // todo: loop update miner info? + minerInfo map[address.Address]shared.MinerInfo + eofErrs map[cid.Cid]int +} + +func NewDealTracker(lc fx.Lifecycle, + full v1api.FullNode, + offlineDealRepo repo.ClientOfflineDealRepo, + stream *ClientStream, +) *DealTracker { + dt := &DealTracker{ + full: full, + dealRepo: offlineDealRepo, + stream: stream, + minerInfo: make(map[address.Address]shared.MinerInfo), + eofErrs: make(map[cid.Cid]int), + } + + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + go dt.loopRefreshDealState(ctx) + + return nil + }, + }) + + return dt +} + +type dealInfos struct { + activeDeals, inactiveDeals []*types.ClientOfflineDeal + miners map[address.Address]struct{} +} + +func (dt *DealTracker) loadDeals(ctx context.Context) (*dealInfos, error) { + deals, err := dt.dealRepo.ListDeal(ctx) + if err != nil { + return nil, err + } + + infos := &dealInfos{miners: make(map[address.Address]struct{})} + for _, deal := range deals { + if storageprovider.IsTerminateState(deal.State) { + continue + } + if deal.State != storagemarket.StorageDealActive { + infos.inactiveDeals = append(infos.inactiveDeals, deal) + } else { + infos.activeDeals = append(infos.activeDeals, deal) + } + infos.miners[deal.Proposal.Provider] = struct{}{} + } + + return infos, nil +} + +func (dt *DealTracker) loopRefreshDealState(ctx context.Context) { + infos, err := dt.loadDeals(ctx) + if err == nil { + dt.refreshDealState(ctx, infos) + dt.checkSlash(ctx, infos.activeDeals) + } + + ticker := time.NewTicker(time.Minute * 3) + defer ticker.Stop() + + slashTicker := time.NewTimer(time.Hour * 6) + defer slashTicker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + infos, err := dt.loadDeals(ctx) + if err != nil { + dealTrackerLog.Infof("list offline deal failed: %v", err) + continue + } + dt.refreshDealState(ctx, infos) + case <-slashTicker.C: + infos, err := dt.loadDeals(ctx) + if err != nil { + dealTrackerLog.Infof("list offline deal failed: %v", err) + continue + } + dt.checkSlash(ctx, infos.activeDeals) + } + } +} + +func (dt *DealTracker) updateMinerCache(ctx context.Context, miners map[address.Address]struct{}) error { + for miner := range miners { + _, ok := dt.minerInfo[miner] + if !ok { + minerInfo, err := dt.full.StateMinerInfo(ctx, miner, shared.EmptyTSK) + if err != nil { + return fmt.Errorf("got miner info failed: %v", err) + } + dt.minerInfo[miner] = minerInfo + } + } + + return nil +} + +func (dt *DealTracker) refreshDealState(ctx context.Context, infos *dealInfos) { + if err := dt.updateMinerCache(ctx, infos.miners); err != nil { + dealTrackerLog.Info(err.Error()) + } + + for _, deal := range infos.inactiveDeals { + proposalCID := deal.ProposalCID + minerInfo, ok := dt.minerInfo[deal.Proposal.Provider] + if !ok || minerInfo.PeerId == nil { + dealTrackerLog.Debugf("deal %s not found miner peer", proposalCID) + continue + } + if dt.eofErrs[proposalCID] >= maxEOFCount { + continue + } + status, err := dt.stream.GetDealState(ctx, deal, minerInfo) + if err != nil { + if strings.Contains(err.Error(), io.EOF.Error()) { + dt.eofErrs[proposalCID]++ + } + dealTrackerLog.Infof("failed to got deal status: %v %v", proposalCID, err) + continue + } + var needUpdate bool + if deal.State != status.State { + deal.State = status.State + needUpdate = true + } + if deal.Message != status.Message { + deal.Message = status.Message + needUpdate = true + } + if deal.DealID != uint64(status.DealID) { + deal.DealID = uint64(status.DealID) + needUpdate = true + } + if status.AddFundsCid != nil { + deal.AddFundsCid = status.AddFundsCid + needUpdate = true + } + if status.PublishCid != nil { + deal.PublishMessage = status.PublishCid + needUpdate = true + } + if needUpdate { + dt.persistDeal(ctx, deal) + } + } +} + +func (dt *DealTracker) checkSlash(ctx context.Context, deals []*types.ClientOfflineDeal) { + for _, deal := range deals { + md, err := dt.full.StateMarketStorageDeal(ctx, abi.DealID(deal.DealID), shared.EmptyTSK) + if err == nil && md.State.SlashEpoch > -1 { + deal.State = storagemarket.StorageDealSlashed + dt.persistDeal(ctx, deal) + } + } +} + +func (dt *DealTracker) persistDeal(ctx context.Context, deal *types.ClientOfflineDeal) { + deal.UpdatedAt = time.Now() + if err := dt.dealRepo.SaveDeal(ctx, deal); err != nil { + dealTrackerLog.Errorf("failed to save deal: %s %v", deal.ProposalCID, err) + } +} diff --git a/client/modules.go b/client/modules.go index 1abcd7f3..7b522ca1 100644 --- a/client/modules.go +++ b/client/modules.go @@ -134,8 +134,14 @@ func RetrievalBlockstoreAccessor(r *config.HomeDir) (retrievalmarket.BlockstoreA return retrievalprovider.NewCARBlockstoreAccessor(dir), nil } -func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer network.ClientDataTransfer, discovery *discoveryimpl.Local, - deals badger.ClientDatastore, scn storagemarket.StorageClientNode, accessor storagemarket.BlockstoreAccessor, j journal.Journal, +func StorageClient(lc fx.Lifecycle, + h host.Host, + dataTransfer network.ClientDataTransfer, + discovery *discoveryimpl.Local, + deals badger.ClientDatastore, + scn storagemarket.StorageClientNode, + accessor storagemarket.BlockstoreAccessor, + j journal.Journal, ) (storagemarket.StorageClient, error) { // go-fil-markets protocol retries: // 1s, 5s, 25s, 2m5s, 5m x 11 ~= 1 hour @@ -233,4 +239,6 @@ var MarketClientOpts = builder.Options( builder.Override(new(retrievalmarket.BlockstoreAccessor), RetrievalBlockstoreAccessor), builder.Override(new(retrievalmarket.RetrievalClient), RetrievalClient), builder.Override(new(storagemarket.StorageClient), StorageClient), + builder.Override(new(*ClientStream), NewClientStream), + builder.Override(new(*DealTracker), NewDealTracker), ) diff --git a/client/stat.go b/client/stat.go new file mode 100644 index 00000000..b526c46c --- /dev/null +++ b/client/stat.go @@ -0,0 +1,80 @@ +package client + +import ( + "github.com/filecoin-project/go-address" + shared "github.com/filecoin-project/venus/venus-shared/types" + types "github.com/filecoin-project/venus/venus-shared/types/market/client" +) + +func statDealDistribution(deals []*shared.ClientDealProposal) *types.DealDistribution { + providersDistribution := make(map[address.Address]*types.ProviderDistribution) + replicasDistribution := make(map[address.Address]map[address.Address]*types.ProviderDistribution) + rdTotal := make(map[address.Address]uint64) + + for _, deal := range deals { + provider := deal.Proposal.Provider + pieceCID := deal.Proposal.PieceCID.String() + pieceSize := uint64(deal.Proposal.PieceSize) + providersDistribution[provider] = fillProviderDistribution(providersDistribution[provider], pieceCID, pieceSize, provider) + + client := deal.Proposal.Client + tmp, ok := replicasDistribution[client] + if !ok { + tmp = make(map[address.Address]*types.ProviderDistribution) + } + tmp[provider] = fillProviderDistribution(tmp[provider], pieceCID, pieceSize, provider) + replicasDistribution[client] = tmp + rdTotal[client] += pieceSize + } + + var pds []*types.ProviderDistribution + for _, pd := range providersDistribution { + pd.DuplicationPercentage = float64(pd.Total-pd.Uniq) / float64(pd.Total) + pds = append(pds, pd) + } + + var rds []*types.ReplicaDistribution + for client, pds := range replicasDistribution { + var uniq uint64 + uniqPiece := make(map[string]struct{}) + total := rdTotal[client] + rd := &types.ReplicaDistribution{Client: client, Total: total, ReplicasPercentage: make(map[string]float64, len(pds))} + + for provider, pd := range pds { + rd.ReplicasPercentage[provider.String()] = float64(pd.Total) / float64(total) + pd.DuplicationPercentage = float64(pd.Total-pd.Uniq) / float64(pd.Total) + + for pieceCID, size := range pd.UniqPieces { + if _, ok := uniqPiece[pieceCID]; !ok { + uniqPiece[pieceCID] = struct{}{} + uniq += size + } + } + rd.ReplicasDistribution = append(rd.ReplicasDistribution, pd) + } + rd.Uniq = uniq + rd.DuplicationPercentage = float64(rd.Total-rd.Uniq) / float64(rd.Total) + rds = append(rds, rd) + } + + return &types.DealDistribution{ + ProvidersDistribution: pds, + ReplicasDistribution: rds, + } +} + +func fillProviderDistribution(pd *types.ProviderDistribution, pieceCID string, pieceSize uint64, provider address.Address) *types.ProviderDistribution { + if pd == nil { + pd = &types.ProviderDistribution{ + Provider: provider, + UniqPieces: make(map[string]uint64), + } + } + pd.Total += pieceSize + if _, ok := pd.UniqPieces[pieceCID]; !ok { + pd.UniqPieces[pieceCID] = pieceSize + pd.Uniq += pieceSize + } + + return pd +} diff --git a/client/stat_test.go b/client/stat_test.go new file mode 100644 index 00000000..a49f4140 --- /dev/null +++ b/client/stat_test.go @@ -0,0 +1,90 @@ +package client + +import ( + "testing" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/venus/venus-shared/testutil" + shared "github.com/filecoin-project/venus/venus-shared/types" + types "github.com/filecoin-project/venus/venus-shared/types/market/client" + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/assert" +) + +func TestStat(t *testing.T) { + pieceCIDs := make([]cid.Cid, 10) + deals := make([]*shared.ClientDealProposal, 20) + providers := make([]address.Address, 10) + clients := make([]address.Address, 5) + pieceSize := abi.PaddedPieceSize(10000) + + testutil.Provide(t, &pieceCIDs) + testutil.Provide(t, &deals) + testutil.Provide(t, &providers) + testutil.Provide(t, &clients) + + var verifiedDeal []*shared.ClientDealProposal + expectProvidersDistribution := make(map[address.Address]*types.ProviderDistribution) + expectReplicasDistribution := make(map[address.Address]*types.ReplicaDistribution) + for i := 0; i < len(deals); i++ { + deal := deals[i] + deal.Proposal.PieceCID = pieceCIDs[i%len(pieceCIDs)] + deal.Proposal.PieceSize = pieceSize + deal.Proposal.Client = clients[i%len(clients)] + deal.Proposal.Provider = providers[i%len(providers)] + if i%2 == 0 { + deal.Proposal.VerifiedDeal = true + verifiedDeal = append(verifiedDeal, deal) + if i < len(deals)/2 { + pd := &types.ProviderDistribution{ + Provider: deal.Proposal.Provider, + Total: uint64(deal.Proposal.PieceSize) * 2, + Uniq: uint64(deal.Proposal.PieceSize), + DuplicationPercentage: 0.5, + UniqPieces: map[string]uint64{ + deal.Proposal.PieceCID.String(): uint64(pieceSize), + }, + } + expectProvidersDistribution[pd.Provider] = pd + } + } + } + + for i := range []int{0, 1, 2, 3, 4} { + provider := verifiedDeal[i].Proposal.Provider + if i == 1 || i == 3 { + provider = verifiedDeal[i+5].Proposal.Provider + } + pd := expectProvidersDistribution[provider] + rd := &types.ReplicaDistribution{ + Client: verifiedDeal[i].Proposal.Client, + Total: uint64(verifiedDeal[i].Proposal.PieceSize) * 2, + Uniq: uint64(verifiedDeal[i].Proposal.PieceSize) * 1, + DuplicationPercentage: 0.5, + ReplicasPercentage: map[string]float64{ + provider.String(): 1, + }, + ReplicasDistribution: []*types.ProviderDistribution{ + pd, + }, + } + expectReplicasDistribution[rd.Client] = rd + } + + dd := statDealDistribution(verifiedDeal) + + assert.Len(t, dd.ProvidersDistribution, 5) + for _, pd := range dd.ProvidersDistribution { + expect := expectProvidersDistribution[pd.Provider] + assert.NotNil(t, expect) + assert.Equal(t, expect, pd) + } + + assert.Len(t, dd.ReplicasDistribution, 5) + for _, rd := range dd.ReplicasDistribution { + expect := expectReplicasDistribution[rd.Client] + assert.NotNil(t, expect) + assert.Equal(t, expect, rd) + } +} diff --git a/client/stream.go b/client/stream.go new file mode 100644 index 00000000..46479755 --- /dev/null +++ b/client/stream.go @@ -0,0 +1,95 @@ +package client + +import ( + "context" + "fmt" + "time" + + "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/venus-market/v2/utils" + "github.com/filecoin-project/venus/venus-shared/types" + "github.com/filecoin-project/venus/venus-shared/types/market/client" + "github.com/libp2p/go-libp2p/core/host" +) + +type ClientStream struct { + h host.Host + node storagemarket.StorageClientNode + net network.StorageMarketNetwork +} + +func NewClientStream(h host.Host, node storagemarket.StorageClientNode) *ClientStream { + // go-fil-markets protocol retries: + // 1s, 5s, 25s, 2m5s, 5m x 11 ~= 1 hour + marketsRetryParams := network.RetryParameters(time.Second, 5*time.Minute, 15, 5) + net := network.NewFromLibp2pHost(h, marketsRetryParams) + + return &ClientStream{h: h, node: node, net: net} +} + +func (cs *ClientStream) GetDealState(ctx context.Context, + deal *client.ClientOfflineDeal, + minerInfo types.MinerInfo, +) (*storagemarket.ProviderDealState, error) { + if len(minerInfo.Multiaddrs) > 0 { + multiaddr, err := utils.ConvertMultiaddr(minerInfo.Multiaddrs) + if err == nil { + cs.net.AddAddrs(*minerInfo.PeerId, multiaddr) + } + } + s, err := cs.net.NewDealStatusStream(ctx, *minerInfo.PeerId) + if err != nil { + return nil, fmt.Errorf("failed to open stream to miner: %w", err) + } + defer s.Close() //nolint + + buf, err := cborutil.Dump(deal.ProposalCID) + if err != nil { + return nil, fmt.Errorf("failed serialize deal status request: %w", err) + } + signature, err := cs.node.SignBytes(ctx, deal.Proposal.Client, buf) + if err != nil { + return nil, fmt.Errorf("failed to sign deal status request: %w", err) + } + + if err := s.WriteDealStatusRequest(network.DealStatusRequest{Proposal: deal.ProposalCID, Signature: *signature}); err != nil { + return nil, fmt.Errorf("failed to send deal status request: %w", err) + } + + resp, origBytes, err := s.ReadDealStatusResponse() + if err != nil { + return nil, fmt.Errorf("failed to read deal status response: %w", err) + } + + valid, err := cs.verifyStatusResponseSignature(ctx, minerInfo.Worker, resp, origBytes) + if err != nil { + return nil, err + } + + if !valid { + return nil, fmt.Errorf("invalid deal status response signature") + } + + return &resp.DealState, nil +} + +func (cs *ClientStream) verifyStatusResponseSignature(ctx context.Context, + miner address.Address, + response network.DealStatusResponse, + origBytes []byte, +) (bool, error) { + tok, _, err := cs.node.GetChainHead(ctx) + if err != nil { + return false, fmt.Errorf("getting chain head: %w", err) + } + + valid, err := cs.node.VerifySignature(ctx, response.Signature, miner, origBytes, tok) + if err != nil { + return false, fmt.Errorf("validating signature: %w", err) + } + + return valid, nil +} diff --git a/cmd/market-client/storage.go b/cmd/market-client/storage.go index 4a85807a..6fdc708b 100644 --- a/cmd/market-client/storage.go +++ b/cmd/market-client/storage.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "bytes" "context" "encoding/json" "errors" @@ -374,13 +375,16 @@ var storageAsksListCmd = &cli.Command{ var storageDealsCmd = &cli.Command{ Name: "deals", - Usage: "query storage asks", + Usage: "storage deals", Subcommands: []*cli.Command{ storageDealsInitCmd, + storageDelesBatchCmd, storageDealsListCmd, storageDealsStatsCmd, storageDealsGetCmd, storageDealsInspectCmd, + verifiedDealStatsCmd, + storageDealsExportCmd, }, } @@ -458,58 +462,16 @@ The minimum value is 518400 (6 months).`, } // [data, miner, price, dur] - - data, err := cid.Parse(cctx.Args().Get(0)) - if err != nil { - return err - } - - miner, err := address.NewFromString(cctx.Args().Get(1)) - if err != nil { - return err - } - - price, err := types.ParseFIL(cctx.Args().Get(2)) + p, err := dealParamsFromContext(cctx, api, fapi, false) if err != nil { return err } - dur, err := strconv.ParseInt(cctx.Args().Get(3), 10, 32) + data, err := cid.Parse(p.firstArg) if err != nil { return err } - var provCol big.Int - if pcs := cctx.String("provider-collateral"); pcs != "" { - pc, err := big.FromString(pcs) - if err != nil { - return fmt.Errorf("failed to parse provider-collateral: %w", err) - } - provCol = pc - } - - if abi.ChainEpoch(dur) < MinDealDuration { - return fmt.Errorf("minimum deal duration is %d blocks", MinDealDuration) - } - if abi.ChainEpoch(dur) > MaxDealDuration { - return fmt.Errorf("maximum deal duration is %d blocks", MaxDealDuration) - } - - var a address.Address - if from := cctx.String("from"); from != "" { - faddr, err := address.NewFromString(from) - if err != nil { - return fmt.Errorf("failed to parse 'from' address: %w", err) - } - a = faddr - } else { - def, err := api.DefaultAddress(ctx) - if err != nil { - return err - } - a = def - } - ref := &storagemarket.DataRef{ TransferType: storagemarket.TTGraphsync, Root: data, @@ -533,42 +495,25 @@ The minimum value is 518400 (6 months).`, ref.TransferType = storagemarket.TTManual } - // Check if the address is a verified client - dcap, err := fapi.StateVerifiedClientStatus(ctx, a, types.EmptyTSK) - if err != nil { - return err - } - - isVerified := dcap != nil - - // If the user has explicitly set the --verified-deal flag - if cctx.IsSet("verified-deal") { - // If --verified-deal is true, but the address is not a verified - // client, return an error - verifiedDealParam := cctx.Bool("verified-deal") - if verifiedDealParam && !isVerified { - return fmt.Errorf("address %s does not have verified client status", a) - } - - // Override the default - isVerified = verifiedDealParam - } - - sdParams := &client.StartDealParams{ + sdParams := &client.DealParams{ Data: ref, - Wallet: a, - Miner: miner, - EpochPrice: types.BigInt(price), - MinBlocksDuration: uint64(dur), + Wallet: p.from, + Miner: p.miner[0], + EpochPrice: types.BigInt(p.price), + MinBlocksDuration: uint64(p.dur), DealStartEpoch: abi.ChainEpoch(cctx.Int64("start-epoch")), FastRetrieval: cctx.Bool("fast-retrieval"), - VerifiedDeal: isVerified, - ProviderCollateral: provCol, + VerifiedDeal: p.isVerified, + ProviderCollateral: p.provCol, + } + + if p.isVerified && p.dcap < uint64(sdParams.Data.PieceSize.Padded()) { + return fmt.Errorf("not enough datacap, need %d, has: %d", p.dcap, sdParams.Data.PieceSize.Padded()) } var proposal *cid.Cid - if cctx.Bool("manual-stateless-deal") { - if ref.TransferType != storagemarket.TTManual || price.Int64() != 0 { + if p.statelessDeal { + if ref.TransferType != storagemarket.TTManual || p.price.Int64() != 0 { return errors.New("when manual-stateless-deal is enabled, you must also provide a 'price' of 0 and specify 'manual-piece-cid' and 'manual-piece-size'") } proposal, err = api.ClientStatelessDeal(ctx, sdParams) @@ -585,7 +530,7 @@ The minimum value is 518400 (6 months).`, return err } - afmt.Println(encoder.Encode(*proposal)) + afmt.Println("proposal cid: ", encoder.Encode(*proposal)) return nil }, @@ -1027,7 +972,7 @@ uiLoop: color.Blue(".. executing\n") for i, maddr := range maddrs { - proposal, err := api.ClientStartDeal(ctx, &client.StartDealParams{ + proposal, err := api.ClientStartDeal(ctx, &client.DealParams{ Data: &storagemarket.DataRef{ TransferType: storagemarket.TTGraphsync, Root: data, @@ -1084,6 +1029,10 @@ var storageDealsListCmd = &cli.Command{ Name: "watch", Usage: "watch deal updates in real-time, rather than a one time list", }, + &cli.BoolFlag{ + Name: "offline", + Usage: "only print offline deals", + }, }, Action: func(cctx *cli.Context) error { if cctx.IsSet("color") { @@ -1107,42 +1056,50 @@ var storageDealsListCmd = &cli.Command{ watch := cctx.Bool("watch") showFailed := cctx.Bool("show-failed") - localDeals, err := api.ClientListDeals(ctx) - if err != nil { - return err - } - - if watch { - updates, err := api.ClientGetDealUpdates(ctx) + var localDeals []client.DealInfo + if cctx.Bool("offline") { + localDeals, err = api.ClientListOfflineDeals(ctx) + if err != nil { + return err + } + } else { + localDeals, err = api.ClientListDeals(ctx) if err != nil { return err } - for { - tm.Clear() - tm.MoveCursor(1, 1) - - err = outputClientStorageDeals(ctx, tm.Screen, fapi, localDeals, verbose, showFailed) + if watch { + updates, err := api.ClientGetDealUpdates(ctx) if err != nil { return err } - tm.Flush() - - select { - case <-ctx.Done(): - return nil - case updated := <-updates: - var found bool - for i, existing := range localDeals { - if existing.ProposalCid.Equals(updated.ProposalCid) { - localDeals[i] = updated - found = true - break - } + for { + tm.Clear() + tm.MoveCursor(1, 1) + + err = outputClientStorageDeals(ctx, tm.Screen, fapi, localDeals, verbose, showFailed) + if err != nil { + return err } - if !found { - localDeals = append(localDeals, updated) + + tm.Flush() + + select { + case <-ctx.Done(): + return nil + case updated := <-updates: + var found bool + for i, existing := range localDeals { + if existing.ProposalCid.Equals(updated.ProposalCid) { + localDeals[i] = updated + found = true + break + } + } + if !found { + localDeals = append(localDeals, updated) + } } } } @@ -1538,3 +1495,451 @@ var storageDealsInspectCmd = &cli.Command{ return inspectDealCmd(ctx, api, cctx.String("proposal-cid"), cctx.Int("deal-id")) }, } + +type params struct { + firstArg string // may data cid or car dir + from address.Address + miner []address.Address + price types.FIL + dur int64 + provCol big.Int + statelessDeal bool + isVerified bool + dcap uint64 +} + +func dealParamsFromContext(cctx *cli.Context, api clientapi.IMarketClient, fapi v1api.FullNode, isBatch bool) (*params, error) { + var start int + var addrs []string + var miners []address.Address + if !isBatch { + // [data, miner, price, dur] + start = 2 + addrs = []string{cctx.Args().Get(1)} + } else { + // [price, dur] + addrs = cctx.StringSlice("miner") + } + + if len(addrs) == 0 { + return nil, fmt.Errorf("must pass miner") + } + for _, addrStr := range addrs { + miner, err := address.NewFromString(addrStr) + if err != nil { + return nil, err + } + miners = append(miners, miner) + } + + price, err := types.ParseFIL(cctx.Args().Get(start)) + if err != nil { + return nil, err + } + dur, err := strconv.ParseInt(cctx.Args().Get(start+1), 10, 32) + if err != nil { + return nil, err + } + + var provCol big.Int + if pcs := cctx.String("provider-collateral"); pcs != "" { + pc, err := big.FromString(pcs) + if err != nil { + return nil, fmt.Errorf("failed to parse provider-collateral: %w", err) + } + provCol = pc + } + + if abi.ChainEpoch(dur) < MinDealDuration { + return nil, fmt.Errorf("minimum deal duration is %d blocks", MinDealDuration) + } + if abi.ChainEpoch(dur) > MaxDealDuration { + return nil, fmt.Errorf("maximum deal duration is %d blocks", MaxDealDuration) + } + + var a address.Address + if from := cctx.String("from"); from != "" { + faddr, err := address.NewFromString(from) + if err != nil { + return nil, fmt.Errorf("failed to parse 'from' address: %w", err) + } + a = faddr + } else { + def, err := api.DefaultAddress(cctx.Context) + if err != nil { + return nil, err + } + a = def + } + + // Check if the address is a verified client + dcap, err := fapi.StateVerifiedClientStatus(cctx.Context, a, types.EmptyTSK) + if err != nil { + return nil, err + } + + isVerified := dcap != nil + + // If the user has explicitly set the --verified-deal flag + if cctx.IsSet("verified-deal") { + // If --verified-deal is true, but the address is not a verified + // client, return an error + verifiedDealParam := cctx.Bool("verified-deal") + if verifiedDealParam && !isVerified { + return nil, fmt.Errorf("address %s does not have verified client status", a) + } + + // Override the default + isVerified = verifiedDealParam + } + + p := ¶ms{ + firstArg: cctx.Args().Get(0), + miner: miners, + from: a, + price: price, + dur: dur, + provCol: provCol, + statelessDeal: cctx.Bool("manual-stateless-deal"), + isVerified: isVerified, + } + if dcap != nil { + p.dcap = uint64(dcap.Int64()) + } + + return p, nil +} + +var storageDelesBatchCmd = &cli.Command{ + Name: "batch", + Usage: "Batch storage deals with miners", + Description: `Make deals with miners. +price is measured in FIL/Epoch. Miners usually don't accept a bid +lower than their advertised ask (which is in FIL/GiB/Epoch). You can check a miners listed price +with './market-client storage asks query '. +duration is how long the miner should store the data for, in blocks. +The minimum value is 518400 (6 months).`, + ArgsUsage: "[price duration]", + Flags: []cli.Flag{ + &cli.StringSliceFlag{ + Name: "miner", + Usage: "The address of the miner you wish to make a deal with, eg. --miner t010001 --miner t010002", + }, + &cli.StringFlag{ + Name: "manifest", + Usage: "Path to the manifest file", + Required: true, + }, + &cli.StringFlag{ + Name: "from", + Usage: "specify address to fund the deal with", + }, + &cli.Int64Flag{ + Name: "start-epoch", + Usage: "specify the epoch that the deal should start at", + Value: -1, + }, + &cli.BoolFlag{ + Name: "fast-retrieval", + Usage: "indicates that data should be available for fast retrieval", + Value: true, + }, + &cli.BoolFlag{ + Name: "verified-deal", + Usage: "indicate that the deal counts towards verified client total", + DefaultText: "true if client is verified, false otherwise", + }, + &cli.StringFlag{ + Name: "provider-collateral", + Usage: "specify the requested provider collateral the miner should put up", + }, + &cli.IntFlag{ + Name: "start-index", + Usage: "Starting from the nth deal", + }, + &cli.IntFlag{ + Name: "end-index", + Usage: "At the end of the nth deal", + }, + &cli.BoolFlag{ + Name: "filter", + Usage: "f+ requirements for LDN", + }, + &cli2.CidBaseFlag, + }, + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 2 { + return fmt.Errorf("must pass two arguments") + } + + fapi, fcloser, err := cli2.NewFullNode(cctx) + if err != nil { + return err + } + defer fcloser() + + api, closer, err := cli2.NewMarketClientNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := cli2.ReqContext(cctx) + p, err := dealParamsFromContext(cctx, api, fapi, true) + if err != nil { + return err + } + + transferType := storagemarket.TTManual + if p.price.Int64() != 0 { + return fmt.Errorf("you must provide a 'price' of 0") + } + + manifests, err := loadManifest(cctx.String("manifest")) + if err != nil { + return fmt.Errorf("load manifest error: %v", err) + } + + var params []*client.DealParams + var selector *selector + startIdx := cctx.Int("start-index") + endIdx := cctx.Int("end-index") + currDatacap := p.dcap + minerDeal := make(map[address.Address]int) + + if p.isVerified { + dd, err := api.ClientGetVerifiedDealDistribution(ctx, p.miner, p.from) + if err != nil { + return err + } + selector = newSelector(dd, p.from, p.miner) + } + + for i, m := range manifests { + if i < startIdx { + continue + } + if endIdx > 0 && i >= endIdx { + break + } + + dataRef := &storagemarket.DataRef{ + TransferType: transferType, + Root: m.payloadCID, + PieceCid: &m.pieceCID, + PieceSize: m.pieceSize, + RawBlockSize: m.payloadSize, + } + + miner := p.miner[i%len(p.miner)] + if p.isVerified { + paddedPiecedSize := uint64(m.pieceSize.Padded()) + if currDatacap < paddedPiecedSize { + fmt.Printf("datacap %d less than piece size %d\n", currDatacap, paddedPiecedSize) + break + } + + if cctx.IsSet("filter") { + miner = selector.selectMiner(m.pieceCID, paddedPiecedSize) + if miner.Empty() { + selector.printError() + break + } + } + currDatacap -= paddedPiecedSize + } + params = append(params, fillDealParams(cctx, p, dataRef, miner)) + minerDeal[miner]++ + } + fmt.Printf("has %d deals need to publish", len(params)) + if len(params) == 0 { + fmt.Println() + return nil + } + for miner, count := range minerDeal { + fmt.Printf(", %s: %d", miner, count) + } + fmt.Println() + + res, err := api.ClientBatchDeal(ctx, params) + if err != nil { + return err + } + + for i, r := range res.Results { + root := params[i].Data.Root.String() + if len(r.Message) == 0 { + fmt.Printf("create deal success, proposal cid: %v\n", r.ProposalCID) + } else { + fmt.Printf("create deal failed, playload cid: %v, error: %v\n", root, r.Message) + } + } + + return nil + }, +} + +var verifiedDealStatsCmd = &cli.Command{ + Name: "verified-deal-stat", + Usage: "Print the distribution of verified deals", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "provider", + Usage: "provider address", + }, + &cli.StringFlag{ + Name: "client", + Usage: "datacap address", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := cli2.NewMarketClientNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := cli2.ReqContext(cctx) + var provider, clientAddr address.Address + if cctx.IsSet("provider") { + provider, err = address.NewFromString(cctx.String("provider")) + if err != nil { + return err + } + } else if cctx.IsSet("client") { + clientAddr, err = address.NewFromString(cctx.String("client")) + if err != nil { + return err + } + } else { + return fmt.Errorf("must pass --provider of --client") + } + + dd, err := api.ClientGetVerifiedDealDistribution(ctx, []address.Address{provider}, clientAddr) + if err != nil { + return err + } + + newProviderWriter := func() *tablewriter.TableWriter { + return tablewriter.New( + tablewriter.Col("Provider"), + tablewriter.Col("Total"), + tablewriter.Col("Percentage"), + tablewriter.Col("Uniq"), + tablewriter.Col("DuplicationPercentage"), + ) + } + + sizeStr := func(v uint64) string { + return fmt.Sprintf("%s (%d B)", types.SizeStr(types.NewInt(v)), v) + } + + writeProviderDistribution := func(writer *tablewriter.TableWriter, pd *client.ProviderDistribution, percentage float64) { + rows := map[string]interface{}{ + "Provider": pd.Provider, + "Total": sizeStr(pd.Total), + "Uniq": sizeStr(pd.Uniq), + "DuplicationPercentage": fmt.Sprintf("%.2f%s", 100*pd.DuplicationPercentage, "%"), + } + if percentage != 0 { + rows["Percentage"] = fmt.Sprintf("%.2f%s", 100*percentage, "%") + } + writer.Write(rows) + } + + writeReplicasDistribution := func(buf *bytes.Buffer, rd *client.ReplicaDistribution) { + writer := newProviderWriter() + for _, pd := range rd.ReplicasDistribution { + writeProviderDistribution(writer, pd, rd.ReplicasPercentage[pd.Provider.String()]) + } + fmt.Fprintf(buf, "Client: %s\n", rd.Client) + fmt.Fprintf(buf, "Total: %s\n", sizeStr(rd.Total)) + fmt.Fprintf(buf, "Uniq: %s\n", sizeStr(rd.Uniq)) + fmt.Fprintf(buf, "DuplicationPercentage: %.2f%s\n", rd.DuplicationPercentage*100, "%") + _ = writer.Flush(buf) + buf.WriteString("\n") + } + + for _, pd := range dd.ProvidersDistribution { + if pd.Provider == provider { + writer := newProviderWriter() + writeProviderDistribution(writer, pd, 0) + + return writer.Flush(os.Stdout) + } + } + + buf := new(bytes.Buffer) + for _, rd := range dd.ReplicasDistribution { + if rd.Client == clientAddr { + writeReplicasDistribution(buf, rd) + break + } + } + fmt.Fprint(os.Stdout, buf.String()) + + return nil + }, +} + +var storageDealsExportCmd = &cli.Command{ + Name: "export", + Usage: "Export deal proposal cid and piece cid when the deal status is StorageDealWaitingForData", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "output", + Usage: "output result to file", + }, + &cli.IntFlag{ + Name: "count", + Usage: "number of exported deals", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := cli2.NewMarketClientNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := cli2.ReqContext(cctx) + + var deals []*client.DealInfo + res, err := api.ClientListOfflineDeals(ctx) + if err != nil { + return err + } + for i := range res { + if res[i].State == storagemarket.StorageDealWaitingForData { + deals = append(deals, &res[i]) + } + } + if len(deals) == 0 { + fmt.Println("no deals need export") + return nil + } + + if cctx.IsSet("output") { + count := cctx.Int("count") + buf := &bytes.Buffer{} + buf.WriteString("proposalCID,pieceCID\n") + for i, deal := range deals { + if count > 0 && i >= count { + continue + } + buf.WriteString(fmt.Sprintf("%s,%s\n", deal.ProposalCid.String(), deal.PieceCID.String())) + } + + return os.WriteFile(cctx.String("output"), buf.Bytes(), 0o755) + } + writer := tablewriter.New(tablewriter.Col("ProposalCID"), tablewriter.Col("PieceCID")) + for _, deal := range deals { + writer.Write(map[string]interface{}{ + "ProposalCID": deal.ProposalCid.String(), + "PieceCID": deal.PieceCID.String(), + }) + } + + return writer.Flush(os.Stdout) + }, +} diff --git a/cmd/market-client/util.go b/cmd/market-client/util.go new file mode 100644 index 00000000..4777a981 --- /dev/null +++ b/cmd/market-client/util.go @@ -0,0 +1,223 @@ +package main + +import ( + "encoding/csv" + "fmt" + "os" + "strconv" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/venus/venus-shared/types" + "github.com/filecoin-project/venus/venus-shared/types/market/client" + "github.com/ipfs/go-cid" + "github.com/urfave/cli/v2" +) + +func fillDealParams(cctx *cli.Context, p *params, ref *storagemarket.DataRef, miner address.Address) *client.DealParams { + return &client.DealParams{ + Data: ref, + Wallet: p.from, + Miner: miner, + EpochPrice: types.BigInt(p.price), + MinBlocksDuration: uint64(p.dur), + DealStartEpoch: abi.ChainEpoch(cctx.Int64("start-epoch")), + FastRetrieval: cctx.Bool("fast-retrieval"), + VerifiedDeal: p.isVerified, + ProviderCollateral: p.provCol, + } +} + +type manifest struct { + payloadCID cid.Cid + payloadSize uint64 + pieceCID cid.Cid + pieceSize abi.UnpaddedPieceSize +} + +func loadManifest(path string) ([]*manifest, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + + records, err := csv.NewReader(f).ReadAll() + if err != nil { + return nil, err + } + + manifests := make([]*manifest, 0, len(records)) + for i, record := range records { + // skip title: payload_cid,filename,piece_cid,payload_size,piece_size,detail or payload_cid,filename,detail + if i == 0 { + continue + } + + if len(record) == 3 { + payloadCID, err := cid.Parse(record[0]) + if err == nil { + manifests = append(manifests, &manifest{payloadCID: payloadCID}) + } + } else if len(record) == 6 { + payloadCID, err := cid.Parse(record[0]) + if err != nil { + continue + } + pieceCID, err := cid.Parse(record[2]) + if err != nil { + continue + } + payloadSize, err := strconv.ParseUint(record[3], 10, 64) + if err != nil { + continue + } + pieceSize, err := strconv.Atoi(record[4]) + if err == nil { + manifests = append(manifests, &manifest{payloadCID: payloadCID, payloadSize: payloadSize, + pieceCID: pieceCID, pieceSize: abi.UnpaddedPieceSize(pieceSize)}) + } + } + } + + return manifests, nil +} + +type selector struct { + pds map[address.Address]*client.ProviderDistribution + rds map[address.Address]*client.ReplicaDistribution + miners []address.Address + clientAddr address.Address + + errs map[address.Address]error +} + +func newSelector(dd *client.DealDistribution, clientAddr address.Address, miners []address.Address) *selector { + s := &selector{ + pds: make(map[address.Address]*client.ProviderDistribution, len(dd.ProvidersDistribution)), + rds: make(map[address.Address]*client.ReplicaDistribution, len(dd.ReplicasDistribution)), + miners: miners, + clientAddr: clientAddr, + + errs: make(map[address.Address]error, len(miners)), + } + s.init(dd) + + return s +} + +func (s *selector) init(dd *client.DealDistribution) { + for _, pd := range dd.ProvidersDistribution { + s.pds[pd.Provider] = pd + } + for _, miner := range s.miners { + if _, ok := s.pds[miner]; !ok { + s.pds[miner] = &client.ProviderDistribution{ + Provider: miner, + UniqPieces: make(map[string]uint64), + } + } + } + + for _, rd := range dd.ReplicasDistribution { + s.rds[rd.Client] = rd + } + if _, ok := s.rds[s.clientAddr]; !ok { + s.rds[s.clientAddr] = &client.ReplicaDistribution{ + Client: s.clientAddr, + ReplicasPercentage: map[string]float64{}, + } + } +} + +func (s *selector) selectMiner(pieceCID cid.Cid, pieceSize uint64) address.Address { + // clean error + s.errs = make(map[address.Address]error) + + var foundMiner address.Address + for _, miner := range s.miners { + err := s.checkDuplication(miner, pieceCID, pieceSize) + if err == nil { + err = s.checkRatio(miner, s.clientAddr, pieceSize) + } + if err == nil { + foundMiner = miner + break + } + s.errs[miner] = err + } + + if !foundMiner.Empty() { + s.update(s.clientAddr, foundMiner, pieceCID, pieceSize) + } + + return foundMiner +} + +// Storage provider should not be storing duplicate data for more than 20%. +func (s *selector) checkDuplication(miner address.Address, pieceCID cid.Cid, pieceSize uint64) error { + pd, ok := s.pds[miner] + if !ok { + return fmt.Errorf("not found provider distribution") + } + + total := pd.Total + pieceSize + uniq := pd.Uniq + _, ok = pd.UniqPieces[pieceCID.String()] + if !ok { + uniq += pieceSize + } + + duplicationPercentage := float64(total-uniq) / float64(total) + if duplicationPercentage > 0.2 && duplicationPercentage > pd.DuplicationPercentage { + return fmt.Errorf("duplication percentage %.2f%s greater than %s", duplicationPercentage*100, "%", "20%") + } + + return nil +} + +// Storage provider should not exceed 25% of total datacap. +func (s *selector) checkRatio(miner address.Address, clientAddr address.Address, pieceSize uint64) error { + rd, ok := s.rds[clientAddr] + if !ok { + return fmt.Errorf("not found replicas distribution: %v", clientAddr) + } + + oldPercent := rd.ReplicasPercentage[miner.String()] + total := rd.Total + pieceSize + // Not checking a small number of deals in the front + if 10*pieceSize >= total { + return nil + } + percent := (float64(rd.Total)*oldPercent + float64(pieceSize)) / float64(total) + if percent > 0.25 && percent > oldPercent { + return fmt.Errorf("replicas percentage %0.2f%s greater than %s", percent*100, "%", "25%") + } + + return nil +} + +func (s *selector) update(clientAddr address.Address, miner address.Address, pieceCID cid.Cid, pieceSize uint64) { + pd := s.pds[miner] + pd.Total += pieceSize + if _, ok := pd.UniqPieces[pieceCID.String()]; !ok { + pd.UniqPieces[pieceCID.String()] = pieceSize + pd.Uniq += pieceSize + } + pd.DuplicationPercentage = (float64(pd.Total-pd.Uniq) / float64(pd.Total)) + + rd := s.rds[clientAddr] + total := rd.Total + rd.Total += pieceSize + percent := rd.ReplicasPercentage[miner.String()] + rd.ReplicasPercentage[miner.String()] = (float64(total)*percent + float64(pieceSize)) / float64(rd.Total) + // todo: Calculate the proportion of each miner ? +} + +func (s *selector) printError() { + fmt.Println("select all miners failed: ") + for miner, err := range s.errs { + fmt.Printf("miner: %s, error: %v\n", miner, err) + } + fmt.Println() +} diff --git "a/docs/zh/\346\211\271\351\207\217\345\217\221\345\215\225.md" "b/docs/zh/\346\211\271\351\207\217\345\217\221\345\215\225.md" new file mode 100644 index 00000000..76b56bbf --- /dev/null +++ "b/docs/zh/\346\211\271\351\207\217\345\217\221\345\215\225.md" @@ -0,0 +1,144 @@ +## 批量发单 + +通过 `market-client` 批量发布离线订单。 + +批量发单命令:`market-client storage deals batch [command options] [miners price duration]`,具体使用和发送单个订单类似,新增了一些 Option。 + +部分 Option: + +* --miner 接收订单的矿工,可以是多个 `--miner=t010001` `--miner=t010002`,多个会平均分配订单。 +* --manifest `go-graphsplit` 生成的 `manifest.csv` 文件,包含 `payload cid`、`piece cid` 和 `piece size` 等信息 +* --start-index 从 `manifest.csv` 第 N 条订单开始 +* --end-index 在 `manifest.csv` 第 N 条订单结束,start-index - end-index 就是本次预期发布的订单总数 +* --filter 是否启动过滤,默认是 false,目前有两条过滤规则 + * 存储提供者重复的数据不能超过20% + * 一个发单者对同一个存储提供者发的datacap订单不能超过25% + +## 例子 + +### 准备订单数据 + +使用 [go-graphsplit](https://github.com/filedrive-team/go-graphsplit#usage) 生成订单数据,`go-graphsplit` 是一种用于将大型数据集分割成固定大小的 car 文件的工具。 + +``` +# --car-dir 指定文件目录存储生成的 car 文件和 manifest.csv 文件 +# --slice-size 指定car文件大小 +# --graph-name 图的名字,可随意填 +# --calc-commp 是否计算 commp,必须要计算,离线订单需要piece cid 和 piece size +# --rename 生成的car文件默认是以 payloadcid.car 作为文件名,加上本 flag 后以 piececid 作为文件名,建议加上本flag,方便后面数据导入 +# path-to-source-file 指定用于生成car文件的原始数据 + +./graphsplit chunk --car-dir --slice-size --graph-name --calc-commp --rename + +# 结果 +会在指定的目录生成 car 文件和 manifest.csv 文件 +``` + +manifest.csv 文件内容,发单的时候会使用到 playload_cid、piece_cid、payload_size 和 payload_size。 +``` +playload_cid,filename,piece_cid,payload_size,payload_size,detail +bafybeib6kembdoggfwssslpcrxgpbhxmeyretwcnaspx5ff2r4rybt73gq,test-total-19-part-1.car,baga6ea4seaqevygou3u5mlxl6prwifaw2n4e6eanrbdl7wm2guzfm2r5tool4fi,14241759,16646144,"{""Name"":"""",""Hash"":""bafybeib6kembdoggfwssslpcrxgpbhxmeyretwcnaspx5ff2r4rybt73gq"",""Size"":0,""Link"":[{""Name"":""filecoin-ffi"",""Hash"":""bafybeibum5nzwz54733lioyzwhqrod2zauax2jaa4qiu5hdqw7fidzl47m"",""Size"":14240979,""Link"":[{""Name"":""libfilcrypto.a.00000001"",""Hash"":""bafybeigptobclymvmmlxn4n764dey2p6hgot6h34vtte5fzm67l2znwz4q"",""Size"":14240905,""Link"":null}]}]}" +bafybeihh6sq3w5qmzrxakde5wwjkw7wnbwwxm5yvfo3nd5zjw25vtgcqom,test-total-19-part-2.car,baga6ea4seaqh4vblymafqc3tskmy5jilgs462gsbvuu6mxnsql364bhbj5omcji,14241759,16646144,"{""Name"":"""",""Hash"":""bafybeihh6sq3w5qmzrxakde5wwjkw7wnbwwxm5yvfo3nd5zjw25vtgcqom"",""Size"":0,""Link"":[{""Name"":""filecoin-ffi"",""Hash"":""bafybeigiprvgstre4ctcwleocm2gg52hzc32ccnjr66phud5qrnbmjyqli"",""Size"":14240979,""Link"":[{""Name"":""libfilcrypto.a.00000002"",""Hash"":""bafybeia4fxflzrihwbjkjayasrtvmnocvq5xn3pesendcjnlog47fchd6q"",""Size"":14240905,""Link"":null}]}]}" +bafybeihw7afqagfvg2uhmigyunsgvxeidaqfbg6odi4jwbus6557kqhsd4,test-total-19-part-3.car,baga6ea4seaqij5nrrmypz22lchdrjcaowjdjk67agjntbcfbcbbrcrnrdbn7kfy,14241759,16646144,"{""Name"":"""",""Hash"":""bafybeihw7afqagfvg2uhmigyunsgvxeidaqfbg6odi4jwbus6557kqhsd4"",""Size"":0,""Link"":[{""Name"":""filecoin-ffi"",""Hash"":""bafybeicn4bdznskvsjt3on2633mm3uh5gungf37hmfhsizraugq4uleosu"",""Size"":14240979,""Link"":[{""Name"":""libfilcrypto.a.00000003"",""Hash"":""bafybeidi5kz5hhfwcqechj4czbyat3tlyyd4d6rigigkks66ysjy5agfke"",""Size"":14240905,""Link"":null}]}]}" +bafybeigq66gjr6c6junro7t4fwcpkhl7vpowi3soadksjvleusq6nlzu6a,test-total-19-part-4.car,baga6ea4seaqaamzryns5cphp5bynuq5nrxdltxzcj6hoybboduhh4377rw6lqgy,14241759,16646144,"{""Name"":"""",""Hash"":""bafybeigq66gjr6c6junro7t4fwcpkhl7vpowi3soadksjvleusq6nlzu6a"",""Size"":0,""Link"":[{""Name"":""filecoin-ffi"",""Hash"":""bafybeih5lopymib4lbnoonqatu43r2avm2rw2nz4eeva6zmnyyegjsoyma"",""Size"":14240979,""Link"":[{""Name"":""libfilcrypto.a.00000004"",""Hash"":""bafybeieo2dahkmiak4rlyfehpgu4gyjq2v6wulbmcobha3f2nk6ftysaae"",""Size"":14240905,""Link"":null}]}]}" +``` + +### 批量发单 + +1. 发布普通离线订单 +``` +./market-client storage deals batch --from
--manifest --end-index 10 --start-index 5 --miner=t019150 --miner=t018682 0 518400 + +# 结果 +has 5 deals need to publish, t018682: 3, t019150: 2 +create deal success, proposal cid: bafyreihwvsr3vfsdbrxagtdjzsemngtc3r3xra2gaunbs6pjb63lyodl6a +create deal success, proposal cid: bafyreid2oakcs2di6lq5mv3e4h5sybkezl3zn7lhvf656zeqentcsqetem +create deal success, proposal cid: bafyreidmzdh7zee7inm65cdxmuwyxyv7uwtoo2hiphsjfseyymoir2z4nm +create deal success, proposal cid: bafyreigt663jymtilnxocfybs27w6b564yxfczdpsx47fbsqwzrj54zxz4 +create deal success, proposal cid: bafyreigtemnxftqg65gtwsw3rwfvqaqzbb47d4r75ipksjlypcs56y7qei +``` + +2. 批量发布 datacap 订单 + +> 请加上 --verified-deal flag + +``` +./market-client storage deals batch --from
--manifest --end-index 15 --start-index 10 --verified-deal --miner=t019150 --miner=t018682 0 518400 +``` + +### 查询订单 + +1. 查询单个订单 + +``` +./market-client storage deals get bafyreify4mm46vfdqsogcfhki44vi3ll5n2h35p5fflcclew53fgm3yieu +``` + +2. 列出所有离线订单 + +``` +./market-client storage deals list --offline +``` + +### 批量导入订单数据 + +**前提:使用 `go-graphsplit` 生成的 car 文件是以 piececid 作为文件名。** + +1. 导出待导入的离线订单 + +可以导出所有订单状态在 `StorageDealWaitingForData` 的订单,导出数据包括 proposal cid 和 piece cid。 + +``` +./market-client storage deals export --output proposal_piece.txt + +cat proposal_piece.txt + +proposalCID,pieceCID +bafyreihwvsr3vfsdbrxagtdjzsemngtc3r3xra2gaunbs6pjb63lyodl6a,baga6ea4seaqbj3yywnq3yisdxy4zlf4if2whlm5sdjcz7ricm2wrow2b7rc2uja +bafyreid2oakcs2di6lq5mv3e4h5sybkezl3zn7lhvf656zeqentcsqetem,baga6ea4seaqcdstiui27aajpz2dcpx2f6brimxhfvepgxljwsweicul32pkeofq +bafyreidmzdh7zee7inm65cdxmuwyxyv7uwtoo2hiphsjfseyymoir2z4nm,baga6ea4seaqlrwtnhj322vczuuiy2ekb4kjftbf3ho6f4bgy6k5rnzh67eia4lq +bafyreigt663jymtilnxocfybs27w6b564yxfczdpsx47fbsqwzrj54zxz4,baga6ea4seaqdgfsfsdtpnsgwlwhtj4ecvk7432gaqheltfrzun3vju3yc3d7cnq +bafyreigtemnxftqg65gtwsw3rwfvqaqzbb47d4r75ipksjlypcs56y7qei, +baga6ea4seaqknn4cstmtmscdhebaxmv5dopnxqthwbdmvuslbuv5dcupzvw46ni +``` + +2. 批量导入订单数据 + +``` +# --cardir car 文件的目录,若提前把 car 文件放到 piecestore,则本 flag 可以省略 +./venus-market storage deal batch-import-data --manifest --cardir + +# 结果 +import data success: bafyreihwvsr3vfsdbrxagtdjzsemngtc3r3xra2gaunbs6pjb63lyodl6a +import data success: bafyreid2oakcs2di6lq5mv3e4h5sybkezl3zn7lhvf656zeqentcsqetem +import data success: bafyreidmzdh7zee7inm65cdxmuwyxyv7uwtoo2hiphsjfseyymoir2z4nm +import data success: bafyreigt663jymtilnxocfybs27w6b564yxfczdpsx47fbsqwzrj54zxz4 +import data success: bafyreigtemnxftqg65gtwsw3rwfvqaqzbb47d4r75ipksjlypcs56y7qei +``` + +### 查询占比情况 + +1. 查看存储提供者订单重复情况 + +``` +./market-client storage deals verified-deal-stat --provider t019150 + +# 结果 +Provider Total Uniq DuplicationPercentage +t019150 981467136 612368384 37.61% +``` + +2. 查看样本分布情况 + +``` +./market-client storage deals verified-deal-stat --client t3wivhkdivcxj5zp2l4wjkzon232s52smnd5m3na66ujl5nel75jggguhgaa3zbhjo3as4epf5ytxl6ly3qoha + +# 结果 +Client: t3wivhkdivcxj5zp2l4wjkzon232s52smnd5m3na66ujl5nel75jggguhgaa3zbhjo3as4epf5ytxl6ly3qoha +Total: 16.17 GiB / 17362337792 B +Uniq: 14.77 GiB / 15860764672 B +DuplicationPercentage: 8.65% +Provider Total Percentage Uniq DuplicationPercentage +t019150 620 MiB / 650117120 B 3.74% 620 MiB / 650117120 B 0.00% +t018682 15.56 GiB / 16712220672 B 96.26% 14.42 GiB / 15487471616 B 7.33% +``` diff --git a/go.mod b/go.mod index 05ab9378..39dfdc96 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/filecoin-project/go-statestore v0.2.0 github.com/filecoin-project/specs-actors/v2 v2.3.6 github.com/filecoin-project/specs-actors/v7 v7.0.1 - github.com/filecoin-project/venus v1.11.2-0.20230525071843-ab19674dd7b3 + github.com/filecoin-project/venus v1.11.2-0.20230602071244-7b32f3e62010 github.com/filecoin-project/venus-auth v1.11.1-0.20230511013901-7829b3effbcd github.com/filecoin-project/venus-messager v1.11.0 github.com/golang/mock v1.6.0 @@ -40,7 +40,7 @@ require ( github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 - github.com/ipfs-force-community/venus-gateway v1.11.2-0.20230525072131-90ce0561276b + github.com/ipfs-force-community/venus-gateway v1.11.2-0.20230526091733-452ecda21333 github.com/ipfs/go-blockservice v0.5.0 github.com/ipfs/go-cid v0.3.2 github.com/ipfs/go-cidutil v0.1.0 diff --git a/go.sum b/go.sum index 8b76a88e..b7abad12 100644 --- a/go.sum +++ b/go.sum @@ -468,8 +468,8 @@ github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893 github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8= github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E= github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0= -github.com/filecoin-project/venus v1.11.2-0.20230525071843-ab19674dd7b3 h1:xUiO9SyFj6NdpFIZohgo1fqqmR0DEZm5pzew+0dsBMM= -github.com/filecoin-project/venus v1.11.2-0.20230525071843-ab19674dd7b3/go.mod h1:9QctTOegFH+hZ5icsuR2BRRnZMhyLDQa5uJAbOog76M= +github.com/filecoin-project/venus v1.11.2-0.20230602071244-7b32f3e62010 h1:rMfjmjOF184lcuS3FNlJ8vbzZiVhKAT4RIig4vr7YB4= +github.com/filecoin-project/venus v1.11.2-0.20230602071244-7b32f3e62010/go.mod h1:9QctTOegFH+hZ5icsuR2BRRnZMhyLDQa5uJAbOog76M= github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU= github.com/filecoin-project/venus-auth v1.11.1-0.20230511013901-7829b3effbcd h1:l02UJuEbSUIBi3NC/+17K2gBbAzsUNQg42rNCXskOBc= github.com/filecoin-project/venus-auth v1.11.1-0.20230511013901-7829b3effbcd/go.mod h1:PoTmfEn5lljjAQThBzX0+friJYGgi7Z3VLLujkOkCT4= @@ -837,8 +837,8 @@ github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea/go. github.com/ipfs-force-community/venus-common-utils v0.0.0-20210924063144-1d3a5b30de87/go.mod h1:RTVEOzM+hkpqmcEWpyLDkx1oGO5r9ZWCgYxG/CsXzJQ= github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 h1:v/1/INcqm3kHLauWQYB63MwWJRWGz+3WEuUPp0jzIl8= github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7/go.mod h1:sSTUXgIu95tPHvgcYhdLuELmgPJWCP/pNMFtsrVtOyA= -github.com/ipfs-force-community/venus-gateway v1.11.2-0.20230525072131-90ce0561276b h1:WJYSYAcvHqo6ht8TRPX8tt1R9ki7fpa2zraiHIw1DUc= -github.com/ipfs-force-community/venus-gateway v1.11.2-0.20230525072131-90ce0561276b/go.mod h1:fJzlqs3UfzmPXyuIfFc0tuUWJ7L+7hmcBxFl6jsRUWE= +github.com/ipfs-force-community/venus-gateway v1.11.2-0.20230526091733-452ecda21333 h1:mnqvXRJqKVsCrs6NF8xZFZD01TmI/MInvGzODAYeT50= +github.com/ipfs-force-community/venus-gateway v1.11.2-0.20230526091733-452ecda21333/go.mod h1:fJzlqs3UfzmPXyuIfFc0tuUWJ7L+7hmcBxFl6jsRUWE= github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= diff --git a/models/badger/client_offline_deal.go b/models/badger/client_offline_deal.go new file mode 100644 index 00000000..0394f2f6 --- /dev/null +++ b/models/badger/client_offline_deal.go @@ -0,0 +1,75 @@ +package badger + +import ( + "context" + "encoding/json" + + "github.com/filecoin-project/venus-market/v2/models/repo" + types "github.com/filecoin-project/venus/venus-shared/types/market/client" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" +) + +func NewBadgerClientOfflineDealRepo(ds ClientOfflineDealsDS) repo.ClientOfflineDealRepo { + return &badgerClientOfflineDealRepo{ds: ds} +} + +type badgerClientOfflineDealRepo struct { + ds datastore.Batching +} + +func (r *badgerClientOfflineDealRepo) SaveDeal(ctx context.Context, d *types.ClientOfflineDeal) error { + key := keyFromProposalCID(d.ProposalCID) + data, err := json.Marshal(d) + if err != nil { + return err + } + return r.ds.Put(ctx, key, data) +} + +func (r *badgerClientOfflineDealRepo) GetDeal(ctx context.Context, proposalCID cid.Cid) (*types.ClientOfflineDeal, error) { + key := keyFromProposalCID(proposalCID) + data, err := r.ds.Get(ctx, key) + if err != nil { + return nil, err + } + var d types.ClientOfflineDeal + err = json.Unmarshal(data, &d) + if err != nil { + return nil, err + } + + return &d, nil +} + +func (r *badgerClientOfflineDealRepo) ListDeal(ctx context.Context) (deals []*types.ClientOfflineDeal, err error) { + result, err := r.ds.Query(ctx, query.Query{}) + if err != nil { + return nil, err + } + defer func() { + err = result.Close() + }() + + for entry := range result.Next() { + if entry.Error != nil { + return nil, err + } + var d types.ClientOfflineDeal + err = json.Unmarshal(entry.Value, &d) + if err != nil { + return nil, err + } + + deals = append(deals, &d) + } + + return deals, nil +} + +var _ repo.ClientOfflineDealRepo = (*badgerClientOfflineDealRepo)(nil) + +func keyFromProposalCID(proposalCID cid.Cid) datastore.Key { + return datastore.KeyWithNamespaces([]string{proposalCID.String()}) +} diff --git a/models/badger/client_offline_deal_test.go b/models/badger/client_offline_deal_test.go new file mode 100644 index 00000000..b4f063e4 --- /dev/null +++ b/models/badger/client_offline_deal_test.go @@ -0,0 +1,53 @@ +package badger + +import ( + "context" + "testing" + + "github.com/filecoin-project/venus/venus-shared/testutil" + vTypes "github.com/filecoin-project/venus/venus-shared/types" + types "github.com/filecoin-project/venus/venus-shared/types/market/client" + "github.com/stretchr/testify/assert" +) + +func TestClientOfflineDeal(t *testing.T) { + ds, err := NewDatastore("") + assert.NoError(t, err) + r := NewBadgerClientOfflineDealRepo(ds) + + deals := make([]*types.ClientOfflineDeal, 10) + testutil.Provide(t, &deals) + + ctx := context.Background() + + t.Run("save deal", func(t *testing.T) { + for i, deal := range deals { + if i%2 == 0 { + deals[i].AddFundsCid = nil + } + assert.NoError(t, r.SaveDeal(ctx, deal)) + } + }) + + t.Run("get deal", func(t *testing.T) { + for _, deal := range deals { + res, err := r.GetDeal(ctx, deal.ProposalCID) + assert.NoError(t, err) + labelByte, err := deal.Proposal.Label.ToBytes() + assert.NoError(t, err) + labelStr, err := res.Proposal.Label.ToString() + assert.NoError(t, err) + assert.Equal(t, string(labelByte), labelStr) + res.Proposal.Label, err = vTypes.NewLabelFromBytes([]byte(labelStr)) + assert.NoError(t, err) + assert.Equal(t, deal, res) + } + }) + + t.Run("list deal", func(t *testing.T) { + res, err := r.ListDeal(ctx) + assert.NoError(t, err) + + assert.Len(t, res, len(deals)) + }) +} diff --git a/models/badger/db.go b/models/badger/db.go index 3ff6b7be..e9edee65 100644 --- a/models/badger/db.go +++ b/models/badger/db.go @@ -33,6 +33,7 @@ const ( // client dealClient = "/deals/client" dealLocal = "/deals/local" + offlineDeal = "/deals/offline" retrievalClient = "/retrievals/client" clientTransfer = "/datatransfer/client/transfers" ) @@ -89,6 +90,9 @@ type ClientDatastore datastore.Batching // /metadata/deals/local type ClientDealsDS datastore.Batching +// /metadata/deals/offline +type ClientOfflineDealsDS datastore.Batching + // /metadata/retrievals/client type RetrievalClientDS datastore.Batching @@ -165,6 +169,11 @@ func NewClientDatastore(ds MetadataDS) ClientDatastore { return namespace.Wrap(ds, datastore.NewKey(dealClient)) } +// NewClientOfflineDealStore creates a datastore for the client to store its offline deals +func NewClientOfflineDealStore(ds MetadataDS) ClientOfflineDealsDS { + return namespace.Wrap(ds, datastore.NewKey(offlineDeal)) +} + // for discover func NewClientDealsDS(ds MetadataDS) ClientDealsDS { return namespace.Wrap(ds, datastore.NewKey(dealLocal)) diff --git a/models/module.go b/models/module.go index b512a2ff..071fa097 100644 --- a/models/module.go +++ b/models/module.go @@ -58,7 +58,9 @@ var DBOptions = func(server bool, mysqlCfg *config.Mysql) builder.Option { builder.Override(new(badger2.RetrievalClientDS), badger2.NewRetrievalClientDS), builder.Override(new(badger2.ImportClientDS), badger2.NewImportClientDS), builder.Override(new(badger2.ClientTransferDS), badger2.NewClientTransferDS), + builder.Override(new(badger2.ClientOfflineDealsDS), badger2.NewClientOfflineDealStore), builder.Override(new(repo.Repo), badger2.NewMigratedBadgerRepo), + builder.Override(new(repo.ClientOfflineDealRepo), badger2.NewBadgerClientOfflineDealRepo), ), ), ) diff --git a/models/repo/repo.go b/models/repo/repo.go index 4954c360..15e1f44f 100644 --- a/models/repo/repo.go +++ b/models/repo/repo.go @@ -16,6 +16,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" fbig "github.com/filecoin-project/go-state-types/big" types "github.com/filecoin-project/venus/venus-shared/types/market" + types2 "github.com/filecoin-project/venus/venus-shared/types/market/client" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" ) @@ -113,6 +114,12 @@ type TxRepo interface { StorageDealRepo() StorageDealRepo } +type ClientOfflineDealRepo interface { + SaveDeal(ctx context.Context, deal *types2.ClientOfflineDeal) error + GetDeal(ctx context.Context, proposalCID cid.Cid) (*types2.ClientOfflineDeal, error) + ListDeal(ctx context.Context) ([]*types2.ClientOfflineDeal, error) +} + var ErrNotFound = errors.New("record not found") func UniformNotFoundErrors() { diff --git a/storageprovider/deal_assigner.go b/storageprovider/deal_assigner.go index 92f6309c..bc9cdf69 100644 --- a/storageprovider/deal_assigner.go +++ b/storageprovider/deal_assigner.go @@ -107,7 +107,7 @@ func (ps *dealAssigner) GetDeals(ctx context.Context, mAddr address.Address, pag for _, md := range mds { // TODO: 要排除不可密封状态的订单? - if md.DealID > 0 && !isTerminateState(md) { + if md.DealID > 0 && !IsTerminateState(md.State) { dis = append(dis, &types.DealInfo{ DealInfo: piecestore.DealInfo{ DealID: md.DealID, @@ -157,7 +157,7 @@ func (ps *dealAssigner) GetUnPackedDeals(ctx context.Context, miner address.Addr for _, md := range mds { // TODO: 要排除不可密封状态的订单? - if md.DealID == 0 || isTerminateState(md) { + if md.DealID == 0 || IsTerminateState(md.State) { continue } if ((spec.MaxPieceSize > 0 && uint64(md.Proposal.PieceSize)+curPieceSize < spec.MaxPieceSize) || spec.MaxPieceSize == 0) && numberPiece+1 < spec.MaxPiece { diff --git a/storageprovider/deal_handler.go b/storageprovider/deal_handler.go index 12ee8536..b4c2140c 100644 --- a/storageprovider/deal_handler.go +++ b/storageprovider/deal_handler.go @@ -639,22 +639,22 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleError(ctx context.Contex if deal.PiecePath != filestore.Path("") { fs, err := storageDealPorcess.tf(deal.Proposal.Provider) if err != nil { - log.Warnf("get temp file store for %s: %w", deal.Proposal.Provider, err) + log.Warnf("get temp file store for %s: %v", deal.Proposal.Provider, err) } else { err = fs.Delete(deal.PiecePath) if err != nil { - log.Warnf("deleting piece at path %s: %w", deal.PiecePath, err) + log.Warnf("deleting piece at path %s: %v", deal.PiecePath, err) } } } if deal.MetadataPath != filestore.Path("") { fs, err := storageDealPorcess.tf(deal.Proposal.Provider) if err != nil { - log.Warnf("get temp file store for %s: %w", deal.Proposal.Provider, err) + log.Warnf("get temp file store for %s: %v", deal.Proposal.Provider, err) } else { err = fs.Delete(deal.MetadataPath) if err != nil { - log.Warnf("deleting piece at path %s: %w", deal.MetadataPath, err) + log.Warnf("deleting piece at path %s: %v", deal.MetadataPath, err) } } } diff --git a/storageprovider/deal_tracker.go b/storageprovider/deal_tracker.go index 5a781598..af61e6ca 100644 --- a/storageprovider/deal_tracker.go +++ b/storageprovider/deal_tracker.go @@ -171,7 +171,7 @@ func (dealTracker *DealTracker) checkSlash(ctx metrics.MetricsCtx, addr address. if err != nil { return fmt.Errorf("get market deal info for sector %d of miner %s %w", deal.SectorNumber, addr, err) } - if dealProposal.State.SlashEpoch > -1 { // include in sector + if dealProposal.State.SlashEpoch > -1 { err = dealTracker.storageRepo.UpdateDealStatus(ctx, deal.ProposalCid, storagemarket.StorageDealSlashed, "") if err != nil { return fmt.Errorf("update deal status to slash for sector %d of miner %s %w", deal.SectorNumber, addr, err) diff --git a/storageprovider/storage_provider.go b/storageprovider/storage_provider.go index b72707b0..0711110e 100644 --- a/storageprovider/storage_provider.go +++ b/storageprovider/storage_provider.go @@ -6,9 +6,11 @@ import ( "fmt" "io" "math" + "os" "time" "github.com/ipfs-force-community/metrics" + "go.uber.org/fx" "github.com/hannahhoward/go-pubsub" "github.com/ipfs/go-cid" @@ -21,6 +23,7 @@ import ( commp "github.com/filecoin-project/go-fil-commp-hashhash" "github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/go-fil-markets/filestore" @@ -109,8 +112,8 @@ type StorageProvider interface { // GetStorageCollateral returns the current collateral balance GetStorageCollateral(ctx context.Context, mAddr address.Address) (storagemarket.Balance, error) - // ImportDataForDeal manually imports data for an offline storage deal - ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader, skipCommP bool) error + // ImportDataForDeals manually batch imports data for an offline storage deals + ImportDataForDeals(ctx context.Context, refs []*types.ImportDataRef, skipCommP bool) ([]*types.ImportDataResult, error) // ImportPublishedDeal manually import published deals to storage deals ImportPublishedDeal(ctx context.Context, deal types.MinerDeal) error @@ -123,6 +126,8 @@ type StorageProvider interface { } type StorageProviderImpl struct { + ctx context.Context + net smnet.StorageMarketNetwork tf config.TransferFileStoreConfigFunc @@ -146,6 +151,7 @@ type StorageProviderImpl struct { // NewStorageProvider returns a new storage provider func NewStorageProvider( mCtx metrics.MetricsCtx, + lc fx.Lifecycle, storedAsk IStorageAsk, h host.Host, tf config.TransferFileStoreConfigFunc, @@ -162,6 +168,8 @@ func NewStorageProvider( net := smnet.NewFromLibp2pHost(h) spV2 := &StorageProviderImpl{ + ctx: metrics.LifecycleCtx(mCtx, lc), + net: net, tf: tf, @@ -229,9 +237,9 @@ func (p *StorageProviderImpl) start(ctx context.Context) error { return nil } -func isTerminateState(deal *types.MinerDeal) bool { - if deal.State == storagemarket.StorageDealSlashed || deal.State == storagemarket.StorageDealExpired || - deal.State == storagemarket.StorageDealError || deal.State == storagemarket.StorageDealFailing { +func IsTerminateState(state storagemarket.StorageDealStatus) bool { + if state == storagemarket.StorageDealSlashed || state == storagemarket.StorageDealExpired || + state == storagemarket.StorageDealError || state == storagemarket.StorageDealFailing { return true } @@ -240,7 +248,7 @@ func isTerminateState(deal *types.MinerDeal) bool { func (p *StorageProviderImpl) restartDeals(ctx context.Context, deals []*types.MinerDeal) error { for _, deal := range deals { - if isTerminateState(deal) { + if IsTerminateState(deal.State) { continue } @@ -261,19 +269,70 @@ func (p *StorageProviderImpl) Stop() error { return p.net.StopHandlingRequests() } -// ImportDataForDeal manually imports data for an offline storage deal -// It will verify that the data in the passed io.Reader matches the expected piece -// cid for the given deal or it will error -// If can find the car file from the piece store, read it directly without copying the car file to the local directory. -// If skipCommP is true, do not compare piece cid. -func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader, skipCommP bool) error { +// ImportDataForDeals manually batch imports data for offline storage deals +func (p *StorageProviderImpl) ImportDataForDeals(ctx context.Context, refs []*types.ImportDataRef, skipCommP bool) ([]*types.ImportDataResult, error) { // TODO: be able to check if we have enough disk space - d, err := p.dealStore.GetDeal(ctx, propCid) - if err != nil { - return fmt.Errorf("failed getting deal %s: %w", propCid, err) + results := make([]*types.ImportDataResult, 0, len(refs)) + minerDeals := make(map[address.Address][]*types.MinerDeal) + for _, ref := range refs { + d, err := p.dealStore.GetDeal(ctx, ref.ProposalCID) + if err != nil { + results = append(results, &types.ImportDataResult{ + ProposalCID: ref.ProposalCID, + Message: fmt.Errorf("failed getting deal: %v", err).Error(), + }) + continue + } + if err := p.importDataForDeal(ctx, d, ref, skipCommP); err != nil { + results = append(results, &types.ImportDataResult{ + ProposalCID: ref.ProposalCID, + Message: err.Error(), + }) + continue + } + minerDeals[d.Proposal.Provider] = append(minerDeals[d.Proposal.Provider], d) + } + + for provider, deals := range minerDeals { + res, err := p.batchReserverFunds(p.ctx, deals) + if err != nil { + log.Errorf("batch reserver funds for %s failed: %v", provider, err) + for _, deal := range deals { + results = append(results, &types.ImportDataResult{ + ProposalCID: deal.ProposalCid, + Message: err.Error(), + }) + } + continue + } + + for _, deal := range deals { + if err := res[deal.ProposalCid]; err != nil { + results = append(results, &types.ImportDataResult{ + ProposalCID: deal.ProposalCid, + Message: err.Error(), + }) + continue + } + results = append(results, &types.ImportDataResult{ + ProposalCID: deal.ProposalCid, + }) + + go func(deal *types.MinerDeal) { + err := p.dealProcess.HandleOff(p.ctx, deal) + if err != nil { + log.Errorf("deal %s handle off err: %s", deal.ProposalCid, err) + } + }(deal) + } } - if isTerminateState(d) { + return results, nil +} + +func (p *StorageProviderImpl) importDataForDeal(ctx context.Context, d *types.MinerDeal, ref *types.ImportDataRef, skipCommP bool) error { + propCid := d.ProposalCid + if IsTerminateState(d.State) { return fmt.Errorf("deal %s is terminate state", propCid) } @@ -309,6 +368,11 @@ func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid } else { log.Debugf("not found %s in piece storage", d.Proposal.PieceCID) + data, err := os.Open(ref.File) + if err != nil { + return err + } + fs, err := p.tf(d.Proposal.Provider) if err != nil { return fmt.Errorf("failed to create temp filestore for provider %s: %w", d.Proposal.Provider.String(), err) @@ -399,14 +463,74 @@ func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid if err := p.dealStore.SaveDeal(ctx, d); err != nil { return fmt.Errorf("save deal(%d) failed:%w", d.DealID, err) } + p.eventPublisher.Publish(storagemarket.ProviderEventManualDataReceived, d) - go func() { - err := p.dealProcess.HandleOff(context.TODO(), d) + + return nil +} + +// batchReserverFunds batch reserver funds for deals +func (p *StorageProviderImpl) batchReserverFunds(ctx context.Context, deals []*types.MinerDeal) (map[cid.Cid]error, error) { + handleError := func(deals []*types.MinerDeal, evt storagemarket.ProviderEvent, err error) (map[cid.Cid]error, error) { + for _, deal := range deals { + p.eventPublisher.Publish(evt, deal) + _ = p.dealProcess.HandleError(ctx, deal, err) + } + return nil, err + } + + res := make(map[cid.Cid]error, len(deals)) + provider := deals[0].Proposal.Provider + allFunds := big.NewInt(0) + fundList := make([]big.Int, 0, len(deals)) + for _, d := range deals { + big.Add(allFunds, d.Proposal.ProviderCollateral) + fundList = append(fundList, d.Proposal.ProviderCollateral) + } + + tok, _, err := p.spn.GetChainHead(ctx) + if err != nil { + return handleError(deals, storagemarket.ProviderEventNodeErrored, fmt.Errorf("acquiring chain head: %v", err)) + } + + waddr, err := p.spn.GetMinerWorkerAddress(ctx, provider, tok) + if err != nil { + return handleError(deals, storagemarket.ProviderEventNodeErrored, fmt.Errorf("looking up miner worker: %v", err)) + } + + mcid, err := p.spn.ReserveFunds(ctx, waddr, provider, allFunds) + if err != nil { + return handleError(deals, storagemarket.ProviderEventNodeErrored, fmt.Errorf("reserving funds: %v", err)) + } + + for i, deal := range deals { + p.eventPublisher.Publish(storagemarket.ProviderEventFundsReserved, deal) + res[deal.ProposalCid] = nil + + if deal.FundsReserved.Nil() { + deal.FundsReserved = fundList[i] + } else { + deal.FundsReserved = big.Add(deal.FundsReserved, fundList[i]) + } + + // if no message was sent, and there was no error, funds were already available + if mcid != cid.Undef { + deal.AddFundsCid = &mcid + deal.State = storagemarket.StorageDealProviderFunding + } else { + p.eventPublisher.Publish(storagemarket.ProviderEventFunded, deal) + deal.State = storagemarket.StorageDealPublish // PublishDeal + } + + p.eventPublisher.Publish(storagemarket.ProviderEventFundingInitiated, deal) + err = p.dealStore.SaveDeal(ctx, deal) if err != nil { - log.Errorf("deal %s handle off err: %s", propCid, err) + _ = p.dealProcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err)) + res[deal.ProposalCid] = err } - }() - return nil + } + + return res, nil } // ImportPublishedDeal manually import published deals for an storage deal diff --git a/storageprovider/storageprovider_test.go b/storageprovider/storageprovider_test.go index 5abea06e..22cf15d6 100644 --- a/storageprovider/storageprovider_test.go +++ b/storageprovider/storageprovider_test.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/go-cid" "github.com/stretchr/testify/assert" + "go.uber.org/fx/fxtest" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/filestore" @@ -114,6 +115,7 @@ func TestStorageProviderImpl_ImportOfflineDeal(t *testing.T) { } func setup(t *testing.T) StorageProvider { + lc := fxtest.NewLifecycle(t) ctx := context.Background() spn := newMockProviderNode() @@ -152,7 +154,7 @@ func setup(t *testing.T) StorageProvider { } return store, nil } - provider, err := NewStorageProvider(ctx, ask, h, tf, psManager, dt, spn, nil, r, addrMgr, nil, nil, NewEventPublishAdapter(r)) + provider, err := NewStorageProvider(ctx, lc, ask, h, tf, psManager, dt, spn, nil, r, addrMgr, nil, nil, NewEventPublishAdapter(r)) if err != nil { t.Error(err) } diff --git a/utils/converters.go b/utils/converters.go index 0d395202..22880706 100644 --- a/utils/converters.go +++ b/utils/converters.go @@ -11,16 +11,25 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" ) -func NewStorageProviderInfo(miner address.Address, worker address.Address, sectorSize abi.SectorSize, peer peer.ID, addrs []abi.Multiaddrs) storagemarket.StorageProviderInfo { +func ConvertMultiaddr(addrs [][]byte) ([]multiaddr.Multiaddr, error) { multiaddrs := make([]multiaddr.Multiaddr, 0, len(addrs)) for _, a := range addrs { maddr, err := multiaddr.NewMultiaddrBytes(a) if err != nil { - return storagemarket.StorageProviderInfo{} + return nil, err } multiaddrs = append(multiaddrs, maddr) } + return multiaddrs, nil +} + +func NewStorageProviderInfo(miner address.Address, worker address.Address, sectorSize abi.SectorSize, peer peer.ID, addrs []abi.Multiaddrs) storagemarket.StorageProviderInfo { + multiaddrs, err := ConvertMultiaddr(addrs) + if err != nil { + return storagemarket.StorageProviderInfo{} + } + return storagemarket.StorageProviderInfo{ Address: miner, Worker: worker,