Skip to content

Commit

Permalink
Finish subscription tests
Browse files Browse the repository at this point in the history
  • Loading branch information
IronGauntlets committed Nov 22, 2024
1 parent ae68ead commit 26ad026
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 17 deletions.
12 changes: 2 additions & 10 deletions rpc/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rpc
import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/NethermindEth/juno/blockchain"
Expand Down Expand Up @@ -82,13 +81,12 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
case <-subscriptionCtx.Done():
return
case header := <-headerSub.Recv():
fmt.Println("Calling process events from headerSub go routine")

h.processEvents(subscriptionCtx, w, id, header.Number, header.Number, fromAddr, keys)
}
}
}()

fmt.Println("Calling process events from from main sub go routine.")
h.processEvents(subscriptionCtx, w, id, requestedHeader.Number, headHeader.Number, fromAddr, keys)

wg.Wait()
Expand All @@ -98,15 +96,13 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
}

func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt, keys [][]felt.Felt) {
fmt.Println("Inside process Events")
filter, err := h.bcReader.EventFilter(fromAddr, keys)
if err != nil {
h.log.Warnw("Error creating event filter", "err", err)
return
}

Check warning on line 103 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L101-L103

Added lines #L101 - L103 were not covered by tests
fmt.Println("After h.bcReader.EventFilter")

defer func() {
fmt.Println("Returning from processEvents")
h.callAndLogErr(filter.Close, "Error closing event filter in events subscription")
}()

Expand All @@ -115,15 +111,11 @@ func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, t
return
}

Check warning on line 112 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L110-L112

Added lines #L110 - L112 were not covered by tests

fmt.Println("Before calling Filter.Events")
filteredEvents, cToken, err := filter.Events(nil, subscribeEventsChunkSize)
fmt.Println("After Filter.Events1")
if err != nil {
fmt.Println("err is not nil--------")
h.log.Warnw("Error filtering events", "err", err)
return
}

Check warning on line 118 in rpc/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

rpc/subscriptions.go#L116-L118

Added lines #L116 - L118 were not covered by tests
fmt.Println("After Filter.Events2")

err = sendEvents(ctx, w, filteredEvents, id)
if err != nil {
Expand Down
79 changes: 72 additions & 7 deletions rpc/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (fc *fakeConn) Equal(other jsonrpc.Conn) bool {
return fc.w == fc2.w
}

func TestSubscribeEventsAndUnsubscribe(t *testing.T) {
func TestSubscribeEvents(t *testing.T) {
log := utils.NewNopZapLogger()

t.Run("Too many keys in filter", func(t *testing.T) {
Expand Down Expand Up @@ -172,7 +172,8 @@ func TestSubscribeEventsAndUnsubscribe(t *testing.T) {
require.NoError(t, clientConn.Close())
})

subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn})
ctx, cancel := context.WithCancel(context.Background())
subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn})
id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, &BlockID{Number: b1.Number})
require.Nil(t, rpcErr)

Expand All @@ -189,9 +190,10 @@ func TestSubscribeEventsAndUnsubscribe(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, string(m), string(got))
}
cancel()
})

t.Run("Events from new blocks", func(t *testing.T) {
t.Run("Events when continuation token is not nil", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
t.Cleanup(mockCtrl.Finish)

Expand All @@ -201,10 +203,15 @@ func TestSubscribeEventsAndUnsubscribe(t *testing.T) {
handler := New(mockChain, mockSyncer, nil, "", log)

mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: b1.Number}, nil)
mockChain.EXPECT().BlockHeaderByNumber(b1.Number).Return(b1.Header, nil)
mockChain.EXPECT().EventFilter(fromAddr, keys).Return(mockEventFilterer, nil)

cToken := new(blockchain.ContinuationToken)
mockEventFilterer.EXPECT().SetRangeEndBlockByNumber(gomock.Any(), gomock.Any()).Return(nil).MaxTimes(2)
mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return([]*blockchain.FilteredEvent{filteredEvents[0]}, nil, nil)
mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return(
[]*blockchain.FilteredEvent{filteredEvents[0]}, cToken, nil)
mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return(
[]*blockchain.FilteredEvent{filteredEvents[1]}, nil, nil)
mockEventFilterer.EXPECT().Close().AnyTimes()

serverConn, clientConn := net.Pipe()
Expand All @@ -213,14 +220,56 @@ func TestSubscribeEventsAndUnsubscribe(t *testing.T) {
require.NoError(t, clientConn.Close())
})

subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn})
id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, nil)
ctx, cancel := context.WithCancel(context.Background())
subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn})
id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, &BlockID{Number: b1.Number})
require.Nil(t, rpcErr)

var marshalledResponses [][]byte
for _, e := range emittedEvents {
resp, err := marshalSubscriptionResponse(e, id.ID)
require.NoError(t, err)
marshalledResponses = append(marshalledResponses, resp)
}

for _, m := range marshalledResponses {
got := make([]byte, len(m))
_, err := clientConn.Read(got)
require.NoError(t, err)
assert.Equal(t, string(m), string(got))
}
cancel()
})

t.Run("Events from new blocks", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
t.Cleanup(mockCtrl.Finish)

mockChain := mocks.NewMockReader(mockCtrl)
mockSyncer := mocks.NewMockSyncReader(mockCtrl)
mockEventFilterer := mocks.NewMockEventFilterer(mockCtrl)

handler := New(mockChain, mockSyncer, nil, "", log)
headerFeed := feed.New[*core.Header]()
handler.newHeads = headerFeed

// headerFeed.Send()
mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: b1.Number}, nil)
mockChain.EXPECT().EventFilter(fromAddr, keys).Return(mockEventFilterer, nil)

mockEventFilterer.EXPECT().SetRangeEndBlockByNumber(gomock.Any(), gomock.Any()).Return(nil).MaxTimes(2)
mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return([]*blockchain.FilteredEvent{filteredEvents[0]}, nil, nil)
mockEventFilterer.EXPECT().Close().AnyTimes()

serverConn, clientConn := net.Pipe()
t.Cleanup(func() {
require.NoError(t, serverConn.Close())
require.NoError(t, clientConn.Close())
})

ctx, cancel := context.WithCancel(context.Background())
subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn})
id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, nil)
require.Nil(t, rpcErr)

resp, err := marshalSubscriptionResponse(emittedEvents[0], id.ID)
require.NoError(t, err)
Expand All @@ -230,6 +279,22 @@ func TestSubscribeEventsAndUnsubscribe(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, string(resp), string(got))

mockChain.EXPECT().EventFilter(fromAddr, keys).Return(mockEventFilterer, nil)

mockEventFilterer.EXPECT().SetRangeEndBlockByNumber(gomock.Any(), gomock.Any()).Return(nil).MaxTimes(2)
mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return([]*blockchain.FilteredEvent{filteredEvents[1]}, nil, nil)

headerFeed.Send(&core.Header{Number: b1.Number + 1})

resp, err = marshalSubscriptionResponse(emittedEvents[1], id.ID)
require.NoError(t, err)

got = make([]byte, len(resp))
_, err = clientConn.Read(got)
require.NoError(t, err)
assert.Equal(t, string(resp), string(got))

cancel()
time.Sleep(100 * time.Millisecond)
})
}
Expand Down

0 comments on commit 26ad026

Please sign in to comment.