From b89a29803be2c22da0ef7d14b1b42949e4f58162 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 8 Nov 2024 06:09:49 +0000 Subject: [PATCH] feat(shed): add commands for importing/exporting datastore snapshots (#12685) This makes it possible to import/export arbitrary datastore snapshots. --- CHANGELOG.md | 1 + cmd/lotus-shed/datastore.go | 277 ++++++++++++++++++++++++++++ cmd/lotus-shed/shedgen/cbor_gen.go | 155 ++++++++++++++++ cmd/lotus-shed/shedgen/datastore.go | 6 + gen/main.go | 1 + 5 files changed, 440 insertions(+) create mode 100644 cmd/lotus-shed/shedgen/datastore.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 97ec6a7ae07..ef512d3f33f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - Implement new `lotus f3` CLI commands to list F3 participants, dump manifest, get/list finality certificates and check the F3 status. ([filecoin-project/lotus#12617](https://github.com/filecoin-project/lotus/pull/12617), [filecoin-project/lotus#12627](https://github.com/filecoin-project/lotus/pull/12627)) - Return a `"data"` field on the `"error"` returned from RPC when `eth_call` and `eth_estimateGas` APIs encounter `execution reverted` errors. ([filecoin-project/lotus#12553](https://github.com/filecoin-project/lotus/pull/12553)) - Implement `EthGetTransactionByBlockNumberAndIndex` (`eth_getTransactionByBlockNumberAndIndex`) and `EthGetTransactionByBlockHashAndIndex` (`eth_getTransactionByBlockHashAndIndex`) methods. ([filecoin-project/lotus#12618](https://github.com/filecoin-project/lotus/pull/12618)) +- Add a set of `lotus-shed datastore` commands for importing, exporting, and clearing parts of the datastore ([filecoin-project/lotus#12685](https://github.com/filecoin-project/lotus/pull/12685)): ## Bug Fixes - Fix a bug in the `lotus-shed indexes backfill-events` command that may result in either duplicate events being backfilled where there are existing events (such an operation *should* be idempotent) or events erroneously having duplicate `logIndex` values when queried via ETH APIs. ([filecoin-project/lotus#12567](https://github.com/filecoin-project/lotus/pull/12567)) diff --git a/cmd/lotus-shed/datastore.go b/cmd/lotus-shed/datastore.go index 8e31ccc3c44..ad68145852c 100644 --- a/cmd/lotus-shed/datastore.go +++ b/cmd/lotus-shed/datastore.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "encoding/json" + "errors" "fmt" "io" "os" @@ -17,10 +18,12 @@ import ( "github.com/mitchellh/go-homedir" "github.com/polydawn/refmt/cbor" "github.com/urfave/cli/v2" + cbg "github.com/whyrusleeping/cbor-gen" "go.uber.org/multierr" "golang.org/x/xerrors" lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/cmd/lotus-shed/shedgen" "github.com/filecoin-project/lotus/lib/backupds" "github.com/filecoin-project/lotus/node/repo" ) @@ -34,6 +37,9 @@ var datastoreCmd = &cli.Command{ datastoreGetCmd, datastoreRewriteCmd, datastoreVlog2CarCmd, + datastoreImportCmd, + datastoreExportCmd, + datastoreClearCmd, }, } @@ -106,6 +112,98 @@ var datastoreListCmd = &cli.Command{ }, } +var datastoreClearCmd = &cli.Command{ + Name: "clear", + Description: "Clear a part or all of the given datastore.", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "repo-type", + Usage: "node type (FullNode, StorageMiner, Worker, Wallet)", + Value: "FullNode", + }, + &cli.StringFlag{ + Name: "prefix", + Usage: "only delete key/values with the given prefix", + Value: "", + }, + &cli.BoolFlag{ + Name: "really-do-it", + Usage: "must be specified for the action to take effect", + }, + }, + ArgsUsage: "[namespace]", + Action: func(cctx *cli.Context) (_err error) { + if cctx.NArg() != 2 { + return xerrors.Errorf("requires 2 arguments: the datastore prefix") + } + namespace := cctx.Args().Get(0) + + r, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return xerrors.Errorf("opening fs repo: %w", err) + } + + exists, err := r.Exists() + if err != nil { + return err + } + if !exists { + return xerrors.Errorf("lotus repo doesn't exist") + } + + lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type"))) + if err != nil { + return err + } + defer lr.Close() //nolint:errcheck + + ds, err := lr.Datastore(cctx.Context, namespace) + if err != nil { + return err + } + defer func() { + _err = multierr.Append(_err, ds.Close()) + }() + + dryRun := !cctx.Bool("really-do-it") + + query, err := ds.Query(cctx.Context, dsq.Query{ + Prefix: cctx.String("prefix"), + }) + if err != nil { + return err + } + defer query.Close() //nolint:errcheck + + batch, err := ds.Batch(cctx.Context) + if err != nil { + return xerrors.Errorf("failed to create a datastore batch: %w", err) + } + + for res, ok := query.NextSync(); ok; res, ok = query.NextSync() { + if res.Error != nil { + return xerrors.Errorf("failed to read from datastore: %w", res.Error) + } + _, _ = fmt.Fprintf(cctx.App.Writer, "deleting: %q\n", res.Key) + if !dryRun { + if err := batch.Delete(cctx.Context, datastore.NewKey(res.Key)); err != nil { + return xerrors.Errorf("failed to delete %q: %w", res.Key, err) + } + } + } + + if !dryRun { + if err := batch.Commit(cctx.Context); err != nil { + return xerrors.Errorf("failed to flush the batch: %w", err) + } + } else { + _, _ = fmt.Fprintln(cctx.App.Writer, "NOTE: dry run complete, re-run with --really-do-it to actually delete this state.") + } + + return nil + }, +} + var datastoreGetCmd = &cli.Command{ Name: "get", Description: "list datastore keys", @@ -158,6 +256,185 @@ var datastoreGetCmd = &cli.Command{ }, } +var datastoreExportCmd = &cli.Command{ + Name: "export", + Description: "Export part or all of the specified datastore, appending to the specified datastore snapshot.", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "repo-type", + Usage: "node type (FullNode, StorageMiner, Worker, Wallet)", + Value: "FullNode", + }, + &cli.StringFlag{ + Name: "prefix", + Usage: "export only keys with the given prefix", + Value: "", + }, + }, + ArgsUsage: "[namespace filename]", + Action: func(cctx *cli.Context) (_err error) { + if cctx.NArg() != 2 { + return xerrors.Errorf("requires 2 arguments: the datastore prefix and the filename to which the snapshot will be written") + } + namespace := cctx.Args().Get(0) + fname := cctx.Args().Get(1) + + snapshot, err := os.OpenFile(fname, os.O_WRONLY|os.O_APPEND|os.O_CREATE, os.ModePerm) + if err != nil { + return xerrors.Errorf("failed to open snapshot: %w", err) + } + defer func() { + _err = multierr.Append(_err, snapshot.Close()) + }() + + r, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return xerrors.Errorf("opening fs repo: %w", err) + } + + exists, err := r.Exists() + if err != nil { + return err + } + if !exists { + return xerrors.Errorf("lotus repo doesn't exist") + } + + lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type"))) + if err != nil { + return err + } + defer lr.Close() //nolint:errcheck + + ds, err := lr.Datastore(cctx.Context, namespace) + if err != nil { + return err + } + defer func() { + _err = multierr.Append(_err, ds.Close()) + }() + + query, err := ds.Query(cctx.Context, dsq.Query{ + Prefix: cctx.String("prefix"), + }) + if err != nil { + return err + } + + bufWriter := bufio.NewWriter(snapshot) + snapshotWriter := cbg.NewCborWriter(bufWriter) + for res, ok := query.NextSync(); ok; res, ok = query.NextSync() { + if res.Error != nil { + return xerrors.Errorf("failed to read from datastore: %w", res.Error) + } + + entry := shedgen.DatastoreEntry{ + Key: []byte(res.Key), + Value: res.Value, + } + + _, _ = fmt.Fprintf(cctx.App.Writer, "exporting: %q\n", res.Key) + if err := entry.MarshalCBOR(snapshotWriter); err != nil { + return xerrors.Errorf("failed to write %q to snapshot: %w", res.Key, err) + } + } + if err := bufWriter.Flush(); err != nil { + return xerrors.Errorf("failed to flush snapshot: %w", err) + } + + return nil + }, +} + +var datastoreImportCmd = &cli.Command{ + Name: "import", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "repo-type", + Usage: "node type (FullNode, StorageMiner, Worker, Wallet)", + Value: "FullNode", + }, + }, + Description: "Import the specified datastore snapshot.", + ArgsUsage: "[namespace filename]", + Action: func(cctx *cli.Context) (_err error) { + if cctx.NArg() != 2 { + return xerrors.Errorf("requires 2 arguments: the datastore prefix and the filename of the snapshot to import") + } + namespace := cctx.Args().Get(0) + fname := cctx.Args().Get(1) + + snapshot, err := os.Open(fname) + if err != nil { + return xerrors.Errorf("failed to open snapshot: %w", err) + } + defer snapshot.Close() //nolint:errcheck + + r, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return xerrors.Errorf("opening fs repo: %w", err) + } + + exists, err := r.Exists() + if err != nil { + return err + } + if !exists { + return xerrors.Errorf("lotus repo doesn't exist") + } + + lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type"))) + if err != nil { + return err + } + defer lr.Close() //nolint:errcheck + + ds, err := lr.Datastore(cctx.Context, namespace) + if err != nil { + return err + } + defer func() { + _err = multierr.Append(_err, ds.Close()) + }() + + batch, err := ds.Batch(cctx.Context) + if err != nil { + return err + } + + dryRun := !cctx.Bool("really-do-it") + + snapshotReader := cbg.NewCborReader(bufio.NewReader(snapshot)) + for { + var entry shedgen.DatastoreEntry + if err := entry.UnmarshalCBOR(snapshotReader); err != nil { + if errors.Is(err, io.EOF) { + break + } + return xerrors.Errorf("failed to read entry from snapshot: %w", err) + } + + _, _ = fmt.Fprintf(cctx.App.Writer, "importing: %q\n", string(entry.Key)) + + if !dryRun { + key := datastore.NewKey(string(entry.Key)) + if err := batch.Put(cctx.Context, key, entry.Value); err != nil { + return xerrors.Errorf("failed to put %q: %w", key, err) + } + } + } + + if !dryRun { + if err := batch.Commit(cctx.Context); err != nil { + return xerrors.Errorf("failed to commit batch: %w", err) + } + } else { + _, _ = fmt.Fprintln(cctx.App.Writer, "NOTE: dry run complete, re-run with --really-do-it to actually import the datastore snapshot, overwriting any conflicting state.") + } + return nil + }, +} + var datastoreBackupCmd = &cli.Command{ Name: "backup", Description: "manage datastore backups", diff --git a/cmd/lotus-shed/shedgen/cbor_gen.go b/cmd/lotus-shed/shedgen/cbor_gen.go index 10b41827ffd..287ddbdaeb0 100644 --- a/cmd/lotus-shed/shedgen/cbor_gen.go +++ b/cmd/lotus-shed/shedgen/cbor_gen.go @@ -148,3 +148,158 @@ func (t *CarbNode) UnmarshalCBOR(r io.Reader) (err error) { return nil } +func (t *DatastoreEntry) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write([]byte{162}); err != nil { + return err + } + + // t.Key ([]uint8) (slice) + if len("Key") > 8192 { + return xerrors.Errorf("Value in field \"Key\" was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("Key"))); err != nil { + return err + } + if _, err := cw.WriteString(string("Key")); err != nil { + return err + } + + if len(t.Key) > 2097152 { + return xerrors.Errorf("Byte array in field t.Key was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(t.Key))); err != nil { + return err + } + + if _, err := cw.Write(t.Key); err != nil { + return err + } + + // t.Value ([]uint8) (slice) + if len("Value") > 8192 { + return xerrors.Errorf("Value in field \"Value\" was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("Value"))); err != nil { + return err + } + if _, err := cw.WriteString(string("Value")); err != nil { + return err + } + + if len(t.Value) > 2097152 { + return xerrors.Errorf("Byte array in field t.Value was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(t.Value))); err != nil { + return err + } + + if _, err := cw.Write(t.Value); err != nil { + return err + } + + return nil +} + +func (t *DatastoreEntry) UnmarshalCBOR(r io.Reader) (err error) { + *t = DatastoreEntry{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajMap { + return fmt.Errorf("cbor input should be of type map") + } + + if extra > cbg.MaxLength { + return fmt.Errorf("DatastoreEntry: map struct too large (%d)", extra) + } + + var name string + n := extra + + for i := uint64(0); i < n; i++ { + + { + sval, err := cbg.ReadStringWithMax(cr, 8192) + if err != nil { + return err + } + + name = string(sval) + } + + switch name { + // t.Key ([]uint8) (slice) + case "Key": + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > 2097152 { + return fmt.Errorf("t.Key: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + + if extra > 0 { + t.Key = make([]uint8, extra) + } + + if _, err := io.ReadFull(cr, t.Key); err != nil { + return err + } + + // t.Value ([]uint8) (slice) + case "Value": + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > 2097152 { + return fmt.Errorf("t.Value: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + + if extra > 0 { + t.Value = make([]uint8, extra) + } + + if _, err := io.ReadFull(cr, t.Value); err != nil { + return err + } + + default: + // Field doesn't exist on this type, so ignore it + cbg.ScanForLinks(r, func(cid.Cid) {}) + } + } + + return nil +} diff --git a/cmd/lotus-shed/shedgen/datastore.go b/cmd/lotus-shed/shedgen/datastore.go new file mode 100644 index 00000000000..f8915c29376 --- /dev/null +++ b/cmd/lotus-shed/shedgen/datastore.go @@ -0,0 +1,6 @@ +package shedgen + +type DatastoreEntry struct { + Key []byte + Value []byte +} diff --git a/gen/main.go b/gen/main.go index d1d37cb6a9a..261a85bcb00 100644 --- a/gen/main.go +++ b/gen/main.go @@ -56,6 +56,7 @@ func generateBlockstore() error { func generateLotusShed() error { return gen.WriteMapEncodersToFile("./cmd/lotus-shed/shedgen/cbor_gen.go", "shedgen", shedgen.CarbNode{}, + shedgen.DatastoreEntry{}, ) }