Skip to content

Commit

Permalink
Add L1Feed in blockchain
Browse files Browse the repository at this point in the history
  • Loading branch information
IronGauntlets committed Dec 25, 2024
1 parent 7b638db commit 20c0587
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 92 deletions.
22 changes: 20 additions & 2 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ import (
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/encoder"
"github.com/NethermindEth/juno/feed"
"github.com/NethermindEth/juno/utils"
"github.com/ethereum/go-ethereum/common"
)

type L1HeadSubscription struct {
*feed.Subscription[*core.L1Head]
}

//go:generate mockgen -destination=../mocks/mock_blockchain.go -package=mocks github.com/NethermindEth/juno/blockchain Reader
type Reader interface {
Height() (height uint64, err error)

Head() (head *core.Block, err error)
L1Head() (*core.L1Head, error)
SubscribeL1Head() L1HeadSubscription
BlockByNumber(number uint64) (block *core.Block, err error)
BlockByHash(hash *felt.Felt) (block *core.Block, err error)

Expand Down Expand Up @@ -82,6 +88,7 @@ type Blockchain struct {
network *utils.Network
database db.DB
listener EventListener
l1HeadFeed *feed.Feed[*core.L1Head]
pendingBlockFn func() *core.Block
}

Expand All @@ -91,6 +98,7 @@ func New(database db.DB, network *utils.Network, pendingBlockFn func() *core.Blo
database: database,
network: network,
listener: &SelectiveListener{},
l1HeadFeed: feed.New[*core.L1Head](),
pendingBlockFn: pendingBlockFn,
}
}
Expand Down Expand Up @@ -280,6 +288,10 @@ func (b *Blockchain) Receipt(hash *felt.Felt) (*core.TransactionReceipt, *felt.F
})
}

func (b *Blockchain) SubscribeL1Head() L1HeadSubscription {
return L1HeadSubscription{b.l1HeadFeed.Subscribe()}
}

func (b *Blockchain) L1Head() (*core.L1Head, error) {
b.listener.OnRead("L1Head")
var update *core.L1Head
Expand All @@ -306,9 +318,15 @@ func (b *Blockchain) SetL1Head(update *core.L1Head) error {
if err != nil {
return err
}
return b.database.Update(func(txn db.Transaction) error {

if err := b.database.Update(func(txn db.Transaction) error {
return txn.Set(db.L1Height.Key(), updateBytes)
})
}); err != nil {
return err
}

b.l1HeadFeed.Send(update)
return nil
}

// Store takes a block and state update and performs sanity checks before putting in the database.
Expand Down
17 changes: 17 additions & 0 deletions blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,3 +690,20 @@ func TestL1Update(t *testing.T) {
})
}
}

func TestSubscribeL1Head(t *testing.T) {
l1Head := &core.L1Head{
BlockNumber: 1,
StateRoot: new(felt.Felt).SetUint64(2),
}

chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil)
sub := chain.SubscribeL1Head()
t.Cleanup(sub.Unsubscribe)

require.NoError(t, chain.SetL1Head(l1Head))

