diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index bce7be3d56d..87530c666ee 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -3,11 +3,14 @@ package main import ( "context" "encoding/csv" + "encoding/json" "fmt" "io" "os" + "runtime" "strconv" "strings" + "sync" "time" "github.com/filecoin-project/lotus/build" @@ -70,6 +73,266 @@ var auditsCmd = &cli.Command{ chainBalanceStateCmd, chainPledgeCmd, fillBalancesCmd, + duplicatedMessagesCmd, + }, +} + +var duplicatedMessagesCmd = &cli.Command{ + Name: "duplicate-messages", + Usage: "Check for duplicate messages included in a tipset.", + UsageText: `Check for duplicate messages included in a tipset. + +Due to Filecoin's expected consensus, a tipset may include the same message multiple times in +different blocks. The message will only be executed once. + +This command will find such duplicate messages and print them to standard out as newline-delimited +JSON. Status messages in the form of "H: $HEIGHT ($PROGRESS%)" will be printed to standard error for +every day of chain processed. +`, + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "parallel", + Usage: "the number of parallel threads for block processing", + DefaultText: "half the number of cores", + }, + &cli.IntFlag{ + Name: "start", + Usage: "the first epoch to check", + DefaultText: "genesis", + }, + &cli.IntFlag{ + Name: "end", + Usage: "the last epoch to check", + DefaultText: "the current head", + }, + &cli.IntSliceFlag{ + Name: "method", + Usage: "filter results by method number", + DefaultText: "all methods", + }, + &cli.StringSliceFlag{ + Name: "include-to", + Usage: "include only messages to the given address (does not perform address resolution)", + DefaultText: "all recipients", + }, + &cli.StringSliceFlag{ + Name: "include-from", + Usage: "include only messages from the given address (does not perform address resolution)", + DefaultText: "all senders", + }, + &cli.StringSliceFlag{ + Name: "exclude-to", + Usage: "exclude messages to the given address (does not perform address resolution)", + }, + &cli.StringSliceFlag{ + Name: "exclude-from", + Usage: "exclude messages from the given address (does not perform address resolution)", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + + defer closer() + ctx := lcli.ReqContext(cctx) + + var head *types.TipSet + if cctx.IsSet("end") { + epoch := abi.ChainEpoch(cctx.Int("end")) + head, err = api.ChainGetTipSetByHeight(ctx, epoch, types.EmptyTSK) + } else { + head, err = api.ChainHead(ctx) + } + if err != nil { + return err + } + + var printLk sync.Mutex + + threads := runtime.NumCPU() / 2 + if cctx.IsSet("parallel") { + threads = cctx.Int("int") + if threads <= 0 { + return fmt.Errorf("parallelism needs to be at least 1") + } + } else if threads == 0 { + threads = 1 // if we have one core, but who are we kidding... + } + + throttle := make(chan struct{}, threads) + + methods := map[abi.MethodNum]bool{} + for _, m := range cctx.IntSlice("method") { + if m < 0 { + return fmt.Errorf("expected method numbers to be non-negative") + } + methods[abi.MethodNum(m)] = true + } + + addressSet := func(flag string) (map[address.Address]bool, error) { + if !cctx.IsSet(flag) { + return nil, nil + } + addrs := cctx.StringSlice(flag) + set := make(map[address.Address]bool, len(addrs)) + for _, addrStr := range addrs { + addr, err := address.NewFromString(addrStr) + if err != nil { + return nil, fmt.Errorf("failed to parse address %s: %w", addrStr, err) + } + set[addr] = true + } + return set, nil + } + + onlyFrom, err := addressSet("include-from") + if err != nil { + return err + } + onlyTo, err := addressSet("include-to") + if err != nil { + return err + } + excludeFrom, err := addressSet("exclude-from") + if err != nil { + return err + } + excludeTo, err := addressSet("exclude-to") + if err != nil { + return err + } + + target := abi.ChainEpoch(cctx.Int("start")) + if target < 0 || target > head.Height() { + return fmt.Errorf("start height must be greater than 0 and less than the end height") + } + totalEpochs := head.Height() - target + + for target <= head.Height() { + select { + case throttle <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + + go func(ts *types.TipSet) { + defer func() { + <-throttle + }() + + type addrNonce struct { + s address.Address + n uint64 + } + anonce := func(m *types.Message) addrNonce { + return addrNonce{ + s: m.From, + n: m.Nonce, + } + } + + msgs := map[addrNonce]map[cid.Cid]*types.Message{} + + processMessage := func(c cid.Cid, m *types.Message) { + // Filter + if len(methods) > 0 && !methods[m.Method] { + return + } + if len(onlyFrom) > 0 && !onlyFrom[m.From] { + return + } + if len(onlyTo) > 0 && !onlyTo[m.To] { + return + } + if excludeFrom[m.From] || excludeTo[m.To] { + return + } + + // Record + msgSet, ok := msgs[anonce(m)] + if !ok { + msgSet = make(map[cid.Cid]*types.Message, 1) + msgs[anonce(m)] = msgSet + } + msgSet[c] = m + } + + encoder := json.NewEncoder(os.Stdout) + + for _, bh := range ts.Blocks() { + bms, err := api.ChainGetBlockMessages(ctx, bh.Cid()) + if err != nil { + fmt.Fprintln(os.Stderr, "ERROR: ", err) + return + } + + for i, m := range bms.BlsMessages { + processMessage(bms.Cids[i], m) + } + + for i, m := range bms.SecpkMessages { + processMessage(bms.Cids[len(bms.BlsMessages)+i], &m.Message) + } + } + for _, ms := range msgs { + if len(ms) == 1 { + continue + } + type Msg struct { + Cid string + Value string + Method uint64 + } + grouped := map[string][]Msg{} + for c, m := range ms { + addr := m.To.String() + grouped[addr] = append(grouped[addr], Msg{ + Cid: c.String(), + Value: types.FIL(m.Value).String(), + Method: uint64(m.Method), + }) + } + printLk.Lock() + err := encoder.Encode(grouped) + if err != nil { + fmt.Fprintln(os.Stderr, "ERROR: ", err) + } + printLk.Unlock() + } + }(head) + + if head.Parents().IsEmpty() { + break + } + + head, err = api.ChainGetTipSet(ctx, head.Parents()) + if err != nil { + return err + } + + if head.Height()%2880 == 0 { + printLk.Lock() + fmt.Fprintf(os.Stderr, "H: %s (%d%%)\n", head.Height(), (100*(head.Height()-target))/totalEpochs) + printLk.Unlock() + } + } + + for i := 0; i < threads; i++ { + select { + case throttle <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + + } + + printLk.Lock() + fmt.Fprintf(os.Stderr, "H: %s (100%%)\n", head.Height()) + printLk.Unlock() + + return nil }, }