Skip to content

Commit

Permalink
feat: new filter protocol increment - subscribe request handling (#1584)
Browse files Browse the repository at this point in the history
* feat: new filter protocol service node

* feat: add test and fix compilation errors

* feat: more tests and minor fixes

* chore: update waku/v2/protocol/waku_filter_v2/protocol.nim

Co-authored-by: Lorenzo Delgado <[email protected]>

* chore: add some convenience functions

---------

Co-authored-by: Lorenzo Delgado <[email protected]>
  • Loading branch information
jm-clius and Lorenzo Delgado authored Mar 2, 2023
1 parent a57bea2 commit ef7ce67
Show file tree
Hide file tree
Showing 7 changed files with 593 additions and 0 deletions.
239 changes: 239 additions & 0 deletions tests/v2/waku_filter_v2/test_waku_filter_v2.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
{.used.}

import
std/[options,sets,strutils,tables],
testutils/unittests,
chronos,
chronicles
import
../../../waku/v2/node/peer_manager,
../../../waku/v2/protocol/waku_filter_v2,
../../../waku/v2/protocol/waku_filter_v2/rpc,
../../../waku/v2/protocol/waku_message,
../testlib/common,
../testlib/waku2

proc newTestWakuFilter(switch: Switch): Future[WakuFilter] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilter.new(peerManager)

await proto.start()
switch.mount(proto)

return proto

proc generateRequestId(rng: ref HmacDrbgContext): string =
var bytes: array[10, byte]
hmacDrbgGenerate(rng[], bytes)
return toHex(bytes)

proc createRequest(filterSubscribeType: FilterSubscribeType, pubsubTopic = none(PubsubTopic), contentTopics = newSeq[ContentTopic]()): FilterSubscribeRequest =
let requestId = generateRequestId(rng)

return FilterSubscribeRequest(
requestId: requestId,
filterSubscribeType: filterSubscribeType,
pubsubTopic: pubsubTopic,
contentTopics: contentTopics
)

proc getSubscribedContentTopics(wakuFilter: WakuFilter, peerId: PeerId): seq[ContentTopic] =
var contentTopics: seq[ContentTopic]
for filterCriterion in wakuFilter.subscriptions[peerId]:
contentTopics.add(filterCriterion[1])

return contentTopics

suite "Waku Filter - handling subscribe requests":

asyncTest "simple subscribe and unsubscribe request":
# Given
let
switch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(switch)
peerId = PeerId.random().get()
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic]
)
filterUnsubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = filterSubscribeRequest.pubsubTopic,
contentTopics = filterSubscribeRequest.contentTopics
)

# When
let response = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest)

# Then
check:
wakuFilter.subscriptions.len == 1
wakuFilter.subscriptions[peerId].len == 1
response.requestId == filterSubscribeRequest.requestId
response.statusCode == 200
response.statusDesc.get() == "OK"

# When
let response2 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest)

# Then
check:
wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions
response2.requestId == filterUnsubscribeRequest.requestId
response2.statusCode == 200
response2.statusDesc.get() == "OK"

asyncTest "simple subscribe and unsubscribe all for multiple content topics":
# Given
let
switch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(switch)
peerId = PeerId.random().get()
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic, nonDefaultContentTopic]
)
filterUnsubscribeAllRequest = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL
)

# When
let response = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest)

# Then
check:
wakuFilter.subscriptions.len == 1
wakuFilter.subscriptions[peerId].len == 2
wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest.contentTopics
response.requestId == filterSubscribeRequest.requestId
response.statusCode == 200
response.statusDesc.get() == "OK"

# When
let response2 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeAllRequest)

# Then
check:
wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions
response2.requestId == filterUnsubscribeAllRequest.requestId
response2.statusCode == 200
response2.statusDesc.get() == "OK"

asyncTest "subscribe and unsubscribe to multiple content topics":
# Given
let
switch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(switch)
peerId = PeerId.random().get()
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
filterSubscribeRequest1 = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic]
)
filterSubscribeRequest2 = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = filterSubscribeRequest1.pubsubTopic,
contentTopics = @[nonDefaultContentTopic]
)
filterUnsubscribeRequest1 = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = filterSubscribeRequest1.pubsubTopic,
contentTopics = filterSubscribeRequest1.contentTopics
)
filterUnsubscribeRequest2 = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = filterSubscribeRequest2.pubsubTopic,
contentTopics = filterSubscribeRequest2.contentTopics
)

