Skip to content

Commit

Permalink
starknet_subscribeTransactionStatus websocket method
Browse files Browse the repository at this point in the history
  • Loading branch information
weiihann authored and pnowosie committed Dec 13, 2024
1 parent 4ff174d commit e65a8a4
Show file tree
Hide file tree
Showing 10 changed files with 543 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"revert_error": "This is hand-made transaction used for txStatus endpoint test",
"execution_status": "REJECTED",
"finality_status": "ACCEPTED_ON_L1",
"status": "REVERTED",
"block_hash": "0x111100000000111100000000333300000000444400000000111100000000111",
"block_number": 304740,
"transaction_index": 1,
"transaction_hash": "0x111100000000222200000000333300000000444400000000555500000000fff",
"l2_to_l1_messages": [],
"events": [],
"actual_fee": "0x247aff6e224"
}
2 changes: 1 addition & 1 deletion docs/docs/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ docker logs -f juno
<details>
<summary>How can I get real-time updates of new blocks?</summary>

The [WebSocket](websocket#subscribe-to-newly-created-blocks) interface provides a `juno_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain.
The [WebSocket](websocket#subscribe-to-newly-created-blocks) interface provides a `starknet_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain.

</details>

Expand Down
64 changes: 58 additions & 6 deletions docs/docs/websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,15 @@ Get the most recent accepted block hash and number with the `starknet_blockHashA

## Subscribe to newly created blocks

The WebSocket server provides a `juno_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain:
The WebSocket server provides a `starknet_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain:

<Tabs>
<TabItem value="request" label="Request">

```json
{
"jsonrpc": "2.0",
"method": "juno_subscribeNewHeads",
"params": [],
"method": "starknet_subscribeNewHeads",
"id": 1
}
```
Expand All @@ -129,7 +128,7 @@ When a new block is added, you will receive a message like this:
```json
{
"jsonrpc": "2.0",
"method": "juno_subscribeNewHeads",
"method": "starknet_subscriptionNewHeads",
"params": {
"result": {
"block_hash": "0x840660a07a17ae6a55d39fb6d366698ecda11e02280ca3e9ca4b4f1bad741c",
Expand All @@ -149,12 +148,65 @@ When a new block is added, you will receive a message like this:
"l1_da_mode": "BLOB",
"starknet_version": "0.13.1.1"
},
"subscription": 16570962336122680234
"subscription_id": 16570962336122680234
}
}
```

## Subscribe to transaction status changes

The WebSocket server provides a `starknet_subscribeTransactionStatus` method that emits an event when a transaction status changes:

<Tabs>
<TabItem value="request" label="Request">

```json
{
"jsonrpc": "2.0",
"method": "starknet_subscribeTransactionStatus",
"params": [
{
"transaction_hash": "0x631333277e88053336d8c302630b4420dc3ff24018a1c464da37d5e36ea19df"
}
],
"id": 1
}
```

</TabItem>
<TabItem value="response" label="Response">

```json
{
"jsonrpc": "2.0",
"result": 16570962336122680234,
"id": 1
}
```

</TabItem>
</Tabs>

When a transaction get a new status, you will receive a message like this:

```json
{
"jsonrpc": "2.0",
"method": "starknet_subscriptionTransactionsStatus",
"params": {
"result": {
"transaction_hash": "0x631333277e88053336d8c302630b4420dc3ff24018a1c464da37d5e36ea19df",
"status": {
"finality_status": "ACCEPTED_ON_L2",
"execution_status": "SUCCEEDED"
}
},
"subscription_id": 16570962336122680234
}
}
```

## Unsubscribe from newly created blocks
## Unsubscribe from previous subscription

Use the `juno_unsubscribe` method with the `result` value from the subscription response or the `subscription` field from any new block event to stop receiving updates for new blocks:

Expand Down
23 changes: 19 additions & 4 deletions jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,17 @@ func isBatch(reader *bufio.Reader) bool {
return false
}

func isNil(i any) bool {
return i == nil || reflect.ValueOf(i).IsNil()
func isNilOrEmpty(i any) (bool, error) {
if utils.IsNil(i) {
return true, nil
}

switch reflect.TypeOf(i).Kind() {
case reflect.Slice, reflect.Array, reflect.Map:
return reflect.ValueOf(i).Len() == 0, nil
default:
return false, fmt.Errorf("impossible param type: check request.isSane")

Check warning on line 434 in jsonrpc/server.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/server.go#L433-L434

Added lines #L433 - L434 were not covered by tests
}
}

func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, http.Header, error) {
Expand Down Expand Up @@ -471,7 +480,7 @@ func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, ht
header = (tuple[1].Interface()).(http.Header)
}

if errAny := tuple[errorIndex].Interface(); !isNil(errAny) {
if errAny := tuple[errorIndex].Interface(); !utils.IsNil(errAny) {
res.Error = errAny.(*Error)
if res.Error.Code == InternalError {
s.listener.OnRequestFailed(req.Method, res.Error)
Expand All @@ -486,6 +495,7 @@ func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, ht
return res, header, nil
}

//nolint:gocyclo
func (s *Server) buildArguments(ctx context.Context, params any, method Method) ([]reflect.Value, error) {
handlerType := reflect.TypeOf(method.Handler)

Expand All @@ -498,7 +508,12 @@ func (s *Server) buildArguments(ctx context.Context, params any, method Method)
addContext = 1
}

if isNil(params) {
isNilOrEmpty, err := isNilOrEmpty(params)
if err != nil {
return nil, err
}

Check warning on line 514 in jsonrpc/server.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/server.go#L513-L514

Added lines #L513 - L514 were not covered by tests

if isNilOrEmpty {
allParamsAreOptional := utils.All(method.Params, func(p Parameter) bool {
return p.Optional
})
Expand Down
121 changes: 117 additions & 4 deletions rpc/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,20 @@ import (
"github.com/NethermindEth/juno/jsonrpc"
)

const (
MaxBlocksBack = 1024
MaxAddressesInFilter = 1000 // TODO(weiihann): not finalised yet
)

type EventsArg struct {
EventFilter
ResultPageRequest
}

type SubscriptionID struct {
ID uint64 `json:"subscription_id"`
}

type EventFilter struct {
FromBlock *BlockID `json:"from_block"`
ToBlock *BlockID `json:"to_block"`
Expand Down Expand Up @@ -44,10 +53,6 @@ type EventsChunk struct {
ContinuationToken string `json:"continuation_token,omitempty"`
}

type SubscriptionID struct {
ID uint64 `json:"subscription_id"`
}

/****************************************************
Events Handlers
*****************************************************/
Expand Down Expand Up @@ -100,6 +105,87 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context) (uint64, *jsonrpc.Error
return id, nil
}

// SubscribeTxnStatus subscribes to status changes of a transaction. It checks for updates each time a new block is added.
// Subsequent updates are sent only when the transaction status changes.
// The optional block_id parameter is ignored, as status changes are not stored and historical data cannot be sent.
func (h *Handler) SubscribeTxnStatus(ctx context.Context, txHash felt.Felt, _ *BlockID) (*SubscriptionID, *jsonrpc.Error) {
var (
lastKnownStatus, lastSendStatus *TransactionStatus
wrapResult = func(s *TransactionStatus) *NewTransactionStatus {
return &NewTransactionStatus{
TransactionHash: &txHash,
Status: s,
}
}
)

w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}

Check warning on line 125 in rpc/events.go

View check run for this annotation

Codecov / codecov/patch

rpc/events.go#L124-L125

Added lines #L124 - L125 were not covered by tests

id := h.idgen()
subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx)
sub := &subscription{
cancel: subscriptionCtxCancel,
conn: w,
}

lastKnownStatus, rpcErr := h.TransactionStatus(subscriptionCtx, txHash)
if rpcErr != nil {
h.log.Warnw("Failed to get Tx status", "txHash", &txHash, "rpcErr", rpcErr)
return nil, rpcErr
}

h.mu.Lock()
h.subscriptions[id] = sub
h.mu.Unlock()

statusSub := h.txnStatus.Subscribe()
headerSub := h.newHeads.Subscribe()
sub.wg.Go(func() {
defer func() {
h.unsubscribe(sub, id)
statusSub.Unsubscribe()
headerSub.Unsubscribe()
}()

if err := h.sendTxnStatus(sub.conn, wrapResult(lastKnownStatus), id); err != nil {
h.log.Warnw("Error while sending Txn status", "txHash", txHash)
return
}

Check warning on line 156 in rpc/events.go

View check run for this annotation

Codecov / codecov/patch

rpc/events.go#L154-L156

Added lines #L154 - L156 were not covered by tests
lastSendStatus = lastKnownStatus

for {
select {
case <-subscriptionCtx.Done():
return
case <-headerSub.Recv():
lastKnownStatus, rpcErr = h.TransactionStatus(subscriptionCtx, txHash)
if rpcErr != nil {
h.log.Warnw("Failed to get Tx status", "txHash", txHash, "rpcErr", rpcErr)
return
}

Check warning on line 168 in rpc/events.go

View check run for this annotation

Codecov / codecov/patch

rpc/events.go#L166-L168

Added lines #L166 - L168 were not covered by tests

if *lastKnownStatus != *lastSendStatus {
if err := h.sendTxnStatus(sub.conn, wrapResult(lastKnownStatus), id); err != nil {
h.log.Warnw("Error while sending Txn status", "txHash", txHash)
return
}

Check warning on line 174 in rpc/events.go

View check run for this annotation

Codecov / codecov/patch

rpc/events.go#L172-L174

Added lines #L172 - L174 were not covered by tests
lastSendStatus = lastKnownStatus
}

// Stop when final status reached and notified
if isFinal(lastSendStatus) {
return
}
}
}
})

return &SubscriptionID{ID: id}, nil
}

func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Error) {
w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
Expand Down Expand Up @@ -217,3 +303,30 @@ func setEventFilterRange(filter blockchain.EventFilterer, fromID, toID *BlockID,
}
return set(blockchain.EventFilterTo, toID)
}

type NewTransactionStatus struct {
TransactionHash *felt.Felt `json:"transaction_hash"`
Status *TransactionStatus `json:"status"`
}

// sendHeader creates a request and sends it to the client
func (h *Handler) sendTxnStatus(w jsonrpc.Conn, status *NewTransactionStatus, id uint64) error {
resp, err := json.Marshal(jsonrpc.Request{
Version: "2.0",
Method: "starknet_subscriptionTransactionsStatus",
Params: map[string]any{
"subscription_id": id,
"result": status,
},
})
if err != nil {
return err
}

Check warning on line 324 in rpc/events.go

View check run for this annotation

Codecov / codecov/patch

rpc/events.go#L323-L324

Added lines #L323 - L324 were not covered by tests
h.log.Debugw("Sending Txn status", "status", string(resp))
_, err = w.Write(resp)
return err
}

func isFinal(status *TransactionStatus) bool {
return status.Finality == TxnStatusRejected || status.Finality == TxnStatusAcceptedOnL1
}
Loading

0 comments on commit e65a8a4

Please sign in to comment.