Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add endpoint for submitting onchain events via rpc #105

Merged
merged 1 commit into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/bin/perftest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ use figment::{
use hex;
use hex::FromHex;
use serde::{Deserialize, Serialize};
use snapchain::consensus::proposer::current_time;
use snapchain::proto::msg as message;
use snapchain::proto::rpc::snapchain_service_client::SnapchainServiceClient;
use snapchain::proto::snapchain::Block;
use snapchain::utils::cli::{compose_message, follow_blocks, send_message};
use snapchain::{
consensus::proposer::current_time, proto::admin_rpc::admin_service_client::AdminServiceClient,
utils::cli::compose_rent_event,
};
use snapchain::{proto::msg as message, utils::cli::send_on_chain_event};
use std::collections::HashSet;
use std::error::Error;
use std::path::Path;
Expand Down Expand Up @@ -90,6 +93,18 @@ fn start_submit_messages(
}
};

let mut admin_client = match AdminServiceClient::connect(rpc_addr.clone()).await {
Ok(client) => client,
Err(e) => {
panic!("Error connecting to {}: {}", &rpc_addr, e);
}
};

let rent_event = compose_rent_event(6833);
send_on_chain_event(&mut admin_client, rent_event)
.await
.unwrap();

let mut i = 1;
loop {
submit_message_timer.tick().await;
Expand Down
14 changes: 8 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
Err(e) => Err(format!("invalid statsd address: {}", e)),
}?;

let admin_service = {
let mut admin_service = MyAdminService::new(app_config.rocksdb_dir.clone().as_str());
admin_service.maybe_destroy_databases().unwrap();
admin_service
};

let host = (statsd_host, statsd_port);
let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap();
let sink = cadence::UdpMetricSink::from(host, socket)?;
Expand Down Expand Up @@ -165,6 +159,14 @@ async fn main() -> Result<(), Box<dyn Error>> {

let rpc_block_store = block_store.clone();
tokio::spawn(async move {
let admin_service = {
let mut admin_service = MyAdminService::new(
app_config.rocksdb_dir.clone().as_str(),
rpc_shard_senders.clone(),
);
admin_service.maybe_destroy_databases().unwrap();
admin_service
};
let service = MySnapchainService::new(
rpc_block_store,
rpc_shard_stores,
Expand Down
40 changes: 37 additions & 3 deletions src/network/admin_server.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use crate::proto::admin_rpc;
use crate::proto::admin_rpc::admin_service_server::AdminService;
use crate::proto::onchain_event;
use crate::proto::snapchain::ValidatorMessage;
use crate::proto::{admin_rpc, onchain_event::OnChainEvent};
use crate::storage::store::engine::{MempoolMessage, Senders};
use rocksdb;
use std::collections::HashMap;
use std::{io, path, process};
use thiserror::Error;
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};
use tracing::warn;
use tracing::{info, warn};

pub struct MyAdminService {
db_dir: String,
admin_db_dir: String,
db: Option<rocksdb::TransactionDB>,
message_tx: mpsc::Sender<MempoolMessage>,
}

#[derive(Debug, Error)]
Expand All @@ -24,15 +30,19 @@ pub enum AdminServiceError {
const DB_DESTROY_KEY: &[u8] = b"__destroy_all_databases_on_start__";

impl MyAdminService {
pub fn new(db_dir: &str) -> Self {
pub fn new(db_dir: &str, shard_senders: HashMap<u32, Senders>) -> Self {
let admin_db_dir = path::Path::new(db_dir)
.join("admin")
.to_string_lossy()
.into_owned();

// TODO(aditi): This logic will change once a mempool exists
let message_tx = shard_senders.get(&1u32).unwrap().messages_tx.clone();
Self {
db_dir: db_dir.to_string(),
admin_db_dir,
db: None,
message_tx,
}
}

Expand Down Expand Up @@ -92,4 +102,28 @@ impl AdminService for MyAdminService {
let response = Response::new(admin_rpc::TerminateResponse {});
Ok(response)
}

async fn submit_on_chain_event(
&self,
request: Request<OnChainEvent>,
) -> Result<Response<OnChainEvent>, Status> {
info!("Received call to [submit_on_chain_event] RPC");

let onchain_event = request.into_inner();

let result = self
.message_tx
.send(MempoolMessage::ValidatorMessage(ValidatorMessage {
on_chain_event: Some(onchain_event.clone()),
fname_transfer: None,
}))
.await;
match result {
Ok(()) => {
let response = Response::new(onchain_event);
Ok(response)
}
Err(err) => Err(Status::from_error(Box::new(err))),
}
}
}
2 changes: 1 addition & 1 deletion src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::utils::statsd_wrapper::StatsdClientWrapper;
use hex::ToHex;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{server, Request, Response, Status};
use tonic::{Request, Response, Status};
use tracing::info;

pub struct MySnapchainService {
Expand Down
3 changes: 3 additions & 0 deletions src/proto/admin_rpc.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
syntax = "proto3";
package admin_rpc;

import "onchain_event.proto";

message TerminateRequest {
bool destroy_database = 1;
}
Expand All @@ -10,4 +12,5 @@ message TerminateResponse {

service AdminService {
rpc Terminate(TerminateRequest) returns (TerminateResponse);
rpc SubmitOnChainEvent(onchain_event.OnChainEvent) returns (onchain_event.OnChainEvent);
}
17 changes: 17 additions & 0 deletions src/utils/cli.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::proto::admin_rpc::admin_service_client::AdminServiceClient;
use crate::proto::msg as message;
use crate::proto::onchain_event::OnChainEvent;
use crate::proto::rpc::snapchain_service_client::SnapchainServiceClient;
use crate::proto::{rpc, snapchain::Block};
use crate::utils::factory::messages_factory;
Expand All @@ -8,6 +10,8 @@ use tokio::sync::mpsc;
use tokio::time;
use tonic::transport::Channel;

use super::factory;

const FETCH_SIZE: u64 = 100;

// compose_message is a proof-of-concept script, is not guaranteed to be correct,
Expand All @@ -22,6 +26,19 @@ pub async fn send_message(
Ok(response.into_inner())
}

pub async fn send_on_chain_event(
client: &mut AdminServiceClient<Channel>,
onchain_event: OnChainEvent,
) -> Result<OnChainEvent, Box<dyn Error>> {
let request = tonic::Request::new(onchain_event.clone());
let response = client.submit_on_chain_event(request).await?;
Ok(response.into_inner())
}

pub fn compose_rent_event(fid: u32) -> OnChainEvent {
factory::events_factory::create_rent_event(fid, Some(10), Some(10), false)
}

pub fn compose_message(
fid: u32,
text: &str,
Expand Down