diff --git a/node/node.go b/node/node.go index 002fb242a4..38df2d9d16 100644 --- a/node/node.go +++ b/node/node.go @@ -89,6 +89,8 @@ type Config struct { GatewayAPIKey string `mapstructure:"gw-api-key"` GatewayTimeout time.Duration `mapstructure:"gw-timeout"` + + PluginPath string `mapstructure:"plugin-path"` } type Node struct { diff --git a/rpc/events_test.go b/rpc/events_test.go index b95ac94f86..d30e6d78b1 100644 --- a/rpc/events_test.go +++ b/rpc/events_test.go @@ -3,15 +3,11 @@ package rpc_test import ( "context" "fmt" - "io" - "net" + "github.com/NethermindEth/juno/db" "net/http/httptest" "testing" "time" - "github.com/NethermindEth/juno/db" - "github.com/NethermindEth/juno/mocks" - "go.uber.org/mock/gomock" "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/clients/feeder" "github.com/NethermindEth/juno/core" @@ -19,6 +15,7 @@ import ( "github.com/NethermindEth/juno/db/pebble" "github.com/NethermindEth/juno/feed" "github.com/NethermindEth/juno/jsonrpc" + "github.com/NethermindEth/juno/mocks" "github.com/NethermindEth/juno/rpc" adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" "github.com/NethermindEth/juno/sync" @@ -26,14 +23,20 @@ import ( "github.com/coder/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" ) var emptyCommitments = core.BlockCommitments{} const ( - subscribeNewHeads = `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}` - newHeadsResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}` - subscribeResponse = `{"jsonrpc":"2.0","result":{"subscription_id":%d},"id":1}` + unsubscribeMsg = `{"jsonrpc":"2.0","id":1,"method":"juno_unsubscribe","params":[%d]}` + unsubscribeNotFoundResponse = `{"jsonrpc":"2.0","error":{"code":100,"message":"Subscription not found"},"id":1}` + subscribeNewHeads = `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}` + newHeadsResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}` + subscribeResponse = `{"jsonrpc":"2.0","result":{"subscription_id":%d},"id":1}` + subscribeTxStatus = `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeTransactionStatus","params":{"transaction_hash":"%s"}}` + txStatusNotFoundResponse = `{"jsonrpc":"2.0","error":{"code":29,"message":"Transaction hash not found"},"id":1}` + txStatusResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionTransactionsStatus","params":{"result":{"transaction_hash":"%s","status":{"finality_status":"%s","execution_status":"%s"}},"subscription_id":%d}}` ) func TestEvents(t *testing.T) { @@ -627,13 +630,38 @@ func testHeader(t *testing.T) *core.Header { } func setupSubscriptionTest(t *testing.T, ctx context.Context) (*rpc.Handler, *fakeSyncer, *jsonrpc.Server) { + return setupSubscriptionTestWithOptions(t, ctx) +} + +func setupSubscriptionTestWithOptions(t *testing.T, ctx context.Context, srvs ...any) (*rpc.Handler, *fakeSyncer, *jsonrpc.Server) { t.Helper() - log := utils.NewNopZapLogger() - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + var ( + log utils.Logger + chain blockchain.Reader + ) + + for _, srv := range srvs { + switch srv := srv.(type) { + case utils.Logger: + log = srv + case blockchain.Reader: + chain = srv + default: + t.Fatalf("unexpected option type: %T", srv) + } + } + + // provide good defaults + if log == nil { + log = utils.NewNopZapLogger() + } + if chain == nil { + chain = blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + } syncer := newFakeSyncer() - handler := rpc.New(chain, syncer, nil, "", log) + handler := rpc.New(chain, syncer, nil, "", log) go func() { require.NoError(t, handler.Run(ctx)) }() @@ -654,33 +682,111 @@ func sendAndReceiveMessage(t *testing.T, ctx context.Context, conn *websocket.Co return string(response) } -func TestSubscribeTxStatusAndUnsubscribe(t *testing.T) { - t.Parallel() - mockCtrl := gomock.NewController(t) - t.Cleanup(mockCtrl.Finish) +//func TestSubscribeTxStatusAndUnsubscribe(t *testing.T) { +// t.Parallel() +// mockCtrl := gomock.NewController(t) +// t.Cleanup(mockCtrl.Finish) +// +// mockReader := mocks.NewMockReader(mockCtrl) +// +// syncer := newFakeSyncer() +// log, _ := utils.NewZapLogger(utils.INFO, false) +// handler := rpc.New(mockReader, syncer, nil, "", log) +// +// ctx, cancel := context.WithCancel(context.Background()) +// t.Cleanup(cancel) +// +// go func() { +// require.NoError(t, handler.Run(ctx)) +// }() +// // Technically, there's a race between goroutine above and the SubscribeNewHeads call down below. +// // Sleep for a moment just in case. +// time.Sleep(50 * time.Millisecond) +// +// serverConn, clientConn := net.Pipe() +// t.Cleanup(func() { +// require.NoError(t, serverConn.Close()) +// require.NoError(t, clientConn.Close()) +// }) +// +// txnHash := utils.HexToFelt(t, "0x4c5772d1914fe6ce891b64eb35bf3522aeae1315647314aac58b01137607f3f") +// txn := &core.DeployTransaction{TransactionHash: txnHash, Version: (*core.TransactionVersion)(&felt.Zero)} +// receipt := &core.TransactionReceipt{ +// TransactionHash: txnHash, +// Reverted: false, +// } +// +// mockReader.EXPECT().TransactionByHash(txnHash).Return(txn, nil).Times(1) +// mockReader.EXPECT().Receipt(txnHash).Return(receipt, nil, uint64(1), nil).Times(1) +// mockReader.EXPECT().TransactionByHash(gomock.Any()).Return(nil, db.ErrKeyNotFound).AnyTimes() +// +// // Subscribe without setting the connection on the context. +// id, rpcErr := handler.SubscribeTxnStatus(ctx, felt.Zero, nil) +// require.Nil(t, id) +// require.Equal(t, jsonrpc.MethodNotFound, rpcErr.Code) +// +// // Subscribe correctly but for the unknown transaction +// subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) +// id, rpcErr = handler.SubscribeTxnStatus(subCtx, felt.Zero, nil) +// require.Equal(t, rpc.ErrTxnHashNotFound, rpcErr) +// require.Nil(t, id) +// +// // Subscribe correctly for the known transaction +// subCtx = context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) +// id, rpcErr = handler.SubscribeTxnStatus(subCtx, *txnHash, nil) +// require.Nil(t, rpcErr) +// +// // Receive a block header. +// time.Sleep(100 * time.Millisecond) +// got := make([]byte, 0, 300) +// _, err := clientConn.Read(got) +// require.NoError(t, err) +// require.Equal(t, "", string(got)) +// +// // Unsubscribe without setting the connection on the context. +// ok, rpcErr := handler.Unsubscribe(ctx, id.ID) +// require.Equal(t, jsonrpc.MethodNotFound, rpcErr.Code) +// require.False(t, ok) +// +// // Unsubscribe on correct connection with the incorrect id. +// ok, rpcErr = handler.Unsubscribe(subCtx, id.ID+1) +// require.Equal(t, rpc.ErrSubscriptionNotFound, rpcErr) +// require.False(t, ok) +// +// // Unsubscribe on incorrect connection with the correct id. +// subCtx = context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{}) +// ok, rpcErr = handler.Unsubscribe(subCtx, id.ID) +// require.Equal(t, rpc.ErrSubscriptionNotFound, rpcErr) +// require.False(t, ok) +// +// // Unsubscribe on correct connection with the correct id. +// subCtx = context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) +// ok, rpcErr = handler.Unsubscribe(subCtx, id.ID) +// require.Nil(t, rpcErr) +// require.True(t, ok) +//} + +func formatTxStatusResponse(t *testing.T, txnHash *felt.Felt, finality rpc.TxnFinalityStatus, execution rpc.TxnExecutionStatus, id uint64) string { + t.Helper() - mockReader := mocks.NewMockReader(mockCtrl) + finStatusB, err := finality.MarshalText() + require.NoError(t, err) + exeStatusB, err := execution.MarshalText() + require.NoError(t, err) - syncer := newFakeSyncer() - log, _ := utils.NewZapLogger(utils.INFO, false) - handler := rpc.New(mockReader, syncer, nil, "", log) + return fmt.Sprintf(txStatusResponse, txnHash, string(finStatusB), string(exeStatusB), id) +} + +func TestSimpleSubscribeTxStatusAndUnsubscribe(t *testing.T) { + t.Parallel() ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - go func() { - require.NoError(t, handler.Run(ctx)) - }() - // Technically, there's a race between goroutine above and the SubscribeNewHeads call down below. - // Sleep for a moment just in case. - time.Sleep(50 * time.Millisecond) - - serverConn, clientConn := net.Pipe() - t.Cleanup(func() { - require.NoError(t, serverConn.Close()) - require.NoError(t, clientConn.Close()) - }) + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + mockReader := mocks.NewMockReader(mockCtrl) txnHash := utils.HexToFelt(t, "0x4c5772d1914fe6ce891b64eb35bf3522aeae1315647314aac58b01137607f3f") txn := &core.DeployTransaction{TransactionHash: txnHash, Version: (*core.TransactionVersion)(&felt.Zero)} receipt := &core.TransactionReceipt{ @@ -692,105 +798,113 @@ func TestSubscribeTxStatusAndUnsubscribe(t *testing.T) { mockReader.EXPECT().Receipt(txnHash).Return(receipt, nil, uint64(1), nil).Times(1) mockReader.EXPECT().TransactionByHash(gomock.Any()).Return(nil, db.ErrKeyNotFound).AnyTimes() - // Subscribe without setting the connection on the context. - id, rpcErr := handler.SubscribeTxnStatus(ctx, felt.Zero, nil) - require.Nil(t, id) - require.Equal(t, jsonrpc.MethodNotFound, rpcErr.Code) + handler, syncer, server := setupSubscriptionTestWithOptions(t, ctx, mockReader) - // Subscribe correctly but for the unknown transaction - subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) - id, rpcErr = handler.SubscribeTxnStatus(subCtx, felt.Zero, nil) - require.Equal(t, rpc.ErrTxnHashNotFound, rpcErr) - require.Nil(t, id) + require.NoError(t, server.RegisterMethods(jsonrpc.Method{ + Name: "starknet_subscribeTransactionStatus", + Params: []jsonrpc.Parameter{{Name: "transaction_hash"}, {Name: "block", Optional: true}}, + Handler: handler.SubscribeTxnStatus, + }, jsonrpc.Method{ + Name: "juno_unsubscribe", + Params: []jsonrpc.Parameter{{Name: "id"}}, + Handler: handler.Unsubscribe, + })) - // Subscribe correctly for the known transaction - subCtx = context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) - id, rpcErr = handler.SubscribeTxnStatus(subCtx, *txnHash, nil) - require.Nil(t, rpcErr) + ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger()) + httpSrv := httptest.NewServer(ws) - // Receive a block header. - time.Sleep(100 * time.Millisecond) - got := make([]byte, 0, 300) - _, err := clientConn.Read(got) + conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) + require.NoError(t, err) + conn2, _, err := websocket.Dial(ctx, httpSrv.URL, nil) require.NoError(t, err) - require.Equal(t, "", string(got)) - - // Unsubscribe without setting the connection on the context. - ok, rpcErr := handler.Unsubscribe(ctx, id.ID) - require.Equal(t, jsonrpc.MethodNotFound, rpcErr.Code) - require.False(t, ok) - - // Unsubscribe on correct connection with the incorrect id. - ok, rpcErr = handler.Unsubscribe(subCtx, id.ID+1) - require.Equal(t, rpc.ErrSubscriptionNotFound, rpcErr) - require.False(t, ok) - - // Unsubscribe on incorrect connection with the correct id. - subCtx = context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{}) - ok, rpcErr = handler.Unsubscribe(subCtx, id.ID) - require.Equal(t, rpc.ErrSubscriptionNotFound, rpcErr) - require.False(t, ok) - - // Unsubscribe on correct connection with the correct id. - subCtx = context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) - ok, rpcErr = handler.Unsubscribe(subCtx, id.ID) - require.Nil(t, rpcErr) - require.True(t, ok) -} - -func TestSubscribeTxStatusAndUnsubscribeSimple(t *testing.T) { - t.Parallel() - mockCtrl := gomock.NewController(t) - t.Cleanup(mockCtrl.Finish) - - mockReader := mocks.NewMockReader(mockCtrl) - - syncer := newFakeSyncer() - log, _ := utils.NewZapLogger(utils.INFO, false) - handler := rpc.New(mockReader, syncer, nil, "", log) - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - go func() { - require.NoError(t, handler.Run(ctx)) - }() - // Technically, there's a race between goroutine above and the SubscribeNewHeads call down below. - // Sleep for a moment just in case. - time.Sleep(50 * time.Millisecond) + firstID := uint64(1) + secondID := uint64(2) - serverConn, clientConn := net.Pipe() - t.Cleanup(func() { - require.NoError(t, serverConn.Close()) - require.NoError(t, clientConn.Close()) - }) + handler.WithIDGen(func() uint64 { return firstID }) + firstWant := txStatusNotFoundResponse + firstGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(subscribeTxStatus, felt.Zero.String())) + require.NoError(t, err) + require.Equal(t, firstWant, firstGot) - txnHash := utils.HexToFelt(t, "0x4c5772d1914fe6ce891b64eb35bf3522aeae1315647314aac58b01137607f3f") - txn := &core.DeployTransaction{TransactionHash: txnHash, Version: (*core.TransactionVersion)(&felt.Zero)} - receipt := &core.TransactionReceipt{ - TransactionHash: txnHash, - Reverted: false, - } + handler.WithIDGen(func() uint64 { return secondID }) + secondWant := fmt.Sprintf(subscribeResponse, secondID) + secondGot := sendAndReceiveMessage(t, ctx, conn2, fmt.Sprintf(subscribeTxStatus, txnHash)) + require.NoError(t, err) + require.Equal(t, secondWant, secondGot) - mockReader.EXPECT().TransactionByHash(txnHash).Return(txn, nil).Times(1) - mockReader.EXPECT().Receipt(txnHash).Return(receipt, nil, uint64(1), nil).Times(1) - mockReader.EXPECT().TransactionByHash(gomock.Any()).Return(nil, db.ErrKeyNotFound).AnyTimes() + // We subscribed to not existing tx so the subscription is gone + firstUnsubGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(unsubscribeMsg, firstID)) + require.Equal(t, unsubscribeNotFoundResponse, firstUnsubGot) - // Subscribe correctly for the known transaction - subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) - id, rpcErr := handler.SubscribeTxnStatus(subCtx, *txnHash, nil) - require.Nil(t, rpcErr) + // Simulate a new block + _ = syncer + //syncer.newHeads.Send(testHeader(t)) // Receive a block header. - time.Sleep(100 * time.Millisecond) - got := make([]byte, 0, 300) - _, err := clientConn.Read(got) + secondWant = formatTxStatusResponse(t, txnHash, rpc.TxnAcceptedOnL2, rpc.TxnSuccess, secondID) + _, secondHeaderGot, err := conn2.Read(ctx) + secondGot = string(secondHeaderGot) require.NoError(t, err) - require.Equal(t, "", string(got)) + require.Equal(t, secondWant, secondGot) - // Unsubscribe on correct connection with the correct id. - subCtx = context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) - ok, rpcErr := handler.Unsubscribe(subCtx, id.ID) - require.Nil(t, rpcErr) - require.True(t, ok) + // Unsubscribe + require.NoError(t, conn2.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubscribeMsg, secondID)))) } + +//func TestSubscribeTxStatusAndUnsubscribeSimple(t *testing.T) { +// t.Parallel() +// mockCtrl := gomock.NewController(t) +// t.Cleanup(mockCtrl.Finish) +// +// mockReader := mocks.NewMockReader(mockCtrl) +// +// syncer := newFakeSyncer() +// log, _ := utils.NewZapLogger(utils.INFO, false) +// handler := rpc.New(mockReader, syncer, nil, "", log) +// +// ctx, cancel := context.WithCancel(context.Background()) +// t.Cleanup(cancel) +// +// go func() { +// require.NoError(t, handler.Run(ctx)) +// }() +// // Technically, there's a race between goroutine above and the SubscribeNewHeads call down below. +// // Sleep for a moment just in case. +// time.Sleep(50 * time.Millisecond) +// +// serverConn, clientConn := net.Pipe() +// t.Cleanup(func() { +// require.NoError(t, serverConn.Close()) +// require.NoError(t, clientConn.Close()) +// }) +// +// txnHash := utils.HexToFelt(t, "0x4c5772d1914fe6ce891b64eb35bf3522aeae1315647314aac58b01137607f3f") +// txn := &core.DeployTransaction{TransactionHash: txnHash, Version: (*core.TransactionVersion)(&felt.Zero)} +// receipt := &core.TransactionReceipt{ +// TransactionHash: txnHash, +// Reverted: false, +// } +// +// mockReader.EXPECT().TransactionByHash(txnHash).Return(txn, nil).Times(1) +// mockReader.EXPECT().Receipt(txnHash).Return(receipt, nil, uint64(1), nil).Times(1) +// mockReader.EXPECT().TransactionByHash(gomock.Any()).Return(nil, db.ErrKeyNotFound).AnyTimes() +// +// // Subscribe correctly for the known transaction +// subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) +// id, rpcErr := handler.SubscribeTxnStatus(subCtx, *txnHash, nil) +// require.Nil(t, rpcErr) +// +// // Receive a block header. +// time.Sleep(100 * time.Millisecond) +// got := make([]byte, 0, 300) +// _, err := clientConn.Read(got) +// require.NoError(t, err) +// require.Equal(t, "", string(got)) +// +// // Unsubscribe on correct connection with the correct id. +// subCtx = context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) +// ok, rpcErr := handler.Unsubscribe(subCtx, id.ID) +// require.Nil(t, rpcErr) +// require.True(t, ok) +//}