From 30a6a9f96d88fd84d548e5f5f30eb5818e638211 Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Sat, 30 Nov 2024 20:10:36 -0500 Subject: [PATCH] add endpoint for submitting onchain events --- src/bin/perftest.rs | 19 ++++++++++++++++-- src/main.rs | 14 +++++++------ src/network/admin_server.rs | 40 ++++++++++++++++++++++++++++++++++--- src/network/server.rs | 2 +- src/proto/admin_rpc.proto | 3 +++ src/utils/cli.rs | 17 ++++++++++++++++ 6 files changed, 83 insertions(+), 12 deletions(-) diff --git a/src/bin/perftest.rs b/src/bin/perftest.rs index 8c43de1..1a91937 100644 --- a/src/bin/perftest.rs +++ b/src/bin/perftest.rs @@ -7,11 +7,14 @@ use figment::{ use hex; use hex::FromHex; use serde::{Deserialize, Serialize}; -use snapchain::consensus::proposer::current_time; -use snapchain::proto::msg as message; use snapchain::proto::rpc::snapchain_service_client::SnapchainServiceClient; use snapchain::proto::snapchain::Block; use snapchain::utils::cli::{compose_message, follow_blocks, send_message}; +use snapchain::{ + consensus::proposer::current_time, proto::admin_rpc::admin_service_client::AdminServiceClient, + utils::cli::compose_rent_event, +}; +use snapchain::{proto::msg as message, utils::cli::send_on_chain_event}; use std::collections::HashSet; use std::error::Error; use std::path::Path; @@ -90,6 +93,18 @@ fn start_submit_messages( } }; + let mut admin_client = match AdminServiceClient::connect(rpc_addr.clone()).await { + Ok(client) => client, + Err(e) => { + panic!("Error connecting to {}: {}", &rpc_addr, e); + } + }; + + let rent_event = compose_rent_event(6833); + send_on_chain_event(&mut admin_client, rent_event) + .await + .unwrap(); + let mut i = 1; loop { submit_message_timer.tick().await; diff --git a/src/main.rs b/src/main.rs index 3a8903f..953772c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -67,12 +67,6 @@ async fn main() -> Result<(), Box> { Err(e) => Err(format!("invalid statsd address: {}", e)), }?; - let admin_service = { - let mut admin_service = MyAdminService::new(app_config.rocksdb_dir.clone().as_str()); - admin_service.maybe_destroy_databases().unwrap(); - admin_service - }; - let host = (statsd_host, statsd_port); let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap(); let sink = cadence::UdpMetricSink::from(host, socket)?; @@ -165,6 +159,14 @@ async fn main() -> Result<(), Box> { let rpc_block_store = block_store.clone(); tokio::spawn(async move { + let admin_service = { + let mut admin_service = MyAdminService::new( + app_config.rocksdb_dir.clone().as_str(), + rpc_shard_senders.clone(), + ); + admin_service.maybe_destroy_databases().unwrap(); + admin_service + }; let service = MySnapchainService::new( rpc_block_store, rpc_shard_stores, diff --git a/src/network/admin_server.rs b/src/network/admin_server.rs index d891ade..4f73eb7 100644 --- a/src/network/admin_server.rs +++ b/src/network/admin_server.rs @@ -1,15 +1,21 @@ -use crate::proto::admin_rpc; use crate::proto::admin_rpc::admin_service_server::AdminService; +use crate::proto::onchain_event; +use crate::proto::snapchain::ValidatorMessage; +use crate::proto::{admin_rpc, onchain_event::OnChainEvent}; +use crate::storage::store::engine::{MempoolMessage, Senders}; use rocksdb; +use std::collections::HashMap; use std::{io, path, process}; use thiserror::Error; +use tokio::sync::mpsc; use tonic::{Request, Response, Status}; -use tracing::warn; +use tracing::{info, warn}; pub struct MyAdminService { db_dir: String, admin_db_dir: String, db: Option, + message_tx: mpsc::Sender, } #[derive(Debug, Error)] @@ -24,15 +30,19 @@ pub enum AdminServiceError { const DB_DESTROY_KEY: &[u8] = b"__destroy_all_databases_on_start__"; impl MyAdminService { - pub fn new(db_dir: &str) -> Self { + pub fn new(db_dir: &str, shard_senders: HashMap) -> Self { let admin_db_dir = path::Path::new(db_dir) .join("admin") .to_string_lossy() .into_owned(); + + // TODO(aditi): This logic will change once a mempool exists + let message_tx = shard_senders.get(&1u32).unwrap().messages_tx.clone(); Self { db_dir: db_dir.to_string(), admin_db_dir, db: None, + message_tx, } } @@ -92,4 +102,28 @@ impl AdminService for MyAdminService { let response = Response::new(admin_rpc::TerminateResponse {}); Ok(response) } + + async fn submit_on_chain_event( + &self, + request: Request, + ) -> Result, Status> { + info!("Received call to [submit_on_chain_event] RPC"); + + let onchain_event = request.into_inner(); + + let result = self + .message_tx + .send(MempoolMessage::ValidatorMessage(ValidatorMessage { + on_chain_event: Some(onchain_event.clone()), + fname_transfer: None, + })) + .await; + match result { + Ok(()) => { + let response = Response::new(onchain_event); + Ok(response) + } + Err(err) => Err(Status::from_error(Box::new(err))), + } + } } diff --git a/src/network/server.rs b/src/network/server.rs index 0a5bd0a..8fe1684 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -14,7 +14,7 @@ use crate::utils::statsd_wrapper::StatsdClientWrapper; use hex::ToHex; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tonic::{server, Request, Response, Status}; +use tonic::{Request, Response, Status}; use tracing::info; pub struct MySnapchainService { diff --git a/src/proto/admin_rpc.proto b/src/proto/admin_rpc.proto index 7edefd0..a01aaa8 100644 --- a/src/proto/admin_rpc.proto +++ b/src/proto/admin_rpc.proto @@ -1,6 +1,8 @@ syntax = "proto3"; package admin_rpc; +import "onchain_event.proto"; + message TerminateRequest { bool destroy_database = 1; } @@ -10,4 +12,5 @@ message TerminateResponse { service AdminService { rpc Terminate(TerminateRequest) returns (TerminateResponse); + rpc SubmitOnChainEvent(onchain_event.OnChainEvent) returns (onchain_event.OnChainEvent); } diff --git a/src/utils/cli.rs b/src/utils/cli.rs index cda52e0..b78cffa 100644 --- a/src/utils/cli.rs +++ b/src/utils/cli.rs @@ -1,4 +1,6 @@ +use crate::proto::admin_rpc::admin_service_client::AdminServiceClient; use crate::proto::msg as message; +use crate::proto::onchain_event::OnChainEvent; use crate::proto::rpc::snapchain_service_client::SnapchainServiceClient; use crate::proto::{rpc, snapchain::Block}; use crate::utils::factory::messages_factory; @@ -8,6 +10,8 @@ use tokio::sync::mpsc; use tokio::time; use tonic::transport::Channel; +use super::factory; + const FETCH_SIZE: u64 = 100; // compose_message is a proof-of-concept script, is not guaranteed to be correct, @@ -22,6 +26,19 @@ pub async fn send_message( Ok(response.into_inner()) } +pub async fn send_on_chain_event( + client: &mut AdminServiceClient, + onchain_event: OnChainEvent, +) -> Result> { + let request = tonic::Request::new(onchain_event.clone()); + let response = client.submit_on_chain_event(request).await?; + Ok(response.into_inner()) +} + +pub fn compose_rent_event(fid: u32) -> OnChainEvent { + factory::events_factory::create_rent_event(fid, Some(10), Some(10), false) +} + pub fn compose_message( fid: u32, text: &str,