Skip to content

Commit

Permalink
tvx extract: more tipset extraction goodness.
Browse files Browse the repository at this point in the history
- ability to extract a tipset range into individual vectors.
- ability to extract a tipset range and squash into a single multi-tipset vector.
- mark statediff output deterministically, so it can be extracted by tooling.
- ability to execute callbacks between tipsets in the driver.
- implement save-balances callback.
  • Loading branch information
raulk committed Dec 27, 2020
1 parent bb5a92e commit e79a6f2
Show file tree
Hide file tree
Showing 9 changed files with 469 additions and 174 deletions.
155 changes: 129 additions & 26 deletions cmd/tvx/exec.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,48 @@
package main

import (
"bufio"
"encoding/json"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"

"github.com/fatih/color"
"github.com/filecoin-project/go-address"
cbornode "github.com/ipfs/go-ipld-cbor"
"github.com/urfave/cli/v2"

"github.com/filecoin-project/lotus/conformance"

"github.com/filecoin-project/test-vectors/schema"

"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/conformance"
"github.com/filecoin-project/lotus/lib/blockstore"
)

var execFlags struct {
file string
out string
driverOpts cli.StringSlice
fallbackBlockstore bool
}

const (
optSaveBalances = "save-balances"
)

var execCmd = &cli.Command{
Name: "exec",
Description: "execute one or many test vectors against Lotus; supplied as a single JSON file, or a ndjson stdin stream",
Action: runExecLotus,
Description: "execute one or many test vectors against Lotus; supplied as a single JSON file, a directory, or a ndjson stdin stream",
Action: runExec,
Flags: []cli.Flag{
&repoFlag,
&cli.StringFlag{
Name: "file",
Usage: "input file; if not supplied, the vector will be read from stdin",
Usage: "input file or directory; if not supplied, the vector will be read from stdin",
TakesFile: true,
Destination: &execFlags.file,
},
Expand All @@ -36,10 +51,20 @@ var execCmd = &cli.Command{
Usage: "sets the full node API as a fallback blockstore; use this if you're transplanting vectors and get block not found errors",
Destination: &execFlags.fallbackBlockstore,
},
&cli.StringFlag{
Name: "out",
Usage: "output directory where to save the results, only used when the input is a directory",
Destination: &execFlags.out,
},
&cli.StringSliceFlag{
Name: "driver-opt",
Usage: "comma-separated list of driver options (EXPERIMENTAL; will change), supported: 'save-balances=<dst>', 'pipeline-basefee' (unimplemented); only available in single-file mode",
Destination: &execFlags.driverOpts,
},
},
}

