Skip to content

Commit

Permalink
Various Subscription improvements (#177)
Browse files Browse the repository at this point in the history
* feat: configurable subscription buffer size in pubsubconnect

* docs: improve

* refactor: simplify

* feat: is_subscription

* lint: clippy

* feat: add is_non_standard_sub to RequestMeta

* feat: frontend configures each Inflight

* feat: getter and setter

* fix: clean is_subscription

* fix: missing_const

* docs: add much more info to pubsub

* feat: pass channel size configuration to rpcclient

* fix: correct name of delegated methods
  • Loading branch information
prestwich authored Feb 5, 2024
1 parent 5d51ee9 commit 32618e9
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 26 deletions.
49 changes: 49 additions & 0 deletions crates/json-rpc/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ pub struct RequestMeta {
pub method: &'static str,
/// The request ID.
pub id: Id,
/// Whether the request is a subscription, other than `eth_subscribe`.
is_subscription: bool,
}

impl RequestMeta {
/// Create a new `RequestMeta`.
pub const fn new(method: &'static str, id: Id) -> Self {
Self { method, id, is_subscription: false }
}

/// Returns `true` if the request is a subscription.
pub const fn is_subscription(&self) -> bool {
self.is_subscription || matches!(self.method.as_bytes(), b"eth_subscribe")
}

/// Indicates that the request is a non-standard subscription (i.e. not
/// "eth_subscribe").
pub fn set_is_subscription(&mut self) {
self.is_subscription = true;
}
}

/// A JSON-RPC 2.0 request object.
Expand All @@ -29,6 +49,24 @@ pub struct Request<Params> {
pub params: Params,
}

impl<Params> Request<Params> {
/// Create a new `Request`.
pub const fn new(method: &'static str, id: Id, params: Params) -> Self {
Self { meta: RequestMeta::new(method, id), params }
}

/// Returns `true` if the request is a subscription.
pub const fn is_subscription(&self) -> bool {
self.meta.is_subscription()
}

/// Indicates that the request is a non-standard subscription (i.e. not
/// "eth_subscribe").
pub fn set_is_subscription(&mut self) {
self.meta.set_is_subscription()
}
}

/// A [`Request`] that has been partially serialized. The request parameters
/// have been serialized, and are represented as a boxed [`RawValue`]. This is
/// useful for collections containing many requests, as it erases the `Param`
Expand Down Expand Up @@ -161,6 +199,17 @@ impl SerializedRequest {
self.meta.method
}

/// Mark the request as a non-standard subscription (i.e. not
/// `eth_subscribe`)
pub fn set_is_subscription(&mut self) {
self.meta.set_is_subscription();
}

/// Returns `true` if the request is a subscription.
pub const fn is_subscription(&self) -> bool {
self.meta.is_subscription()
}

/// Returns the serialized request.
pub const fn serialized(&self) -> &RawValue {
&self.request
Expand Down
102 changes: 99 additions & 3 deletions crates/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,25 @@ bidirectional. They are used to subscribe to events on the server, and
receive notifications when those events occur.

The PubSub system here consists of 3 logical parts:

- The **frontend** is the part of the system that the user interacts with.
It exposes a simple API that allows the user to issue requests and manage
subscriptions.
- The **service** is an intermediate layer that manages request/response
mappings, subscription aliasing, and backend lifecycle events. Running
[`PubSubConnect::into_service`] will spawn a long-lived service task.
[`PubSubConnect::into_service`] will spawn a long-lived service task. The
service exists to manage the lifecycle of requests and subscriptions over
reconnections, and to serve any number of **frontends**.
- The **backend** is an actively running connection to the server. Users
should NEVER instantiate a backend directly. Instead, they should use
[`PubSubConnect::into_service`] for some connection object.
[`PubSubConnect::into_service`] for some connection object. Backends
are responsible for managing the connection to the server,accepting user
requests from the **service** and forwarding server responses to the
**service**.

This crate provides the following:

- [PubSubConnect]: A trait for instantiating a PubSub service by connecting
- [`PubSubConnect`]: A trait for instantiating a PubSub service by connecting
to some **backend**. Implementors of this trait are responsible for
the precise connection details, and for spawning the **backend** task.
Users should ALWAYS call [`PubSubConnect::into_service`] to get a running
Expand All @@ -38,3 +44,93 @@ This crate provides the following:
- [`PubSubFrontend`]: The **frontend**. A handle to a running PubSub
**service**. It is used to issue requests and subscription lifecycle
instructions to the **service**.
- [`RawSubscription`]: A handle to a subscription. This type is yielded by
the **service** when a user issues a `get_subscription()` request. It is a
`tokio::broadcast` channel which receives notifications from the **service**
when the server sends a notification for the subscription.
- [`Subscription`]: A handle to a subscription expecting a specific response
type. A wrapper around [`RawSubscription`] that deserializes notifications
into the expected type, and allows the user to accept or discard unexpected
responses.
- [`SubscriptionItem`]: An item in a typed [`Subscription`]. This type is
yielded by the subscription via the `recv_any()` API a notification is
received and contains the deserialized item. If deserialization fails, it
contains the raw JSON value.

## On Handling Subscriptions

For a normal request, the user sends a request to the **frontend**, and
later receives a response via a tokio oneshot channel. This is straightforward
and easy to reason about. Subscriptions, however, are side-effects of other
requests, and are long-lived. They are managed by the **service** and
identified by a `U256` id. The **service** uses this id to manage the
subscription lifecycle, and to dispatch notifications to the correct
subscribers.

### Server & Local IDs

When a user issues a subscription request, the **frontend** sends a
subscription request to the **service**. The **service** dispatches it to the
RPC server via the **backend**. The **service** then intercepts the RPC server
response containing the serve id, and assigns a `local_id` to the subscription.
This `local_id` is used to identify the subscription in the **service** and in
tasks consuming the subscription, while the `server_id` is used to identify the
subscription to the RPC server, and to associate notifications with specific
active subscriptions.

This allows us to use long-lived `local_id` values to manage subscriptions over
multiple reconnections, without having to notify frontend users of the ID change
when the server connection is lost. It also prevents race conditions when
unsubscribing during or immediately after a reconnection.

### What is a subscription request?

The **service** uses the `is_subscription()` method in the request to determine
whether a given RPC request is a subscription. In general, subscription requests
use the `eth_subscribe` method. However, other methods may also be used to
create subscriptions, such as `admin_peerEvents`. To allow custom subscriptions
on unknown methods, the `Request`, `SerializedRequest` and `RpcCall` expose
`set_is_subscription()`, which can be used to mark any given request as a
subscription.

When marking a request as a subscription, the **service** will intercept the
RPC response, which MUST be a `U256` value. Subscription requests that return
anything other than a `U256` value will not function.

### Subscription Lifecycle

Regular Request Lifecycle

1. The user issues a request to the **frontend**.
1. The **frontend** sends the request to the **service**, with a oneshot channel
to receive the response.
1. The **service** stores the oneshot channel in its `RequestManager`.
1. The **service** sends the request to the **backend**.
1. The **backend** sends the request to the RPC server.
1. The RPC server responds with a JSON RPC response.
1. The **backend** sends the response to the **service**.
1. The **service** sends the response to the waiting task via the oneshot.

Subscription Request Lifecycle:

1. The user issues a subscription request to the **frontend**.
1. The **frontend** sends the request to the **service**, with a oneshot channel
to receive the response.
1. The **service** stores the oneshot channel in its `RequestManager`.
1. The **service** sends the request to the **backend**.
1. The **backend** sends the request to the RPC server.
1. The RPC server responds with a `U256` value (the `server_id`).
1. The **backend** sends the response to the **service**.
1. The **service** assigns a `local_id` to the subscription, creates a
subscription broadcast channel, and stores the relevant information in its
`SubscriptionManager`.
1. The **service** overwrites the JSON RPC response with the `local_id`.
1. The **service** sends the response to the waiting task via the oneshot.

Subscription Notification Lifecycle

1. The RPC server sends a notification to the **backend**.
1. The **backend** sends the notification to the **service**.
1. The **service** looks up the `local_id` i1n its `SubscriptionManager`.
1. If present, the **service** sends the notification to the relevant channel.
1. Otherwise, the **service** ignores the notification.
26 changes: 24 additions & 2 deletions crates/pubsub/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ use tokio::sync::{mpsc, oneshot};
#[derive(Debug, Clone)]
pub struct PubSubFrontend {
tx: mpsc::UnboundedSender<PubSubInstruction>,
/// The number of items to buffer in new subscription channels. Defaults to
/// 16. See [`tokio::sync::broadcast::channel`] for a description.
channel_size: usize,
}

impl PubSubFrontend {
/// Create a new frontend.
pub(crate) const fn new(tx: mpsc::UnboundedSender<PubSubInstruction>) -> Self {
Self { tx }
Self { tx, channel_size: 16 }
}

/// Get the subscription ID for a local ID.
Expand Down Expand Up @@ -52,8 +55,10 @@ impl PubSubFrontend {
req: SerializedRequest,
) -> impl Future<Output = Result<Response, TransportError>> + Send + 'static {
let tx = self.tx.clone();
let channel_size = self.channel_size;

async move {
let (in_flight, rx) = InFlight::new(req);
let (in_flight, rx) = InFlight::new(req, channel_size);
tx.send(PubSubInstruction::Request(in_flight))
.map_err(|_| TransportErrorKind::backend_gone())?;
rx.await.map_err(|_| TransportErrorKind::backend_gone())?
Expand All @@ -71,6 +76,23 @@ impl PubSubFrontend {
.boxed(),
}
}

/// Get the currently configured channel size. This is the number of items
/// to buffer in new subscription channels. Defaults to 16. See
/// [`tokio::sync::broadcast`] for a description of relevant
/// behavior.
pub const fn channel_size(&self) -> usize {
self.channel_size
}

/// Set the channel size. This is the number of items to buffer in new
/// subscription channels. Defaults to 16. See
/// [`tokio::sync::broadcast`] for a description of relevant
/// behavior.
pub fn set_channel_size(&mut self, channel_size: usize) {
debug_assert_ne!(channel_size, 0, "channel size must be non-zero");
self.channel_size = channel_size;
}
}

impl tower::Service<RequestPacket> for PubSubFrontend {
Expand Down
6 changes: 3 additions & 3 deletions crates/pubsub/src/managers/active_sub.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use alloy_json_rpc::SerializedRequest;
use alloy_primitives::B256;
use serde_json::value::RawValue;
use std::{fmt, hash::Hash};
use std::{fmt, hash::Hash, usize};
use tokio::sync::broadcast;

use crate::RawSubscription;
Expand Down Expand Up @@ -57,9 +57,9 @@ impl fmt::Debug for ActiveSubscription {

impl ActiveSubscription {
/// Create a new active subscription.
pub(crate) fn new(request: SerializedRequest) -> Self {
pub(crate) fn new(request: SerializedRequest, channel_size: usize) -> Self {
let local_id = request.params_hash();
let (tx, _rx) = broadcast::channel(16);
let (tx, _rx) = broadcast::channel(channel_size);
Self { request, local_id, tx }
}

Expand Down
15 changes: 10 additions & 5 deletions crates/pubsub/src/managers/in_flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub(crate) struct InFlight {
/// The request
pub(crate) request: SerializedRequest,

/// The number of items to buffer in the subscription channel.
pub(crate) channel_size: usize,

/// The channel to send the response on.
pub(crate) tx: oneshot::Sender<Result<Response, TransportError>>,
}
Expand All @@ -20,6 +23,7 @@ impl fmt::Debug for InFlight {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("InFlight")
.field("request", &self.request)
.field("channel_size", &self.channel_size)
.field("tx_is_closed", &self.tx.is_closed())
.finish()
}
Expand All @@ -29,15 +33,16 @@ impl InFlight {
/// Create a new in-flight request.
pub(crate) fn new(
request: SerializedRequest,
channel_size: usize,
) -> (Self, oneshot::Receiver<Result<Response, TransportError>>) {
let (tx, rx) = oneshot::channel();

(Self { request, tx }, rx)
(Self { request, channel_size, tx }, rx)
}

/// Get the method
pub(crate) const fn method(&self) -> &'static str {
self.request.method()
/// Check if the request is a subscription.
pub(crate) const fn is_subscription(&self) -> bool {
self.request.is_subscription()
}

/// Get a reference to the serialized request.
Expand All @@ -51,7 +56,7 @@ impl InFlight {
/// request. If the request is a subscription and the response is not an
/// error, the subscription ID and the in-flight request are returned.
pub(crate) fn fulfill(self, resp: Response) -> Option<(U256, Self)> {
if self.method() == "eth_subscribe" {
if self.is_subscription() {
if let ResponsePayload::Success(val) = resp.payload {
let sub_id: serde_json::Result<U256> = serde_json::from_str(val.get());
match sub_id {
Expand Down
12 changes: 9 additions & 3 deletions crates/pubsub/src/managers/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ impl SubscriptionManager {
}

/// Insert a subscription.
fn insert(&mut self, request: SerializedRequest, server_id: U256) -> RawSubscription {
let active = ActiveSubscription::new(request);
fn insert(
&mut self,
request: SerializedRequest,
server_id: U256,
channel_size: usize,
) -> RawSubscription {
let active = ActiveSubscription::new(request, channel_size);
let sub = active.subscribe();

let local_id = active.local_id;
Expand All @@ -39,6 +44,7 @@ impl SubscriptionManager {
&mut self,
request: SerializedRequest,
server_id: U256,
channel_size: usize,
) -> RawSubscription {
let local_id = request.params_hash();

Expand All @@ -48,7 +54,7 @@ impl SubscriptionManager {
self.change_server_id(local_id, server_id);
self.get_subscription(local_id).expect("checked existence")
} else {
self.insert(request, server_id)
self.insert(request, server_id, channel_size)
}
}

Expand Down
15 changes: 7 additions & 8 deletions crates/pubsub/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
PubSubConnect, PubSubFrontend, RawSubscription,
};

use alloy_json_rpc::{Id, PubSubItem, Request, RequestMeta, Response, ResponsePayload};
use alloy_json_rpc::{Id, PubSubItem, Request, Response, ResponsePayload};
use alloy_primitives::U256;
use alloy_transport::{
utils::{to_json_raw_value, Spawnable},
Expand Down Expand Up @@ -47,7 +47,7 @@ where
handle,
connector,
reqs,
subs: Default::default(),
subs: SubscriptionManager::default(),
in_flights: Default::default(),
};
this.spawn();
Expand Down Expand Up @@ -94,7 +94,9 @@ where
// Dispatch all subscription requests.
for (_, sub) in self.subs.iter() {
let req = sub.request().to_owned();
let (in_flight, _) = InFlight::new(req.clone());
// 0 is a dummy value, we don't care about the channel size here,
// as none of these will result in channel creation.
let (in_flight, _) = InFlight::new(req.clone(), 0);
self.in_flights.insert(in_flight);

let msg = req.into_serialized();
Expand Down Expand Up @@ -142,10 +144,7 @@ where
/// Service an unsubscribe instruction.
fn service_unsubscribe(&mut self, local_id: U256) -> TransportResult<()> {
let local_id = local_id.into();
let req = Request {
meta: RequestMeta { id: Id::None, method: "eth_unsubscribe" },
params: [local_id],
};
let req = Request::new("eth_unsubscribe", Id::None, [local_id]);
let brv = req.serialize().expect("no ser error").take_request();

self.dispatch_request(brv)?;
Expand Down Expand Up @@ -182,7 +181,7 @@ where
let request = in_flight.request;
let id = request.id().clone();

self.subs.upsert(request, server_id);
self.subs.upsert(request, server_id, in_flight.channel_size);

// lie to the client about the sub id.
let local_id = self.subs.local_id_for(server_id).unwrap();
Expand Down
Loading

0 comments on commit 32618e9

Please sign in to comment.