Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch send deals #297

Merged
merged 15 commits into from
Jun 6, 2023
2 changes: 1 addition & 1 deletion api/clients/mix_msgclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (msgClient *MixMsgClient) PushMessage(ctx context.Context, msg *types.Messa
return cid.Undef, err
}

log.Warnf("push message %s to venus-messager", msgID.String())
log.Infof("push message %s to venus-messager", msgID.String())

return msgID, nil
}
Expand Down
67 changes: 61 additions & 6 deletions api/impl/venus_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,20 @@ func (m *MarketNodeImpl) MarketImportDealData(ctx context.Context, propCid cid.C
}
defer fi.Close() //nolint:errcheck

return m.StorageProvider.ImportDataForDeal(ctx, propCid, fi, true)
ref := types.ImportDataRef{
ProposalCID: propCid,
File: path,
}

res, err := m.StorageProvider.ImportDataForDeals(ctx, []*types.ImportDataRef{&ref}, false)
if err != nil {
return err
}
if len(res[0].Message) > 0 {
return fmt.Errorf(res[0].Message)
}

return nil
}

func (m *MarketNodeImpl) MarketImportPublishedDeal(ctx context.Context, deal types.MinerDeal) error {
Expand Down Expand Up @@ -1092,21 +1105,31 @@ func (m *MarketNodeImpl) UpdateDealStatus(ctx context.Context, miner address.Add
return m.DealAssigner.UpdateDealStatus(ctx, miner, dealId, pieceStatus, dealStatus)
}

func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, fname string, skipCommP bool) error {
func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string, skipCommP bool) error {
deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, dealPropCid)
if err != nil {
return err
}
if err := jwtclient.CheckPermissionByMiner(ctx, m.AuthClient, deal.Proposal.Provider); err != nil {
return err
}
fi, err := os.Open(fname)

ref := &types.ImportDataRef{
ProposalCID: dealPropCid,
File: file,
}
res, err := m.DealsBatchImportData(ctx, types.ImportDataRefs{
Refs: []*types.ImportDataRef{ref},
SkipCommP: skipCommP,
})
if err != nil {
return fmt.Errorf("failed to open given file: %w", err)
return err
}
if len(res[0].Message) > 0 {
return fmt.Errorf(res[0].Message)
}
defer fi.Close() //nolint:errcheck

return m.StorageProvider.ImportDataForDeal(ctx, dealPropCid, fi, skipCommP)
return nil
}

func (m *MarketNodeImpl) GetDeals(ctx context.Context, miner address.Address, pageIndex, pageSize int) ([]*types.DealInfo, error) {
Expand Down Expand Up @@ -1250,3 +1273,35 @@ func (m *MarketNodeImpl) MarketGetReserved(ctx context.Context, addr address.Add
}
return m.FundAPI.MarketGetReserved(ctx, addr)
}

func (m *MarketNodeImpl) DealsBatchImportData(ctx context.Context, refs types.ImportDataRefs) ([]*types.ImportDataResult, error) {
refLen := len(refs.Refs)
results := make([]*types.ImportDataResult, 0, refLen)
validRefs := make([]*types.ImportDataRef, 0, refLen)
for _, ref := range refs.Refs {
deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, ref.ProposalCID)
if err != nil {
results = append(results, &types.ImportDataResult{
ProposalCID: ref.ProposalCID,
Message: err.Error(),
})
continue
}
if err := jwtclient.CheckPermissionByMiner(ctx, m.AuthClient, deal.Proposal.Provider); err != nil {
results = append(results, &types.ImportDataResult{
ProposalCID: ref.ProposalCID,
Message: err.Error(),
})
continue
}
validRefs = append(validRefs, ref)
}

res, err := m.StorageProvider.ImportDataForDeals(ctx, validRefs, refs.SkipCommP)
if err != nil {
return nil, err
}
results = append(results, res...)

