From ed3a5085a7432b2b3979646500279e528f85dc02 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Wed, 19 Jul 2023 11:32:53 +0000 Subject: [PATCH] Add new lotus-shed command for backfillling actor events --- cmd/lotus-shed/indexes.go | 283 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 283 insertions(+) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 24a9a817f00..15eb4f2c0be 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -1,6 +1,7 @@ package main import ( + "context" "database/sql" "fmt" "path" @@ -8,12 +9,17 @@ import ( "strings" "github.com/mitchellh/go-homedir" + "github.com/multiformats/go-varint" "github.com/urfave/cli/v2" "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + builtintypes "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/exitcode" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" lcli "github.com/filecoin-project/lotus/cli" ) @@ -31,6 +37,283 @@ var indexesCmd = &cli.Command{ withCategory("msgindex", backfillMsgIndexCmd), withCategory("msgindex", pruneMsgIndexCmd), withCategory("txhash", backfillTxHashCmd), + withCategory("events", backfillEventsCmd), + }, +} + +var backfillEventsCmd = &cli.Command{ + Name: "backfill-events", + Usage: "Backfill the events.db for a number of epochs starting from a specified height", + Flags: []cli.Flag{ + &cli.UintFlag{ + Name: "from", + Value: 0, + Usage: "the tipset height to start backfilling from (0 is head of chain)", + }, + &cli.IntFlag{ + Name: "epochs", + Value: 2000, + Usage: "the number of epochs to backfill", + }, + }, + Action: func(cctx *cli.Context) error { + srv, err := lcli.GetFullNodeServices(cctx) + if err != nil { + return err + } + defer srv.Close() //nolint:errcheck + + api := srv.FullNodeAPI() + ctx := lcli.ReqContext(cctx) + + // currTs will be the tipset where we start backfilling from + currTs, err := api.ChainHead(ctx) + if err != nil { + return err + } + if cctx.IsSet("from") { + // we need to fetch the tipset after the epoch being specified since we will need to advance currTs + currTs, err = api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(cctx.Int("from")+1), currTs.Key()) + if err != nil { + return err + } + } + + // advance currTs by one epoch and maintain prevTs as the previous tipset (this allows us to easily use the ChainGetParentMessages/Receipt API) + prevTs := currTs + currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) + if err != nil { + return fmt.Errorf("failed to load tipset %s: %w", prevTs.Parents(), err) + } + + epochs := cctx.Int("epochs") + + basePath, err := homedir.Expand(cctx.String("repo")) + if err != nil { + return err + } + + dbPath := path.Join(basePath, "sqlite", "events.db") + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return err + } + + defer func() { + err := db.Close() + if err != nil { + fmt.Printf("ERROR: closing db: %s", err) + } + }() + + stmtSelectEvent, err := db.Prepare("SELECT MAX(id) from event WHERE height=? AND tipset_key=? and tipset_key_cid=? and emitter_addr=? and event_index=? and message_cid=? and message_index=? and reverted=false") + if err != nil { + return err + } + stmtSelectEntry, err := db.Prepare("SELECT EXISTS(SELECT 1 from event_entry WHERE event_id=? and indexed=? and flags=? and key=? and codec=? and value=?)") + if err != nil { + return err + } + stmtEvent, err := db.Prepare("INSERT INTO event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)") + if err != nil { + return err + } + stmtEntry, err := db.Prepare("INSERT INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)") + if err != nil { + return err + } + + addressLookups := make(map[abi.ActorID]address.Address) + + resolveFn := func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { + // we only want to match using f4 addresses + idAddr, err := address.NewIDAddress(uint64(emitter)) + if err != nil { + return address.Undef, false + } + + actor, err := api.StateGetActor(ctx, idAddr, ts.Key()) + if err != nil || actor.Address == nil { + return address.Undef, false + } + + // if robust address is not f4 then we won't match against it so bail early + if actor.Address.Protocol() != address.Delegated { + return address.Undef, false + } + + // we have an f4 address, make sure it's assigned by the EAM + if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID { + return address.Undef, false + } + return *actor.Address, true + } + + isIndexedValue := func(b uint8) bool { + // currently we mark the full entry as indexed if either the key + // or the value are indexed; in the future we will need finer-grained + // management of indices + return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0 + } + + var eventsAffected int64 + var entriesAffected int64 + for i := 0; i < epochs; i++ { + select { + case <-ctx.Done(): + return nil + default: + } + + log.Infof("[%d] backfilling actor events epoch:%d, eventsAffected:%d, entriesAffected:%d", i, currTs.Height(), eventsAffected, entriesAffected) + + blockCid := prevTs.Blocks()[0].Cid() + + // get messages for the parent of the previous tipset (which will be currTs) + msgs, err := api.ChainGetParentMessages(ctx, blockCid) + if err != nil { + return fmt.Errorf("failed to get parent messages for block %s: %w", blockCid, err) + } + + // get receipts for the parent of the previous tipset (which will be currTs) + receipts, err := api.ChainGetParentReceipts(ctx, blockCid) + if err != nil { + return fmt.Errorf("failed to get parent receipts for block %s: %w", blockCid, err) + } + + if len(msgs) != len(receipts) { + return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(msgs), len(receipts)) + } + + // loop over each message receipt and backfill the events + for idx, receipt := range receipts { + msg := msgs[idx] + + if receipt.ExitCode != exitcode.Ok { + continue + } + + if receipt.EventsRoot == nil { + continue + } + + events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot) + if err != nil { + return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err) + } + + for eventIdx, event := range events { + addr, found := addressLookups[event.Emitter] + if !found { + var ok bool + addr, ok = resolveFn(ctx, event.Emitter, currTs) + if !ok { + // not an address we will be able to match against + continue + } + addressLookups[event.Emitter] = addr + } + + tsKeyCid, err := currTs.Key().Cid() + if err != nil { + return fmt.Errorf("failed to get tipset key cid: %w", err) + } + + // select the highest event id that exists in database, or null if none exists + var entryID sql.NullInt64 + err = stmtSelectEvent.QueryRow( + currTs.Height(), + currTs.Key().Bytes(), + tsKeyCid.Bytes(), + addr.Bytes(), + eventIdx, + msg.Cid.Bytes(), + idx, + ).Scan(&entryID) + if err != nil { + return fmt.Errorf("error checking if event exists: %w", err) + } + + if !entryID.Valid { + // event does not exist, lets backfill it + res, err := stmtEvent.Exec( + currTs.Height(), // height + currTs.Key().Bytes(), // tipset_key + tsKeyCid.Bytes(), // tipset_key_cid + addr.Bytes(), // emitter_addr + eventIdx, // event_index + msg.Cid.Bytes(), // message_cid + idx, // message_index + false, // reverted + ) + + if err != nil { + return fmt.Errorf("error inserting event: %w", err) + } + + entryID.Int64, err = res.LastInsertId() + if err != nil { + return fmt.Errorf("could not get last insert id: %w", err) + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("error getting rows affected: %s", err) + } + + eventsAffected += rowsAffected + } + + for _, entry := range event.Entries { + // check if entry exists + var exists bool + err = stmtSelectEntry.QueryRow( + entryID.Int64, + isIndexedValue(entry.Flags), + []byte{entry.Flags}, + entry.Key, + entry.Codec, + entry.Value, + ).Scan(&exists) + if err != nil { + return fmt.Errorf("error checking if entry exists: %w", err) + } + + if !exists { + // entry does not exist, lets backfill it + res, err := stmtEntry.Exec( + entryID.Int64, // event_id + isIndexedValue(entry.Flags), // indexed + []byte{entry.Flags}, // flags + entry.Key, // key + entry.Codec, // codec + entry.Value, // value + ) + if err != nil { + return fmt.Errorf("error inserting entry: %w", err) + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("error getting rows affected: %s", err) + } + entriesAffected += rowsAffected + } + } + } + } + + // advance prevTs and currTs up the chain + prevTs = currTs + currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) + if err != nil { + return fmt.Errorf("failed to load tipset %s: %w", currTs, err) + } + } + + log.Infof("backfilling events complete, eventsAffected:%d, entriesAffected:%d", eventsAffected, entriesAffected) + + return nil }, }