func runExecLotus(c *cli.Context) error {
func runExec(c *cli.Context) error {
if execFlags.fallbackBlockstore {
if err := initialize(c); err != nil {
return fmt.Errorf("fallback blockstore was enabled, but could not resolve lotus API endpoint: %w", err)
Expand All @@ -48,30 +73,97 @@ func runExecLotus(c *cli.Context) error {
conformance.FallbackBlockstoreGetter = FullAPI
}

if file := execFlags.file; file != "" {
// we have a single test vector supplied as a file.
file, err := os.Open(file)
if err != nil {
return fmt.Errorf("failed to open test vector: %w", err)
path := execFlags.file
if path == "" {
return execVectorsStdin()
}

fi, err := os.Stat(path)
if err != nil {
return err
}

if fi.IsDir() {
// we're in directory mode; ensure the out directory exists.
outdir := execFlags.out
if outdir == "" {
return fmt.Errorf("no output directory provided")
}
if err := ensureDir(outdir); err != nil {
return err
}
return execVectorDir(path, outdir)
}

var (
dec = json.NewDecoder(file)
tv schema.TestVector
)
// process tipset vector options.
if err := processTipsetOpts(); err != nil {
return err
}

if err = dec.Decode(&tv); err != nil {
return fmt.Errorf("failed to decode test vector: %w", err)
_, err = execVectorFile(new(conformance.LogReporter), path)
return err
}

func processTipsetOpts() error {
for _, opt := range execFlags.driverOpts.Value() {
switch ss := strings.Split(opt, "="); {
case ss[0] == optSaveBalances:
filename := ss[1]
log.Printf("saving balances after each tipset in: %s", filename)
balancesFile, err := os.Create(filename)
if err != nil {
return err
}
w := bufio.NewWriter(balancesFile)
cb := func(bs blockstore.Blockstore, params *conformance.ExecuteTipsetParams, res *conformance.ExecuteTipsetResult) {
cst := cbornode.NewCborStore(bs)
st, err := state.LoadStateTree(cst, res.PostStateRoot)
if err != nil {
return
}
_ = st.ForEach(func(addr address.Address, actor *types.Actor) error {
_, err := fmt.Fprintln(w, params.ExecEpoch, addr, actor.Balance)
return err
})
_ = w.Flush()
}
conformance.TipsetVectorOpts.OnTipsetApplied = append(conformance.TipsetVectorOpts.OnTipsetApplied, cb)

}

}
return nil
}

func execVectorDir(path string, outdir string) error {
files, err := filepath.Glob(filepath.Join(path, "*"))
if err != nil {
return fmt.Errorf("failed to glob input directory %s: %w", path, err)
}
for _, f := range files {
outfile := strings.TrimSuffix(filepath.Base(f), filepath.Ext(f)) + ".out"
outpath := filepath.Join(outdir, outfile)
outw, err := os.Create(outpath)
if err != nil {
return fmt.Errorf("failed to create file %s: %w", outpath, err)
}

return executeTestVector(tv)
log.Printf("processing vector %s; sending output to %s", f, outpath)
log.SetOutput(io.MultiWriter(os.Stderr, outw)) // tee the output.
_, _ = execVectorFile(new(conformance.LogReporter), f)
log.SetOutput(os.Stderr)
_ = outw.Close()
}
return nil
}

func execVectorsStdin() error {
r := new(conformance.LogReporter)
for dec := json.NewDecoder(os.Stdin); ; {
var tv schema.TestVector
switch err := dec.Decode(&tv); err {
case nil:
if err = executeTestVector(tv); err != nil {
if _, err = executeTestVector(r, tv); err != nil {
return err
}
case io.EOF:
Expand All @@ -84,19 +176,30 @@ func runExecLotus(c *cli.Context) error {
}
}

func executeTestVector(tv schema.TestVector) error {
func execVectorFile(r conformance.Reporter, path string) (diffs []string, error error) {
file, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("failed to open test vector: %w", err)
}

var tv schema.TestVector
if err = json.NewDecoder(file).Decode(&tv); err != nil {
return nil, fmt.Errorf("failed to decode test vector: %w", err)
}
return executeTestVector(r, tv)
}

func executeTestVector(r conformance.Reporter, tv schema.TestVector) (diffs []string, err error) {
log.Println("executing test vector:", tv.Meta.ID)

for _, v := range tv.Pre.Variants {
r := new(conformance.LogReporter)

switch class, v := tv.Class, v; class {
case "message":
conformance.ExecuteMessageVector(r, &tv, &v)
err, diffs = conformance.ExecuteMessageVector(r, &tv, &v)
case "tipset":
conformance.ExecuteTipsetVector(r, &tv, &v)
err, diffs = conformance.ExecuteTipsetVector(r, &tv, &v)
default:
return fmt.Errorf("test vector class %s not supported", class)
return nil, fmt.Errorf("test vector class %s not supported", class)
}

if r.Failed() {
Expand All @@ -106,5 +209,5 @@ func executeTestVector(tv schema.TestVector) error {
}
}

return nil
return diffs, err
}
57 changes: 55 additions & 2 deletions cmd/tvx/extract.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package main

import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"path/filepath"

"github.com/filecoin-project/test-vectors/schema"
"github.com/urfave/cli/v2"
)

Expand All @@ -21,6 +27,7 @@ type extractOpts struct {
retain string
precursor string
ignoreSanityChecks bool
squash bool
}

var extractFlags extractOpts
Expand Down Expand Up @@ -62,13 +69,13 @@ var extractCmd = &cli.Command{
},
&cli.StringFlag{
Name: "tsk",
Usage: "tipset key to extract into a vector",
Usage: "tipset key to extract into a vector, or range of tipsets in tsk1..tsk2 form",
Destination: &extractFlags.tsk,
},
&cli.StringFlag{
Name: "out",
Aliases: []string{"o"},
Usage: "file to write test vector to",
Usage: "file to write test vector to, or directory to write the batch to",
Destination: &extractFlags.file,
},
&cli.StringFlag{
Expand All @@ -93,6 +100,12 @@ var extractCmd = &cli.Command{
Value: false,
Destination: &extractFlags.ignoreSanityChecks,
},
&cli.BoolFlag{
Name: "squash",
Usage: "when extracting a tipset range, squash all tipsets into a single vector",
Value: false,
Destination: &extractFlags.squash,
},
},
}

Expand All @@ -106,3 +119,43 @@ func runExtract(_ *cli.Context) error {
return fmt.Errorf("unsupported vector class")
}
}

// writeVector writes the vector into the specified file, or to stdout if
// file is empty.
func writeVector(vector *schema.TestVector, file string) (err error) {
output := io.WriteCloser(os.Stdout)
if file := file; file != "" {
dir := filepath.Dir(file)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("unable to create directory %s: %w", dir, err)
}
output, err = os.Create(file)
if err != nil {
return err
}
defer output.Close() //nolint:errcheck
defer log.Printf("wrote test vector to file: %s", file)
}

enc := json.NewEncoder(output)
enc.SetIndent("", " ")
return enc.Encode(&vector)
}

// writeVectors writes each vector to a different file under the specified
// directory.
func writeVectors(dir string, vectors ...*schema.TestVector) error {
// verify the output directory exists.
if err := ensureDir(dir); err != nil {
return err
}
// write each vector to its file.
for _, v := range vectors {
id := v.Meta.ID
path := filepath.Join(dir, fmt.Sprintf("%s.json", id))
if err := writeVector(v, path); err != nil {
return err
}
}
return nil
}
26 changes: 1 addition & 25 deletions cmd/tvx/extract_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"path/filepath"

"github.com/fatih/color"
"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -316,28 +313,7 @@ func doExtractMessage(opts extractOpts) error {
},
},
}

return writeVector(vector, opts.file)
}

func writeVector(vector schema.TestVector, file string) (err error) {
output := io.WriteCloser(os.Stdout)
if file := file; file != "" {
dir := filepath.Dir(file)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("unable to create directory %s: %w", dir, err)
}
output, err = os.Create(file)
if err != nil {
return err
}
defer output.Close() //nolint:errcheck
defer log.Printf("wrote test vector to file: %s", file)
}

enc := json.NewEncoder(output)
enc.SetIndent("", " ")
return enc.Encode(&vector)
return writeVector(&vector, opts.file)
}

// resolveFromChain queries the chain for the provided message, using the block CID to
Expand Down
Loading

0 comments on commit e79a6f2

Please sign in to comment.