From 4a964ef0b166a40009d5431cf41361cc4c4aee9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Fri, 27 Dec 2024 08:19:19 +0100 Subject: [PATCH] iterator: return NextRowError from next() methods --- scylla/src/transport/iterator.rs | 10 +++++----- scylla/src/transport/topology.rs | 7 ++++++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index cf58822cd2..81e036100b 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -37,7 +37,7 @@ use crate::statement::{prepared_statement::PreparedStatement, query::Query}; use crate::statement::{Consistency, PagingState, SerialConsistency}; use crate::transport::cluster::ClusterData; use crate::transport::connection::{Connection, NonErrorQueryResponse, QueryResponse}; -use crate::transport::errors::{QueryError, UserRequestError}; +use crate::transport::errors::UserRequestError; use crate::transport::load_balancing::{self, RoutingInfo}; use crate::transport::metrics::Metrics; use crate::transport::retry_policy::{RequestInfo, RetryDecision, RetrySession}; @@ -592,11 +592,11 @@ impl QueryPager { /// borrows from self. /// /// This is cancel-safe. - async fn next(&mut self) -> Option> { + async fn next(&mut self) -> Option> { let res = std::future::poll_fn(|cx| Pin::new(&mut *self).poll_fill_page(cx)).await; match res { Some(Ok(())) => {} - Some(Err(err)) => return Some(Err(err.into())), + Some(Err(err)) => return Some(Err(err)), None => return None, } @@ -1043,7 +1043,7 @@ impl Stream for TypedRowStream where RowT: DeserializeOwnedRow, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let next_fut = async { @@ -1183,7 +1183,7 @@ mod legacy { pub enum LegacyNextRowError { /// Query to fetch next page has failed #[error(transparent)] - QueryError(#[from] QueryError), + NextRowError(#[from] NextRowError), /// Parsing values in row as given types failed #[error(transparent)] diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index d134ac6365..2c2951bc2f 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -827,6 +827,7 @@ async fn query_peers(conn: &Arc, connect_port: u16) -> Result(rows_stream) }) .into_stream() + .map(|result| result.map(|stream| stream.map_err(QueryError::from))) .try_flatten() .and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result))); @@ -844,6 +845,7 @@ async fn query_peers(conn: &Arc, connect_port: u16) -> Result(rows_stream) }) .into_stream() + .map(|result| result.map(|stream| stream.map_err(QueryError::from))) .try_flatten() .and_then(|row_result| future::ok((NodeInfoSource::Local, row_result))); @@ -986,7 +988,9 @@ where pager.rows_stream::().map_err(convert_typecheck_error)?; Ok::<_, QueryError>(stream) }; - fut.into_stream().try_flatten() + fut.into_stream() + .map(|result| result.map(|stream| stream.map_err(QueryError::from))) + .try_flatten() } async fn query_keyspaces( @@ -1740,6 +1744,7 @@ async fn query_table_partitioners( Ok::<_, QueryError>(stream) }) .into_stream() + .map(|result| result.map(|stream| stream.map_err(QueryError::from))) .try_flatten(); let result = rows