Skip to content

Commit

Permalink
fix(chainwatch): sync based on height
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed Aug 27, 2020
1 parent fccdd70 commit f934ebd
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cmd/lotus-chainwatch/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var runCmd = &cli.Command{
}
db.SetMaxOpenConns(1350)

sync := syncer.NewSyncer(db, api)
sync := syncer.NewSyncer(db, api, 1400)
sync.Start(ctx)

proc := processor.NewProcessor(ctx, db, api, maxBatch)
Expand Down
83 changes: 51 additions & 32 deletions cmd/lotus-chainwatch/syncer/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ var log = logging.Logger("syncer")
type Syncer struct {
db *sql.DB

lookbackLimit uint64

headerLk sync.Mutex
node api.FullNode
}

func NewSyncer(db *sql.DB, node api.FullNode) *Syncer {
func NewSyncer(db *sql.DB, node api.FullNode, lookbackLimit uint64) *Syncer {
return &Syncer{
db: db,
node: node,
db: db,
node: node,
lookbackLimit: lookbackLimit,
}
}

Expand Down Expand Up @@ -148,59 +151,53 @@ create index if not exists state_heights_parentstateroot_index
}

func (s *Syncer) Start(ctx context.Context) {
if err := logging.SetLogLevel("syncer", "info"); err != nil {
log.Fatal(err)
}
log.Debug("Starting Syncer")

if err := s.setupSchemas(); err != nil {
log.Fatal(err)
}

// doing the initial sync here lets us avoid the HCCurrent case in the switch
head, err := s.node.ChainHead(ctx)
if err != nil {
log.Fatalw("Failed to get chain head form lotus", "error", err)
}

unsynced, err := s.unsyncedBlocks(ctx, head, time.Unix(0, 0))
if err != nil {
log.Fatalw("failed to gather unsynced blocks", "error", err)
}

if err := s.storeHeaders(unsynced, true, time.Now()); err != nil {
log.Fatalw("failed to store unsynced blocks", "error", err)
}

// continue to keep the block headers table up to date.
notifs, err := s.node.ChainNotify(ctx)
if err != nil {
log.Fatal(err)
}

lastSynced := time.Now()
// we need to ensure that on a restart we don't reprocess the whole flarping chain
blkCID, height, err := s.mostRecentlySyncedBlockHeight()
if err != nil {
log.Fatalw("failed to find most recently synced block", "error", err)
}
log.Infow("Found starting point for syncing", "blockCID", blkCID.String(), "height", height)
sinceEpoch := uint64(height)
go func() {
for notif := range notifs {
for _, change := range notif {
switch change.Type {
case store.HCApply:
unsynced, err := s.unsyncedBlocks(ctx, change.Val, lastSynced)
unsynced, err := s.unsyncedBlocks(ctx, change.Val, sinceEpoch)
if err != nil {
log.Errorw("failed to gather unsynced blocks", "error", err)
}

if err := s.storeCirculatingSupply(ctx, change.Val); err != nil {
log.Errorw("failed to store circulating supply", "error", err)
// TODO do something with me
}

if len(unsynced) == 0 {
continue
}

if err := s.storeHeaders(unsynced, true, lastSynced); err != nil {
if err := s.storeHeaders(unsynced, true, time.Now()); err != nil {
// so this is pretty bad, need some kind of retry..
// for now just log an error and the blocks will be attempted again on next notifi
log.Errorw("failed to store unsynced blocks", "error", err)
}

lastSynced = time.Now()
sinceEpoch = uint64(change.Val.Height())
case store.HCRevert:
log.Debug("revert todo")
}
Expand All @@ -209,12 +206,8 @@ func (s *Syncer) Start(ctx context.Context) {
}()
}

func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since time.Time) (map[cid.Cid]*types.BlockHeader, error) {
// get a list of blocks we have already synced in the past 3 mins. This ensures we aren't returning the entire
// table every time.
lookback := since.Add(-(time.Minute * 3))
log.Debugw("Gathering unsynced blocks", "since", lookback.String())
hasList, err := s.syncedBlocks(lookback)
func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since uint64) (map[cid.Cid]*types.BlockHeader, error) {
hasList, err := s.syncedBlocks(since, s.lookbackLimit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -257,9 +250,8 @@ func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since t
return toSync, nil
}

func (s *Syncer) syncedBlocks(timestamp time.Time) (map[cid.Cid]struct{}, error) {
// timestamp is used to return a configurable amount of rows based on when they were last added.
rws, err := s.db.Query(`select cid FROM blocks_synced where synced_at > $1`, timestamp.Unix())
func (s *Syncer) syncedBlocks(since, limit uint64) (map[cid.Cid]struct{}, error) {
rws, err := s.db.Query(`select bs.cid FROM blocks_synced bs left join blocks b on b.cid = bs.cid where b.height <= $1 and bs.processed_at is not null limit $2`, since, limit)
if err != nil {
return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err)
}
Expand All @@ -281,6 +273,33 @@ func (s *Syncer) syncedBlocks(timestamp time.Time) (map[cid.Cid]struct{}, error)
return out, nil
}

func (s *Syncer) mostRecentlySyncedBlockHeight() (cid.Cid, int64, error) {
rw := s.db.QueryRow(`
select blocks_synced.cid, b.height
from blocks_synced
left join blocks b on blocks_synced.cid = b.cid
where processed_at is not null
order by height desc
limit 1
`)

var c string
var h int64
if err := rw.Scan(&c, &h); err != nil {
if err == sql.ErrNoRows {
return cid.Undef, 0, nil
}
return cid.Undef, -1, err
}

ci, err := cid.Parse(c)
if err != nil {
return cid.Undef, -1, err
}

return ci, h, nil
}

func (s *Syncer) storeCirculatingSupply(ctx context.Context, tipset *types.TipSet) error {
supply, err := s.node.StateCirculatingSupply(ctx, tipset.Key())
if err != nil {
Expand Down

0 comments on commit f934ebd

Please sign in to comment.