Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shed: command to list duplicate messages in tipsets (steb) #5847

Merged
merged 4 commits into from
Apr 29, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 201 additions & 0 deletions cmd/lotus-shed/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package main
import (
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"

"github.com/filecoin-project/lotus/build"
Expand Down Expand Up @@ -70,6 +73,204 @@ var auditsCmd = &cli.Command{
chainBalanceStateCmd,
chainPledgeCmd,
fillBalancesCmd,
duplicatedMessagesCmd,
},
}

var duplicatedMessagesCmd = &cli.Command{
Name: "duplicate-messages",
Usage: "Check for duplicate messages included in a tipset.",
UsageText: `Check for duplicate messages included in a tipset.

Due to Filecoin's expected consensus, a tipset may include the same message multiple times in
different blocks. The message will only be executed once.

This command will find such duplicate messages and print them to standard out as newline-delimited
JSON. Status messages in the form of "H: $HEIGHT ($PROGRESS%)" will be printed to standard error for
every day of chain processed.
`,
Flags: []cli.Flag{
&cli.IntFlag{
Name: "parallel",
Usage: "the number of parallel threads for block processing",
DefaultText: "half the number of cores",
},
&cli.IntFlag{
Name: "start",
Usage: "the first epoch to check",
DefaultText: "genesis",
},
&cli.IntFlag{
Name: "end",
Usage: "the last epoch to check",
DefaultText: "the current head",
},
&cli.IntSliceFlag{
Name: "method",
Usage: "Filter results by method number.",
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
DefaultText: "all methods",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}

defer closer()
ctx := lcli.ReqContext(cctx)

var head *types.TipSet
if cctx.IsSet("end") {
epoch := abi.ChainEpoch(cctx.Int("end"))
head, err = api.ChainGetTipSetByHeight(ctx, epoch, types.EmptyTSK)
} else {
head, err = api.ChainHead(ctx)
}
if err != nil {
return err
}

var printLk sync.Mutex

threads := runtime.NumCPU() / 2
if cctx.IsSet("parallel") {
threads = cctx.Int("int")
if threads <= 0 {
return fmt.Errorf("parallelism needs to be at least 1")
}
} else if threads == 0 {
threads = 1 // if we have one core, but who are we kidding...
}

throttle := make(chan struct{}, threads)

methods := make(map[abi.MethodNum]bool)
for _, m := range cctx.IntSlice("method") {
if m < 0 {
return fmt.Errorf("expected method numbers to be non-negative")
}
methods[abi.MethodNum(m)] = true
}

target := abi.ChainEpoch(cctx.Int("start"))
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
totalEpochs := head.Height() - target

for target <= head.Height() {
select {
case throttle <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}

go func(ts *types.TipSet) {
defer func() {
<-throttle
}()

type addrNonce struct {
s address.Address
n uint64
}
anonce := func(m *types.Message) addrNonce {
return addrNonce{
s: m.From,
n: m.Nonce,
}
}

msgs := map[addrNonce]map[cid.Cid]*types.Message{}

encoder := json.NewEncoder(os.Stdout)

for _, bh := range ts.Blocks() {
bms, err := api.ChainGetBlockMessages(ctx, bh.Cid())
if err != nil {
fmt.Fprintln(os.Stderr, "ERROR: ", err)
return
}

for i, m := range bms.BlsMessages {
if len(methods) > 0 && !methods[m.Method] {
continue
}
c, ok := msgs[anonce(m)]
if !ok {
c = make(map[cid.Cid]*types.Message, 1)
msgs[anonce(m)] = c
}
c[bms.Cids[i]] = m
}

for i, m := range bms.SecpkMessages {
if len(methods) > 0 && !methods[m.Message.Method] {
continue
}
c, ok := msgs[anonce(&m.Message)]
if !ok {
c = make(map[cid.Cid]*types.Message, 1)
msgs[anonce(&m.Message)] = c
}
c[bms.Cids[len(bms.BlsMessages)+i]] = &m.Message
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
}
}
for _, ms := range msgs {
if len(ms) == 1 {
continue
}
type Msg struct {
Cid string
Value string
Method uint64
}
grouped := map[string][]Msg{}
for c, m := range ms {
addr := m.To.String()
grouped[addr] = append(grouped[addr], Msg{
Cid: c.String(),
Value: types.FIL(m.Value).String(),
Method: uint64(m.Method),
})
}
printLk.Lock()
err := encoder.Encode(grouped)
if err != nil {
fmt.Fprintln(os.Stderr, "ERROR: ", err)
}
printLk.Unlock()
}
}(head)

if head.Parents().IsEmpty() {
break
}

head, err = api.ChainGetTipSet(ctx, head.Parents())
if err != nil {
return err
}

if head.Height()%2880 == 0 {
printLk.Lock()
fmt.Fprintf(os.Stderr, "H: %s (%d%%)\n", head.Height(), (100*(head.Height()-target))/totalEpochs)
printLk.Unlock()
}
}

for i := 0; i < threads; i++ {
select {
case throttle <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}

}

printLk.Lock()
fmt.Fprintf(os.Stderr, "H: %s (100%%)\n", head.Height())
printLk.Unlock()

return nil
},
}

Expand Down