diff --git a/cmd/lotus-chainwatch/run.go b/cmd/lotus-chainwatch/run.go index 8bdcfcfe3f6..7ff9c540774 100644 --- a/cmd/lotus-chainwatch/run.go +++ b/cmd/lotus-chainwatch/run.go @@ -70,7 +70,7 @@ var runCmd = &cli.Command{ } db.SetMaxOpenConns(1350) - sync := syncer.NewSyncer(db, api) + sync := syncer.NewSyncer(db, api, 1400) sync.Start(ctx) proc := processor.NewProcessor(ctx, db, api, maxBatch) diff --git a/cmd/lotus-chainwatch/syncer/sync.go b/cmd/lotus-chainwatch/syncer/sync.go index 69195b536bd..b62f7170bea 100644 --- a/cmd/lotus-chainwatch/syncer/sync.go +++ b/cmd/lotus-chainwatch/syncer/sync.go @@ -23,14 +23,17 @@ var log = logging.Logger("syncer") type Syncer struct { db *sql.DB + lookbackLimit uint64 + headerLk sync.Mutex node api.FullNode } -func NewSyncer(db *sql.DB, node api.FullNode) *Syncer { +func NewSyncer(db *sql.DB, node api.FullNode, lookbackLimit uint64) *Syncer { return &Syncer{ - db: db, - node: node, + db: db, + node: node, + lookbackLimit: lookbackLimit, } } @@ -148,59 +151,53 @@ create index if not exists state_heights_parentstateroot_index } func (s *Syncer) Start(ctx context.Context) { + if err := logging.SetLogLevel("syncer", "info"); err != nil { + log.Fatal(err) + } log.Debug("Starting Syncer") if err := s.setupSchemas(); err != nil { log.Fatal(err) } - // doing the initial sync here lets us avoid the HCCurrent case in the switch - head, err := s.node.ChainHead(ctx) - if err != nil { - log.Fatalw("Failed to get chain head form lotus", "error", err) - } - - unsynced, err := s.unsyncedBlocks(ctx, head, time.Unix(0, 0)) - if err != nil { - log.Fatalw("failed to gather unsynced blocks", "error", err) - } - - if err := s.storeHeaders(unsynced, true, time.Now()); err != nil { - log.Fatalw("failed to store unsynced blocks", "error", err) - } - // continue to keep the block headers table up to date. notifs, err := s.node.ChainNotify(ctx) if err != nil { log.Fatal(err) } - lastSynced := time.Now() + // we need to ensure that on a restart we don't reprocess the whole flarping chain + blkCID, height, err := s.mostRecentlySyncedBlockHeight() + if err != nil { + log.Fatalw("failed to find most recently synced block", "error", err) + } + log.Infow("Found starting point for syncing", "blockCID", blkCID.String(), "height", height) + sinceEpoch := uint64(height) go func() { for notif := range notifs { for _, change := range notif { switch change.Type { case store.HCApply: - unsynced, err := s.unsyncedBlocks(ctx, change.Val, lastSynced) + unsynced, err := s.unsyncedBlocks(ctx, change.Val, sinceEpoch) if err != nil { log.Errorw("failed to gather unsynced blocks", "error", err) } if err := s.storeCirculatingSupply(ctx, change.Val); err != nil { - log.Errorw("failed to store circulating supply", "error", err) + // TODO do something with me } if len(unsynced) == 0 { continue } - if err := s.storeHeaders(unsynced, true, lastSynced); err != nil { + if err := s.storeHeaders(unsynced, true, time.Now()); err != nil { // so this is pretty bad, need some kind of retry.. // for now just log an error and the blocks will be attempted again on next notifi log.Errorw("failed to store unsynced blocks", "error", err) } - lastSynced = time.Now() + sinceEpoch = uint64(change.Val.Height()) case store.HCRevert: log.Debug("revert todo") } @@ -209,12 +206,8 @@ func (s *Syncer) Start(ctx context.Context) { }() } -func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since time.Time) (map[cid.Cid]*types.BlockHeader, error) { - // get a list of blocks we have already synced in the past 3 mins. This ensures we aren't returning the entire - // table every time. - lookback := since.Add(-(time.Minute * 3)) - log.Debugw("Gathering unsynced blocks", "since", lookback.String()) - hasList, err := s.syncedBlocks(lookback) +func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since uint64) (map[cid.Cid]*types.BlockHeader, error) { + hasList, err := s.syncedBlocks(since, s.lookbackLimit) if err != nil { return nil, err } @@ -257,9 +250,8 @@ func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since t return toSync, nil } -func (s *Syncer) syncedBlocks(timestamp time.Time) (map[cid.Cid]struct{}, error) { - // timestamp is used to return a configurable amount of rows based on when they were last added. - rws, err := s.db.Query(`select cid FROM blocks_synced where synced_at > $1`, timestamp.Unix()) +func (s *Syncer) syncedBlocks(since, limit uint64) (map[cid.Cid]struct{}, error) { + rws, err := s.db.Query(`select bs.cid FROM blocks_synced bs left join blocks b on b.cid = bs.cid where b.height <= $1 and bs.processed_at is not null limit $2`, since, limit) if err != nil { return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err) } @@ -281,6 +273,33 @@ func (s *Syncer) syncedBlocks(timestamp time.Time) (map[cid.Cid]struct{}, error) return out, nil } +func (s *Syncer) mostRecentlySyncedBlockHeight() (cid.Cid, int64, error) { + rw := s.db.QueryRow(` +select blocks_synced.cid, b.height +from blocks_synced +left join blocks b on blocks_synced.cid = b.cid +where processed_at is not null +order by height desc +limit 1 +`) + + var c string + var h int64 + if err := rw.Scan(&c, &h); err != nil { + if err == sql.ErrNoRows { + return cid.Undef, 0, nil + } + return cid.Undef, -1, err + } + + ci, err := cid.Parse(c) + if err != nil { + return cid.Undef, -1, err + } + + return ci, h, nil +} + func (s *Syncer) storeCirculatingSupply(ctx context.Context, tipset *types.TipSet) error { supply, err := s.node.StateCirculatingSupply(ctx, tipset.Key()) if err != nil {