Skip to content

Commit

Permalink
Add subscribeEvents and subcriptionEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
IronGauntlets committed Nov 22, 2024
1 parent 9b033c5 commit 6ad3741
Show file tree
Hide file tree
Showing 4 changed files with 530 additions and 0 deletions.
4 changes: 4 additions & 0 deletions rpc/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type EventsChunk struct {
ContinuationToken string `json:"continuation_token,omitempty"`
}

type SubscriptionID struct {
ID uint64 `json:"subscription_id"`
}

/****************************************************
Events Handlers
*****************************************************/
Expand Down
8 changes: 8 additions & 0 deletions rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,15 @@ var (
ErrUnsupportedTxVersion = &jsonrpc.Error{Code: 61, Message: "the transaction version is not supported"}
ErrUnsupportedContractClassVersion = &jsonrpc.Error{Code: 62, Message: "the contract class version is not supported"}
ErrUnexpectedError = &jsonrpc.Error{Code: 63, Message: "An unexpected error occurred"}
ErrTooManyBlocksBack = &jsonrpc.Error{Code: 68, Message: "Cannot go back more than 1024 blocks"}
ErrCallOnPending = &jsonrpc.Error{Code: 69, Message: "This method does not support being called on the pending block"}

// These errors can be only be returned by Juno-specific methods.
ErrSubscriptionNotFound = &jsonrpc.Error{Code: 100, Message: "Subscription not found"}
)

const (
maxBlocksBack = 1024
maxEventChunkSize = 10240
maxEventFilterKeys = 1024
traceCacheSize = 128
Expand Down Expand Up @@ -334,6 +337,11 @@ func (h *Handler) Methods() ([]jsonrpc.Method, string) { //nolint: funlen
Name: "starknet_specVersion",
Handler: h.SpecVersion,
},
{
Name: "starknet_subscribeEvents",
Params: []jsonrpc.Parameter{{Name: "from_address"}, {Name: "keys"}, {Name: "block", Optional: true}},
Handler: h.SubscribeEvents,
},
{
Name: "juno_subscribeNewHeads",
Handler: h.SubscribeNewHeads,
Expand Down
180 changes: 180 additions & 0 deletions rpc/subscriptions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package rpc

import (
"context"
"encoding/json"
"sync"

"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/jsonrpc"
)

const subscribeEventsChunkSize = 1024

func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys [][]felt.Felt,
blockID *BlockID,
) (*SubscriptionID, *jsonrpc.Error) {
w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}

Check warning on line 22 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L21-L22

Added lines #L21 - L22 were not covered by tests

lenKeys := len(keys)
for _, k := range keys {
lenKeys += len(k)
}
if lenKeys > maxEventFilterKeys {
return nil, ErrTooManyKeysInFilter
}

var requestedHeader *core.Header
headHeader, err := h.bcReader.HeadsHeader()
if err != nil {
return nil, ErrInternal.CloneWithData(err.Error())
}

Check warning on line 36 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L35-L36

Added lines #L35 - L36 were not covered by tests

if blockID == nil {
requestedHeader = headHeader
} else {
if blockID.Pending {
return nil, ErrCallOnPending
}

var rpcErr *jsonrpc.Error
requestedHeader, rpcErr = h.blockHeaderByID(blockID)
if rpcErr != nil {
return nil, rpcErr
}

Check warning on line 49 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L48-L49

Added lines #L48 - L49 were not covered by tests

if headHeader.Number >= maxBlocksBack && requestedHeader.Number <= headHeader.Number-maxBlocksBack {
return nil, ErrTooManyBlocksBack
}
}

id := h.idgen()
subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx)
sub := &subscription{
cancel: subscriptionCtxCancel,
conn: w,
}
h.mu.Lock()
h.subscriptions[id] = sub
h.mu.Unlock()

headerSub := h.newHeads.Subscribe()
sub.wg.Go(func() {
defer func() {
h.unsubscribe(sub, id)
headerSub.Unsubscribe()
}()

// The specification doesn't enforce ordering of events therefore events from new blocks can be sent before
// old blocks.
// Todo: see if sub's wg can be used?
wg := sync.WaitGroup{}
wg.Add(1)

go func() {
defer wg.Done()

for {
select {
case <-subscriptionCtx.Done():
return
case header := <-headerSub.Recv():

h.processEvents(subscriptionCtx, w, id, header.Number, header.Number, fromAddr, keys)
}
}
}()

h.processEvents(subscriptionCtx, w, id, requestedHeader.Number, headHeader.Number, fromAddr, keys)

wg.Wait()
})

return &SubscriptionID{ID: id}, nil
}

func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt, keys [][]felt.Felt) {
filter, err := h.bcReader.EventFilter(fromAddr, keys)
if err != nil {
h.log.Warnw("Error creating event filter", "err", err)
return
}

Check warning on line 106 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L104-L106

Added lines #L104 - L106 were not covered by tests

defer func() {
h.callAndLogErr(filter.Close, "Error closing event filter in events subscription")
}()

if err = setEventFilterRange(filter, &BlockID{Number: from}, &BlockID{Number: to}, to); err != nil {
h.log.Warnw("Error setting event filter range", "err", err)
return
}

Check warning on line 115 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L113-L115

Added lines #L113 - L115 were not covered by tests

filteredEvents, cToken, err := filter.Events(nil, subscribeEventsChunkSize)
if err != nil {
h.log.Warnw("Error filtering events", "err", err)
return
}

Check warning on line 121 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L119-L121

Added lines #L119 - L121 were not covered by tests

err = sendEvents(ctx, w, filteredEvents, id)
if err != nil {
h.log.Warnw("Error sending events", "err", err)
return
}

Check warning on line 127 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L125-L127

Added lines #L125 - L127 were not covered by tests

for cToken != nil {
filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize)
if err != nil {
h.log.Warnw("Error filtering events", "err", err)
return
}

Check warning on line 134 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L132-L134

Added lines #L132 - L134 were not covered by tests

err = sendEvents(ctx, w, filteredEvents, id)
if err != nil {
h.log.Warnw("Error sending events", "err", err)
return
}

Check warning on line 140 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L138-L140

Added lines #L138 - L140 were not covered by tests
}
}

func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.FilteredEvent, id uint64) error {
for _, event := range events {
select {
case <-ctx.Done():
return ctx.Err()

Check warning on line 148 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L147-L148

Added lines #L147 - L148 were not covered by tests
default:
emittedEvent := &EmittedEvent{
BlockNumber: &event.BlockNumber, // This always be filled as subscribeEvents cannot be called on pending block
BlockHash: event.BlockHash,
TransactionHash: event.TransactionHash,
Event: &Event{
From: event.From,
Keys: event.Keys,
Data: event.Data,
},
}

resp, err := json.Marshal(jsonrpc.Request{
Version: "2.0",
Method: "starknet_subscriptionEvents",
Params: map[string]any{
"subscription_id": id,
"result": emittedEvent,
},
})
if err != nil {
return err
}

Check warning on line 171 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L170-L171

Added lines #L170 - L171 were not covered by tests

_, err = w.Write(resp)
if err != nil {
return err
}

Check warning on line 176 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L175-L176

Added lines #L175 - L176 were not covered by tests
}
}
return nil
}
Loading

0 comments on commit 6ad3741

Please sign in to comment.