v0.23.0
[v0.23.0] - 2024-06-07
This is a new breaking release, and let's go through the changes.
hyper v1.0
jsonrpsee has been upgraded to use hyper v1.0 and this mainly impacts users that are using
the low-level API and rely on the hyper::service::make_service_fn
which has been removed, and from now on you need to manage the socket yourself.
The hyper::service::make_service_fn
can be replaced by the following example template:
async fn start_server() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
loop {
let sock = tokio::select! {
res = listener.accept() => {
match res {
Ok((stream, _remote_addr)) => stream,
Err(e) => {
tracing::error!("failed to accept v4 connection: {:?}", e);
continue;
}
}
}
_ = per_conn.stop_handle.clone().shutdown() => break,
};
let svc = tower::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
let mut jsonrpsee_svc = svc_builder
.set_rpc_middleware(rpc_middleware)
.build(methods, stop_handle);
// https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
// to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to a concrete type
// as workaround.
jsonrpsee_svc
.call(req)
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))
});
tokio::spawn(jsonrpsee::server::serve_with_graceful_shutdown(
sock,
svc,
stop_handle.clone().shutdown(),
));
}
}
Also, be aware that tower::service_fn
and hyper::service::service_fn
are different and it's recommended to use tower::service_fn
from now.
Extensions
Because it was not possible/easy to share state between RPC middleware layers
jsonrpsee has added Extensions
to the Request and Response.
To allow users to inject arbitrary data that can be accessed in the RPC middleware
and RPC handlers.
Please be careful when injecting large amounts of data into the extensions because
It's cloned for each RPC call, which can increase memory usage significantly.
The connection ID from the jsonrpsee-server is injected in the extensions by default.
and it is possible to fetch it as follows:
struct LogConnectionId<S>(S);
impl<'a, S: RpcServiceT<'a>> RpcServiceT<'a> for LogConnectionId<S> {
type Future = S::Future;
fn call(&self, request: jsonrpsee::types::Request<'a>) -> Self::Future {
let conn_id = request.extensions().get::<ConnectionId>().unwrap();
tracing::info!("Connection ID {}", conn_id.0);
self.0.call(request)
}
}
In addition the Extensions
is not added in the proc-macro API by default and
one has to enable with_extensions
attr for that to be available:
#[rpc(client, server)]
pub trait Rpc {
// legacy
#[method(name = "foo"])
async fn async_method(&self) -> u16>;
// with extensions
#[method(name = "with_ext", with_extensions)]
async fn f(&self) -> bool;
}
impl RpcServer for () {
async fn async_method(&self) -> u16 {
12
}
// NOTE: ext is injected just after self in the API
async fn f(&self, ext: &Extensions: b: String) -> {
ext.get::<u32>().is_ok()
}
}
client - TLS certificate store changed
The default TLS certificate store has been changed to
rustls-platform-verifier
to decide the best certificate
store for each platform.
In addition it's now possible to inject a custom certificate store
if one wants need some special certificate store.
client - Subscription API modified
The subscription API has been modified:
- The error type has been changed to
serde_json::Error
to indicate that error can only occur if the decoding of T fails. - It has been some confusion when the subscription is closed which can occur if the client "lagged" or the connection is closed.
Now it's possible to callSubscription::close_reason
after the subscription closed (i.e. has return None) to know why.
If one wants to replace old messages in case of lagging it is recommended to write your own adaptor on top of the subscription:
fn drop_oldest_when_lagging<T: Clone + DeserializeOwned + Send + Sync + 'static>(
mut sub: Subscription<T>,
buffer_size: usize,
) -> impl Stream<Item = Result<T, BroadcastStreamRecvError>> {
let (tx, rx) = tokio::sync::broadcast::channel(buffer_size);
tokio::spawn(async move {
// Poll the subscription which ignores errors
while let Some(n) = sub.next().await {
let msg = match n {
Ok(msg) => msg,
Err(e) => {
tracing::error!("Failed to decode the subscription message: {e}");
continue;
}
};
// Only fails if the receiver has been dropped
if tx.send(msg).is_err() {
return;
}
}
});
BroadcastStream::new(rx)
}
[Added]
- server: add
serve
andserve_with_graceful_shutdown
helpers (#1382) - server: pass
extensions
from http layer (#1389) - macros: add macro attr
with_extensions
(#1380) - server: inject connection id in extensions (#1381)
- feat: add
Extensions
to Request/MethodResponse (#1306) - proc-macros: rename parameter names (#1365)
- client: add
Subscription::close_reason
(#1320)
[Changed]
- chore(deps): tokio ^1.23.1 (#1393)
- server: use
ConnectionId
in subscription APIs (#1392) - server: add logs when connection closed by
ws ping/pong
(#1386) - client: set
authorization header
from the URL (#1384) - client: use rustls-platform-verifier cert store (#1373)
- client: remove MaxSlots limit (#1377)
- upgrade to hyper v1.0 (#1368)