Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
Fix IPC handling of jsonrpc errors (#1123)
Browse files Browse the repository at this point in the history
* fixes ipc jsonrpc error handling, use json RawValue where sensible

* ran cargo +nightly fmt

Co-authored-by: Oliver Giersch <[email protected]>
  • Loading branch information
oliver-giersch and Oliver Giersch authored Apr 9, 2022
1 parent 6e004e7 commit db1870c
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 137 deletions.
2 changes: 1 addition & 1 deletion ethers-providers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async-trait = { version = "0.1.50", default-features = false }
hex = { version = "0.4.3", default-features = false, features = ["std"] }
reqwest = { version = "0.11.10", default-features = false, features = ["json"] }
serde = { version = "1.0.124", default-features = false, features = ["derive"] }
serde_json = { version = "1.0.64", default-features = false }
serde_json = { version = "1.0.64", default-features = false, features = ["raw_value"] }
thiserror = { version = "1.0.30", default-features = false }
url = { version = "2.2.2", default-features = false }
auto_impl = { version = "0.5.0", default-features = false }
Expand Down
6 changes: 3 additions & 3 deletions ethers-providers/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ethers_core::types::{TxHash, U256};
use futures_util::stream::Stream;
use pin_project::{pin_project, pinned_drop};
use serde::de::DeserializeOwned;
use serde_json::Value;
use serde_json::value::RawValue;
use std::{
marker::PhantomData,
pin::Pin,
Expand All @@ -15,7 +15,7 @@ use std::{
/// A transport implementation supporting pub sub subscriptions.
pub trait PubsubClient: JsonRpcClient {
/// The type of stream this transport returns
type NotificationStream: futures_core::Stream<Item = Value> + Send + Unpin;
type NotificationStream: futures_core::Stream<Item = Box<RawValue>> + Send + Unpin;

/// Add a subscription to this transport
fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, Self::Error>;
Expand Down Expand Up @@ -76,7 +76,7 @@ where
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
match futures_util::ready!(this.rx.poll_next(ctx)) {
Some(item) => match serde_json::from_value(item) {
Some(item) => match serde_json::from_str(item.get()) {
Ok(res) => Poll::Ready(Some(res)),
_ => Poll::Pending,
},
Expand Down
194 changes: 148 additions & 46 deletions ethers-providers/src/transports/common.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Code adapted from: https://github.com/althea-net/guac_rs/tree/master/web3/src/jsonrpc
use ethers_core::types::U256;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt;

use serde::{
de::{self, MapAccess, Unexpected, Visitor},
Deserialize, Serialize,
};
use serde_json::{value::RawValue, Value};
use thiserror::Error;

#[derive(Serialize, Deserialize, Debug, Clone, Error)]
use ethers_core::types::U256;

#[derive(Deserialize, Debug, Clone, Error)]
/// A JSON-RPC 2.0 error
pub struct JsonRpcError {
/// The error code
Expand Down Expand Up @@ -36,62 +41,142 @@ pub struct Request<'a, T> {
params: T,
}

#[derive(Serialize, Deserialize, Debug)]
/// A JSON-RPC Notifcation
pub struct Notification<R> {
#[serde(alias = "JSONRPC")]
jsonrpc: String,
method: String,
pub params: Subscription<R>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Subscription<R> {
pub subscription: U256,
pub result: R,
}

impl<'a, T> Request<'a, T> {
/// Creates a new JSON RPC request
pub fn new(id: u64, method: &'a str, params: T) -> Self {
Self { id, jsonrpc: "2.0", method, params }
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Response<T> {
pub(crate) id: u64,
jsonrpc: String,
#[serde(flatten)]
pub data: ResponseData<T>,
/// A JSON-RPC Notifcation
#[allow(unused)]
#[derive(Deserialize, Debug)]
pub struct Notification<'a> {
#[serde(alias = "JSONRPC")]
jsonrpc: &'a str,
method: &'a str,
#[serde(borrow)]
pub params: Subscription<'a>,
}

#[derive(Deserialize, Debug)]
pub struct Subscription<'a> {
pub subscription: U256,
#[serde(borrow)]
pub result: &'a RawValue,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(untagged)]
pub enum ResponseData<R> {
Error { error: JsonRpcError },
Success { result: R },
#[derive(Debug)]
pub enum Response<'a> {
Success { id: u64, jsonrpc: &'a str, result: &'a RawValue },
Error { id: u64, jsonrpc: &'a str, error: JsonRpcError },
}

impl<R> ResponseData<R> {
/// Consume response and return value
pub fn into_result(self) -> Result<R, JsonRpcError> {
impl Response<'_> {
pub fn id(&self) -> u64 {
match self {
ResponseData::Success { result } => Ok(result),
ResponseData::Error { error } => Err(error),
Self::Success { id, .. } => *id,
Self::Error { id, .. } => *id,
}
}
}

impl ResponseData<serde_json::Value> {
/// Encode the error to json value if it is an error
#[allow(dead_code)]
pub fn into_value(self) -> serde_json::Result<serde_json::Value> {
pub fn as_result(&self) -> Result<&RawValue, &JsonRpcError> {
match self {
ResponseData::Success { result } => Ok(result),
ResponseData::Error { error } => serde_json::to_value(error),
Self::Success { result, .. } => Ok(*result),
Self::Error { error, .. } => Err(error),
}
}

pub fn into_result(self) -> Result<Box<RawValue>, JsonRpcError> {
match self {
Self::Success { result, .. } => Ok(result.to_owned()),
Self::Error { error, .. } => Err(error),
}
}
}

// FIXME: ideally, this could be auto-derived as an untagged enum, but due to
// https://github.com/serde-rs/serde/issues/1183 this currently fails
impl<'de: 'a, 'a> Deserialize<'de> for Response<'a> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct ResponseVisitor<'a>(&'a ());
impl<'de: 'a, 'a> Visitor<'de> for ResponseVisitor<'a> {
type Value = Response<'a>;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a valid jsonrpc 2.0 response object")
}

fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
let mut id = None;
let mut jsonrpc = None;
let mut result = None;
let mut error = None;

while let Some(key) = map.next_key()? {
match key {
"id" => {
let value: u64 = map.next_value()?;
let prev = id.replace(value);
if prev.is_some() {
return Err(de::Error::duplicate_field("id"))
}
}
"jsonrpc" => {
let value: &'de str = map.next_value()?;
if value != "2.0" {
return Err(de::Error::invalid_value(Unexpected::Str(value), &"2.0"))
}

let prev = jsonrpc.replace(value);
if prev.is_some() {
return Err(de::Error::duplicate_field("jsonrpc"))
}
}
"result" => {
let value: &RawValue = map.next_value()?;
let prev = result.replace(value);
if prev.is_some() {
return Err(de::Error::duplicate_field("result"))
}
}
"error" => {
let value: JsonRpcError = map.next_value()?;
let prev = error.replace(value);
if prev.is_some() {
return Err(de::Error::duplicate_field("error"))
}
}
key => {
return Err(de::Error::unknown_field(
key,
&["id", "jsonrpc", "result", "error"],
))
}
}
}

let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
let jsonrpc = jsonrpc.ok_or_else(|| de::Error::missing_field("jsonrpc"))?;

match (result, error) {
(Some(result), None) => Ok(Response::Success { id, jsonrpc, result }),
(None, Some(error)) => Ok(Response::Error { id, jsonrpc, error }),
_ => Err(de::Error::custom(
"response must have either a `result` or `error` field",
)),
}
}
}

deserializer.deserialize_map(ResponseVisitor(&()))
}
}

/// Basic or bearer authentication in http or websocket transport
Expand Down Expand Up @@ -129,10 +214,27 @@ mod tests {

#[test]
fn deser_response() {
let response: Response<u64> =
serde_json::from_str(r#"{"jsonrpc": "2.0", "result": 19, "id": 1}"#).unwrap();
assert_eq!(response.id, 1);
assert_eq!(response.data.into_result().unwrap(), 19);
let _ =
serde_json::from_str::<Response<'_>>(r#"{"jsonrpc":"2.0","result":19}"#).unwrap_err();
let _ = serde_json::from_str::<Response<'_>>(r#"{"jsonrpc":"3.0","result":19,"id":1}"#)
.unwrap_err();

let response: Response<'_> =
serde_json::from_str(r#"{"jsonrpc":"2.0","result":19,"id":1}"#).unwrap();

assert_eq!(response.id(), 1);
let result: u64 = serde_json::from_str(response.into_result().unwrap().get()).unwrap();
assert_eq!(result, 19);

let response: Response<'_> = serde_json::from_str(
r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"error occurred"},"id":2}"#,
)
.unwrap();

assert_eq!(response.id(), 2);
let err = response.into_result().unwrap_err();
assert_eq!(err.code, -32000);
assert_eq!(err.message, "error occurred");
}

#[test]
Expand Down
12 changes: 9 additions & 3 deletions ethers-providers/src/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,16 @@ impl JsonRpcClient for Provider {

let res = self.client.post(self.url.as_ref()).json(&payload).send().await?;
let text = res.text().await?;
let res: Response<R> =
serde_json::from_str(&text).map_err(|err| ClientError::SerdeJson { err, text })?;
let response: Response<'_> = match serde_json::from_str(&text) {
Ok(response) => response,
Err(err) => return Err(ClientError::SerdeJson { err, text }),
};

Ok(res.data.into_result()?)
let raw = response.as_result().map_err(Clone::clone)?;
let res = serde_json::from_str(raw.get())
.map_err(|err| ClientError::SerdeJson { err, text: raw.to_string() })?;

Ok(res)
}
}

Expand Down
Loading

0 comments on commit db1870c

Please sign in to comment.