# When
let response1 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest1)

# Then
check:
wakuFilter.subscriptions.len == 1
wakuFilter.subscriptions[peerId].len == 1
wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest1.contentTopics
response1.requestId == filterSubscribeRequest1.requestId
response1.statusCode == 200
response1.statusDesc.get() == "OK"

# When
let response2 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest2)

# Then
check:
wakuFilter.subscriptions.len == 1
wakuFilter.subscriptions[peerId].len == 2
wakuFilter.getSubscribedContentTopics(peerId) ==
filterSubscribeRequest1.contentTopics &
filterSubscribeRequest2.contentTopics
response2.requestId == filterSubscribeRequest2.requestId
response2.statusCode == 200
response2.statusDesc.get() == "OK"

# When
let response3 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest1)

# Then
check:
wakuFilter.subscriptions.len == 1
wakuFilter.subscriptions[peerId].len == 1
wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest2.contentTopics
response3.requestId == filterUnsubscribeRequest1.requestId
response3.statusCode == 200
response3.statusDesc.get() == "OK"

# When
let response4 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest2)

# Then
check:
wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions
response4.requestId == filterUnsubscribeRequest2.requestId
response4.statusCode == 200
response4.statusDesc.get() == "OK"

asyncTest "ping subscriber":
# Given
let
switch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(switch)
peerId = PeerId.random().get()
pingRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBER_PING
)
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic]
)

# When
let response1 = wakuFilter.handleSubscribeRequest(peerId, pingRequest)

# Then
check:
response1.requestId == pingRequest.requestId
response1.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
response1.statusDesc.get().contains("peer has no subscriptions")

# When
let
response2 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest)
response3 = wakuFilter.handleSubscribeRequest(peerId, pingRequest)

# Then
check:
response2.requestId == filterSubscribeRequest.requestId
response2.statusCode == 200
response2.statusDesc.get() == "OK"
response3.requestId == pingRequest.requestId
response3.statusCode == 200
response3.statusDesc.get() == "OK"

7 changes: 7 additions & 0 deletions waku/v2/protocol/waku_filter_v2.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import
./waku_filter_v2/common,
./waku_filter_v2/protocol

export
common,
protocol
52 changes: 52 additions & 0 deletions waku/v2/protocol/waku_filter_v2/common.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
stew/results

const
WakuFilterSubscribeCodec* = "/vac/waku/filter-subscribe/2.0.0-beta1"
WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1"

type
FilterSubscribeErrorKind* {.pure.} = enum
UNKNOWN = uint32(000)
BAD_REQUEST = uint32(400)
NOT_FOUND = uint32(404)
SERVICE_UNAVAILABLE = uint32(503)

FilterSubscribeError* = object
kind*: FilterSubscribeErrorKind
cause*: string

FilterSubscribeResult* = Result[void, FilterSubscribeError]

# Convenience functions

proc badRequest*(T: type FilterSubscribeError, cause = "bad request"): FilterSubscribeError =
FilterSubscribeError(
kind: FilterSubscribeErrorKind.BAD_REQUEST,
cause: cause)

proc notFound*(T: type FilterSubscribeError, cause = "peer has no subscriptions"): FilterSubscribeError =
FilterSubscribeError(
kind: FilterSubscribeErrorKind.NOT_FOUND,
cause: cause)

proc serviceUnavailable*(T: type FilterSubscribeError, cause = "service unavailable"): FilterSubscribeError =
FilterSubscribeError(
kind: FilterSubscribeErrorKind.SERVICE_UNAVAILABLE,
cause: cause)

proc `$`*(err: FilterSubscribeError): string =
case err.kind:
of FilterSubscribeErrorKind.BAD_REQUEST:
"BAD_REQUEST: " & err.cause
of FilterSubscribeErrorKind.NOT_FOUND:
"NOT_FOUND: " & err.cause
of FilterSubscribeErrorKind.SERVICE_UNAVAILABLE:
"SERVICE_UNAVAILABLE: " & err.cause
of FilterSubscribeErrorKind.UNKNOWN:
"UNKNOWN"
Loading

0 comments on commit ef7ce67

Please sign in to comment.