Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use flex-error for tendermint-rs errors #923

Merged
merged 30 commits into from
Aug 7, 2021
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8bd987c
Use flex-error for tendermint
soareschen Jun 30, 2021
0df6c59
Use flex-error for p2p
soareschen Jun 30, 2021
f9edb66
Use flex-error for light-client
soareschen Jul 1, 2021
d042c04
Use flex-error for light_client::predicates
soareschen Jul 1, 2021
c21edce
Fix lint
soareschen Jul 1, 2021
90a26cd
Use flex-error for builder and io errors
soareschen Jul 1, 2021
c4a41df
Use flex-error for rpc errors
soareschen Jul 1, 2021
668beb3
Use flex-error for protobuf errors
soareschen Jul 2, 2021
98a161a
Use flex-error for abci
soareschen Jul 2, 2021
266e0cc
Fix test_bisection_no_witness_left
soareschen Jul 6, 2021
a9bb713
Fix build errors in all-features
soareschen Jul 6, 2021
45f95ad
Fix failing tests
soareschen Jul 9, 2021
f79236f
Fix more failures
soareschen Jul 9, 2021
cdee9e1
Merge branch 'master' into soares/flex-error
soareschen Jul 9, 2021
67e275b
Fix tungstenite error under wasm target
soareschen Jul 9, 2021
5682f66
Merge remote-tracking branch 'origin/master' into soares/flex-error
soareschen Jul 14, 2021
5763a28
Fix incoming_fixtures test
soareschen Jul 14, 2021
08115e0
Merge branch 'master' into soares/flex-error
soareschen Jul 20, 2021
17506fa
Fix conflict
soareschen Jul 20, 2021
515f31e
Update flex-error to v0.4.0
soareschen Jul 23, 2021
2095a70
Merge remote-tracking branch 'origin/master' into soares/flex-error
soareschen Jul 23, 2021
bef6287
set std feature in flex-error instead of individual crates
soareschen Jul 26, 2021
da2ed80
Add flex-error patch to tools/Cargo.toml
soareschen Jul 26, 2021
bb30f30
Use published version of flex-error v0.4.1
soareschen Jul 26, 2021
90a0df2
Enable flex-error/eyre_tracer feature by default
soareschen Jul 26, 2021
2ea6da4
Add .changelog entry (#940)
thanethomson Aug 3, 2021
bc2f0cb
flex-error: resolve conflicts with `master` (#945)
thanethomson Aug 7, 2021
c699ba1
Fix master merge
thanethomson Aug 7, 2021
ab1aae4
Remove unnecessary macros
thanethomson Aug 7, 2021
afd7185
Correct error messages
thanethomson Aug 7, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changelog/unreleased/breaking-changes/923-flex-error.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- All crates' error handling has been refactored to make use of
[`flex-error`](https://github.com/informalsystems/flex-error/). This gives
users greater flexibility in terms of the error handling/reporting systems
they want to use and is a critical step towards `no_std` support.
([#923](https://github.com/informalsystems/tendermint-rs/pull/923))
8 changes: 6 additions & 2 deletions abci/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ path = "src/application/kvstore/main.rs"
required-features = [ "binary", "kvstore-app" ]

[features]
default = ["std", "eyre_tracer"]
eyre_tracer = ["flex-error/eyre_tracer"]
client = []
echo-app = []
kvstore-app = []
binary = [ "structopt", "tracing-subscriber" ]
std = [
"flex-error/std"
]

[dependencies]
bytes = "1.0"
eyre = "0.6"
prost = "0.7"
tendermint-proto = { version = "0.21.0", path = "../proto" }
thiserror = "1.0"
tracing = "0.1"
flex-error = { version = "0.4.1", default-features = false }

structopt = { version = "0.3", optional = true }
tracing-subscriber = { version = "0.2", optional = true }
25 changes: 10 additions & 15 deletions abci/src/application/kvstore.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! In-memory key/value store ABCI application.

use crate::codec::{encode_varint, MAX_VARINT_LENGTH};
use crate::{Application, Error, Result};
use crate::{Application, Error};
use bytes::BytesMut;
use std::collections::HashMap;
use std::sync::mpsc::{channel, Receiver, Sender};
Expand All @@ -28,7 +28,7 @@ impl KeyValueStoreApp {
}

/// Attempt to retrieve the value associated with the given key.
pub fn get<K: AsRef<str>>(&self, key: K) -> Result<(i64, Option<String>)> {
pub fn get<K: AsRef<str>>(&self, key: K) -> Result<(i64, Option<String>), Error> {
let (result_tx, result_rx) = channel();
channel_send(
&self.cmd_tx,
Expand All @@ -44,7 +44,7 @@ impl KeyValueStoreApp {
///
/// Optionally returns any pre-existing value associated with the given
/// key.
pub fn set<K, V>(&self, key: K, value: V) -> Result<Option<String>>
pub fn set<K, V>(&self, key: K, value: V) -> Result<Option<String>, Error>
where
K: AsRef<str>,
V: AsRef<str>,
Expand Down Expand Up @@ -202,12 +202,9 @@ impl KeyValueStoreDriver {
}

/// Run the driver in the current thread (blocking).
pub fn run(mut self) -> Result<()> {
pub fn run(mut self) -> Result<(), Error> {
loop {
let cmd = self
.cmd_rx
.recv()
.map_err(|e| Error::ChannelRecv(e.to_string()))?;
let cmd = self.cmd_rx.recv().map_err(Error::channel_recv)?;
match cmd {
Command::GetInfo { result_tx } => {
channel_send(&result_tx, (self.height, self.app_hash.clone()))?
Expand All @@ -232,7 +229,7 @@ impl KeyValueStoreDriver {
}
}

fn commit(&mut self, result_tx: Sender<(i64, Vec<u8>)>) -> Result<()> {
fn commit(&mut self, result_tx: Sender<(i64, Vec<u8>)>) -> Result<(), Error> {
// As in the Go-based key/value store, simply encode the number of
// items as the "app hash"
let mut app_hash = BytesMut::with_capacity(MAX_VARINT_LENGTH);
Expand Down Expand Up @@ -263,12 +260,10 @@ enum Command {
Commit { result_tx: Sender<(i64, Vec<u8>)> },
}

fn channel_send<T>(tx: &Sender<T>, value: T) -> Result<()> {
tx.send(value)
.map_err(|e| Error::ChannelSend(e.to_string()).into())
fn channel_send<T>(tx: &Sender<T>, value: T) -> Result<(), Error> {
tx.send(value).map_err(Error::send)
}

fn channel_recv<T>(rx: &Receiver<T>) -> Result<T> {
rx.recv()
.map_err(|e| Error::ChannelRecv(e.to_string()).into())
fn channel_recv<T>(rx: &Receiver<T>) -> Result<T, Error> {
rx.recv().map_err(Error::channel_recv)
}
52 changes: 27 additions & 25 deletions abci/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Blocking ABCI client.

use crate::codec::ClientCodec;
use crate::{Error, Result};
use crate::Error;
use std::net::{TcpStream, ToSocketAddrs};
use tendermint_proto::abci::{
request, response, RequestApplySnapshotChunk, RequestBeginBlock, RequestCheckTx, RequestCommit,
Expand Down Expand Up @@ -31,8 +31,8 @@ impl ClientBuilder {

/// Client constructor that attempts to connect to the given network
/// address.
pub fn connect<A: ToSocketAddrs>(self, addr: A) -> Result<Client> {
let stream = TcpStream::connect(addr)?;
pub fn connect<A: ToSocketAddrs>(self, addr: A) -> Result<Client, Error> {
let stream = TcpStream::connect(addr).map_err(Error::io)?;
Ok(Client {
codec: ClientCodec::new(stream, self.read_buf_size),
})
Expand All @@ -56,101 +56,103 @@ macro_rules! perform {
($self:expr, $type:ident, $req:expr) => {
match $self.perform(request::Value::$type($req))? {
response::Value::$type(r) => Ok(r),
r => Err(Error::UnexpectedServerResponseType(stringify!($type).to_string(), r).into()),
r => {
Err(Error::unexpected_server_response_type(stringify!($type).to_string(), r).into())
}
}
};
}

impl Client {
/// Ask the ABCI server to echo back a message.
pub fn echo(&mut self, req: RequestEcho) -> Result<ResponseEcho> {
pub fn echo(&mut self, req: RequestEcho) -> Result<ResponseEcho, Error> {
perform!(self, Echo, req)
}

/// Request information about the ABCI application.
pub fn info(&mut self, req: RequestInfo) -> Result<ResponseInfo> {
pub fn info(&mut self, req: RequestInfo) -> Result<ResponseInfo, Error> {
perform!(self, Info, req)
}

/// To be called once upon genesis.
pub fn init_chain(&mut self, req: RequestInitChain) -> Result<ResponseInitChain> {
pub fn init_chain(&mut self, req: RequestInitChain) -> Result<ResponseInitChain, Error> {
perform!(self, InitChain, req)
}

/// Query the application for data at the current or past height.
pub fn query(&mut self, req: RequestQuery) -> Result<ResponseQuery> {
pub fn query(&mut self, req: RequestQuery) -> Result<ResponseQuery, Error> {
perform!(self, Query, req)
}

/// Check the given transaction before putting it into the local mempool.
pub fn check_tx(&mut self, req: RequestCheckTx) -> Result<ResponseCheckTx> {
pub fn check_tx(&mut self, req: RequestCheckTx) -> Result<ResponseCheckTx, Error> {
perform!(self, CheckTx, req)
}

/// Signal the beginning of a new block, prior to any `DeliverTx` calls.
pub fn begin_block(&mut self, req: RequestBeginBlock) -> Result<ResponseBeginBlock> {
pub fn begin_block(&mut self, req: RequestBeginBlock) -> Result<ResponseBeginBlock, Error> {
perform!(self, BeginBlock, req)
}

/// Apply a transaction to the application's state.
pub fn deliver_tx(&mut self, req: RequestDeliverTx) -> Result<ResponseDeliverTx> {
pub fn deliver_tx(&mut self, req: RequestDeliverTx) -> Result<ResponseDeliverTx, Error> {
perform!(self, DeliverTx, req)
}

/// Signal the end of a block.
pub fn end_block(&mut self, req: RequestEndBlock) -> Result<ResponseEndBlock> {
pub fn end_block(&mut self, req: RequestEndBlock) -> Result<ResponseEndBlock, Error> {
perform!(self, EndBlock, req)
}

pub fn flush(&mut self) -> Result<ResponseFlush> {
pub fn flush(&mut self) -> Result<ResponseFlush, Error> {
perform!(self, Flush, RequestFlush {})
}

/// Commit the current state at the current height.
pub fn commit(&mut self) -> Result<ResponseCommit> {
pub fn commit(&mut self) -> Result<ResponseCommit, Error> {
perform!(self, Commit, RequestCommit {})
}

/// Request that the application set an option to a particular value.
pub fn set_option(&mut self, req: RequestSetOption) -> Result<ResponseSetOption> {
pub fn set_option(&mut self, req: RequestSetOption) -> Result<ResponseSetOption, Error> {
perform!(self, SetOption, req)
}

/// Used during state sync to discover available snapshots on peers.
pub fn list_snapshots(&mut self) -> Result<ResponseListSnapshots> {
pub fn list_snapshots(&mut self) -> Result<ResponseListSnapshots, Error> {
perform!(self, ListSnapshots, RequestListSnapshots {})
}

/// Called when bootstrapping the node using state sync.
pub fn offer_snapshot(&mut self, req: RequestOfferSnapshot) -> Result<ResponseOfferSnapshot> {
pub fn offer_snapshot(
&mut self,
req: RequestOfferSnapshot,
) -> Result<ResponseOfferSnapshot, Error> {
perform!(self, OfferSnapshot, req)
}

/// Used during state sync to retrieve chunks of snapshots from peers.
pub fn load_snapshot_chunk(
&mut self,
req: RequestLoadSnapshotChunk,
) -> Result<ResponseLoadSnapshotChunk> {
) -> Result<ResponseLoadSnapshotChunk, Error> {
perform!(self, LoadSnapshotChunk, req)
}

/// Apply the given snapshot chunk to the application's state.
pub fn apply_snapshot_chunk(
&mut self,
req: RequestApplySnapshotChunk,
) -> Result<ResponseApplySnapshotChunk> {
) -> Result<ResponseApplySnapshotChunk, Error> {
perform!(self, ApplySnapshotChunk, req)
}

fn perform(&mut self, req: request::Value) -> Result<response::Value> {
fn perform(&mut self, req: request::Value) -> Result<response::Value, Error> {
self.codec.send(Request { value: Some(req) })?;
let res = self
.codec
.next()
.ok_or(Error::ServerConnectionTerminated)??;
match res.value {
Some(value) => Ok(value),
None => Err(Error::MalformedServerResponse.into()),
}
.ok_or_else(Error::server_connection_terminated)??;
res.value.ok_or_else(Error::malformed_server_response)
}
}
40 changes: 25 additions & 15 deletions abci/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
//!
//! [tsp]: https://docs.tendermint.com/master/spec/abci/client-server.html#tsp

use crate::Result;
use bytes::{Buf, BufMut, BytesMut};
use prost::Message;
use std::io::{Read, Write};
use std::marker::PhantomData;
use tendermint_proto::abci::{Request, Response};

use crate::error::Error;

/// The maximum number of bytes we expect in a varint. We use this to check if
/// we're encountering a decoding error for a varint.
pub const MAX_VARINT_LENGTH: usize = 16;
Expand Down Expand Up @@ -60,7 +61,7 @@ where
S: Read,
I: Message + Default,
{
type Item = Result<I>;
type Item = Result<I, Error>;

fn next(&mut self) -> Option<Self::Item> {
loop {
Expand All @@ -75,7 +76,7 @@ where
// more
let bytes_read = match self.stream.read(self.read_window.as_mut()) {
Ok(br) => br,
Err(e) => return Some(Err(e.into())),
Err(e) => return Some(Err(Error::io(e))),
};
if bytes_read == 0 {
// The underlying stream terminated
Expand All @@ -93,39 +94,46 @@ where
O: Message,
{
/// Send a message using this codec.
pub fn send(&mut self, message: O) -> Result<()> {
pub fn send(&mut self, message: O) -> Result<(), Error> {
encode_length_delimited(message, &mut self.write_buf)?;
while !self.write_buf.is_empty() {
let bytes_written = self.stream.write(self.write_buf.as_ref())?;
let bytes_written = self
.stream
.write(self.write_buf.as_ref())
.map_err(Error::io)?;

if bytes_written == 0 {
return Err(std::io::Error::new(
return Err(Error::io(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"failed to write to underlying stream",
)
.into());
)));
}
self.write_buf.advance(bytes_written);
}
Ok(self.stream.flush()?)

self.stream.flush().map_err(Error::io)?;

Ok(())
}
}

/// Encode the given message with a length prefix.
pub fn encode_length_delimited<M, B>(message: M, mut dst: &mut B) -> Result<()>
pub fn encode_length_delimited<M, B>(message: M, mut dst: &mut B) -> Result<(), Error>
where
M: Message,
B: BufMut,
{
let mut buf = BytesMut::new();
message.encode(&mut buf)?;
message.encode(&mut buf).map_err(Error::encode)?;

let buf = buf.freeze();
encode_varint(buf.len() as u64, &mut dst);
dst.put(buf);
Ok(())
}

/// Attempt to decode a message of type `M` from the given source buffer.
pub fn decode_length_delimited<M>(src: &mut BytesMut) -> Result<Option<M>>
pub fn decode_length_delimited<M>(src: &mut BytesMut) -> Result<Option<M>, Error>
where
M: Message + Default,
{
Expand All @@ -148,7 +156,9 @@ where
src.advance(delim_len + (encoded_len as usize));

let mut result_bytes = BytesMut::from(tmp.split_to(encoded_len as usize).as_ref());
Ok(Some(M::decode(&mut result_bytes)?))
let res = M::decode(&mut result_bytes).map_err(Error::decode)?;

Ok(Some(res))
}
}

Expand All @@ -158,7 +168,7 @@ pub fn encode_varint<B: BufMut>(val: u64, mut buf: &mut B) {
prost::encoding::encode_varint(val << 1, &mut buf);
}

pub fn decode_varint<B: Buf>(mut buf: &mut B) -> Result<u64> {
let len = prost::encoding::decode_varint(&mut buf)?;
pub fn decode_varint<B: Buf>(mut buf: &mut B) -> Result<u64, Error> {
let len = prost::encoding::decode_varint(&mut buf).map_err(Error::decode)?;
Ok(len >> 1)
}
Loading