From fcb7cd0eb08f45ba84e543a60571951d5f069e5f Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Mon, 22 Nov 2021 19:49:38 -0500 Subject: [PATCH 1/3] Shed: Add a util to send a batch of messages --- chain/messagepool/messagepool.go | 6 +- cmd/lotus-shed/main.go | 1 + cmd/lotus-shed/send-csv.go | 104 +++++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 3 deletions(-) create mode 100644 cmd/lotus-shed/send-csv.go diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 06343e9c978..e97dfea7b7a 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -196,10 +196,10 @@ func ComputeMinRBF(curPrem abi.TokenAmount) abi.TokenAmount { return types.BigAdd(minPrice, types.NewInt(1)) } -func CapGasFee(mff dtypes.DefaultMaxFeeFunc, msg *types.Message, sendSepc *api.MessageSendSpec) { +func CapGasFee(mff dtypes.DefaultMaxFeeFunc, msg *types.Message, sendSpec *api.MessageSendSpec) { var maxFee abi.TokenAmount - if sendSepc != nil { - maxFee = sendSepc.MaxFee + if sendSpec != nil { + maxFee = sendSpec.MaxFee } if maxFee.Int == nil || maxFee.Equals(big.Zero()) { mf, err := mff() diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index d35fb56dd8b..f4e6627cb3c 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -65,6 +65,7 @@ func main() { fr32Cmd, chainCmd, balancerCmd, + sendCsvCmd, } app := &cli.App{ diff --git a/cmd/lotus-shed/send-csv.go b/cmd/lotus-shed/send-csv.go new file mode 100644 index 00000000000..072605b8d51 --- /dev/null +++ b/cmd/lotus-shed/send-csv.go @@ -0,0 +1,104 @@ +package main + +import ( + "encoding/csv" + "fmt" + "os" + "strings" + + "github.com/ipfs/go-cid" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/exitcode" + + lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + lcli "github.com/filecoin-project/lotus/cli" +) + +var sendCsvCmd = &cli.Command{ + Name: "send-csv", + Usage: "Utility for sending a batch of balance transfers", + ArgsUsage: "[sender] [csvfile]", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 2 { + return xerrors.New("must supply sender and path to csv file") + } + + api, closer, err := lcli.GetFullNodeAPIV1(cctx) + if err != nil { + return err + } + + defer closer() + ctx := lcli.ReqContext(cctx) + + sender, err := address.NewFromString(cctx.Args().Get(0)) + if err != nil { + return err + } + + fileReader, err := os.Open(cctx.Args().Get(1)) + if err != nil { + return xerrors.Errorf("read csv: %w", err) + } + + defer fileReader.Close() //nolint:errcheck + r := csv.NewReader(fileReader) + records, err := r.ReadAll() + if err != nil { + return xerrors.Errorf("read csv: %w", err) + } + + var msgCids []cid.Cid + for i, e := range records[1:] { + addr, err := address.NewFromString(e[0]) + if err != nil { + return xerrors.Errorf("failed to parse address in row %d: %w", i, err) + } + + value, err := types.ParseFIL(strings.TrimSpace(e[1])) + if err != nil { + return xerrors.Errorf("failed to parse value balance: %w", err) + } + + smsg, err := api.MpoolPushMessage(ctx, &types.Message{ + To: addr, + From: sender, + Value: abi.TokenAmount(value), + }, nil) + if err != nil { + return err + } + + fmt.Printf("sending %s to %s in msg %s\n", value.String(), addr, smsg.Cid()) + + if i > 0 && i%100 == 0 { + fmt.Printf("catching up until latest message lands") + _, err := api.StateWaitMsg(ctx, smsg.Cid(), 1, lapi.LookbackNoLimit, true) + if err != nil { + return err + } + } + + msgCids = append(msgCids, smsg.Cid()) + } + + fmt.Println("waiting on messages") + + for _, msgCid := range msgCids { + ml, err := api.StateWaitMsg(ctx, msgCid, 5, lapi.LookbackNoLimit, true) + if err != nil { + return err + } + if ml.Receipt.ExitCode != exitcode.Ok { + fmt.Printf("MSG %s NON-ZERO EXITCODE: %s\n", msgCid, ml.Receipt.ExitCode) + } + } + + return nil + }, +} From c9a557e0d97a5240d022573da3f498cbcaef642d Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Wed, 24 Nov 2021 14:34:28 -0500 Subject: [PATCH 2/3] Address review --- cmd/lotus-shed/send-csv.go | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/cmd/lotus-shed/send-csv.go b/cmd/lotus-shed/send-csv.go index 072605b8d51..2ecd6345bf4 100644 --- a/cmd/lotus-shed/send-csv.go +++ b/cmd/lotus-shed/send-csv.go @@ -20,12 +20,19 @@ import ( ) var sendCsvCmd = &cli.Command{ - Name: "send-csv", - Usage: "Utility for sending a batch of balance transfers", - ArgsUsage: "[sender] [csvfile]", + Name: "send-csv", + Usage: "Utility for sending a batch of balance transfers", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "from", + Usage: "specify the account to send funds from", + Required: true, + }, + }, + ArgsUsage: "[csvfile]", Action: func(cctx *cli.Context) error { - if cctx.NArg() != 2 { - return xerrors.New("must supply sender and path to csv file") + if cctx.NArg() != 1 { + return xerrors.New("must supply path to csv file") } api, closer, err := lcli.GetFullNodeAPIV1(cctx) @@ -36,12 +43,12 @@ var sendCsvCmd = &cli.Command{ defer closer() ctx := lcli.ReqContext(cctx) - sender, err := address.NewFromString(cctx.Args().Get(0)) + sender, err := address.NewFromString(cctx.String("from")) if err != nil { return err } - fileReader, err := os.Open(cctx.Args().Get(1)) + fileReader, err := os.Open(cctx.Args().First()) if err != nil { return xerrors.Errorf("read csv: %w", err) } @@ -53,7 +60,11 @@ var sendCsvCmd = &cli.Command{ return xerrors.Errorf("read csv: %w", err) } - var msgCids []cid.Cid + if strings.TrimSpace(records[0][0]) != "Recipient" || strings.TrimSpace(records[0][1]) != "FIL" { + return xerrors.Errorf("expected header row to be \"Recipient, FIL\"") + } + + var msgs []*types.Message for i, e := range records[1:] { addr, err := address.NewFromString(e[0]) if err != nil { @@ -65,16 +76,21 @@ var sendCsvCmd = &cli.Command{ return xerrors.Errorf("failed to parse value balance: %w", err) } - smsg, err := api.MpoolPushMessage(ctx, &types.Message{ + msgs = append(msgs, &types.Message{ To: addr, From: sender, Value: abi.TokenAmount(value), - }, nil) + }) + } + + var msgCids []cid.Cid + for i, msg := range msgs { + smsg, err := api.MpoolPushMessage(ctx, msg, nil) if err != nil { return err } - fmt.Printf("sending %s to %s in msg %s\n", value.String(), addr, smsg.Cid()) + fmt.Printf("sending %s to %s in msg %s\n", msg.Value.String(), msg.To, smsg.Cid()) if i > 0 && i%100 == 0 { fmt.Printf("catching up until latest message lands") From b0a9a272880b7e72cca5d5b33b9f9ff654e0a883 Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Thu, 25 Nov 2021 17:21:50 -0500 Subject: [PATCH 3/3] Shed: Allow send-csv to specify params and method --- cmd/lotus-shed/send-csv.go | 48 +++++++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/cmd/lotus-shed/send-csv.go b/cmd/lotus-shed/send-csv.go index 2ecd6345bf4..ce1c8b68a74 100644 --- a/cmd/lotus-shed/send-csv.go +++ b/cmd/lotus-shed/send-csv.go @@ -2,8 +2,10 @@ package main import ( "encoding/csv" + "encoding/hex" "fmt" "os" + "strconv" "strings" "github.com/ipfs/go-cid" @@ -43,6 +45,12 @@ var sendCsvCmd = &cli.Command{ defer closer() ctx := lcli.ReqContext(cctx) + srv, err := lcli.GetFullNodeServices(cctx) + if err != nil { + return err + } + defer srv.Close() //nolint:errcheck + sender, err := address.NewFromString(cctx.String("from")) if err != nil { return err @@ -60,8 +68,11 @@ var sendCsvCmd = &cli.Command{ return xerrors.Errorf("read csv: %w", err) } - if strings.TrimSpace(records[0][0]) != "Recipient" || strings.TrimSpace(records[0][1]) != "FIL" { - return xerrors.Errorf("expected header row to be \"Recipient, FIL\"") + if strings.TrimSpace(records[0][0]) != "Recipient" || + strings.TrimSpace(records[0][1]) != "FIL" || + strings.TrimSpace(records[0][2]) != "Method" || + strings.TrimSpace(records[0][3]) != "Params" { + return xerrors.Errorf("expected header row to be \"Recipient, FIL, Method, Params\"") } var msgs []*types.Message @@ -76,21 +87,41 @@ var sendCsvCmd = &cli.Command{ return xerrors.Errorf("failed to parse value balance: %w", err) } + method, err := strconv.Atoi(strings.TrimSpace(e[2])) + if err != nil { + return xerrors.Errorf("failed to parse method number: %w", err) + } + + var params []byte + if strings.TrimSpace(e[3]) != "nil" { + params, err = hex.DecodeString(strings.TrimSpace(e[3])) + if err != nil { + return xerrors.Errorf("failed to parse hexparams: %w", err) + } + } + msgs = append(msgs, &types.Message{ - To: addr, - From: sender, - Value: abi.TokenAmount(value), + To: addr, + From: sender, + Value: abi.TokenAmount(value), + Method: abi.MethodNum(method), + Params: params, }) } + if len(msgs) == 0 { + return nil + } + var msgCids []cid.Cid for i, msg := range msgs { smsg, err := api.MpoolPushMessage(ctx, msg, nil) if err != nil { - return err + fmt.Printf("%d, ERROR %s\n", i, err) + continue } - fmt.Printf("sending %s to %s in msg %s\n", msg.Value.String(), msg.To, smsg.Cid()) + fmt.Printf("%d, %s\n", i, smsg.Cid()) if i > 0 && i%100 == 0 { fmt.Printf("catching up until latest message lands") @@ -103,7 +134,7 @@ var sendCsvCmd = &cli.Command{ msgCids = append(msgCids, smsg.Cid()) } - fmt.Println("waiting on messages") + fmt.Println("waiting on messages...") for _, msgCid := range msgCids { ml, err := api.StateWaitMsg(ctx, msgCid, 5, lapi.LookbackNoLimit, true) @@ -115,6 +146,7 @@ var sendCsvCmd = &cli.Command{ } } + fmt.Println("all sent messages succeeded") return nil }, }