Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EVM-703]: Saving events in boltdb should have retry mechanism #1652

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions command/bridge/exit/exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func run(cmd *cobra.Command, _ []string) {
}

outputter.SetCommandResult(&exitResult{
ID: strconv.FormatUint(exitEvent.ID, 10),
ID: strconv.FormatUint(exitEvent.ID.Uint64(), 10),
Sender: exitEvent.Sender.String(),
Receiver: exitEvent.Receiver.String(),
})
Expand All @@ -186,7 +186,7 @@ func createExitTxn(sender ethgo.Address, proof types.Proof) (*ethgo.Transaction,

var exitEventAPI contractsapi.L2StateSyncedEvent

exitEventEncoded, err := exitEventAPI.Encode(exitEvent)
exitEventEncoded, err := exitEventAPI.Encode(exitEvent.L2StateSyncedEvent)
if err != nil {
return nil, nil, fmt.Errorf("failed to encode exit event: %w", err)
}
Expand Down
105 changes: 52 additions & 53 deletions consensus/polybft/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,23 @@ type checkpointManager struct {
logger hclog.Logger
// state boltDb instance
state *State
// eventGetter gets exit events (missed or current) from blocks
eventGetter *eventsGetter[*ExitEvent]
}

// newCheckpointManager creates a new instance of checkpointManager
func newCheckpointManager(key ethgo.Key, checkpointOffset uint64,
checkpointManagerSC types.Address, txRelayer txrelayer.TxRelayer,
blockchain blockchainBackend, backend polybftBackend, logger hclog.Logger,
state *State) *checkpointManager {
retry := &eventsGetter[*ExitEvent]{
goran-ethernal marked this conversation as resolved.
Show resolved Hide resolved
blockchain: blockchain,
isValidLogFn: func(l *types.Log) bool {
return l.Address == contracts.L2StateSenderContract
},
parseEventFn: parseExitEvent,
}

return &checkpointManager{
key: key,
blockchain: blockchain,
Expand All @@ -84,6 +94,7 @@ func newCheckpointManager(key ethgo.Key, checkpointOffset uint64,
checkpointManagerAddr: checkpointManagerSC,
logger: logger,
state: state,
eventGetter: retry,
}
}

Expand Down Expand Up @@ -273,30 +284,28 @@ func (c *checkpointManager) isCheckpointBlock(blockNumber uint64, isEpochEndingB
// PostBlock is called on every insert of finalized block (either from consensus or syncer)
// It will read any exit event that happened in block and insert it to state boltDb
func (c *checkpointManager) PostBlock(req *PostBlockRequest) error {
var (
epoch = req.Epoch
block = req.FullBlock.Block.Number()
)
block := req.FullBlock.Block.Number()

if req.IsEpochEndingBlock {
// exit events that happened in epoch ending blocks,
// should be added to the tree of the next epoch
epoch++
block++
lastBlock, err := c.state.CheckpointStore.getLastSaved()
if err != nil {
return fmt.Errorf("could not get last processed block for exit events. Error: %w", err)
}

// commit exit events only when we finalize a block
events, err := getExitEventsFromReceipts(epoch, block, req.FullBlock.Receipts)
exitEvents, err := c.eventGetter.getFromBlocks(lastBlock, req.FullBlock)
if err != nil {
return err
}

if len(events) > 0 {
c.logger.Debug("Gotten exit events from logs on block",
"eventsNum", len(events), "block", req.FullBlock.Block.Number())
sort.Slice(exitEvents, func(i, j int) bool {
// keep events in sequential order
return exitEvents[i].ID.Cmp(exitEvents[j].ID) < 0
})

if err := c.state.CheckpointStore.insertExitEvents(exitEvents); err != nil {
return err
}

if err := c.state.CheckpointStore.insertExitEvents(events); err != nil {
if err := c.state.CheckpointStore.updateLastSaved(block); err != nil {
return err
}

Expand Down Expand Up @@ -399,7 +408,7 @@ func (c *checkpointManager) GenerateExitProof(exitID uint64) (types.Proof, error

var exitEventAPI contractsapi.L2StateSyncedEvent

e, err := exitEventAPI.Encode(exitEvent)
e, err := exitEventAPI.Encode(exitEvent.L2StateSyncedEvent)
if err != nil {
return types.Proof{}, err
}
Expand Down Expand Up @@ -436,50 +445,14 @@ func (c *checkpointManager) GenerateExitProof(exitID uint64) (types.Proof, error
}, nil
}

// getExitEventsFromReceipts parses logs from receipts to find exit events
func getExitEventsFromReceipts(epoch, block uint64, receipts []*types.Receipt) ([]*ExitEvent, error) {
events := make([]*ExitEvent, 0)

for i := 0; i < len(receipts); i++ {
if receipts[i].Status == nil || *receipts[i].Status != types.ReceiptSuccess {
continue
}

for _, log := range receipts[i].Logs {
if log.Address != contracts.L2StateSenderContract {
continue
}

event, err := decodeExitEvent(convertLog(log), epoch, block)
if err != nil {
return nil, err
}

if event == nil {
// valid case, not an exit event
continue
}

events = append(events, event)
}
}

// enforce sequential order
sort.Slice(events, func(i, j int) bool {
return events[i].ID < events[j].ID
})

return events, nil
}

// createExitTree creates an exit event merkle tree from provided exit events
func createExitTree(exitEvents []*ExitEvent) (*merkle.MerkleTree, error) {
numOfEvents := len(exitEvents)
data := make([][]byte, numOfEvents)

var exitEventAPI contractsapi.L2StateSyncedEvent
for i := 0; i < numOfEvents; i++ {
b, err := exitEventAPI.Encode(exitEvents[i])
b, err := exitEventAPI.Encode(exitEvents[i].L2StateSyncedEvent)
if err != nil {
return nil, err
}
Expand All @@ -489,3 +462,29 @@ func createExitTree(exitEvents []*ExitEvent) (*merkle.MerkleTree, error) {

return merkle.NewMerkleTree(data)
}

// parseExitEvent parses exit event from provided log
func parseExitEvent(h *types.Header, l *ethgo.Log) (*ExitEvent, bool, error) {
extra, err := GetIbftExtra(h.ExtraData)
if err != nil {
return nil, false,
fmt.Errorf("could not get header extra on exit event parsing. Error: %w", err)
}

epoch := extra.Checkpoint.EpochNumber
block := h.Number

if extra.Validators != nil {
// exit events that happened in epoch ending blocks,
// should be added to the tree of the next epoch
epoch++
block++
}

event, err := decodeExitEvent(l, epoch, block)
if err != nil {
return nil, false, err
}

return event, true, nil
}
95 changes: 83 additions & 12 deletions consensus/polybft/checkpoint_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,46 +296,117 @@ func TestCheckpointManager_PostBlock(t *testing.T) {

state := newTestState(t)

receipts := make([]*types.Receipt, numOfReceipts)
for i := 0; i < numOfReceipts; i++ {
receipts[i] = &types.Receipt{Logs: []*types.Log{
createTestLogForExitEvent(t, uint64(i)),
}}
receipts[i].SetStatus(types.ReceiptSuccess)
createReceipts := func(startID, endID uint64) []*types.Receipt {
receipts := make([]*types.Receipt, endID-startID)
for i := startID; i < endID; i++ {
receipts[i-startID] = &types.Receipt{Logs: []*types.Log{
createTestLogForExitEvent(t, i),
}}
receipts[i-startID].SetStatus(types.ReceiptSuccess)
}

return receipts
}

req := &PostBlockRequest{FullBlock: &types.FullBlock{Block: &types.Block{Header: &types.Header{Number: block}}, Receipts: receipts},
extra := &Extra{
Checkpoint: &CheckpointData{
EpochNumber: epoch,
},
}

req := &PostBlockRequest{FullBlock: &types.FullBlock{Block: &types.Block{Header: &types.Header{Number: block}}},
Epoch: epoch}

req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil)

blockchain := new(blockchainMock)
checkpointManager := newCheckpointManager(wallet.NewEcdsaSigner(createTestKey(t)), 5, types.ZeroAddress,
nil, nil, nil, hclog.NewNullLogger(), state)
nil, blockchain, nil, hclog.NewNullLogger(), state)

t.Run("PostBlock - not epoch ending block", func(t *testing.T) {
require.NoError(t, state.CheckpointStore.updateLastSaved(block-1)) // we got everything till the current block
req.IsEpochEndingBlock = false
req.FullBlock.Receipts = createReceipts(0, 5)
require.NoError(t, checkpointManager.PostBlock(req))

exitEvents, err := state.CheckpointStore.getExitEvents(epoch, func(exitEvent *ExitEvent) bool {
return exitEvent.BlockNumber == block
})

require.NoError(t, err)
require.Len(t, exitEvents, numOfReceipts)
require.Len(t, exitEvents, 5)
require.Equal(t, uint64(epoch), exitEvents[0].EpochNumber)
})

t.Run("PostBlock - epoch ending block (exit events are saved to the next epoch)", func(t *testing.T) {
require.NoError(t, state.CheckpointStore.updateLastSaved(block)) // we got everything till the current block
req.IsEpochEndingBlock = true
req.FullBlock.Receipts = createReceipts(5, 10)
extra.Validators = &validator.ValidatorSetDelta{}
req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil)
req.FullBlock.Block.Header.Number = block + 1

require.NoError(t, checkpointManager.PostBlock(req))

exitEvents, err := state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool {
return exitEvent.BlockNumber == block+1
return exitEvent.BlockNumber == block+2 // they should be saved in the next epoch and its first block
})

require.NoError(t, err)
require.Len(t, exitEvents, numOfReceipts)
require.Equal(t, uint64(block+1), exitEvents[0].BlockNumber)
require.Len(t, exitEvents, 5)
require.Equal(t, uint64(block+2), exitEvents[0].BlockNumber)
require.Equal(t, uint64(epoch+1), exitEvents[0].EpochNumber)
})

t.Run("PostBlock - there are missing events", func(t *testing.T) {
require.NoError(t, state.CheckpointStore.updateLastSaved(block)) // we are missing one block

missedReceipts := createReceipts(10, 13)
newReceipts := createReceipts(13, 15)

extra := &Extra{
Checkpoint: &CheckpointData{
EpochNumber: epoch + 1,
},
}

blockchain.On("GetHeaderByNumber", uint64(block+1)).Return(&types.Header{
Number: block + 1,
ExtraData: extra.MarshalRLPTo(nil),
Hash: types.BytesToHash([]byte{0, 1, 2, 3}),
}, true)
blockchain.On("GetReceiptsByHash", types.BytesToHash([]byte{0, 1, 2, 3})).Return([]*types.Receipt{}, nil)
blockchain.On("GetHeaderByNumber", uint64(block+2)).Return(&types.Header{
Number: block + 2,
ExtraData: extra.MarshalRLPTo(nil),
Hash: types.BytesToHash([]byte{4, 5, 6, 7}),
}, true)
blockchain.On("GetReceiptsByHash", types.BytesToHash([]byte{4, 5, 6, 7})).Return(missedReceipts, nil)

req.IsEpochEndingBlock = false
req.FullBlock.Block.Header.Number = block + 3 // new block
req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil) // same epoch
req.FullBlock.Receipts = newReceipts
require.NoError(t, checkpointManager.PostBlock(req))

exitEvents, err := state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool {
return exitEvent.BlockNumber == block+2
})

require.NoError(t, err)
// receipts from missed block + events from previous test case that were saved in the next epoch
// since they were in epoch ending block
require.Len(t, exitEvents, len(missedReceipts)+5)
require.Equal(t, extra.Checkpoint.EpochNumber, exitEvents[0].EpochNumber)

exitEvents, err = state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool {
return exitEvent.BlockNumber == block+3
})

require.NoError(t, err)
require.Len(t, exitEvents, len(newReceipts))
require.Equal(t, extra.Checkpoint.EpochNumber, exitEvents[0].EpochNumber)
})
}

func TestCheckpointManager_BuildEventRoot(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion consensus/polybft/consensus_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ func encodeExitEvents(t *testing.T, exitEvents []*ExitEvent) [][]byte {

var exitEventAPI contractsapi.L2StateSyncedEvent
for i, e := range exitEvents {
encodedEvent, err := exitEventAPI.Encode(e)
encodedEvent, err := exitEventAPI.Encode(e.L2StateSyncedEvent)
require.NoError(t, err)

encodedEvents[i] = encodedEvent
Expand Down
11 changes: 11 additions & 0 deletions consensus/polybft/contractsapi/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package contractsapi

import (
"github.com/0xPolygon/polygon-edge/types"
"github.com/umbracle/ethgo"
"github.com/umbracle/ethgo/abi"
)

Expand All @@ -13,6 +14,16 @@ type StateTransactionInput interface {
DecodeAbi(b []byte) error
}

// EventAbi is an interface representing an event generated in contractsapi
type EventAbi interface {
// Sig returns the event ABI signature or ID (which is unique for all event types)
Sig() ethgo.Hash
goran-ethernal marked this conversation as resolved.
Show resolved Hide resolved
// Encode does abi encoding of given event
Encode(inputs interface{}) ([]byte, error)
// ParseLog parses the provided receipt log to given event type
ParseLog(log *ethgo.Log) (bool, error)
}

var (
// stateSyncABIType is a specific case where we need to encode state sync event as a tuple of tuple
stateSyncABIType = abi.MustNewType(
Expand Down
4 changes: 0 additions & 4 deletions consensus/polybft/extra.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,5 @@ func GetIbftExtra(extraRaw []byte) (*Extra, error) {
return nil, err
}

if extra.Validators == nil {
extra.Validators = &validator.ValidatorSetDelta{}
}

return extra, nil
}
13 changes: 9 additions & 4 deletions consensus/polybft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ var (
"allowed in an epoch ending block")
errProposalDontMatch = errors.New("failed to insert proposal, because the validated proposal " +
"is either nil or it does not match the received one")
errValidatorSetDeltaMismatch = errors.New("validator set delta mismatch")
errValidatorsUpdateInNonEpochEnding = errors.New("trying to update validator set in a non epoch ending block")
errValidatorSetDeltaMismatch = errors.New("validator set delta mismatch")
errValidatorsUpdateInNonEpochEnding = errors.New("trying to update validator set in a non epoch ending block")
errValidatorDeltaNilInEpochEndingBlock = errors.New("validator set delta is nil in epoch ending block")
)

type fsm struct {
Expand Down Expand Up @@ -337,11 +338,15 @@ func (f *fsm) Validate(proposal []byte) error {

// validate validators delta
if f.isEndOfEpoch {
if extra.Validators == nil {
return errValidatorDeltaNilInEpochEndingBlock
}

if !extra.Validators.Equals(f.newValidatorsDelta) {
return errValidatorSetDeltaMismatch
}
} else if !extra.Validators.IsEmpty() {
// delta should be empty in non epoch ending blocks
} else if extra.Validators != nil {
// delta should be nil in non epoch ending blocks
return errValidatorsUpdateInNonEpochEnding
}

Expand Down
Loading