diff --git a/cli/client.go b/cli/client.go index 7035fb64bd0..01de8801c04 100644 --- a/cli/client.go +++ b/cli/client.go @@ -1,19 +1,25 @@ package cli import ( + "bufio" "context" "encoding/json" "errors" "fmt" "io" + "math/rand" "os" "path/filepath" "sort" "strconv" + "strings" + "sync" + "sync/atomic" "text/tabwriter" "time" tm "github.com/buger/goterm" + "github.com/chzyer/readline" "github.com/docker/go-units" "github.com/fatih/color" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -74,6 +80,7 @@ var clientCmd = &cli.Command{ WithCategory("storage", clientQueryAskCmd), WithCategory("storage", clientListDeals), WithCategory("storage", clientGetDealCmd), + WithCategory("storage", clientListAsksCmd), WithCategory("data", clientImportCmd), WithCategory("data", clientDropCmd), WithCategory("data", clientLocalCmd), @@ -468,16 +475,26 @@ func interactiveDeal(cctx *cli.Context) error { } defer closer() ctx := ReqContext(cctx) + ctx, cancel := context.WithCancel(ctx) + defer cancel() state := "import" + gib := types.NewInt(1 << 30) var data cid.Cid var days int - var maddr address.Address - var ask storagemarket.StorageAsk - var epochPrice big.Int + var maddrs []address.Address + var ask []storagemarket.StorageAsk + var epochPrices []big.Int + var dur time.Duration var epochs abi.ChainEpoch var verified bool + var ds lapi.DataSize + + // find + var candidateAsks []*storagemarket.StorageAsk + var budget types.FIL + var dealCount int64 var a address.Address if from := cctx.String("from"); from != "" { @@ -494,10 +511,24 @@ func interactiveDeal(cctx *cli.Context) error { a = def } + fromBal, err := api.WalletBalance(ctx, a) + if err != nil { + return xerrors.Errorf("checking from address balance: %w", err) + } + printErr := func(err error) { fmt.Printf("%s %s\n", color.RedString("Error:"), err.Error()) } + cs := readline.NewCancelableStdin(os.Stdin) + go func() { + <-ctx.Done() + cs.Close() // nolint:errcheck + }() + + rl := bufio.NewReader(cs) + +uiLoop: for { // TODO: better exit handling if err := ctx.Err(); err != nil { @@ -508,8 +539,8 @@ func interactiveDeal(cctx *cli.Context) error { case "import": fmt.Print("Data CID (from " + color.YellowString("lotus client import") + "): ") - var cidStr string - _, err := fmt.Scan(&cidStr) + _cidStr, _, err := rl.ReadLine() + cidStr := string(_cidStr) if err != nil { printErr(xerrors.Errorf("reading cid string: %w", err)) continue @@ -521,59 +552,36 @@ func interactiveDeal(cctx *cli.Context) error { continue } - state = "duration" - case "duration": - fmt.Print("Deal duration (days): ") - - _, err := fmt.Scan(&days) + color.Blue(".. calculating data size\n") + ds, err = api.ClientDealSize(ctx, data) if err != nil { - printErr(xerrors.Errorf("parsing duration: %w", err)) - continue - } - - if days < int(build.MinDealDuration/builtin.EpochsInDay) { - printErr(xerrors.Errorf("minimum duration is %d days", int(build.MinDealDuration/builtin.EpochsInDay))) - continue + return err } - state = "miner" - case "miner": - fmt.Print("Miner Address (f0..): ") - var maddrStr string - - _, err := fmt.Scan(&maddrStr) - if err != nil { - printErr(xerrors.Errorf("reading miner address: %w", err)) - continue - } + state = "duration" + case "duration": + fmt.Print("Deal duration (days): ") - maddr, err = address.NewFromString(maddrStr) + _daystr, _, err := rl.ReadLine() + daystr := string(_daystr) if err != nil { - printErr(xerrors.Errorf("parsing miner address: %w", err)) - continue + return err } - state = "query" - case "query": - color.Blue(".. querying miner ask") - - mi, err := api.StateMinerInfo(ctx, maddr, types.EmptyTSK) + _, err = fmt.Sscan(daystr, &days) if err != nil { - printErr(xerrors.Errorf("failed to get peerID for miner: %w", err)) - state = "miner" + printErr(xerrors.Errorf("parsing duration: %w", err)) continue } - a, err := api.ClientQueryAsk(ctx, *mi.PeerId, maddr) - if err != nil { - printErr(xerrors.Errorf("failed to query ask: %w", err)) - state = "miner" + if days < int(build.MinDealDuration/builtin.EpochsInDay) { + printErr(xerrors.Errorf("minimum duration is %d days", int(build.MinDealDuration/builtin.EpochsInDay))) continue } - ask = *a + dur = 24 * time.Hour * time.Duration(days) + epochs = abi.ChainEpoch(dur / (time.Duration(build.BlockDelaySecs) * time.Second)) - // TODO: run more validation state = "verified" case "verified": ts, err := api.ChainHead(ctx) @@ -587,26 +595,20 @@ func interactiveDeal(cctx *cli.Context) error { } if dcap == nil { - state = "confirm" + state = "miner" continue } - color.Blue(".. checking verified deal eligibility\n") - ds, err := api.ClientDealSize(ctx, data) - if err != nil { - return err - } - if dcap.Uint64() < uint64(ds.PieceSize) { color.Yellow(".. not enough DataCap available for a verified deal\n") - state = "confirm" + state = "miner" continue } fmt.Print("\nMake this a verified deal? (yes/no): ") - var yn string - _, err = fmt.Scan(&yn) + _yn, _, err := rl.ReadLine() + yn := string(_yn) if err != nil { return err } @@ -621,34 +623,162 @@ func interactiveDeal(cctx *cli.Context) error { continue } - state = "confirm" - case "confirm": - fromBal, err := api.WalletBalance(ctx, a) + state = "miner" + case "miner": + fmt.Print("Miner Addresses (f0.. f0..), none to find: ") + + _maddrsStr, _, err := rl.ReadLine() + maddrsStr := string(_maddrsStr) if err != nil { - return xerrors.Errorf("checking from address balance: %w", err) + printErr(xerrors.Errorf("reading miner address: %w", err)) + continue } - color.Blue(".. calculating data size\n") - ds, err := api.ClientDealSize(ctx, data) + for _, s := range strings.Fields(maddrsStr) { + maddr, err := address.NewFromString(strings.TrimSpace(s)) + if err != nil { + printErr(xerrors.Errorf("parsing miner address: %w", err)) + continue uiLoop + } + + maddrs = append(maddrs, maddr) + } + + state = "query" + if len(maddrs) == 0 { + state = "find" + } + case "find": + asks, err := getAsks(ctx, api) if err != nil { return err } - dur := 24 * time.Hour * time.Duration(days) + for _, ask := range asks { + if ask.MinPieceSize > ds.PieceSize { + continue + } + if ask.MaxPieceSize < ds.PieceSize { + continue + } + candidateAsks = append(candidateAsks, ask) + } - epochs = abi.ChainEpoch(dur / (time.Duration(build.BlockDelaySecs) * time.Second)) - // TODO: do some more or epochs math (round to miner PP, deal start buffer) + fmt.Printf("Found %d candidate asks\n", len(candidateAsks)) + state = "find-budget" + case "find-budget": + fmt.Printf("Proposing from %s, Current Balance: %s\n", a, types.FIL(fromBal)) + fmt.Print("Maximum budget (FIL): ") // TODO: Propose some default somehow? - pricePerGib := ask.Price - if verified { - pricePerGib = ask.VerifiedPrice + _budgetStr, _, err := rl.ReadLine() + budgetStr := string(_budgetStr) + if err != nil { + printErr(xerrors.Errorf("reading miner address: %w", err)) + continue } - gib := types.NewInt(1 << 30) + budget, err = types.ParseFIL(budgetStr) + if err != nil { + printErr(xerrors.Errorf("parsing FIL: %w", err)) + continue uiLoop + } - // TODO: price is based on PaddedPieceSize, right? - epochPrice = types.BigDiv(types.BigMul(pricePerGib, types.NewInt(uint64(ds.PieceSize))), gib) - totalPrice := types.BigMul(epochPrice, types.NewInt(uint64(epochs))) + var goodAsks []*storagemarket.StorageAsk + for _, ask := range candidateAsks { + p := ask.Price + if verified { + p = ask.VerifiedPrice + } + + epochPrice := types.BigDiv(types.BigMul(p, types.NewInt(uint64(ds.PieceSize))), gib) + totalPrice := types.BigMul(epochPrice, types.NewInt(uint64(epochs))) + + if totalPrice.LessThan(abi.TokenAmount(budget)) { + goodAsks = append(goodAsks, ask) + } + } + candidateAsks = goodAsks + fmt.Printf("%d asks within budget\n", len(candidateAsks)) + state = "find-count" + case "find-count": + fmt.Print("Deals to make (1): ") + dealcStr, _, err := rl.ReadLine() + if err != nil { + printErr(xerrors.Errorf("reading deal count: %w", err)) + continue + } + + dealCount, err = strconv.ParseInt(string(dealcStr), 10, 64) + if err != nil { + return err + } + + color.Blue(".. Picking miners") + + // TODO: some better strategy (this tries to pick randomly) + var pickedAsks []*storagemarket.StorageAsk + pickLoop: + for i := 0; i < 64; i++ { + rand.Shuffle(len(candidateAsks), func(i, j int) { + candidateAsks[i], candidateAsks[j] = candidateAsks[j], candidateAsks[i] + }) + + remainingBudget := abi.TokenAmount(budget) + pickedAsks = []*storagemarket.StorageAsk{} + + for _, ask := range candidateAsks { + p := ask.Price + if verified { + p = ask.VerifiedPrice + } + + epochPrice := types.BigDiv(types.BigMul(p, types.NewInt(uint64(ds.PieceSize))), gib) + totalPrice := types.BigMul(epochPrice, types.NewInt(uint64(epochs))) + + if totalPrice.GreaterThan(remainingBudget) { + continue + } + + pickedAsks = append(pickedAsks, ask) + remainingBudget = big.Sub(remainingBudget, totalPrice) + + if len(pickedAsks) == int(dealCount) { + break pickLoop + } + } + } + + for _, pickedAsk := range pickedAsks { + maddrs = append(maddrs, pickedAsk.Miner) + ask = append(ask, *pickedAsk) + } + + state = "confirm" + case "query": + color.Blue(".. querying miner asks") + + for _, maddr := range maddrs { + mi, err := api.StateMinerInfo(ctx, maddr, types.EmptyTSK) + if err != nil { + printErr(xerrors.Errorf("failed to get peerID for miner: %w", err)) + state = "miner" + continue uiLoop + } + + a, err := api.ClientQueryAsk(ctx, *mi.PeerId, maddr) + if err != nil { + printErr(xerrors.Errorf("failed to query ask: %w", err)) + state = "miner" + continue uiLoop + } + + ask = append(ask, *a) + } + + // TODO: run more validation + state = "confirm" + case "confirm": + // TODO: do some more or epochs math (round to miner PP, deal start buffer) fmt.Printf("-----\n") fmt.Printf("Proposing from %s\n", a) @@ -656,15 +786,41 @@ func interactiveDeal(cctx *cli.Context) error { fmt.Printf("\n") fmt.Printf("Piece size: %s (Payload size: %s)\n", units.BytesSize(float64(ds.PieceSize)), units.BytesSize(float64(ds.PayloadSize))) fmt.Printf("Duration: %s\n", dur) - fmt.Printf("Total price: ~%s (%s per epoch)\n", types.FIL(totalPrice), types.FIL(epochPrice)) + + pricePerGib := big.Zero() + for _, a := range ask { + p := a.Price + if verified { + p = a.VerifiedPrice + } + pricePerGib = big.Add(pricePerGib, p) + epochPrice := types.BigDiv(types.BigMul(p, types.NewInt(uint64(ds.PieceSize))), gib) + epochPrices = append(epochPrices, epochPrice) + + mpow, err := api.StateMinerPower(ctx, a.Miner, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("getting power (%s): %w", a.Miner, err) + } + + if len(ask) > 1 { + totalPrice := types.BigMul(epochPrice, types.NewInt(uint64(epochs))) + fmt.Printf("Miner %s (Power:%s) price: ~%s (%s per epoch)\n", color.YellowString(a.Miner.String()), color.GreenString(types.SizeStr(mpow.MinerPower.QualityAdjPower)), color.BlueString(types.FIL(totalPrice).String()), types.FIL(epochPrice)) + } + } + + // TODO: price is based on PaddedPieceSize, right? + epochPrice := types.BigDiv(types.BigMul(pricePerGib, types.NewInt(uint64(ds.PieceSize))), gib) + totalPrice := types.BigMul(epochPrice, types.NewInt(uint64(epochs))) + + fmt.Printf("Total price: ~%s (%s per epoch)\n", color.CyanString(types.FIL(totalPrice).String()), types.FIL(epochPrice)) fmt.Printf("Verified: %v\n", verified) state = "accept" case "accept": fmt.Print("\nAccept (yes/no): ") - var yn string - _, err := fmt.Scan(&yn) + _yn, _, err := rl.ReadLine() + yn := string(_yn) if err != nil { return err } @@ -680,30 +836,34 @@ func interactiveDeal(cctx *cli.Context) error { state = "execute" case "execute": - color.Blue(".. executing") - proposal, err := api.ClientStartDeal(ctx, &lapi.StartDealParams{ - Data: &storagemarket.DataRef{ - TransferType: storagemarket.TTGraphsync, - Root: data, - }, - Wallet: a, - Miner: maddr, - EpochPrice: epochPrice, - MinBlocksDuration: uint64(epochs), - DealStartEpoch: abi.ChainEpoch(cctx.Int64("start-epoch")), - FastRetrieval: cctx.Bool("fast-retrieval"), - VerifiedDeal: verified, - }) - if err != nil { - return err - } + color.Blue(".. executing\n") + + for i, maddr := range maddrs { + proposal, err := api.ClientStartDeal(ctx, &lapi.StartDealParams{ + Data: &storagemarket.DataRef{ + TransferType: storagemarket.TTGraphsync, + Root: data, + }, + Wallet: a, + Miner: maddr, + EpochPrice: epochPrices[i], + MinBlocksDuration: uint64(epochs), + DealStartEpoch: abi.ChainEpoch(cctx.Int64("start-epoch")), + FastRetrieval: cctx.Bool("fast-retrieval"), + VerifiedDeal: verified, + }) + if err != nil { + return err + } - encoder, err := GetCidEncoder(cctx) - if err != nil { - return err + encoder, err := GetCidEncoder(cctx) + if err != nil { + return err + } + + fmt.Printf("Deal (%s) CID: %s\n", maddr, color.GreenString(encoder.Encode(*proposal))) } - fmt.Println("\nDeal CID:", color.GreenString(encoder.Encode(*proposal))) return nil default: return xerrors.Errorf("unknown state: %s", state) @@ -944,6 +1104,152 @@ var clientRetrieveCmd = &cli.Command{ }, } +var clientListAsksCmd = &cli.Command{ + Name: "list-asks", + Usage: "List asks for top miners", + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + asks, err := getAsks(ctx, api) + if err != nil { + return err + } + + for _, ask := range asks { + fmt.Printf("%s: min:%s max:%s price:%s/GiB/Epoch verifiedPrice:%s/GiB/Epoch\n", ask.Miner, + types.SizeStr(types.NewInt(uint64(ask.MinPieceSize))), + types.SizeStr(types.NewInt(uint64(ask.MaxPieceSize))), + types.FIL(ask.Price), + types.FIL(ask.VerifiedPrice), + ) + } + + return nil + }, +} + +func getAsks(ctx context.Context, api lapi.FullNode) ([]*storagemarket.StorageAsk, error) { + color.Blue(".. getting miner list") + miners, err := api.StateListMiners(ctx, types.EmptyTSK) + if err != nil { + return nil, xerrors.Errorf("getting miner list: %w", err) + } + + var lk sync.Mutex + var found int64 + var withMinPower []address.Address + done := make(chan struct{}) + + go func() { + defer close(done) + + var wg sync.WaitGroup + wg.Add(len(miners)) + + throttle := make(chan struct{}, 50) + for _, miner := range miners { + throttle <- struct{}{} + go func(miner address.Address) { + defer wg.Done() + defer func() { + <-throttle + }() + + power, err := api.StateMinerPower(ctx, miner, types.EmptyTSK) + if err != nil { + return + } + + if power.HasMinPower { // TODO: Lower threshold + atomic.AddInt64(&found, 1) + lk.Lock() + withMinPower = append(withMinPower, miner) + lk.Unlock() + } + }(miner) + } + }() + +loop: + for { + select { + case <-time.After(150 * time.Millisecond): + fmt.Printf("\r* Found %d miners with power", atomic.LoadInt64(&found)) + case <-done: + break loop + } + } + fmt.Printf("\r* Found %d miners with power\n", atomic.LoadInt64(&found)) + + color.Blue(".. querying asks") + + var asks []*storagemarket.StorageAsk + var queried, got int64 + + done = make(chan struct{}) + go func() { + defer close(done) + + var wg sync.WaitGroup + wg.Add(len(withMinPower)) + + throttle := make(chan struct{}, 50) + for _, miner := range withMinPower { + throttle <- struct{}{} + go func(miner address.Address) { + defer wg.Done() + defer func() { + <-throttle + atomic.AddInt64(&queried, 1) + }() + + ctx, cancel := context.WithTimeout(ctx, 4*time.Second) + defer cancel() + + mi, err := api.StateMinerInfo(ctx, miner, types.EmptyTSK) + if err != nil { + return + } + if mi.PeerId == nil { + return + } + + ask, err := api.ClientQueryAsk(ctx, *mi.PeerId, miner) + if err != nil { + return + } + + atomic.AddInt64(&got, 1) + lk.Lock() + asks = append(asks, ask) + lk.Unlock() + }(miner) + } + }() + +loop2: + for { + select { + case <-time.After(150 * time.Millisecond): + fmt.Printf("\r* Queried %d asks, got %d responses", atomic.LoadInt64(&queried), atomic.LoadInt64(&got)) + case <-done: + break loop2 + } + } + fmt.Printf("\r* Queried %d asks, got %d responses\n", atomic.LoadInt64(&queried), atomic.LoadInt64(&got)) + + sort.Slice(asks, func(i, j int) bool { + return asks[i].Price.LessThan(asks[j].Price) + }) + + return asks, nil +} + var clientQueryAskCmd = &cli.Command{ Name: "query-ask", Usage: "Find a miners ask", diff --git a/go.mod b/go.mod index a6a074e4985..b648523d1f8 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129 + github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/cockroachdb/pebble v0.0.0-20200916222308-4e219a90ba5b github.com/coreos/go-systemd/v22 v22.0.0 github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e diff --git a/go.sum b/go.sum index 4cf7b362844..b7a717c186a 100644 --- a/go.sum +++ b/go.sum @@ -120,8 +120,11 @@ github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= +github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=