diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 8207a85c4ad1..13ebf6c172ec 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -107,12 +107,11 @@ impl PyStorageNodeClient { .await .map_err(re_grpc_client::TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }) .collect::, tonic::Status>>() .await @@ -156,12 +155,11 @@ impl PyStorageNodeClient { .await .map_err(TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }) .collect::, tonic::Status>>() .await @@ -431,9 +429,12 @@ impl PyStorageNodeClient { while let Some(result) = resp.next().await { let response = result.map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - let tc = decode(EncoderVersion::V0, &response.payload) - .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - + let tc = match decode(EncoderVersion::V0, &response.payload) { + Ok(tc) => tc, + Err(err) => { + return Err(PyRuntimeError::new_err(err.to_string())); + } + }; let chunk = Chunk::from_transport(&tc) .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; @@ -443,9 +444,9 @@ impl PyStorageNodeClient { } Ok(store) - })?; + }); - let handle = ChunkStoreHandle::new(store); + let handle = ChunkStoreHandle::new(store?); let cache = re_dataframe::QueryCacheHandle::new(re_dataframe::QueryCache::new(handle.clone()));