Skip to content

Commit

Permalink
starknet_subscribeTransactionStatus websocket method
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowosie committed Dec 23, 2024
1 parent eb1f2ca commit 2d073dd
Show file tree
Hide file tree
Showing 13 changed files with 525 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"revert_error": "This is hand-made transaction used for txStatus endpoint test",
"execution_status": "REJECTED",
"finality_status": "ACCEPTED_ON_L1",
"status": "REVERTED",
"block_hash": "0x111100000000111100000000333300000000444400000000111100000000111",
"block_number": 304740,
"transaction_index": 1,
"transaction_hash": "0x111100000000222200000000333300000000444400000000555500000000fff",
"l2_to_l1_messages": [],
"events": [],
"actual_fee": "0x247aff6e224"
}
2 changes: 1 addition & 1 deletion docs/docs/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ docker logs -f juno
<details>
<summary>How can I get real-time updates of new blocks?</summary>

The [WebSocket](websocket#subscribe-to-newly-created-blocks) interface provides a `juno_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain.
The [WebSocket](websocket#subscribe-to-newly-created-blocks) interface provides a `starknet_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain.

</details>

Expand Down
64 changes: 58 additions & 6 deletions docs/docs/websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,15 @@ Get the most recent accepted block hash and number with the `starknet_blockHashA

## Subscribe to newly created blocks

The WebSocket server provides a `juno_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain:
The WebSocket server provides a `starknet_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain:

<Tabs>
<TabItem value="request" label="Request">

```json
{
"jsonrpc": "2.0",
"method": "juno_subscribeNewHeads",
"params": [],
"method": "starknet_subscribeNewHeads",
"id": 1
}
```
Expand All @@ -129,7 +128,7 @@ When a new block is added, you will receive a message like this:
```json
{
"jsonrpc": "2.0",
"method": "juno_subscribeNewHeads",
"method": "starknet_subscriptionNewHeads",
"params": {
"result": {
"block_hash": "0x840660a07a17ae6a55d39fb6d366698ecda11e02280ca3e9ca4b4f1bad741c",
Expand All @@ -149,12 +148,65 @@ When a new block is added, you will receive a message like this:
"l1_da_mode": "BLOB",
"starknet_version": "0.13.1.1"
},
"subscription": 16570962336122680234
"subscription_id": 16570962336122680234
}
}
```

## Subscribe to transaction status changes

The WebSocket server provides a `starknet_subscribeTransactionStatus` method that emits an event when a transaction status changes:

<Tabs>
<TabItem value="request" label="Request">

```json
{
"jsonrpc": "2.0",
"method": "starknet_subscribeTransactionStatus",
"params": [
{
"transaction_hash": "0x631333277e88053336d8c302630b4420dc3ff24018a1c464da37d5e36ea19df"
}
],
"id": 1
}
```

</TabItem>
<TabItem value="response" label="Response">

```json
{
"jsonrpc": "2.0",
"result": 16570962336122680234,
"id": 1
}
```

</TabItem>
</Tabs>

When a transaction get a new status, you will receive a message like this:

```json
{
"jsonrpc": "2.0",
"method": "starknet_subscriptionTransactionsStatus",
"params": {
"result": {
"transaction_hash": "0x631333277e88053336d8c302630b4420dc3ff24018a1c464da37d5e36ea19df",
"status": {
"finality_status": "ACCEPTED_ON_L2",
"execution_status": "SUCCEEDED"
}
},
"subscription_id": 16570962336122680234
}
}
```

## Unsubscribe from newly created blocks
## Unsubscribe from previous subscription

Use the `juno_unsubscribe` method with the `result` value from the subscription response or the `subscription` field from any new block event to stop receiving updates for new blocks:

Expand Down
8 changes: 2 additions & 6 deletions jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,6 @@ func isBatch(reader *bufio.Reader) bool {
return false
}

func isNil(i any) bool {
return i == nil || reflect.ValueOf(i).IsNil()
}

func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, http.Header, error) {
s.log.Tracew("Received request", "req", req)

Expand Down Expand Up @@ -471,7 +467,7 @@ func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, ht
header = (tuple[1].Interface()).(http.Header)
}

if errAny := tuple[errorIndex].Interface(); !isNil(errAny) {
if errAny := tuple[errorIndex].Interface(); !utils.IsNil(errAny) {
res.Error = errAny.(*Error)
if res.Error.Code == InternalError {
s.listener.OnRequestFailed(req.Method, res.Error)
Expand All @@ -498,7 +494,7 @@ func (s *Server) buildArguments(ctx context.Context, params any, method Method)
addContext = 1
}

if isNil(params) {
if utils.IsNil(params) {
allParamsAreOptional := utils.All(method.Params, func(p Parameter) bool {
return p.Optional
})
Expand Down
8 changes: 4 additions & 4 deletions rpc/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ type EventsArg struct {
ResultPageRequest
}

type SubscriptionID struct {
ID uint64 `json:"subscription_id"`
}

type EventFilter struct {
FromBlock *BlockID `json:"from_block"`
ToBlock *BlockID `json:"to_block"`
Expand Down Expand Up @@ -44,10 +48,6 @@ type EventsChunk struct {
ContinuationToken string `json:"continuation_token,omitempty"`
}

type SubscriptionID struct {
ID uint64 `json:"subscription_id"`
}

/****************************************************
Events Handlers
*****************************************************/
Expand Down
1 change: 1 addition & 0 deletions rpc/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func TestEvents(t *testing.T) {
})
}

// TODO[pnowosie]: Refactor. fakeConn - this is redefined in subscription test, but also used in NewHeads
type fakeConn struct {
w io.Writer
}
Expand Down
15 changes: 12 additions & 3 deletions rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ var (
ErrUnsupportedTxVersion = &jsonrpc.Error{Code: 61, Message: "the transaction version is not supported"}
ErrUnsupportedContractClassVersion = &jsonrpc.Error{Code: 62, Message: "the contract class version is not supported"}
ErrUnexpectedError = &jsonrpc.Error{Code: 63, Message: "An unexpected error occurred"}
ErrTooManyAddressesInFilter = &jsonrpc.Error{Code: 67, Message: "Too many addresses in filter sender_address filter"}
ErrTooManyBlocksBack = &jsonrpc.Error{Code: 68, Message: fmt.Sprintf("Cannot go back more than %v blocks", maxBlocksBack)}
ErrCallOnPending = &jsonrpc.Error{Code: 69, Message: "This method does not support being called on the pending block"}

Expand Down Expand Up @@ -93,8 +94,9 @@ type Handler struct {
vm vm.VM
log utils.Logger

version string
newHeads *feed.Feed[*core.Header]
version string
newHeads *feed.Feed[*core.Header]
pendingTxs *feed.Feed[[]core.Transaction]

idgen func() uint64
mu stdsync.Mutex // protects subscriptions.
Expand Down Expand Up @@ -135,6 +137,7 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V
},
version: version,
newHeads: feed.New[*core.Header](),
pendingTxs: feed.New[[]core.Transaction](),
subscriptions: make(map[uint64]*subscription),

blockTraceCache: lru.NewCache[traceCacheKey, []TracedBlockTransaction](traceCacheSize),
Expand Down Expand Up @@ -177,7 +180,8 @@ func (h *Handler) WithGateway(gatewayClient Gateway) *Handler {
func (h *Handler) Run(ctx context.Context) error {
newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription
defer newHeadsSub.Unsubscribe()
feed.Tee[*core.Header](newHeadsSub, h.newHeads)
feed.Tee(newHeadsSub, h.newHeads)

<-ctx.Done()
for _, sub := range h.subscriptions {
sub.wg.Wait()
Expand Down Expand Up @@ -347,6 +351,11 @@ func (h *Handler) Methods() ([]jsonrpc.Method, string) { //nolint: funlen
Name: "juno_subscribeNewHeads",
Handler: h.SubscribeNewHeads,
},
{
Name: "starknet_subscribeTransactionStatus",
Params: []jsonrpc.Parameter{{Name: "transaction_hash"}, {Name: "block", Optional: true}},
Handler: h.SubscribeTxnStatus,
},
{
Name: "juno_unsubscribe",
Params: []jsonrpc.Parameter{{Name: "id"}},
Expand Down
106 changes: 106 additions & 0 deletions rpc/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,85 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
return &SubscriptionID{ID: id}, nil
}

// SubscribeTxnStatus subscribes to status changes of a transaction. It checks for updates each time a new block is added.
// Subsequent updates are sent only when the transaction status changes.
// The optional block_id parameter is ignored, as status changes are not stored and historical data cannot be sent.
func (h *Handler) SubscribeTxnStatus(ctx context.Context, txHash felt.Felt, _ *BlockID) (*SubscriptionID, *jsonrpc.Error) {
var (
lastKnownStatus, lastSendStatus *TransactionStatus
wrapResult = func(s *TransactionStatus) *NewTransactionStatus {
return &NewTransactionStatus{
TransactionHash: &txHash,
Status: s,
}
}
)

w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}

Check warning on line 124 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L123-L124

Added lines #L123 - L124 were not covered by tests

id := h.idgen()
subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx)
sub := &subscription{
cancel: subscriptionCtxCancel,
conn: w,
}

lastKnownStatus, rpcErr := h.TransactionStatus(subscriptionCtx, txHash)
if rpcErr != nil {
h.log.Errorw("Failed to get Tx status", "txHash", &txHash, "rpcErr", rpcErr)
return nil, rpcErr
}

h.mu.Lock()
h.subscriptions[id] = sub
h.mu.Unlock()

headerSub := h.newHeads.Subscribe()
sub.wg.Go(func() {
defer func() {
h.unsubscribe(sub, id)
headerSub.Unsubscribe()
}()

if err := h.sendTxnStatus(sub.conn, wrapResult(lastKnownStatus), id); err != nil {
h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err)
return
}

Check warning on line 153 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L151-L153

Added lines #L151 - L153 were not covered by tests
lastSendStatus = lastKnownStatus

for {
select {
case <-subscriptionCtx.Done():
return
case <-headerSub.Recv():
lastKnownStatus, rpcErr = h.TransactionStatus(subscriptionCtx, txHash)
if rpcErr != nil {
h.log.Errorw("Failed to get Tx status", "txHash", txHash, "rpcErr", rpcErr)
return
}

Check warning on line 165 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L163-L165

Added lines #L163 - L165 were not covered by tests

if *lastKnownStatus != *lastSendStatus {
if err := h.sendTxnStatus(sub.conn, wrapResult(lastKnownStatus), id); err != nil {
h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err)
return
}

Check warning on line 171 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L169-L171

Added lines #L169 - L171 were not covered by tests
lastSendStatus = lastKnownStatus
}

// Stop when final status reached and notified
if isFinal(lastSendStatus) {
return
}
}
}
})

return &SubscriptionID{ID: id}, nil
}

func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt, keys [][]felt.Felt) {
filter, err := h.bcReader.EventFilter(fromAddr, keys)
if err != nil {
Expand Down Expand Up @@ -182,3 +261,30 @@ func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.Filter
}
return nil
}

type NewTransactionStatus struct {
TransactionHash *felt.Felt `json:"transaction_hash"`
Status *TransactionStatus `json:"status"`
}

// sendTxnStatus creates a response and sends it to the client
func (h *Handler) sendTxnStatus(w jsonrpc.Conn, status *NewTransactionStatus, id uint64) error {
resp, err := json.Marshal(SubscriptionResponse{
Version: "2.0",
Method: "starknet_subscriptionTransactionsStatus",
Params: map[string]any{
"subscription_id": id,
"result": status,
},
})
if err != nil {
return err
}

Check warning on line 282 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L281-L282

Added lines #L281 - L282 were not covered by tests
h.log.Debugw("Sending Txn status", "status", string(resp))
_, err = w.Write(resp)
return err
}

func isFinal(status *TransactionStatus) bool {
return status.Finality == TxnStatusRejected || status.Finality == TxnStatusAcceptedOnL1
}
Loading

0 comments on commit 2d073dd

Please sign in to comment.