diff --git a/clients/feeder/testdata/mainnet/transaction/0x111100000000222200000000333300000000444400000000555500000000fff.json b/clients/feeder/testdata/mainnet/transaction/0x111100000000222200000000333300000000444400000000555500000000fff.json
new file mode 100644
index 0000000000..5bd7c674c9
--- /dev/null
+++ b/clients/feeder/testdata/mainnet/transaction/0x111100000000222200000000333300000000444400000000555500000000fff.json
@@ -0,0 +1,13 @@
+{
+ "revert_error": "This is hand-made transaction used for txStatus endpoint test",
+ "execution_status": "REJECTED",
+ "finality_status": "ACCEPTED_ON_L1",
+ "status": "REVERTED",
+ "block_hash": "0x111100000000111100000000333300000000444400000000111100000000111",
+ "block_number": 304740,
+ "transaction_index": 1,
+ "transaction_hash": "0x111100000000222200000000333300000000444400000000555500000000fff",
+ "l2_to_l1_messages": [],
+ "events": [],
+ "actual_fee": "0x247aff6e224"
+}
diff --git a/docs/docs/faq.md b/docs/docs/faq.md
index 78828677ce..1ae6c6a56f 100644
--- a/docs/docs/faq.md
+++ b/docs/docs/faq.md
@@ -74,7 +74,7 @@ docker logs -f juno
How can I get real-time updates of new blocks?
-The [WebSocket](websocket#subscribe-to-newly-created-blocks) interface provides a `juno_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain.
+The [WebSocket](websocket#subscribe-to-newly-created-blocks) interface provides a `starknet_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain.
diff --git a/docs/docs/websocket.md b/docs/docs/websocket.md
index ba55e24db8..8c8282cfbc 100644
--- a/docs/docs/websocket.md
+++ b/docs/docs/websocket.md
@@ -96,7 +96,7 @@ Get the most recent accepted block hash and number with the `starknet_blockHashA
## Subscribe to newly created blocks
-The WebSocket server provides a `juno_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain:
+The WebSocket server provides a `starknet_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain:
@@ -104,8 +104,7 @@ The WebSocket server provides a `juno_subscribeNewHeads` method that emits an ev
```json
{
"jsonrpc": "2.0",
- "method": "juno_subscribeNewHeads",
- "params": [],
+ "method": "starknet_subscribeNewHeads",
"id": 1
}
```
@@ -129,7 +128,7 @@ When a new block is added, you will receive a message like this:
```json
{
"jsonrpc": "2.0",
- "method": "juno_subscribeNewHeads",
+ "method": "starknet_subscriptionNewHeads",
"params": {
"result": {
"block_hash": "0x840660a07a17ae6a55d39fb6d366698ecda11e02280ca3e9ca4b4f1bad741c",
@@ -149,12 +148,65 @@ When a new block is added, you will receive a message like this:
"l1_da_mode": "BLOB",
"starknet_version": "0.13.1.1"
},
- "subscription": 16570962336122680234
+ "subscription_id": 16570962336122680234
+ }
+}
+```
+
+## Subscribe to transaction status changes
+
+The WebSocket server provides a `starknet_subscribeTransactionStatus` method that emits an event when a transaction status changes:
+
+
+
+
+```json
+{
+ "jsonrpc": "2.0",
+ "method": "starknet_subscribeTransactionStatus",
+ "params": [
+ {
+ "transaction_hash": "0x631333277e88053336d8c302630b4420dc3ff24018a1c464da37d5e36ea19df"
+ }
+ ],
+ "id": 1
+}
+```
+
+
+
+
+```json
+{
+ "jsonrpc": "2.0",
+ "result": 16570962336122680234,
+ "id": 1
+}
+```
+
+
+
+
+When a transaction get a new status, you will receive a message like this:
+
+```json
+{
+ "jsonrpc": "2.0",
+ "method": "starknet_subscriptionTransactionsStatus",
+ "params": {
+ "result": {
+ "transaction_hash": "0x631333277e88053336d8c302630b4420dc3ff24018a1c464da37d5e36ea19df",
+ "status": {
+ "finality_status": "ACCEPTED_ON_L2",
+ "execution_status": "SUCCEEDED"
+ }
+ },
+ "subscription_id": 16570962336122680234
}
}
```
-## Unsubscribe from newly created blocks
+## Unsubscribe from previous subscription
Use the `juno_unsubscribe` method with the `result` value from the subscription response or the `subscription` field from any new block event to stop receiving updates for new blocks:
diff --git a/jsonrpc/server.go b/jsonrpc/server.go
index c63f15c849..16f42f6027 100644
--- a/jsonrpc/server.go
+++ b/jsonrpc/server.go
@@ -422,10 +422,6 @@ func isBatch(reader *bufio.Reader) bool {
return false
}
-func isNil(i any) bool {
- return i == nil || reflect.ValueOf(i).IsNil()
-}
-
func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, http.Header, error) {
s.log.Tracew("Received request", "req", req)
@@ -471,7 +467,7 @@ func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, ht
header = (tuple[1].Interface()).(http.Header)
}
- if errAny := tuple[errorIndex].Interface(); !isNil(errAny) {
+ if errAny := tuple[errorIndex].Interface(); !utils.IsNil(errAny) {
res.Error = errAny.(*Error)
if res.Error.Code == InternalError {
s.listener.OnRequestFailed(req.Method, res.Error)
@@ -498,7 +494,7 @@ func (s *Server) buildArguments(ctx context.Context, params any, method Method)
addContext = 1
}
- if isNil(params) {
+ if utils.IsNil(params) {
allParamsAreOptional := utils.All(method.Params, func(p Parameter) bool {
return p.Optional
})
diff --git a/rpc/events.go b/rpc/events.go
index a7298486f8..461e0a483c 100644
--- a/rpc/events.go
+++ b/rpc/events.go
@@ -14,6 +14,10 @@ type EventsArg struct {
ResultPageRequest
}
+type SubscriptionID struct {
+ ID uint64 `json:"subscription_id"`
+}
+
type EventFilter struct {
FromBlock *BlockID `json:"from_block"`
ToBlock *BlockID `json:"to_block"`
@@ -44,10 +48,6 @@ type EventsChunk struct {
ContinuationToken string `json:"continuation_token,omitempty"`
}
-type SubscriptionID struct {
- ID uint64 `json:"subscription_id"`
-}
-
/****************************************************
Events Handlers
*****************************************************/
diff --git a/rpc/events_test.go b/rpc/events_test.go
index c2f1417791..9fdb7f4781 100644
--- a/rpc/events_test.go
+++ b/rpc/events_test.go
@@ -215,6 +215,7 @@ func TestEvents(t *testing.T) {
})
}
+// TODO[pnowosie]: Refactor. fakeConn - this is redefined in subscription test, but also used in NewHeads
type fakeConn struct {
w io.Writer
}
diff --git a/rpc/handlers.go b/rpc/handlers.go
index 1cf96b0c21..de1af10bef 100644
--- a/rpc/handlers.go
+++ b/rpc/handlers.go
@@ -66,6 +66,7 @@ var (
ErrUnsupportedTxVersion = &jsonrpc.Error{Code: 61, Message: "the transaction version is not supported"}
ErrUnsupportedContractClassVersion = &jsonrpc.Error{Code: 62, Message: "the contract class version is not supported"}
ErrUnexpectedError = &jsonrpc.Error{Code: 63, Message: "An unexpected error occurred"}
+ ErrTooManyAddressesInFilter = &jsonrpc.Error{Code: 67, Message: "Too many addresses in filter sender_address filter"}
ErrTooManyBlocksBack = &jsonrpc.Error{Code: 68, Message: fmt.Sprintf("Cannot go back more than %v blocks", maxBlocksBack)}
ErrCallOnPending = &jsonrpc.Error{Code: 69, Message: "This method does not support being called on the pending block"}
@@ -93,8 +94,9 @@ type Handler struct {
vm vm.VM
log utils.Logger
- version string
- newHeads *feed.Feed[*core.Header]
+ version string
+ newHeads *feed.Feed[*core.Header]
+ pendingTxs *feed.Feed[[]core.Transaction]
idgen func() uint64
mu stdsync.Mutex // protects subscriptions.
@@ -135,6 +137,7 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V
},
version: version,
newHeads: feed.New[*core.Header](),
+ pendingTxs: feed.New[[]core.Transaction](),
subscriptions: make(map[uint64]*subscription),
blockTraceCache: lru.NewCache[traceCacheKey, []TracedBlockTransaction](traceCacheSize),
@@ -177,7 +180,8 @@ func (h *Handler) WithGateway(gatewayClient Gateway) *Handler {
func (h *Handler) Run(ctx context.Context) error {
newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription
defer newHeadsSub.Unsubscribe()
- feed.Tee[*core.Header](newHeadsSub, h.newHeads)
+ feed.Tee(newHeadsSub, h.newHeads)
+
<-ctx.Done()
for _, sub := range h.subscriptions {
sub.wg.Wait()
@@ -347,6 +351,11 @@ func (h *Handler) Methods() ([]jsonrpc.Method, string) { //nolint: funlen
Name: "juno_subscribeNewHeads",
Handler: h.SubscribeNewHeads,
},
+ {
+ Name: "starknet_subscribeTransactionStatus",
+ Params: []jsonrpc.Parameter{{Name: "transaction_hash"}, {Name: "block", Optional: true}},
+ Handler: h.SubscribeTxnStatus,
+ },
{
Name: "juno_unsubscribe",
Params: []jsonrpc.Parameter{{Name: "id"}},
diff --git a/rpc/subscriptions.go b/rpc/subscriptions.go
index b049c6ce0d..b70881577e 100644
--- a/rpc/subscriptions.go
+++ b/rpc/subscriptions.go
@@ -104,6 +104,85 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
return &SubscriptionID{ID: id}, nil
}
+// SubscribeTxnStatus subscribes to status changes of a transaction. It checks for updates each time a new block is added.
+// Subsequent updates are sent only when the transaction status changes.
+// The optional block_id parameter is ignored, as status changes are not stored and historical data cannot be sent.
+func (h *Handler) SubscribeTxnStatus(ctx context.Context, txHash felt.Felt, _ *BlockID) (*SubscriptionID, *jsonrpc.Error) {
+ var (
+ lastKnownStatus, lastSendStatus *TransactionStatus
+ wrapResult = func(s *TransactionStatus) *NewTransactionStatus {
+ return &NewTransactionStatus{
+ TransactionHash: &txHash,
+ Status: s,
+ }
+ }
+ )
+
+ w, ok := jsonrpc.ConnFromContext(ctx)
+ if !ok {
+ return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
+ }
+
+ id := h.idgen()
+ subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx)
+ sub := &subscription{
+ cancel: subscriptionCtxCancel,
+ conn: w,
+ }
+
+ lastKnownStatus, rpcErr := h.TransactionStatus(subscriptionCtx, txHash)
+ if rpcErr != nil {
+ h.log.Errorw("Failed to get Tx status", "txHash", &txHash, "rpcErr", rpcErr)
+ return nil, rpcErr
+ }
+
+ h.mu.Lock()
+ h.subscriptions[id] = sub
+ h.mu.Unlock()
+
+ headerSub := h.newHeads.Subscribe()
+ sub.wg.Go(func() {
+ defer func() {
+ h.unsubscribe(sub, id)
+ headerSub.Unsubscribe()
+ }()
+
+ if err := h.sendTxnStatus(sub.conn, wrapResult(lastKnownStatus), id); err != nil {
+ h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err)
+ return
+ }
+ lastSendStatus = lastKnownStatus
+
+ for {
+ select {
+ case <-subscriptionCtx.Done():
+ return
+ case <-headerSub.Recv():
+ lastKnownStatus, rpcErr = h.TransactionStatus(subscriptionCtx, txHash)
+ if rpcErr != nil {
+ h.log.Errorw("Failed to get Tx status", "txHash", txHash, "rpcErr", rpcErr)
+ return
+ }
+
+ if *lastKnownStatus != *lastSendStatus {
+ if err := h.sendTxnStatus(sub.conn, wrapResult(lastKnownStatus), id); err != nil {
+ h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err)
+ return
+ }
+ lastSendStatus = lastKnownStatus
+ }
+
+ // Stop when final status reached and notified
+ if isFinal(lastSendStatus) {
+ return
+ }
+ }
+ }
+ })
+
+ return &SubscriptionID{ID: id}, nil
+}
+
func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt, keys [][]felt.Felt) {
filter, err := h.bcReader.EventFilter(fromAddr, keys)
if err != nil {
@@ -182,3 +261,30 @@ func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.Filter
}
return nil
}
+
+type NewTransactionStatus struct {
+ TransactionHash *felt.Felt `json:"transaction_hash"`
+ Status *TransactionStatus `json:"status"`
+}
+
+// sendTxnStatus creates a response and sends it to the client
+func (h *Handler) sendTxnStatus(w jsonrpc.Conn, status *NewTransactionStatus, id uint64) error {
+ resp, err := json.Marshal(SubscriptionResponse{
+ Version: "2.0",
+ Method: "starknet_subscriptionTransactionsStatus",
+ Params: map[string]any{
+ "subscription_id": id,
+ "result": status,
+ },
+ })
+ if err != nil {
+ return err
+ }
+ h.log.Debugw("Sending Txn status", "status", string(resp))
+ _, err = w.Write(resp)
+ return err
+}
+
+func isFinal(status *TransactionStatus) bool {
+ return status.Finality == TxnStatusRejected || status.Finality == TxnStatusAcceptedOnL1
+}
diff --git a/rpc/subscriptions_test.go b/rpc/subscriptions_test.go
index a3ab61fa7c..d1b50efe72 100644
--- a/rpc/subscriptions_test.go
+++ b/rpc/subscriptions_test.go
@@ -3,8 +3,11 @@ package rpc
import (
"context"
"encoding/json"
+ "fmt"
"io"
"net"
+ "net/http"
+ "net/http/httptest"
"testing"
"time"
@@ -12,16 +15,31 @@ import (
"github.com/NethermindEth/juno/clients/feeder"
"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
+ "github.com/NethermindEth/juno/db"
+ "github.com/NethermindEth/juno/db/pebble"
"github.com/NethermindEth/juno/feed"
"github.com/NethermindEth/juno/jsonrpc"
"github.com/NethermindEth/juno/mocks"
adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder"
+ "github.com/NethermindEth/juno/sync"
"github.com/NethermindEth/juno/utils"
+ "github.com/coder/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)
+const (
+ unsubscribeMsg = `{"jsonrpc":"2.0","id":1,"method":"juno_unsubscribe","params":[%d]}`
+ unsubscribeNotFoundResponse = `{"jsonrpc":"2.0","error":{"code":100,"message":"Subscription not found"},"id":1}`
+ subscribeResponse = `{"jsonrpc":"2.0","result":{"subscription_id":%d},"id":1}`
+ subscribeTxStatus = `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeTransactionStatus","params":{"transaction_hash":"%s"}}`
+ txStatusNotFoundResponse = `{"jsonrpc":"2.0","error":{"code":29,"message":"Transaction hash not found"},"id":1}`
+ txStatusResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionTransactionsStatus","params":{"result":{"transaction_hash":"%s","status":{%s}},"subscription_id":%d}}`
+ txStatusStatusBothStatuses = `"finality_status":"%s","execution_status":"%s"`
+ txStatusStatusRejected = `"finality_status":"%s","failure_reason":"%s"`
+)
+
// Due to the difference in how some test files in rpc use "package rpc" vs "package rpc_test" it was easiest to copy
// the fakeConn here.
// Todo: move all the subscription related test here
@@ -41,6 +59,42 @@ func (fc *fakeConn) Equal(other jsonrpc.Conn) bool {
return fc.w == fc2.w
}
+type fakeSyncer struct {
+ newHeads *feed.Feed[*core.Header]
+ pendingTxs *feed.Feed[[]core.Transaction]
+}
+
+func newFakeSyncer() *fakeSyncer {
+ return &fakeSyncer{
+ newHeads: feed.New[*core.Header](),
+ pendingTxs: feed.New[[]core.Transaction](),
+ }
+}
+
+func (fs *fakeSyncer) SubscribeNewHeads() sync.HeaderSubscription {
+ return sync.HeaderSubscription{Subscription: fs.newHeads.Subscribe()}
+}
+
+func (fs *fakeSyncer) StartingBlockNumber() (uint64, error) {
+ return 0, nil
+}
+
+func (fs *fakeSyncer) HighestBlockHeader() *core.Header {
+ return nil
+}
+
+func (fs *fakeSyncer) Pending() (*sync.Pending, error) {
+ return nil, fmt.Errorf("not implemented")
+}
+
+func (fs *fakeSyncer) PendingBlock() *core.Block {
+ return nil
+}
+
+func (fs *fakeSyncer) PendingState() (core.StateReader, func() error, error) {
+ return nil, nil, fmt.Errorf("not implemented")
+}
+
func TestSubscribeEvents(t *testing.T) {
log := utils.NewNopZapLogger()
@@ -326,6 +380,184 @@ func TestSubscribeEvents(t *testing.T) {
})
}
+func TestSubscribeTxStatusAndUnsubscribe(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+
+ mockCtrl := gomock.NewController(t)
+ t.Cleanup(mockCtrl.Finish)
+
+ mockReader := mocks.NewMockReader(mockCtrl)
+ handler, syncer, server := setupSubscriptionTest(t, ctx, mockReader)
+
+ require.NoError(t, server.RegisterMethods(jsonrpc.Method{
+ Name: "starknet_subscribeTransactionStatus",
+ Params: []jsonrpc.Parameter{{Name: "transaction_hash"}, {Name: "block", Optional: true}},
+ Handler: handler.SubscribeTxnStatus,
+ }, jsonrpc.Method{
+ Name: "juno_unsubscribe",
+ Params: []jsonrpc.Parameter{{Name: "id"}},
+ Handler: handler.Unsubscribe,
+ }))
+
+ ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger())
+ httpSrv := httptest.NewServer(ws)
+
+ // default returns from mocks
+ txnHash := utils.HexToFelt(t, "0x111100000000111100000000111100000000111100000000111100000000111")
+ txn := &core.DeployTransaction{TransactionHash: txnHash, Version: (*core.TransactionVersion)(&felt.Zero)}
+ receipt := &core.TransactionReceipt{
+ TransactionHash: txnHash,
+ Reverted: false,
+ }
+ mockReader.EXPECT().TransactionByHash(txnHash).Return(txn, nil).AnyTimes()
+ mockReader.EXPECT().Receipt(txnHash).Return(receipt, nil, uint64(1), nil).AnyTimes()
+ mockReader.EXPECT().TransactionByHash(&felt.Zero).Return(nil, db.ErrKeyNotFound).AnyTimes()
+
+ firstID := uint64(1)
+ secondID := uint64(2)
+
+ t.Run("simple subscribe and unsubscribe", func(t *testing.T) {
+ conn1, resp1, err := websocket.Dial(ctx, httpSrv.URL, nil)
+ require.NoError(t, err)
+ defer bodyCloser(t, resp1)
+
+ conn2, resp2, err := websocket.Dial(ctx, httpSrv.URL, nil)
+ require.NoError(t, err)
+ defer bodyCloser(t, resp2)
+
+ handler.WithIDGen(func() uint64 { return firstID })
+ firstWant := txStatusNotFoundResponse
+ // Notice we subscribe for non-existing tx, we expect automatic unsubscribe
+ firstGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(subscribeTxStatus, felt.Zero.String()))
+ require.NoError(t, err)
+ require.Equal(t, firstWant, firstGot)
+
+ handler.WithIDGen(func() uint64 { return secondID })
+ secondWant := fmt.Sprintf(subscribeResponse, secondID)
+ secondGot := sendAndReceiveMessage(t, ctx, conn2, fmt.Sprintf(subscribeTxStatus, txnHash))
+ require.NoError(t, err)
+ require.Equal(t, secondWant, secondGot)
+
+ // as expected the subscription is gone
+ firstUnsubGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(unsubscribeMsg, firstID))
+ require.Equal(t, unsubscribeNotFoundResponse, firstUnsubGot)
+
+ // Receive a block header.
+ secondWant = formatTxStatusResponse(t, txnHash, TxnStatusAcceptedOnL2, TxnSuccess, secondID)
+ _, secondHeaderGot, err := conn2.Read(ctx)
+ secondGot = string(secondHeaderGot)
+ require.NoError(t, err)
+ require.Equal(t, secondWant, secondGot)
+
+ // Unsubscribe
+ require.NoError(t, conn2.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubscribeMsg, secondID))))
+ })
+
+ t.Run("no update is sent when status has not changed", func(t *testing.T) {
+ conn1, resp1, err := websocket.Dial(ctx, httpSrv.URL, nil)
+ require.NoError(t, err)
+ defer bodyCloser(t, resp1)
+
+ handler.WithIDGen(func() uint64 { return firstID })
+ firstWant := fmt.Sprintf(subscribeResponse, firstID)
+ firstGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(subscribeTxStatus, txnHash))
+ require.NoError(t, err)
+ require.Equal(t, firstWant, firstGot)
+
+ firstStatusWant := formatTxStatusResponse(t, txnHash, TxnStatusAcceptedOnL2, TxnSuccess, firstID)
+ _, firstStatusGot, err := conn1.Read(ctx)
+ require.NoError(t, err)
+ require.Equal(t, firstStatusWant, string(firstStatusGot))
+
+ // Simulate a new block
+ syncer.newHeads.Send(testHeader(t))
+
+ // expected no status is send
+ timeoutCtx, toCancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+ defer toCancel()
+ _, _, err = conn1.Read(timeoutCtx)
+ require.Regexp(t, "failed to get reader: ", err.Error())
+
+ // at this time connection is closed
+ require.EqualError(t,
+ conn1.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubscribeMsg, firstID))),
+ "failed to write msg: use of closed network connection")
+ })
+
+ t.Run("update is only sent when new status is different", func(t *testing.T) {
+ conn1, resp1, err := websocket.Dial(ctx, httpSrv.URL, nil)
+ require.NoError(t, err)
+ defer bodyCloser(t, resp1)
+
+ otherTxn := utils.HexToFelt(t, "0x222200000000111100000000222200000000111100000000111100000000222")
+ someBlkHash := utils.HexToFelt(t, "0x333300000000111100000000222200000000333300000000111100000000fff")
+ txn := &core.DeployTransaction{TransactionHash: txnHash, Version: (*core.TransactionVersion)(&felt.Zero)}
+ receipt := &core.TransactionReceipt{
+ TransactionHash: otherTxn,
+ Reverted: false,
+ }
+ mockReader.EXPECT().TransactionByHash(otherTxn).Return(txn, nil).Times(2)
+ mockReader.EXPECT().Receipt(otherTxn).Return(receipt, someBlkHash, uint64(1), nil).Times(2)
+ mockReader.EXPECT().L1Head().Return(&core.L1Head{BlockNumber: 0}, nil)
+
+ handler.WithIDGen(func() uint64 { return firstID })
+ firstWant := fmt.Sprintf(subscribeResponse, firstID)
+ firstGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(subscribeTxStatus, otherTxn))
+ require.NoError(t, err)
+ require.Equal(t, firstWant, firstGot)
+
+ firstStatusWant := formatTxStatusResponse(t, otherTxn, TxnStatusAcceptedOnL2, TxnSuccess, firstID)
+ _, firstStatusGot, err := conn1.Read(ctx)
+ require.NoError(t, err)
+ require.Equal(t, firstStatusWant, string(firstStatusGot))
+
+ mockReader.EXPECT().L1Head().Return(&core.L1Head{BlockNumber: 5}, nil).Times(1)
+ syncer.newHeads.Send(testHeader(t))
+
+ secondStatusWant := formatTxStatusResponse(t, otherTxn, TxnStatusAcceptedOnL1, TxnSuccess, firstID)
+ _, secondStatusGot, err := conn1.Read(ctx)
+ require.NoError(t, err)
+ require.Equal(t, secondStatusWant, string(secondStatusGot))
+
+ // second status is final - subcription should be automatically removed
+ thirdUnsubGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(unsubscribeMsg, firstID))
+ require.Equal(t, unsubscribeNotFoundResponse, thirdUnsubGot)
+ })
+
+ t.Run("subscription ends when tx reaches final status", func(t *testing.T) {
+ conn1, resp1, err := websocket.Dial(ctx, httpSrv.URL, nil)
+ require.NoError(t, err)
+ defer bodyCloser(t, resp1)
+
+ revertedTxn := utils.HexToFelt(t, "0x111100000000222200000000333300000000444400000000555500000000fff")
+ mockReader.EXPECT().TransactionByHash(revertedTxn).Return(nil, db.ErrKeyNotFound).Times(2)
+
+ handler.WithIDGen(func() uint64 { return firstID })
+ handler.WithFeeder(feeder.NewTestClient(t, &utils.Mainnet))
+ defer handler.WithFeeder(nil)
+
+ firstWant := fmt.Sprintf(subscribeResponse, firstID)
+ firstGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(subscribeTxStatus, revertedTxn))
+ require.NoError(t, err)
+ require.Equal(t, firstWant, firstGot)
+
+ firstStatusWant := formatTxStatusResponse(t, revertedTxn, TxnStatusRejected, TxnFailure, firstID, "This is hand-made transaction used for txStatus endpoint test")
+ _, firstStatusGot, err := conn1.Read(ctx)
+ require.NoError(t, err)
+ require.Equal(t, firstStatusWant, string(firstStatusGot))
+
+ // final status will be discovered after a new head is received
+ syncer.newHeads.Send(testHeader(t))
+ // and wait a bit for the subscription to process the event
+ time.Sleep(50 * time.Millisecond)
+
+ // second status is final - subcription should be automatically removed
+ thirdUnsubGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(unsubscribeMsg, firstID))
+ require.Equal(t, unsubscribeNotFoundResponse, thirdUnsubGot)
+ })
+}
+
func marshalSubscriptionResponse(e *EmittedEvent, id uint64) ([]byte, error) {
return json.Marshal(SubscriptionResponse{
Version: "2.0",
@@ -336,3 +568,96 @@ func marshalSubscriptionResponse(e *EmittedEvent, id uint64) ([]byte, error) {
},
})
}
+
+func setupSubscriptionTest(t *testing.T, ctx context.Context, srvs ...any) (*Handler, *fakeSyncer, *jsonrpc.Server) {
+ t.Helper()
+
+ var (
+ log utils.Logger
+ chain blockchain.Reader
+ )
+
+ for _, srv := range srvs {
+ switch srv := srv.(type) {
+ case utils.Logger:
+ log = srv
+ case blockchain.Reader:
+ chain = srv
+ default:
+ t.Fatalf("unexpected option type: %T", srv)
+ }
+ }
+
+ // provide good defaults
+ if log == nil {
+ log = utils.NewNopZapLogger()
+ }
+ if chain == nil {
+ chain = blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil)
+ }
+ syncer := newFakeSyncer()
+
+ handler := New(chain, syncer, nil, "", log)
+ go func() {
+ require.NoError(t, handler.Run(ctx))
+ }()
+ time.Sleep(50 * time.Millisecond)
+
+ server := jsonrpc.NewServer(1, log)
+
+ return handler, syncer, server
+}
+
+func sendAndReceiveMessage(t *testing.T, ctx context.Context, conn *websocket.Conn, message string) string {
+ t.Helper()
+
+ require.NoError(t, conn.Write(ctx, websocket.MessageText, []byte(message)))
+
+ _, response, err := conn.Read(ctx)
+ require.NoError(t, err)
+ return string(response)
+}
+
+func formatTxStatusResponse(t *testing.T, txnHash *felt.Felt, finality TxnStatus, execution TxnExecutionStatus, id uint64, reason ...string) string {
+ t.Helper()
+
+ finStatusB, err := finality.MarshalText()
+ require.NoError(t, err)
+ exeStatusB, err := execution.MarshalText()
+ require.NoError(t, err)
+
+ statusBody := fmt.Sprintf(txStatusStatusBothStatuses, string(finStatusB), string(exeStatusB))
+ if finality == TxnStatusRejected {
+ statusBody = fmt.Sprintf(txStatusStatusRejected, string(finStatusB), reason[0])
+ }
+ return fmt.Sprintf(txStatusResponse, txnHash, statusBody, id)
+}
+
+func testHeader(t *testing.T) *core.Header {
+ t.Helper()
+
+ header := &core.Header{
+ Hash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"),
+ ParentHash: utils.HexToFelt(t, "0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb"),
+ Number: 2,
+ GlobalStateRoot: utils.HexToFelt(t, "0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9"),
+ Timestamp: 1637084470,
+ SequencerAddress: utils.HexToFelt(t, "0x0"),
+ L1DataGasPrice: &core.GasPrice{
+ PriceInFri: utils.HexToFelt(t, "0x0"),
+ PriceInWei: utils.HexToFelt(t, "0x0"),
+ },
+ GasPrice: utils.HexToFelt(t, "0x0"),
+ GasPriceSTRK: utils.HexToFelt(t, "0x0"),
+ L1DAMode: core.Calldata,
+ ProtocolVersion: "",
+ }
+ return header
+}
+
+// bodyCloser is making linter happy and closes response body
+func bodyCloser(t *testing.T, resp *http.Response) {
+ if resp.Body != nil {
+ require.NoError(t, resp.Body.Close())
+ }
+}
diff --git a/rpc/transaction.go b/rpc/transaction.go
index 0610671299..f9416c8d57 100644
--- a/rpc/transaction.go
+++ b/rpc/transaction.go
@@ -616,8 +616,9 @@ func (h *Handler) TransactionStatus(ctx context.Context, hash felt.Felt) (*Trans
switch txErr {
case nil:
return &TransactionStatus{
- Finality: TxnStatus(receipt.FinalityStatus),
- Execution: receipt.ExecutionStatus,
+ Finality: TxnStatus(receipt.FinalityStatus),
+ Execution: receipt.ExecutionStatus,
+ FailureReason: receipt.RevertReason,
}, nil
case ErrTxnHashNotFound:
if h.feederClient == nil {
diff --git a/sync/sync_test.go b/sync/sync_test.go
index ab97fc322b..6b0498af29 100644
--- a/sync/sync_test.go
+++ b/sync/sync_test.go
@@ -196,103 +196,3 @@ func TestSubscribeNewHeads(t *testing.T) {
require.Equal(t, want.Header, got)
sub.Unsubscribe()
}
-
-func TestPendingSync(t *testing.T) {
- t.Parallel()
-
- client := feeder.NewTestClient(t, &utils.Mainnet)
- gw := adaptfeeder.New(client)
-
- var synchronizer *sync.Synchronizer
- testDB := pebble.NewMemTest(t)
- log := utils.NewNopZapLogger()
- bc := blockchain.New(testDB, &utils.Mainnet, synchronizer.PendingBlock)
- synchronizer = sync.New(bc, gw, log, time.Millisecond*100, false, testDB)
- ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
-
- require.NoError(t, synchronizer.Run(ctx))
- cancel()
-
- head, err := bc.HeadsHeader()
- require.NoError(t, err)
- pending, err := synchronizer.Pending()
- require.NoError(t, err)
- assert.Equal(t, head.Hash, pending.Block.ParentHash)
-}
-
-func TestPending(t *testing.T) {
- client := feeder.NewTestClient(t, &utils.Mainnet)
- gw := adaptfeeder.New(client)
-
- var synchronizer *sync.Synchronizer
- testDB := pebble.NewMemTest(t)
- chain := blockchain.New(testDB, &utils.Mainnet, synchronizer.PendingBlock)
- synchronizer = sync.New(chain, gw, utils.NewNopZapLogger(), 0, false, testDB)
-
- b, err := gw.BlockByNumber(context.Background(), 0)
- require.NoError(t, err)
- su, err := gw.StateUpdate(context.Background(), 0)
- require.NoError(t, err)
-
- t.Run("pending state shouldnt exist if no pending block", func(t *testing.T) {
- _, _, err = synchronizer.PendingState()
- require.Error(t, err)
- })
-
- t.Run("cannot store unsupported pending block version", func(t *testing.T) {
- pending := &sync.Pending{Block: &core.Block{Header: &core.Header{ProtocolVersion: "1.9.0"}}}
- require.Error(t, synchronizer.StorePending(pending))
- })
-
- t.Run("store genesis as pending", func(t *testing.T) {
- pendingGenesis := &sync.Pending{
- Block: b,
- StateUpdate: su,
- }
- require.NoError(t, synchronizer.StorePending(pendingGenesis))
-
- gotPending, pErr := synchronizer.Pending()
- require.NoError(t, pErr)
- assert.Equal(t, pendingGenesis, gotPending)
- })
-
- require.NoError(t, chain.Store(b, &core.BlockCommitments{}, su, nil))
-
- t.Run("storing a pending too far into the future should fail", func(t *testing.T) {
- b, err = gw.BlockByNumber(context.Background(), 2)
- require.NoError(t, err)
- su, err = gw.StateUpdate(context.Background(), 2)
- require.NoError(t, err)
-
- notExpectedPending := sync.Pending{
- Block: b,
- StateUpdate: su,
- }
- require.ErrorIs(t, synchronizer.StorePending(¬ExpectedPending), blockchain.ErrParentDoesNotMatchHead)
- })
-
- t.Run("store expected pending block", func(t *testing.T) {
- b, err = gw.BlockByNumber(context.Background(), 1)
- require.NoError(t, err)
- su, err = gw.StateUpdate(context.Background(), 1)
- require.NoError(t, err)
-
- expectedPending := &sync.Pending{
- Block: b,
- StateUpdate: su,
- }
- require.NoError(t, synchronizer.StorePending(expectedPending))
-
- gotPending, pErr := synchronizer.Pending()
- require.NoError(t, pErr)
- assert.Equal(t, expectedPending, gotPending)
- })
-
- t.Run("get pending state", func(t *testing.T) {
- _, pendingStateCloser, pErr := synchronizer.PendingState()
- t.Cleanup(func() {
- require.NoError(t, pendingStateCloser())
- })
- require.NoError(t, pErr)
- })
-}
diff --git a/utils/check.go b/utils/nil.go
similarity index 100%
rename from utils/check.go
rename to utils/nil.go
diff --git a/utils/check_test.go b/utils/nil_test.go
similarity index 100%
rename from utils/check_test.go
rename to utils/nil_test.go