diff --git a/massa-grpc/src/api.rs b/massa-grpc/src/api.rs index 0bc456d1b22..78f70bfb585 100644 --- a/massa-grpc/src/api.rs +++ b/massa-grpc/src/api.rs @@ -6,6 +6,7 @@ use crate::config::GrpcConfig; use massa_consensus_exports::{ConsensusChannels, ConsensusController}; use massa_models::{ block::{BlockDeserializer, BlockDeserializerArgs, SecureShareBlock}, + endorsement::{EndorsementDeserializer, SecureShareEndorsement}, error::ModelsError, operation::{OperationDeserializer, SecureShareOperation}, secure_share::SecureShareDeserializer, @@ -231,7 +232,7 @@ impl grpc::grpc_server::Grpc for MassaService { "failed to propagate block: {}", e ); - let _res = sendblocks_notify_error( + let _res = send_blocks_notify_error( req_content.id.clone(), tx.clone(), tonic::Code::Internal, @@ -260,7 +261,7 @@ impl grpc::grpc_server::Grpc for MassaService { "wrong signature: {}", res_block.signature ); - let _res = sendblocks_notify_error( + let _res = send_blocks_notify_error( req_content.id.clone(), tx.clone(), tonic::Code::InvalidArgument, @@ -269,7 +270,7 @@ impl grpc::grpc_server::Grpc for MassaService { .await; }; } else { - let _res = sendblocks_notify_error( + let _res = send_blocks_notify_error( req_content.id.clone(), tx.clone(), tonic::Code::InvalidArgument, @@ -281,7 +282,7 @@ impl grpc::grpc_server::Grpc for MassaService { } Err(e) => { let error = format!("failed to deserialize block: {}", e); - let _res = sendblocks_notify_error( + let _res = send_blocks_notify_error( req_content.id.clone(), tx.clone(), tonic::Code::InvalidArgument, @@ -292,7 +293,7 @@ impl grpc::grpc_server::Grpc for MassaService { } }; } else { - let _res = sendblocks_notify_error( + let _res = send_blocks_notify_error( req_content.id.clone(), tx.clone(), tonic::Code::InvalidArgument, @@ -337,9 +338,166 @@ impl grpc::grpc_server::Grpc for MassaService { async fn send_endorsements( &self, - _request: tonic::Request>, + request: tonic::Request>, ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("not implemented")) + let mut cmd_sender = self.pool_command_sender.clone(); + let mut protocol_sender = self.protocol_command_sender.clone(); + let config = self.grpc_config.clone(); + let storage = self.storage.clone_without_refs(); + + let (tx, rx) = tokio::sync::mpsc::channel(config.max_channel_size); + let mut in_stream = request.into_inner(); + + tokio::spawn(async move { + while let Some(result) = in_stream.next().await { + match result { + Ok(req_content) => { + if req_content.endorsements.is_empty() { + let _res = send_endorsements_notify_error( + req_content.id.clone(), + tx.clone(), + tonic::Code::InvalidArgument, + "the request payload is empty".to_owned(), + ) + .await; + } else { + let proto_endorsement = req_content.endorsements; + if proto_endorsement.len() as u32 > config.max_endorsements_per_message + { + let _res = send_endorsements_notify_error( + req_content.id.clone(), + tx.clone(), + tonic::Code::InvalidArgument, + "too many endorsements".to_owned(), + ) + .await; + } else { + let endorsement_deserializer = + SecureShareDeserializer::new(EndorsementDeserializer::new( + config.thread_count, + config.endorsement_count, + )); + let verified_eds_res: Result, ModelsError> = proto_endorsement + .into_iter() + .map(|proto_endorsement| { + let mut ed_serialized = Vec::new(); + ed_serialized.extend(proto_endorsement.signature.as_bytes()); + ed_serialized.extend(proto_endorsement.creator_public_key.as_bytes()); + ed_serialized.extend(proto_endorsement.serialized_content); + let verified_op = match endorsement_deserializer.deserialize::(&ed_serialized) { + Ok(tuple) => { + let (rest, res_endorsement): (&[u8], SecureShareEndorsement) = tuple; + if rest.is_empty() { + if let Ok(_verify_signature) = res_endorsement.verify_signature() { + Ok((res_endorsement.id.to_string(), res_endorsement)) + } else { + Err(ModelsError::MassaSignatureError(massa_signature::MassaSignatureError::SignatureError( + format!("wrong signature: {}", res_endorsement.signature).to_owned()) + )) + } + } else { + Err(ModelsError::DeserializeError( + "there is data left after endorsement deserialization".to_owned() + )) + } + }, + Err(e) => { + Err(ModelsError::DeserializeError(format!("failed to deserialize endorsement: {}", e).to_owned() + )) + } + }; + verified_op + }) + .collect(); + + match verified_eds_res { + Ok(verified_eds) => { + let mut endorsement_storage = storage.clone_without_refs(); + endorsement_storage.store_endorsements( + verified_eds.values().cloned().collect(), + ); + cmd_sender.add_endorsements(endorsement_storage.clone()); + + let _res = match protocol_sender + .propagate_endorsements(endorsement_storage) + { + Ok(()) => (), + Err(e) => { + let error = format!( + "failed to propagate endorsement: {}", + e + ); + let _res = send_endorsements_notify_error( + req_content.id.clone(), + tx.clone(), + tonic::Code::Internal, + error.to_owned(), + ) + .await; + } + }; + + let result = grpc::EndorsementResult { + ids: verified_eds.keys().cloned().collect(), + }; + let _res = match tx + .send(Ok(grpc::SendEndorsementsResponse { + id: req_content.id.clone(), + message: Some( + grpc::send_endorsements_response::Message::Result( + result, + ), + ), + })) + .await + { + Ok(()) => (), + Err(e) => { + error!( + "failed to send back endorsement response: {}", + e + ) + } + }; + } + Err(e) => { + let error = format!("invalid endorsement(s): {}", e); + let _res = send_endorsements_notify_error( + req_content.id.clone(), + tx.clone(), + tonic::Code::InvalidArgument, + error.to_owned(), + ) + .await; + } + } + } + } + } + Err(err) => { + if let Some(io_err) = match_for_io_error(&err) { + if io_err.kind() == ErrorKind::BrokenPipe { + warn!("client disconnected, broken pipe: {}", io_err); + break; + } + } + match tx.send(Err(err)).await { + Ok(_) => (), + Err(e) => { + error!("failed to send back send_endorsements error response: {}", e); + break; + } + } + } + } + } + }); + + let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + + Ok(tonic::Response::new( + Box::pin(out_stream) as Self::SendEndorsementsStream + )) } type SendOperationsStream = Pin< @@ -367,7 +525,7 @@ impl grpc::grpc_server::Grpc for MassaService { match result { Ok(req_content) => { if req_content.operations.is_empty() { - let _res = sendoperations_notify_error( + let _res = send_operations_notify_error( req_content.id.clone(), tx.clone(), tonic::Code::InvalidArgument, @@ -377,7 +535,7 @@ impl grpc::grpc_server::Grpc for MassaService { } else { let proto_operations = req_content.operations; if proto_operations.len() as u32 > config.max_operations_per_message { - let _res = sendoperations_notify_error( + let _res = send_operations_notify_error( req_content.id.clone(), tx.clone(), tonic::Code::InvalidArgument, @@ -440,9 +598,11 @@ impl grpc::grpc_server::Grpc for MassaService { { Ok(()) => (), Err(e) => { - let error = - format!("failed to propagate operations: {}", e); - let _res = sendoperations_notify_error( + let error = format!( + "failed to propagate operations: {}", + e + ); + let _res = send_operations_notify_error( req_content.id.clone(), tx.clone(), tonic::Code::Internal, @@ -476,8 +636,8 @@ impl grpc::grpc_server::Grpc for MassaService { }; } Err(e) => { - let error = format!("invalid operations:{}", e); - let _res = sendoperations_notify_error( + let error = format!("invalid operation(s): {}", e); + let _res = send_operations_notify_error( req_content.id.clone(), tx.clone(), tonic::Code::InvalidArgument, @@ -499,7 +659,7 @@ impl grpc::grpc_server::Grpc for MassaService { match tx.send(Err(err)).await { Ok(_) => (), Err(e) => { - error!("failed to send back sendblocks error response: {}", e); + error!("failed to send back send_operations error response: {}", e); break; } } @@ -516,7 +676,7 @@ impl grpc::grpc_server::Grpc for MassaService { } } -async fn sendblocks_notify_error( +async fn send_blocks_notify_error( id: String, sender: tokio::sync::mpsc::Sender>, code: tonic::Code, @@ -538,13 +698,41 @@ async fn sendblocks_notify_error( { Ok(()) => Ok(()), Err(e) => { - error!("failed to send back sendblocks error response: {}", e); + error!("failed to send back send_blocks error response: {}", e); + Ok(()) + } + } +} + +async fn send_endorsements_notify_error( + id: String, + sender: tokio::sync::mpsc::Sender>, + code: tonic::Code, + error: String, +) -> Result<(), Box> { + error!("{}", error); + match sender + .send(Ok(grpc::SendEndorsementsResponse { + id, + message: Some(grpc::send_endorsements_response::Message::Error( + massa_proto::google::rpc::Status { + code: code.into(), + message: error, + details: Vec::new(), + }, + )), + })) + .await + { + Ok(()) => Ok(()), + Err(e) => { + error!("failed to send back send_enorsements error response: {}", e); Ok(()) } } } -async fn sendoperations_notify_error( +async fn send_operations_notify_error( id: String, sender: tokio::sync::mpsc::Sender>, code: tonic::Code, @@ -566,7 +754,7 @@ async fn sendoperations_notify_error( { Ok(()) => Ok(()), Err(e) => { - error!("failed to send back sendoperations error response: {}", e); + error!("failed to send back send_operations error response: {}", e); Ok(()) } } diff --git a/massa-proto/proto/massa/api.proto b/massa-proto/proto/massa/api.proto index 46962abec87..e12c3c15994 100644 --- a/massa-proto/proto/massa/api.proto +++ b/massa-proto/proto/massa/api.proto @@ -3,7 +3,6 @@ syntax = "proto3"; package massa.api.v1; import "common.proto"; -import "endorsement.proto"; import "google/api/annotations.proto"; import "google/rpc/status.proto"; @@ -76,7 +75,7 @@ message SendEndorsementsRequest { // string field string id = 1; // object field - Endorsement endorsement = 2; + repeated SecureSharePayload endorsements = 2; } // SendEndorsementsResponse holds response from SendEndorsements @@ -93,7 +92,7 @@ message SendEndorsementsResponse { // Holds Endorsement response message EndorsementResult { // string field - string id = 1; + repeated string ids = 1; } // SendOperationsRequest holds parameters to SendOperations diff --git a/massa-proto/src/api.bin b/massa-proto/src/api.bin index 03bc113dcf9..ade50ebb139 100644 Binary files a/massa-proto/src/api.bin and b/massa-proto/src/api.bin differ diff --git a/massa-proto/src/massa.api.v1.rs b/massa-proto/src/massa.api.v1.rs index 4b175957b6a..4af0dbc724a 100644 --- a/massa-proto/src/massa.api.v1.rs +++ b/massa-proto/src/massa.api.v1.rs @@ -21,97 +21,6 @@ pub struct SecureSharePayload { #[prost(bytes = "vec", tag = "3")] pub serialized_content: ::prost::alloc::vec::Vec, } -/// message struct -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Slot { - /// fixed64 field - #[prost(uint64, tag = "1")] - pub period: u64, - /// float field - #[prost(float, tag = "2")] - pub thread: f32, -} -/// region Endorsement -/// message struct -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct EndorsementInfo { - /// string field - #[prost(string, tag = "1")] - pub id: ::prost::alloc::string::String, - /// bool field - #[prost(bool, tag = "2")] - pub in_pool: bool, - /// string field - #[prost(string, repeated, tag = "3")] - pub in_blocks: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - /// bool field - #[prost(bool, tag = "4")] - pub is_final: bool, - /// object field - #[prost(message, optional, tag = "5")] - pub endorsement: ::core::option::Option, -} -/// message struct -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Endorsement { - /// object field - #[prost(message, optional, tag = "1")] - pub slot: ::core::option::Option, - /// string field - #[prost(uint32, tag = "2")] - pub index: u32, - /// string field - #[prost(string, tag = "3")] - pub endorsed_block: ::prost::alloc::string::String, -} -/// message struct -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct EndorsementContent { - /// string field - #[prost(string, tag = "1")] - pub sender_public_key: ::prost::alloc::string::String, - /// object field - #[prost(message, optional, tag = "2")] - pub slot: ::core::option::Option, - /// float field - #[prost(uint32, tag = "3")] - pub index: u32, - /// string field - #[prost(string, tag = "4")] - pub endorsed_block: ::prost::alloc::string::String, -} -/// message struct -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct EndorsementId { - /// string field - #[prost(string, tag = "1")] - pub value: ::prost::alloc::string::String, -} -/// message struct -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct SecureShareEndorsement { - /// object field - #[prost(message, optional, tag = "1")] - pub content: ::core::option::Option, - /// string field - #[prost(string, tag = "2")] - pub signature: ::prost::alloc::string::String, - /// string field - #[prost(string, tag = "3")] - pub content_creator_pub_key: ::prost::alloc::string::String, - /// string field - #[prost(string, tag = "4")] - pub content_creator_address: ::prost::alloc::string::String, - /// string field - #[prost(string, tag = "5")] - pub id: ::prost::alloc::string::String, -} /// GetVersionRequest holds request from GetVersion #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -184,8 +93,8 @@ pub struct SendEndorsementsRequest { #[prost(string, tag = "1")] pub id: ::prost::alloc::string::String, /// object field - #[prost(message, optional, tag = "2")] - pub endorsement: ::core::option::Option, + #[prost(message, repeated, tag = "2")] + pub endorsements: ::prost::alloc::vec::Vec, } /// SendEndorsementsResponse holds response from SendEndorsements #[allow(clippy::derive_partial_eq_without_eq)] @@ -215,8 +124,8 @@ pub mod send_endorsements_response { #[derive(Clone, PartialEq, ::prost::Message)] pub struct EndorsementResult { /// string field - #[prost(string, tag = "1")] - pub id: ::prost::alloc::string::String, + #[prost(string, repeated, tag = "1")] + pub ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } /// SendOperationsRequest holds parameters to SendOperations #[allow(clippy::derive_partial_eq_without_eq)] @@ -802,6 +711,97 @@ pub mod grpc_server { const NAME: &'static str = "massa.api.v1.Grpc"; } } +/// message struct +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Slot { + /// fixed64 field + #[prost(uint64, tag = "1")] + pub period: u64, + /// float field + #[prost(float, tag = "2")] + pub thread: f32, +} +/// region Endorsement +/// message struct +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EndorsementInfo { + /// string field + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + /// bool field + #[prost(bool, tag = "2")] + pub in_pool: bool, + /// string field + #[prost(string, repeated, tag = "3")] + pub in_blocks: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// bool field + #[prost(bool, tag = "4")] + pub is_final: bool, + /// object field + #[prost(message, optional, tag = "5")] + pub endorsement: ::core::option::Option, +} +/// message struct +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Endorsement { + /// object field + #[prost(message, optional, tag = "1")] + pub slot: ::core::option::Option, + /// string field + #[prost(uint32, tag = "2")] + pub index: u32, + /// string field + #[prost(string, tag = "3")] + pub endorsed_block: ::prost::alloc::string::String, +} +/// message struct +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EndorsementContent { + /// string field + #[prost(string, tag = "1")] + pub sender_public_key: ::prost::alloc::string::String, + /// object field + #[prost(message, optional, tag = "2")] + pub slot: ::core::option::Option, + /// float field + #[prost(uint32, tag = "3")] + pub index: u32, + /// string field + #[prost(string, tag = "4")] + pub endorsed_block: ::prost::alloc::string::String, +} +/// message struct +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EndorsementId { + /// string field + #[prost(string, tag = "1")] + pub value: ::prost::alloc::string::String, +} +/// message struct +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SecureShareEndorsement { + /// object field + #[prost(message, optional, tag = "1")] + pub content: ::core::option::Option, + /// string field + #[prost(string, tag = "2")] + pub signature: ::prost::alloc::string::String, + /// string field + #[prost(string, tag = "3")] + pub content_creator_pub_key: ::prost::alloc::string::String, + /// string field + #[prost(string, tag = "4")] + pub content_creator_address: ::prost::alloc::string::String, + /// string field + #[prost(string, tag = "5")] + pub id: ::prost::alloc::string::String, +} /// region Operation /// message struct #[allow(clippy::derive_partial_eq_without_eq)]