Skip to content

Commit

Permalink
Fix: resync on missed blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Feb 4, 2024
1 parent 076e3b5 commit 76c9930
Showing 1 changed file with 41 additions and 41 deletions.
82 changes: 41 additions & 41 deletions cmd/mempool/tzkt/tzkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,33 @@ func (tzkt *TzKT) Connect(ctx context.Context) error {
case <-ctx.Done():
return
case msg := <-tzkt.client.Listen():
switch msg.Channel {
case events.ChannelOperations:
if err := tzkt.handleOperationMessage(msg); err != nil {
log.Err(err).Msg("handleOperationMessage")
switch msg.Type {
case events.MessageTypeData:
switch msg.Channel {
case events.ChannelOperations:
if err := tzkt.handleOperationMessage(msg); err != nil {
log.Err(err).Msg("handleOperationMessage")
}
case events.ChannelBlocks:
if err := tzkt.handleBlockMessage(msg); err != nil {
log.Err(err).Msg("handleBlockMessage")
}
}
case events.ChannelBlocks:
if err := tzkt.handleBlockMessage(msg); err != nil {
log.Err(err).Msg("handleBlockMessage")
case events.MessageTypeState:
if msg.Channel != events.ChannelBlocks {
continue
}

if tzkt.state < msg.State {
// if blocks was missed in some reason we should index missed blocks
log.Warn().Uint64("old_state", tzkt.state).Uint64("new_level", msg.State).Msg("detect missed blocks. resync...")

tzkt.Sync(ctx, msg.State)
}
tzkt.state = msg.State
case events.MessageTypeReorg, events.MessageTypeSubscribed:
}

}
}
})
Expand Down Expand Up @@ -104,49 +121,32 @@ func (tzkt *TzKT) Blocks() <-chan BlockMessage {
}

func (tzkt *TzKT) handleBlockMessage(msg events.Message) error {
switch msg.Type {
case events.MessageTypeData:
if msg.Body == nil {
return nil
}
blocks := msg.Body.([]data.Block)
for i := range blocks {
tzkt.blocks <- BlockMessage{
Hash: blocks[i].Hash,
Level: blocks[i].Level,
Type: msg.Type,
Timestamp: blocks[i].Timestamp.UTC(),
}
}
case events.MessageTypeState, events.MessageTypeReorg, events.MessageTypeSubscribed:
if msg.Body == nil {
return nil
}
blocks := msg.Body.([]data.Block)
for i := range blocks {
tzkt.blocks <- BlockMessage{
Level: msg.State,
Type: msg.Type,
Hash: blocks[i].Hash,
Level: blocks[i].Level,
Type: msg.Type,
Timestamp: blocks[i].Timestamp.UTC(),
}
default:
return errors.Wrapf(ErrUnknownMessageType, "%d", msg.Type)
tzkt.state = blocks[i].Level
}

return nil
}

func (tzkt *TzKT) handleOperationMessage(msg events.Message) error {
switch msg.Type {
case events.MessageTypeData:
if msg.Body == nil {
return nil
}
operations, ok := msg.Body.([]any)
if !ok {
return nil
}
return tzkt.handleUpdateMessage(operations)
case events.MessageTypeState, events.MessageTypeReorg:
default:
return errors.Wrapf(ErrUnknownMessageType, "%d", msg.Type)
if msg.Body == nil {
return nil
}

return nil
operations, ok := msg.Body.([]any)
if !ok {
return nil
}
return tzkt.handleUpdateMessage(operations)
}

func (tzkt *TzKT) handleUpdateMessage(operations []any) error {
Expand Down

0 comments on commit 76c9930

Please sign in to comment.