return results, nil
}
4 changes: 2 additions & 2 deletions cli/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var StatsCmds = &cli.Command{
Usage: "Stats about deals, sectors, and other things",
Subcommands: []*cli.Command{
StatsPowerCmd,
StatsDealskCmd,
StatsDealsCmd,
},
}

Expand Down Expand Up @@ -254,7 +254,7 @@ var StatsPowerCmd = &cli.Command{
},
}

var StatsDealskCmd = &cli.Command{
var StatsDealsCmd = &cli.Command{
Name: "deals",
Description: "Statistics on active market deals",
Action: func(cctx *cli.Context) error {
Expand Down
102 changes: 102 additions & 0 deletions cli/storage-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"text/tabwriter"
Expand Down Expand Up @@ -42,6 +43,7 @@ var storageDealsCmds = &cli.Command{
Usage: "Manage storage deals and related configuration",
Subcommands: []*cli.Command{
dealsImportDataCmd,
dealsBatchImportDataCmd,
importOfflineDealCmd,
dealsListCmd,
updateStorageDealStateCmd,
Expand Down Expand Up @@ -97,6 +99,106 @@ var dealsImportDataCmd = &cli.Command{
},
}

var dealsBatchImportDataCmd = &cli.Command{
Name: "batch-import-data",
Usage: "Batch import data for deals",
Flags: []cli.Flag{
&cli.StringSliceFlag{
Name: "proposals",
Usage: "proposal cid and car file, eg. --proposals <proposal_cid>,<path_to_car_file> --proposals <proposal_cid>,<path_to_car_file>",
},
&cli.StringFlag{
Name: "manifest",
Usage: `A file containing proposal cid and piece cid, eg.
proposalCID,pieceCID
baadfdxxx,badddxxx
basdefxxx,baefaxxx
`,
},
&cli.BoolFlag{
Name: "skip-commp",
Usage: "skip calculate the piece-cid, please use with caution",
},
&cli.BoolFlag{
Name: "really-do-it",
Usage: "Actually send transaction performing the action",
},
&cli.StringFlag{
Name: "car-dir",
Usage: "Directory of car files",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := NewMarketNode(cctx)
if err != nil {
return err
}
defer closer()

ctx := DaemonContext(cctx)

var proposalFiles []string
var refs []*market.ImportDataRef
if cctx.IsSet("proposals") {
proposalFiles = cctx.StringSlice("proposals")
} else {
manifest := cctx.String("manifest")
if len(manifest) == 0 {
return fmt.Errorf("must pass proposals or manifest")
}
data, err := os.ReadFile(manifest)
if err != nil {
return err
}
proposalFiles = strings.Split(string(data), "\n")
}
carDir := cctx.String("car-dir")
for _, proposalFile := range proposalFiles {
arr := strings.Split(proposalFile, ",")
if len(arr) != 2 {
continue
}
proposalCID, err := cid.Parse(arr[0])
if err == nil && len(arr[1]) != 0 {
ref := &market.ImportDataRef{
ProposalCID: proposalCID,
File: arr[1],
}
if len(carDir) != 0 {
ref.File = filepath.Join(carDir, ref.File)
}

refs = append(refs, ref)
}
}

var skipCommP bool
if cctx.IsSet("skip-commp") {
if !cctx.IsSet("really-do-it") {
return fmt.Errorf("pass --really-do-it to actually execute this action")
}
skipCommP = true
}
res, err := api.DealsBatchImportData(ctx, market.ImportDataRefs{
Refs: refs,
SkipCommP: skipCommP,
})
if err != nil {
return err
}

for _, r := range res {
if len(r.Message) == 0 {
fmt.Printf("import data success: %s\n", r.ProposalCID)
} else {
fmt.Printf("import data failed, deal: %s, error: %s\n", r.ProposalCID, r.Message)
}
}

return nil
},
}

var importOfflineDealCmd = &cli.Command{
Name: "import-offlinedeal",
Usage: "Manually import offline deal",
Expand Down
Loading