From 8a5950b3e95e1f0014922a5f9142a5fbe161b57f Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Wed, 19 Jul 2023 11:32:53 +0000 Subject: [PATCH 1/2] Add new lotus-shed command for backfillling actor events --- cmd/lotus-shed/indexes.go | 283 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 283 insertions(+) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 24a9a817f00..15eb4f2c0be 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -1,6 +1,7 @@ package main import ( + "context" "database/sql" "fmt" "path" @@ -8,12 +9,17 @@ import ( "strings" "github.com/mitchellh/go-homedir" + "github.com/multiformats/go-varint" "github.com/urfave/cli/v2" "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + builtintypes "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/exitcode" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" lcli "github.com/filecoin-project/lotus/cli" ) @@ -31,6 +37,283 @@ var indexesCmd = &cli.Command{ withCategory("msgindex", backfillMsgIndexCmd), withCategory("msgindex", pruneMsgIndexCmd), withCategory("txhash", backfillTxHashCmd), + withCategory("events", backfillEventsCmd), + }, +} + +var backfillEventsCmd = &cli.Command{ + Name: "backfill-events", + Usage: "Backfill the events.db for a number of epochs starting from a specified height", + Flags: []cli.Flag{ + &cli.UintFlag{ + Name: "from", + Value: 0, + Usage: "the tipset height to start backfilling from (0 is head of chain)", + }, + &cli.IntFlag{ + Name: "epochs", + Value: 2000, + Usage: "the number of epochs to backfill", + }, + }, + Action: func(cctx *cli.Context) error { + srv, err := lcli.GetFullNodeServices(cctx) + if err != nil { + return err + } + defer srv.Close() //nolint:errcheck + + api := srv.FullNodeAPI() + ctx := lcli.ReqContext(cctx) + + // currTs will be the tipset where we start backfilling from + currTs, err := api.ChainHead(ctx) + if err != nil { + return err + } + if cctx.IsSet("from") { + // we need to fetch the tipset after the epoch being specified since we will need to advance currTs + currTs, err = api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(cctx.Int("from")+1), currTs.Key()) + if err != nil { + return err + } + } + + // advance currTs by one epoch and maintain prevTs as the previous tipset (this allows us to easily use the ChainGetParentMessages/Receipt API) + prevTs := currTs + currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) + if err != nil { + return fmt.Errorf("failed to load tipset %s: %w", prevTs.Parents(), err) + } + + epochs := cctx.Int("epochs") + + basePath, err := homedir.Expand(cctx.String("repo")) + if err != nil { + return err + } + + dbPath := path.Join(basePath, "sqlite", "events.db") + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return err + } + + defer func() { + err := db.Close() + if err != nil { + fmt.Printf("ERROR: closing db: %s", err) + } + }() + + stmtSelectEvent, err := db.Prepare("SELECT MAX(id) from event WHERE height=? AND tipset_key=? and tipset_key_cid=? and emitter_addr=? and event_index=? and message_cid=? and message_index=? and reverted=false") + if err != nil { + return err + } + stmtSelectEntry, err := db.Prepare("SELECT EXISTS(SELECT 1 from event_entry WHERE event_id=? and indexed=? and flags=? and key=? and codec=? and value=?)") + if err != nil { + return err + } + stmtEvent, err := db.Prepare("INSERT INTO event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)") + if err != nil { + return err + } + stmtEntry, err := db.Prepare("INSERT INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)") + if err != nil { + return err + } + + addressLookups := make(map[abi.ActorID]address.Address) + + resolveFn := func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { + // we only want to match using f4 addresses + idAddr, err := address.NewIDAddress(uint64(emitter)) + if err != nil { + return address.Undef, false + } + + actor, err := api.StateGetActor(ctx, idAddr, ts.Key()) + if err != nil || actor.Address == nil { + return address.Undef, false + } + + // if robust address is not f4 then we won't match against it so bail early + if actor.Address.Protocol() != address.Delegated { + return address.Undef, false + } + + // we have an f4 address, make sure it's assigned by the EAM + if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID { + return address.Undef, false + } + return *actor.Address, true + } + + isIndexedValue := func(b uint8) bool { + // currently we mark the full entry as indexed if either the key + // or the value are indexed; in the future we will need finer-grained + // management of indices + return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0 + } + + var eventsAffected int64 + var entriesAffected int64 + for i := 0; i < epochs; i++ { + select { + case <-ctx.Done(): + return nil + default: + } + + log.Infof("[%d] backfilling actor events epoch:%d, eventsAffected:%d, entriesAffected:%d", i, currTs.Height(), eventsAffected, entriesAffected) + + blockCid := prevTs.Blocks()[0].Cid() + + // get messages for the parent of the previous tipset (which will be currTs) + msgs, err := api.ChainGetParentMessages(ctx, blockCid) + if err != nil { + return fmt.Errorf("failed to get parent messages for block %s: %w", blockCid, err) + } + + // get receipts for the parent of the previous tipset (which will be currTs) + receipts, err := api.ChainGetParentReceipts(ctx, blockCid) + if err != nil { + return fmt.Errorf("failed to get parent receipts for block %s: %w", blockCid, err) + } + + if len(msgs) != len(receipts) { + return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(msgs), len(receipts)) + } + + // loop over each message receipt and backfill the events + for idx, receipt := range receipts { + msg := msgs[idx] + + if receipt.ExitCode != exitcode.Ok { + continue + } + + if receipt.EventsRoot == nil { + continue + } + + events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot) + if err != nil { + return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err) + } + + for eventIdx, event := range events { + addr, found := addressLookups[event.Emitter] + if !found { + var ok bool + addr, ok = resolveFn(ctx, event.Emitter, currTs) + if !ok { + // not an address we will be able to match against + continue + } + addressLookups[event.Emitter] = addr + } + + tsKeyCid, err := currTs.Key().Cid() + if err != nil { + return fmt.Errorf("failed to get tipset key cid: %w", err) + } + + // select the highest event id that exists in database, or null if none exists + var entryID sql.NullInt64 + err = stmtSelectEvent.QueryRow( + currTs.Height(), + currTs.Key().Bytes(), + tsKeyCid.Bytes(), + addr.Bytes(), + eventIdx, + msg.Cid.Bytes(), + idx, + ).Scan(&entryID) + if err != nil { + return fmt.Errorf("error checking if event exists: %w", err) + } + + if !entryID.Valid { + // event does not exist, lets backfill it + res, err := stmtEvent.Exec( + currTs.Height(), // height + currTs.Key().Bytes(), // tipset_key + tsKeyCid.Bytes(), // tipset_key_cid + addr.Bytes(), // emitter_addr + eventIdx, // event_index + msg.Cid.Bytes(), // message_cid + idx, // message_index + false, // reverted + ) + + if err != nil { + return fmt.Errorf("error inserting event: %w", err) + } + + entryID.Int64, err = res.LastInsertId() + if err != nil { + return fmt.Errorf("could not get last insert id: %w", err) + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("error getting rows affected: %s", err) + } + + eventsAffected += rowsAffected + } + + for _, entry := range event.Entries { + // check if entry exists + var exists bool + err = stmtSelectEntry.QueryRow( + entryID.Int64, + isIndexedValue(entry.Flags), + []byte{entry.Flags}, + entry.Key, + entry.Codec, + entry.Value, + ).Scan(&exists) + if err != nil { + return fmt.Errorf("error checking if entry exists: %w", err) + } + + if !exists { + // entry does not exist, lets backfill it + res, err := stmtEntry.Exec( + entryID.Int64, // event_id + isIndexedValue(entry.Flags), // indexed + []byte{entry.Flags}, // flags + entry.Key, // key + entry.Codec, // codec + entry.Value, // value + ) + if err != nil { + return fmt.Errorf("error inserting entry: %w", err) + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("error getting rows affected: %s", err) + } + entriesAffected += rowsAffected + } + } + } + } + + // advance prevTs and currTs up the chain + prevTs = currTs + currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) + if err != nil { + return fmt.Errorf("failed to load tipset %s: %w", currTs, err) + } + } + + log.Infof("backfilling events complete, eventsAffected:%d, entriesAffected:%d", eventsAffected, entriesAffected) + + return nil }, } From 719215122bc3a64720a0d326b9e2fe8bbd5fe1f0 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Mon, 24 Jul 2023 12:53:49 +0000 Subject: [PATCH 2/2] Address PR feedback and move each epoch inside own tx --- cmd/lotus-shed/indexes.go | 197 ++++++++++++++++++++------------------ 1 file changed, 103 insertions(+), 94 deletions(-) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 15eb4f2c0be..be7d43e0513 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/exitcode" + lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" lcli "github.com/filecoin-project/lotus/cli" @@ -106,23 +107,6 @@ var backfillEventsCmd = &cli.Command{ } }() - stmtSelectEvent, err := db.Prepare("SELECT MAX(id) from event WHERE height=? AND tipset_key=? and tipset_key_cid=? and emitter_addr=? and event_index=? and message_cid=? and message_index=? and reverted=false") - if err != nil { - return err - } - stmtSelectEntry, err := db.Prepare("SELECT EXISTS(SELECT 1 from event_entry WHERE event_id=? and indexed=? and flags=? and key=? and codec=? and value=?)") - if err != nil { - return err - } - stmtEvent, err := db.Prepare("INSERT INTO event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)") - if err != nil { - return err - } - stmtEntry, err := db.Prepare("INSERT INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)") - if err != nil { - return err - } - addressLookups := make(map[abi.ActorID]address.Address) resolveFn := func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { @@ -156,34 +140,31 @@ var backfillEventsCmd = &cli.Command{ return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0 } - var eventsAffected int64 - var entriesAffected int64 - for i := 0; i < epochs; i++ { - select { - case <-ctx.Done(): - return nil - default: - } - - log.Infof("[%d] backfilling actor events epoch:%d, eventsAffected:%d, entriesAffected:%d", i, currTs.Height(), eventsAffected, entriesAffected) - - blockCid := prevTs.Blocks()[0].Cid() + var totalEventsAffected int64 + var totalEntriesAffected int64 - // get messages for the parent of the previous tipset (which will be currTs) - msgs, err := api.ChainGetParentMessages(ctx, blockCid) + processHeight := func(ctx context.Context, cnt int, msgs []lapi.Message, receipts []*types.MessageReceipt) error { + tx, err := db.BeginTx(ctx, nil) if err != nil { - return fmt.Errorf("failed to get parent messages for block %s: %w", blockCid, err) + return fmt.Errorf("failed to start transaction: %w", err) } + defer tx.Rollback() //nolint:errcheck - // get receipts for the parent of the previous tipset (which will be currTs) - receipts, err := api.ChainGetParentReceipts(ctx, blockCid) + stmtSelectEvent, err := tx.Prepare("SELECT MAX(id) from event WHERE height=? AND tipset_key=? and tipset_key_cid=? and emitter_addr=? and event_index=? and message_cid=? and message_index=? and reverted=false") if err != nil { - return fmt.Errorf("failed to get parent receipts for block %s: %w", blockCid, err) + return err } - - if len(msgs) != len(receipts) { - return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(msgs), len(receipts)) + stmtEvent, err := tx.Prepare("INSERT INTO event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)") + if err != nil { + return err } + stmtEntry, err := tx.Prepare("INSERT INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)") + if err != nil { + return err + } + + var eventsAffected int64 + var entriesAffected int64 // loop over each message receipt and backfill the events for idx, receipt := range receipts { @@ -234,75 +215,103 @@ var backfillEventsCmd = &cli.Command{ return fmt.Errorf("error checking if event exists: %w", err) } - if !entryID.Valid { - // event does not exist, lets backfill it - res, err := stmtEvent.Exec( - currTs.Height(), // height - currTs.Key().Bytes(), // tipset_key - tsKeyCid.Bytes(), // tipset_key_cid - addr.Bytes(), // emitter_addr - eventIdx, // event_index - msg.Cid.Bytes(), // message_cid - idx, // message_index - false, // reverted - ) - - if err != nil { - return fmt.Errorf("error inserting event: %w", err) - } + // we already have this event + if entryID.Valid { + continue + } - entryID.Int64, err = res.LastInsertId() - if err != nil { - return fmt.Errorf("could not get last insert id: %w", err) - } + // event does not exist, lets backfill it + res, err := tx.Stmt(stmtEvent).Exec( + currTs.Height(), // height + currTs.Key().Bytes(), // tipset_key + tsKeyCid.Bytes(), // tipset_key_cid + addr.Bytes(), // emitter_addr + eventIdx, // event_index + msg.Cid.Bytes(), // message_cid + idx, // message_index + false, // reverted + ) + if err != nil { + return fmt.Errorf("error inserting event: %w", err) + } - rowsAffected, err := res.RowsAffected() - if err != nil { - return fmt.Errorf("error getting rows affected: %s", err) - } + entryID.Int64, err = res.LastInsertId() + if err != nil { + return fmt.Errorf("could not get last insert id: %w", err) + } - eventsAffected += rowsAffected + rowsAffected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("could not get rows affected: %w", err) } + eventsAffected += rowsAffected + // backfill the event entries for _, entry := range event.Entries { - // check if entry exists - var exists bool - err = stmtSelectEntry.QueryRow( - entryID.Int64, - isIndexedValue(entry.Flags), - []byte{entry.Flags}, - entry.Key, - entry.Codec, - entry.Value, - ).Scan(&exists) + _, err := tx.Stmt(stmtEntry).Exec( + entryID.Int64, // event_id + isIndexedValue(entry.Flags), // indexed + []byte{entry.Flags}, // flags + entry.Key, // key + entry.Codec, // codec + entry.Value, // value + ) if err != nil { - return fmt.Errorf("error checking if entry exists: %w", err) + return fmt.Errorf("error inserting entry: %w", err) } - if !exists { - // entry does not exist, lets backfill it - res, err := stmtEntry.Exec( - entryID.Int64, // event_id - isIndexedValue(entry.Flags), // indexed - []byte{entry.Flags}, // flags - entry.Key, // key - entry.Codec, // codec - entry.Value, // value - ) - if err != nil { - return fmt.Errorf("error inserting entry: %w", err) - } - - rowsAffected, err := res.RowsAffected() - if err != nil { - return fmt.Errorf("error getting rows affected: %s", err) - } - entriesAffected += rowsAffected + rowsAffected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("could not get rows affected: %w", err) } + entriesAffected += rowsAffected } } } + err = tx.Commit() + if err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + log.Infof("[%d] backfilling actor events epoch:%d, eventsAffected:%d, entriesAffected:%d", cnt, currTs.Height(), eventsAffected, entriesAffected) + + totalEventsAffected += eventsAffected + totalEntriesAffected += entriesAffected + + return nil + } + + for i := 0; i < epochs; i++ { + select { + case <-ctx.Done(): + return nil + default: + } + + blockCid := prevTs.Blocks()[0].Cid() + + // get messages for the parent of the previous tipset (which will be currTs) + msgs, err := api.ChainGetParentMessages(ctx, blockCid) + if err != nil { + return fmt.Errorf("failed to get parent messages for block %s: %w", blockCid, err) + } + + // get receipts for the parent of the previous tipset (which will be currTs) + receipts, err := api.ChainGetParentReceipts(ctx, blockCid) + if err != nil { + return fmt.Errorf("failed to get parent receipts for block %s: %w", blockCid, err) + } + + if len(msgs) != len(receipts) { + return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(msgs), len(receipts)) + } + + err = processHeight(ctx, i, msgs, receipts) + if err != nil { + return err + } + // advance prevTs and currTs up the chain prevTs = currTs currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) @@ -311,7 +320,7 @@ var backfillEventsCmd = &cli.Command{ } } - log.Infof("backfilling events complete, eventsAffected:%d, entriesAffected:%d", eventsAffected, entriesAffected) + log.Infof("backfilling events complete, totalEventsAffected:%d, totalEntriesAffected:%d", totalEventsAffected, totalEntriesAffected) return nil },