got, ok := <-sub.Recv()
require.True(t, ok)
assert.Equal(t, l1Head, got)
}
14 changes: 14 additions & 0 deletions mocks/mock_blockchain.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions mocks/mock_plugin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 16 additions & 17 deletions mocks/mock_subscriber.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Handler struct {
newHeads *feed.Feed[*core.Header]
reorgs *feed.Feed[*sync.ReorgBlockRange]
pendingTxs *feed.Feed[[]core.Transaction]
l1Heads *feed.Feed[*core.L1Head]

idgen func() uint64
mu stdsync.Mutex // protects subscriptions.
Expand Down Expand Up @@ -138,6 +139,7 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V
newHeads: feed.New[*core.Header](),
reorgs: feed.New[*sync.ReorgBlockRange](),
pendingTxs: feed.New[[]core.Transaction](),
l1Heads: feed.New[*core.L1Head](),
subscriptions: make(map[uint64]*subscription),

blockTraceCache: lru.NewCache[traceCacheKey, []TracedBlockTransaction](traceCacheSize),
Expand Down Expand Up @@ -181,12 +183,15 @@ func (h *Handler) Run(ctx context.Context) error {
newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription
reorgsSub := h.syncReader.SubscribeReorg().Subscription
pendingTxsSub := h.syncReader.SubscribePendingTxs().Subscription
l1HeadsSub := h.bcReader.SubscribeL1Head().Subscription
defer newHeadsSub.Unsubscribe()
defer reorgsSub.Unsubscribe()
defer pendingTxsSub.Unsubscribe()
defer l1HeadsSub.Unsubscribe()
feed.Tee(newHeadsSub, h.newHeads)
feed.Tee(reorgsSub, h.reorgs)
feed.Tee(pendingTxsSub, h.pendingTxs)
feed.Tee(l1HeadsSub, h.l1Heads)

<-ctx.Done()
for _, sub := range h.subscriptions {
Expand Down
63 changes: 1 addition & 62 deletions rpc/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ const (

var emptyCommitments = core.BlockCommitments{}

// Due to the difference in how some test files in rpc use "package rpc" vs "package rpc_test" it was easiest to copy
// the fakeConn here.
// Todo: move all the subscription related test here
type fakeConn struct {
w io.Writer
}
Expand All @@ -61,42 +58,6 @@ func (fc *fakeConn) Equal(other jsonrpc.Conn) bool {
return fc.w == fc2.w
}

type fakeSyncer struct {
newHeads *feed.Feed[*core.Header]
pendingTxs *feed.Feed[[]core.Transaction]
}

func newFakeSyncer() *fakeSyncer {
return &fakeSyncer{
newHeads: feed.New[*core.Header](),
pendingTxs: feed.New[[]core.Transaction](),
}
}

func (fs *fakeSyncer) SubscribeNewHeads() sync.HeaderSubscription {
return sync.HeaderSubscription{Subscription: fs.newHeads.Subscribe()}
}

func (fs *fakeSyncer) StartingBlockNumber() (uint64, error) {
return 0, nil
}

func (fs *fakeSyncer) HighestBlockHeader() *core.Header {
return nil
}

func (fs *fakeSyncer) Pending() (*sync.Pending, error) {
return nil, fmt.Errorf("not implemented")
}

func (fs *fakeSyncer) PendingBlock() *core.Block {
return nil
}

func (fs *fakeSyncer) PendingState() (core.StateReader, func() error, error) {
return nil, nil, fmt.Errorf("not implemented")
}

func TestSubscribeEvents(t *testing.T) {
log := utils.NewNopZapLogger()

Expand Down Expand Up @@ -848,28 +809,6 @@ func subMsg(method string) string {
return fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":%q}`, method)
}

func testHeader(t *testing.T) *core.Header {
t.Helper()

header := &core.Header{
Hash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"),
ParentHash: utils.HexToFelt(t, "0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb"),
Number: 2,
GlobalStateRoot: utils.HexToFelt(t, "0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9"),
Timestamp: 1637084470,
SequencerAddress: utils.HexToFelt(t, "0x0"),
L1DataGasPrice: &core.GasPrice{
PriceInFri: utils.HexToFelt(t, "0x0"),
PriceInWei: utils.HexToFelt(t, "0x0"),
},
GasPrice: utils.HexToFelt(t, "0x0"),
GasPriceSTRK: utils.HexToFelt(t, "0x0"),
L1DAMode: core.Calldata,
ProtocolVersion: "",
}
return header
}

func newHeadsResponse(id uint64) string {
return fmt.Sprintf(`{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}`, id)
}
Expand Down Expand Up @@ -925,7 +864,7 @@ func TestSubscribeTxStatusAndUnsubscribe(t *testing.T) {
Handler: handler.Unsubscribe,
}))

ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger())
ws := jsonrpc.NewWebsocket(server, nil, utils.NewNopZapLogger())
httpSrv := httptest.NewServer(ws)

// default returns from mocks
Expand Down
12 changes: 3 additions & 9 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,21 +568,15 @@ func (s *Synchronizer) HighestBlockHeader() *core.Header {
}

func (s *Synchronizer) SubscribeNewHeads() HeaderSubscription {
return HeaderSubscription{
Subscription: s.newHeads.Subscribe(),
}
return HeaderSubscription{s.newHeads.Subscribe()}
}

func (s *Synchronizer) SubscribeReorg() ReorgSubscription {
return ReorgSubscription{
Subscription: s.reorgFeed.Subscribe(),
}
return ReorgSubscription{s.reorgFeed.Subscribe()}
}

func (s *Synchronizer) SubscribePendingTxs() PendingTxSubscription {
return PendingTxSubscription{
Subscription: s.pendingTxsFeed.Subscribe(),
}
return PendingTxSubscription{s.pendingTxsFeed.Subscribe()}
}

// StorePending stores a pending block given that it is for the next height
Expand Down

0 comments on commit 20c0587

Please sign in to comment.