From 4e564331705c9fc3e158e2b9e2c7e3a4a1334f1e Mon Sep 17 00:00:00 2001 From: Jonathan Otto Date: Fri, 26 Apr 2024 02:53:05 -0400 Subject: [PATCH] Fix new_heads Events Emission on Block Forks (#10072) TL;DR: on a reorg, the common ancestor block is not being published to subscribers of newHeads #### Expected behavior if the reorg's common ancestor is 2, I expect 2 to be republished 1, 2, **2**, **3**, **4** #### Actual behavior 2 is not republished, and 3's parentHash points to a 2 header that was never received 1, 2, **3**, **4** This PR is the same thing as https://github.com/ledgerwatch/erigon/pull/9738 except with a test. Note... the test passes, but **this does not actually work in production** (for Ethereum mainnet with prysm as external CL). Why? Because in production, `h.sync.PrevUnwindPoint()` is always nil: https://github.com/ledgerwatch/erigon/blob/a5270bccf5e69a6beaaab9a0663bdad80e989505/turbo/stages/stageloop.go#L291 which means the initial "if block" is never entered, and thus we have **no control** of increment/decrement `notifyFrom` during reorgs https://github.com/ledgerwatch/erigon/blob/a5270bccf5e69a6beaaab9a0663bdad80e989505/eth/stagedsync/stage_finish.go#L137-L146 I don't know why `h.sync.PrevUnwindPoint()` is seemingly always nil, or how the test can pass if it fails in prod. I'm hoping to pass the baton to someone who might. Thank you @indanielo for original fix. If we can figure this bug out, it closes #8848 and closes #9568 and closes #10056 --------- Co-authored-by: Daniel Gimenez <25278291+indanielo@users.noreply.github.com> --- eth/stagedsync/stage_finish.go | 2 +- turbo/jsonrpc/eth_subscribe_test.go | 61 +++++++++++++++++++++++++---- 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/eth/stagedsync/stage_finish.go b/eth/stagedsync/stage_finish.go index a90be80bc37..212baa2aedf 100644 --- a/eth/stagedsync/stage_finish.go +++ b/eth/stagedsync/stage_finish.go @@ -143,8 +143,8 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishS heightSpan = 1024 } notifyFrom = finishStageAfterSync - heightSpan + notifyFrom++ } - notifyFrom++ var notifyTo = notifyFrom var notifyToHash libcommon.Hash diff --git a/turbo/jsonrpc/eth_subscribe_test.go b/turbo/jsonrpc/eth_subscribe_test.go index 1a595fb2718..eee0edb7a44 100644 --- a/turbo/jsonrpc/eth_subscribe_test.go +++ b/turbo/jsonrpc/eth_subscribe_test.go @@ -2,7 +2,7 @@ package jsonrpc import ( "context" - "fmt" + "math/big" "testing" "github.com/stretchr/testify/require" @@ -25,14 +25,22 @@ import ( "github.com/ledgerwatch/erigon/turbo/stages/mock" ) -func TestEthSubscribe(t *testing.T) { - m, require := mock.Mock(t), require.New(t) - chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 7, func(i int, b *core.BlockGen) { - b.SetCoinbase(libcommon.Address{1}) +func sendBlock(t *testing.T, require *require.Assertions, m *mock.MockSentry, chain *core.ChainPack) { + // Send NewBlock message + b, err := rlp.EncodeToBytes(ð.NewBlockPacket{ + Block: chain.TopBlock, + TD: big.NewInt(1), // This is ignored anyway }) - require.NoError(err) + if err != nil { + t.Fatal(err) + } + m.ReceiveWg.Add(1) + for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_NEW_BLOCK_66, Data: b, PeerId: m.PeerId}) { + require.NoError(err) + } - b, err := rlp.EncodeToBytes(ð.BlockHeadersPacket66{ + // Send all the headers + b, err = rlp.EncodeToBytes(ð.BlockHeadersPacket66{ RequestId: 1, BlockHeadersPacket: chain.Headers, }) @@ -43,6 +51,16 @@ func TestEthSubscribe(t *testing.T) { require.NoError(err) } m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed +} + +func TestEthSubscribe(t *testing.T) { + m, require := mock.Mock(t), require.New(t) + chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 7, func(i int, b *core.BlockGen) { + b.SetCoinbase(libcommon.Address{1}) + }) + require.NoError(err) + + sendBlock(t, require, m, chain) ctx := context.Background() logger := log.New() @@ -64,7 +82,34 @@ func TestEthSubscribe(t *testing.T) { for i := uint64(1); i <= highestSeenHeader; i++ { header := <-newHeads - fmt.Printf("Got header %d\n", header.Number.Uint64()) require.Equal(i, header.Number.Uint64()) + require.Equal(chain.Blocks[i-1].Hash(), header.Hash()) + } + + // create reorg chain starting with common ancestor of 3, 4 will be first block with different coinbase + m2 := mock.Mock(t) + chain, err = core.GenerateChain(m2.ChainConfig, m2.Genesis, m2.Engine, m2.DB, 9, func(i int, b *core.BlockGen) { + // i starts from 0, so this means everything under block 4 will have coinbase 1, and 4 and above will have coinbase 2 + if i < 3 { + b.SetCoinbase(libcommon.Address{1}) + } else { + b.SetCoinbase(libcommon.Address{2}) + } + }) + require.NoError(err) + + sendBlock(t, require, m, chain) + + if err := stages.StageLoopIteration(m.Ctx, m.DB, wrap.TxContainer{}, m.Sync, initialCycle, logger, m.BlockReader, hook, false); err != nil { + t.Fatal(err) + } + + highestSeenHeader = chain.TopBlock.NumberU64() + + // since common ancestor of reorg is 3 the first new header we will see should be 3 + for i := uint64(3); i <= highestSeenHeader; i++ { + header := <-newHeads + require.Equal(i, header.Number.Uint64()) + require.Equal(chain.Blocks[i-1].Hash(), header.Hash()) } }