diff --git a/.gitignore b/.gitignore index 2011dfb..cc8af7c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,7 @@ /target .envrc .bin -.env \ No newline at end of file +.env +devhub.near* +events-committee.near* +infrastructure-committee.near* diff --git a/bring-cache-up-to-date.py b/bring-cache-up-to-date.py index 908d23a..9ad293d 100644 --- a/bring-cache-up-to-date.py +++ b/bring-cache-up-to-date.py @@ -1,9 +1,9 @@ import time import requests -local = False -reset_from_zero = True # False to continue from where it left off -fly_app_name = "events-cache-api-rs" +local = True +reset_from_zero = False # False to continue from where it left off +fly_app_name = "devhub-cache-api-rs" # ~120 calls for devhub # ~20 calls for infra # ~40 calls for events @@ -12,11 +12,11 @@ base_url = f"http://localhost:8080/" if local else f"https://{fly_app_name}.fly.dev/" def call_api(count): - url = f"{base_url}proposal/256/snapshots" # Replace with your API URL + url = f"{base_url}proposals" # Replace with your API URL try: response = requests.get(url) if response.status_code == 200: - print(f"{count} API call successful: - response length {len(response.json())}") + print(f"{count} API call successful: - response length {response.json().get('total_records')}") else: print("API call failed with status code:", response.status_code) except requests.exceptions.RequestException as e: diff --git a/scripts/download_nearblocks.js b/scripts/download_nearblocks.js index 9f68fb1..c0eea61 100644 --- a/scripts/download_nearblocks.js +++ b/scripts/download_nearblocks.js @@ -25,10 +25,13 @@ const path = require("path"); const ACCOUNT = "devhub.near"; const BASE_URL = "https://api.nearblocks.io/v1/account"; -const PER_PAGE = 25; -const API_KEY = "API_KEY"; +const PER_PAGE = 50; +const API_KEY = process.env.NEARBLOCKS_API_KEY; +if (!API_KEY) { + throw new Error("NEARBLOCKS_API_KEY environment variable is required"); +} const START_AFTER_BLOCK = 0; -const RECEIPT = false; +const RECEIPT = false; // Can't use receipt because it's not supported by the API after_block only checks after the block async function saveTransactions(blockHeight, transactions) { // Create a Blob containing the JSON data @@ -44,7 +47,7 @@ async function saveTransactions(blockHeight, transactions) { } async function fetchTransactions(afterBlock) { - const url = `${BASE_URL}/${ACCOUNT}/txns?to=${ACCOUNT}&after_block=${afterBlock}&per_page=${PER_PAGE}&order=asc&page=1`; + const url = `${BASE_URL}/${ACCOUNT}/txns?&after_block=${afterBlock}&per_page=${PER_PAGE}&order=asc&page=1`; try { console.log(url); diff --git a/src/db/mod.rs b/src/db/mod.rs index ce807b0..1dcff86 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -908,6 +908,43 @@ impl DB { Ok((rfps, total_count)) } + + pub async fn get_proposal_with_latest_snapshot_view( + &self, + proposal_id: i32, + ) -> Result, sqlx::Error> { + let sql = r#" + SELECT * + FROM proposals_with_latest_snapshot + WHERE proposal_id = $1 + "#; + let proposal = sqlx::query_as::<_, ProposalWithLatestSnapshotView>(sql) + .bind(proposal_id) + .fetch_optional(&self.0) + .await?; + + Ok(proposal) + } + + pub async fn get_latest_rfp_snapshot( + &self, + rfp_id: i32, + ) -> Result, sqlx::Error> { + let sql = r#" + SELECT * + FROM rfp_snapshots + WHERE rfp_id = $1 + ORDER BY ts DESC + LIMIT 1 + "#; + + let snapshot = sqlx::query_as::<_, RfpSnapshotRecord>(sql) + .bind(rfp_id) + .fetch_optional(&self.0) + .await?; + + Ok(snapshot) + } } async fn run_migrations(rocket: Rocket) -> fairing::Result { diff --git a/src/entrypoints/proposal/mod.rs b/src/entrypoints/proposal/mod.rs index 164025c..0a3e332 100644 --- a/src/entrypoints/proposal/mod.rs +++ b/src/entrypoints/proposal/mod.rs @@ -13,7 +13,7 @@ use rocket::{get, http::Status, State}; use std::convert::TryInto; pub mod proposal_types; -// TODO use caching in search +// TODO Use caching of search terms #[utoipa::path(get, path = "/proposals/search?", params( ("input"= &str, Path, description ="The string to search for in proposal name, description, summary, and category fields."), ))] @@ -158,7 +158,7 @@ async fn set_timestamp(block_height: i64, db: &State) -> Result<(), Status> } } -// TODO remove after testing +// TODO Remove this once we go in production or put it behind authentication or a flag #[get("/info/clean")] async fn clean(db: &State) -> Result<(), Status> { let _ = match db.remove_all_snapshots().await { @@ -190,7 +190,7 @@ async fn get_proposal( proposal_id: i32, contract: &State, ) -> Result, rocket::http::Status> { - let rpc_service = RpcService::new(contract.inner().clone()); + let rpc_service = RpcService::new(contract); // We should also add rate limiting to this endpoint match rpc_service.get_proposal(proposal_id).await { Ok(proposal) => Ok(Json(proposal.data)), diff --git a/src/entrypoints/rfp/mod.rs b/src/entrypoints/rfp/mod.rs index a195fc9..e66cade 100644 --- a/src/entrypoints/rfp/mod.rs +++ b/src/entrypoints/rfp/mod.rs @@ -12,7 +12,7 @@ use rocket::{delete, get, http::Status, State}; use std::convert::TryInto; pub mod rfp_types; -// TODO use caching in search +// TODO Use caching of search terms #[utoipa::path(get, path = "/rfps/search/", params( ("input"= &str, Path, description ="The string to search for in rfp name, description, summary, and category fields."), ))] @@ -116,10 +116,7 @@ async fn get_rfps( #[get("/")] async fn get_rfp(rfp_id: i32, contract: &State) -> Result, Status> { // TODO Get cached rfp - match RpcService::new(contract.inner().clone()) - .get_rfp(rfp_id) - .await - { + match RpcService::new(contract).get_rfp(rfp_id).await { Ok(rfp) => Ok(Json(rfp.data)), Err(e) => { eprintln!("In /rfp/rfp_id; Failed to get rfp from RPC: {:?}", e); diff --git a/src/nearblocks_client/mod.rs b/src/nearblocks_client/mod.rs index 7814c26..eab3e09 100644 --- a/src/nearblocks_client/mod.rs +++ b/src/nearblocks_client/mod.rs @@ -1,12 +1,12 @@ use near_sdk::AccountId; use reqwest::Client; use serde::{Deserialize, Serialize}; +pub mod proposal; +pub mod rfp; pub mod transactions; pub mod types; use types::Transaction; -// TODO use nearblocks API KEY - #[derive(Serialize, Deserialize, Debug, Clone)] pub struct ApiResponse { #[serde(default)] @@ -56,13 +56,28 @@ impl ApiClient { ); let endpoint = format!("v1/account/{}/txns", account_id); let url = self.base_url.clone() + &endpoint + &query_params; - println!("Fetching from {}", url); - self.client + + println!("Fetching transactions from {}", url); + + let response = self + .client .get(&url) .header("Authorization", format!("Bearer {}", self.api_key)) .send() - .await? - .json::() - .await + .await?; + + match response.json::().await { + Ok(api_response) => { + println!( + "Successfully fetched {} transactions", + api_response.txns.len() + ); + Ok(api_response) + } + Err(e) => { + eprintln!("Failed to parse API response: {}", e); + Err(e) + } + } } } diff --git a/src/nearblocks_client/proposal.rs b/src/nearblocks_client/proposal.rs new file mode 100644 index 0000000..0bad2d7 --- /dev/null +++ b/src/nearblocks_client/proposal.rs @@ -0,0 +1,316 @@ +use crate::db::db_types::{BlockHeight, ProposalSnapshotRecord, Timestamp}; +use crate::db::DB; +use crate::entrypoints::proposal::proposal_types::{ + FromContractProposal, PartialEditProposalArgs, SetBlockHeightCallbackArgs, +}; +use crate::nearblocks_client::types::{LinkedProposals, Transaction}; +use crate::rpc_service::RpcService; +use devhub_shared::proposal::VersionedProposal; +use near_account_id::AccountId; +use rocket::{http::Status, State}; + +pub async fn handle_set_block_height_callback( + transaction: Transaction, + db: &State, + contract: &AccountId, +) -> Result<(), Status> { + if !transaction.receipt_outcome.status { + eprintln!( + "Proposal receipt outcome status is {:?}", + transaction.receipt_outcome.status + ); + eprintln!("On transaction: {:?}", transaction); + return Ok(()); + } + + let action = transaction + .actions + .as_ref() + .and_then(|actions| actions.first()) + .ok_or(Status::InternalServerError)?; + + let json_args = action.args.clone(); + + let args: SetBlockHeightCallbackArgs = + serde_json::from_str(&json_args.unwrap_or_default()).unwrap(); + + let mut tx = db.begin().await.map_err(|e| { + eprintln!("Failed to begin transaction: {:?}", e); + Status::InternalServerError + })?; + DB::upsert_proposal( + &mut tx, + args.clone().proposal.id, + args.clone().proposal.author_id.to_string(), + ) + .await + .map_err(|e| { + eprintln!("Failed to upsert proposal {}: {:?}", args.proposal.id, e); + Status::InternalServerError + })?; + + let rpc_service = RpcService::new(contract); + let id = args.clone().proposal.id.try_into().unwrap(); + + let versioned_proposal_fallback: VersionedProposal = args.clone().proposal.into(); + let versioned_proposal = match rpc_service.get_proposal(id).await { + Ok(proposal) => proposal.data, + Err(e) => { + eprintln!( + "Failed to get proposal from RPC, using first snapshot as fallback {:?}", + e + ); + versioned_proposal_fallback + } + }; + + let snapshot = ProposalSnapshotRecord::from_contract_proposal( + versioned_proposal.clone().into(), + transaction.receipt_block.block_timestamp, + transaction.receipt_block.block_height, + ); + + DB::insert_proposal_snapshot(&mut tx, &snapshot) + .await + .map_err(|e| { + eprintln!( + "Failed to insert proposal snapshot for proposal {}: {:?}", + id, e + ); + Status::InternalServerError + })?; + + tx.commit().await.map_err(|e| { + eprintln!("Failed to commit transaction: {:?}", e); + Status::InternalServerError + })?; + + // Handle linked proposals update + // let new_linked_rfp = snapshot.linked_rfp; + // update_linked_proposals( + // id, + // new_linked_rfp, + // transaction.receipt_block.block_height, + // transaction.receipt_block.block_timestamp, + // db, + // ) + // .await?; + + Ok(()) +} + +pub async fn handle_edit_proposal( + transaction: Transaction, + db: &State, + contract: &AccountId, +) -> Result<(), rocket::http::Status> { + let rpc_service = RpcService::new(contract); + let id = get_proposal_id(&transaction).map_err(|e| { + eprintln!("Failed to get proposal ID: {}", e); + Status::InternalServerError + })?; + println!("Updating proposal {}", id); + let versioned_proposal = match rpc_service + .get_proposal_on_block(id, transaction.receipt_block.block_height) + .await + { + Ok(proposal) => proposal, + Err(e) => { + eprintln!("Failed to get proposal from RPC: {:?}", e); + return Err(Status::InternalServerError); + } + }; + + let mut tx = db.begin().await.map_err(|e| { + eprintln!("Failed to begin transaction: {:?}", e); + Status::InternalServerError + })?; + + let snapshot = ProposalSnapshotRecord::from_contract_proposal( + versioned_proposal.clone().into(), + transaction.receipt_block.block_timestamp, + transaction.receipt_block.block_height, + ); + + DB::insert_proposal_snapshot(&mut tx, &snapshot) + .await + .map_err(|e| { + eprintln!( + "Failed to insert proposal snapshot for proposal {}: {:?}", + id, e + ); + Status::InternalServerError + })?; + + tx.commit().await.map_err(|e| { + eprintln!("Failed to commit transaction: {:?}", e); + Status::InternalServerError + })?; + + // Handle linked proposals update + // let new_linked_rfp = snapshot.linked_rfp; + // update_linked_proposals( + // id, + // new_linked_rfp, + // transaction.receipt_block.block_height, + // transaction.receipt_block.block_timestamp, + // db, + // ) + // .await?; + + Ok(()) +} + +fn get_proposal_id(transaction: &Transaction) -> Result { + let action = transaction + .actions + .as_ref() + .and_then(|actions| actions.first()) + .ok_or("No actions found in transaction")?; + + let args: PartialEditProposalArgs = serde_json::from_str(action.args.as_ref().unwrap()) + .map_err(|e| { + eprintln!("Failed to parse JSON: {:?}", e); + "Failed to parse proposal arguments" + })?; + + Ok(args.id) +} + +// This might not be needed since we are getting the proposal snapshot from the RPC on the block height. +pub async fn update_linked_proposals( + proposal_id: i32, + new_linked_rfp: Option, + block_height: BlockHeight, + block_timestamp: Timestamp, + db: &State, +) -> Result<(), Status> { + // Get the latest proposal snapshot before the given block timestamp + let last_snapshot = db + .get_proposal_with_latest_snapshot_view(proposal_id) + .await + .map_err(|e| { + eprintln!( + "Error fetching latest proposal snapshot for proposal {}: {:?}", + proposal_id, e + ); + Status::InternalServerError + })?; + + // Extract the latest linked RFP ID from the snapshot + let latest_linked_rfp_id = last_snapshot + .as_ref() + .and_then(|snapshot| snapshot.linked_rfp); + + // Compare the new and old linked RFP IDs + if new_linked_rfp != latest_linked_rfp_id { + let tx = db.begin().await.map_err(|e| { + eprintln!("Failed to begin transaction: {:?}", e); + Status::InternalServerError + })?; + + if let Some(new_linked_rfp_id) = new_linked_rfp { + println!( + "Adding linked_rfp {} to proposal {}", + new_linked_rfp_id, proposal_id + ); + + // Add linked proposal to new RFP snapshot + modify_snapshot_linked_proposal( + new_linked_rfp_id, + proposal_id, + block_height, + block_timestamp, + add_to_linked_proposals, + db, + ) + .await?; + println!("Proposal added to new RFP snapshot"); + } + + if let Some(old_linked_rfp_id) = latest_linked_rfp_id { + println!( + "Removing linked_rfp {} from proposal {}", + old_linked_rfp_id, proposal_id + ); + // Remove linked proposal from old RFP snapshot + modify_snapshot_linked_proposal( + old_linked_rfp_id, + proposal_id, + block_height, + block_timestamp, + remove_from_linked_proposals, + db, + ) + .await?; + println!("Proposal removed from old RFP snapshot"); + } + + tx.commit().await.map_err(|e| { + eprintln!("Failed to commit transaction: {:?}", e); + Status::InternalServerError + })?; + } + + Ok(()) +} + +fn add_to_linked_proposals(mut linked_proposals: Vec, proposal_id: i32) -> Vec { + linked_proposals.push(proposal_id); + linked_proposals +} + +fn remove_from_linked_proposals(linked_proposals: Vec, proposal_id: i32) -> Vec { + linked_proposals + .into_iter() + .filter(|&id| id != proposal_id) + .collect() +} + +async fn modify_snapshot_linked_proposal( + rfp_id: i32, + proposal_id: i32, + block_height: BlockHeight, + block_timestamp: Timestamp, + callback: fn(Vec, i32) -> Vec, + db: &State, +) -> Result<(), Status> { + let latest_rfp_snapshot = db.get_latest_rfp_snapshot(rfp_id).await.map_err(|e| { + eprintln!( + "Failed to get latest RFP snapshot for RFP {}: {:?}", + rfp_id, e + ); + Status::InternalServerError + })?; + + if let Some(mut snapshot) = latest_rfp_snapshot { + // Update the snapshot with new values + snapshot.rfp_id = rfp_id; + let linked_proposals: LinkedProposals = snapshot.linked_proposals.into(); + let updated_proposals = callback(linked_proposals.0, proposal_id); + snapshot.linked_proposals = LinkedProposals(updated_proposals).into(); + snapshot.block_height = block_height; + snapshot.ts = block_timestamp; + + let mut tx = db.begin().await.map_err(|e| { + eprintln!("Failed to begin transaction: {:?}", e); + Status::InternalServerError + })?; + + DB::insert_rfp_snapshot(&mut tx, &snapshot) + .await + .map_err(|e| { + eprintln!("Failed to insert RFP snapshot for RFP {}: {:?}", rfp_id, e); + Status::InternalServerError + })?; + + tx.commit().await.map_err(|e| { + eprintln!("Failed to commit transaction: {:?}", e); + Status::InternalServerError + })?; + } else { + eprintln!("No existing RFP snapshot found for RFP {}", rfp_id); + } + + Ok(()) +} diff --git a/src/nearblocks_client/rfp.rs b/src/nearblocks_client/rfp.rs new file mode 100644 index 0000000..cd5df90 --- /dev/null +++ b/src/nearblocks_client/rfp.rs @@ -0,0 +1,133 @@ +use crate::db::db_types::RfpSnapshotRecord; +use crate::db::DB; +use crate::entrypoints::rfp::rfp_types::*; +use crate::nearblocks_client::types::Transaction; +use crate::rpc_service::RpcService; +use devhub_shared::rfp::VersionedRFP; +use near_account_id::AccountId; +use rocket::{http::Status, State}; + +pub async fn handle_set_rfp_block_height_callback( + transaction: Transaction, + db: &State, + contract: &AccountId, +) -> Result<(), Status> { + if !transaction.receipt_outcome.status { + eprintln!( + "RFP receipt outcome status is {:?}", + transaction.receipt_outcome.status + ); + eprintln!("On transaction: {:?}", transaction); + return Ok(()); + } + + let action = transaction + .actions + .as_ref() + .and_then(|actions| actions.first()) + .ok_or(Status::InternalServerError)?; + let json_args = action.args.clone().unwrap_or_default(); + + let args: SetRfpBlockHeightCallbackArgs = serde_json::from_str(&json_args).unwrap(); + + let mut tx = db.begin().await.map_err(|_e| Status::InternalServerError)?; + DB::upsert_rfp( + &mut tx, + args.clone().rfp.id, + args.clone().rfp.author_id.to_string(), + ) + .await + .unwrap(); + + let rpc_service = RpcService::new(contract); + let id = args.clone().rfp.id.try_into().unwrap(); + + let versioned_rfp_fallback: VersionedRFP = args.clone().rfp.into(); + let versioned_rfp = match rpc_service.get_rfp(id).await { + Ok(rfp) => rfp.data, + Err(e) => { + eprintln!( + "Failed to get RFP from RPC, using first snapshot as fallback {:?}", + e + ); + versioned_rfp_fallback + } + }; + + let snapshot = RfpSnapshotRecord::from_contract_rfp( + versioned_rfp.into(), + transaction.receipt_block.block_timestamp, + transaction.receipt_block.block_height, + ); + + DB::insert_rfp_snapshot(&mut tx, &snapshot).await.unwrap(); + + tx.commit() + .await + .map_err(|_e| Status::InternalServerError)?; + + Ok(()) +} + +fn get_rfp_id(transaction: &Transaction) -> Result { + let action = transaction + .actions + .as_ref() + .and_then(|actions| actions.first()) + .ok_or("No actions found in transaction")?; + + let args: PartialEditRFPArgs = + serde_json::from_str(action.args.as_ref().unwrap()).map_err(|e| { + eprintln!("Failed to parse JSON: {:?}", e); + "Failed to parse proposal arguments" + })?; + + Ok(args.id) +} + +pub async fn handle_edit_rfp( + transaction: Transaction, + db: &State, + contract: &AccountId, +) -> Result<(), Status> { + let rpc_service = RpcService::new(contract); + let id = get_rfp_id(&transaction).map_err(|e| { + eprintln!("Failed to get RFP ID: {}", e); + Status::InternalServerError + })?; + println!("Updating rfp {}", id); + let versioned_rfp = match rpc_service + .get_rfp_on_block(id, transaction.receipt_block.block_height) + .await + { + Ok(rfp) => rfp, + Err(e) => { + eprintln!("Failed to get rfp from RPC: {:?}", e); + return Err(Status::InternalServerError); + } + }; + + let mut tx = db.begin().await.map_err(|_e| Status::InternalServerError)?; + + let contract_rfp: ContractRFP = versioned_rfp.clone().into(); + println!( + "RFP {} timestamp {}", + contract_rfp.id, transaction.receipt_block.block_timestamp + ); + + let snapshot = RfpSnapshotRecord::from_contract_rfp( + versioned_rfp.into(), + transaction.receipt_block.block_timestamp, + transaction.receipt_block.block_height, + ); + + DB::insert_rfp_snapshot(&mut tx, &snapshot) + .await + .map_err(|_e| Status::InternalServerError)?; + + tx.commit() + .await + .map_err(|_e| Status::InternalServerError)?; + + Ok(()) +} diff --git a/src/nearblocks_client/transactions.rs b/src/nearblocks_client/transactions.rs index 3d827bc..eba4925 100644 --- a/src/nearblocks_client/transactions.rs +++ b/src/nearblocks_client/transactions.rs @@ -1,18 +1,10 @@ -use crate::db::db_types::ProposalSnapshotRecord; -use crate::db::db_types::RfpSnapshotRecord; use crate::db::DB; -use crate::entrypoints::proposal::proposal_types::FromContractProposal; -use crate::entrypoints::proposal::proposal_types::PartialEditProposalArgs; -use crate::entrypoints::proposal::proposal_types::SetBlockHeightCallbackArgs; -use crate::entrypoints::rfp::rfp_types::*; use crate::nearblocks_client; +use crate::nearblocks_client::proposal::{handle_edit_proposal, handle_set_block_height_callback}; +use crate::nearblocks_client::rfp::{handle_edit_rfp, handle_set_rfp_block_height_callback}; use crate::nearblocks_client::types::Transaction; -use crate::rpc_service::RpcService; -use devhub_shared::proposal::VersionedProposal; -use devhub_shared::rfp::VersionedRFP; use near_account_id::AccountId; use rocket::{http::Status, State}; -use std::convert::TryInto; pub async fn update_nearblocks_data( db: &DB, @@ -44,17 +36,14 @@ pub async fn update_nearblocks_data( nearblocks_unwrapped.txns.len() ); - let _ = nearblocks_client::transactions::process( - &nearblocks_unwrapped.txns, - db.into(), - contract.clone(), - ) - .await; + let _ = + nearblocks_client::transactions::process(&nearblocks_unwrapped.txns, db.into(), contract) + .await; if let Some(transaction) = nearblocks_unwrapped.txns.last() { - let timestamp_nano = transaction.receipt_block.block_timestamp; + let timestamp_nano = transaction.block_timestamp.parse::().unwrap(); let _ = db - .set_last_updated_info(timestamp_nano, transaction.receipt_block.block_height) + .set_last_updated_info(timestamp_nano, transaction.block.block_height) .await; } } @@ -62,7 +51,7 @@ pub async fn update_nearblocks_data( pub async fn process( transactions: &[Transaction], db: &State, - contract: AccountId, + contract: &AccountId, ) -> Result<(), Status> { for transaction in transactions.iter() { if let Some(action) = transaction @@ -72,56 +61,46 @@ pub async fn process( { let result = match action.method.as_deref().unwrap_or("") { "set_block_height_callback" => { - handle_set_block_height_callback(transaction.to_owned(), db, contract.clone()) - .await - } - "edit_proposal" => { - handle_edit_proposal(transaction.to_owned(), db, contract.clone()).await + handle_set_block_height_callback(transaction.to_owned(), db, contract).await } + "edit_proposal" => handle_edit_proposal(transaction.to_owned(), db, contract).await, "edit_proposal_timeline" => { - handle_edit_proposal(transaction.to_owned(), db, contract.clone()).await + handle_edit_proposal(transaction.to_owned(), db, contract).await } "edit_proposal_versioned_timeline" => { - handle_edit_proposal(transaction.to_owned(), db, contract.clone()).await + handle_edit_proposal(transaction.to_owned(), db, contract).await } "edit_proposal_linked_rfp" => { - handle_edit_proposal(transaction.to_owned(), db, contract.clone()).await + handle_edit_proposal(transaction.to_owned(), db, contract).await } "edit_proposal_internal" => { - handle_edit_proposal(transaction.to_owned(), db, contract.clone()).await + handle_edit_proposal(transaction.to_owned(), db, contract).await } "edit_rfp_timeline" => { println!("edit_rfp_timeline"); - handle_edit_rfp(transaction.to_owned(), db, contract.clone()).await + handle_edit_rfp(transaction.to_owned(), db, contract).await } "edit_rfp" => { println!("edit_rfp"); - handle_edit_rfp(transaction.to_owned(), db, contract.clone()).await + handle_edit_rfp(transaction.to_owned(), db, contract).await } "edit_rfp_internal" => { println!("edit_rfp_internal"); - handle_edit_rfp(transaction.to_owned(), db, contract.clone()).await + handle_edit_rfp(transaction.to_owned(), db, contract).await } "cancel_rfp" => { - // TODO compare notes with indexer repo on how to handle this - // differently than edit_rfp? println!("cancel_rfp"); - handle_edit_rfp(transaction.to_owned(), db, contract.clone()).await + handle_edit_rfp(transaction.to_owned(), db, contract).await } "set_rfp_block_height_callback" => { println!("set_rfp_block_height_callback"); - handle_set_rfp_block_height_callback( - transaction.to_owned(), - db, - contract.clone(), - ) - .await + handle_set_rfp_block_height_callback(transaction.to_owned(), db, contract).await } _ => { if action.action == "FUNCTION_CALL" { - println!("Unhandled method: {:?}", action.method.as_ref().unwrap()); + // println!("Unhandled method: {:?}", action.method.as_ref().unwrap()); } else { - println!("Unhandled action: {:?}", action.action); + // println!("Unhandled action: {:?}", action.action); } continue; } @@ -132,258 +111,3 @@ pub async fn process( Ok(()) } - -async fn handle_set_rfp_block_height_callback( - transaction: Transaction, - db: &State, - contract: AccountId, -) -> Result<(), Status> { - if !transaction.receipt_outcome.status { - eprintln!( - "RFP receipt outcome status is {:?}", - transaction.receipt_outcome.status - ); - eprintln!("On transaction: {:?}", transaction); - return Ok(()); - } - - let action = transaction - .actions - .as_ref() - .and_then(|actions| actions.first()) - .ok_or(Status::InternalServerError)?; - let json_args = action.args.clone().unwrap_or_default(); - - let args: SetRfpBlockHeightCallbackArgs = serde_json::from_str(&json_args).unwrap(); - - let mut tx = db.begin().await.map_err(|_e| Status::InternalServerError)?; - DB::upsert_rfp( - &mut tx, - args.clone().rfp.id, - args.clone().rfp.author_id.to_string(), - ) - .await - .unwrap(); - - let rpc_service = RpcService::new(contract); - let id = args.clone().rfp.id.try_into().unwrap(); - - let versioned_rfp_fallback: VersionedRFP = args.clone().rfp.into(); - let versioned_rfp = match rpc_service.get_rfp(id).await { - Ok(rfp) => rfp.data, - Err(e) => { - eprintln!( - "Failed to get RFP from RPC, using first snapshot as fallback {:?}", - e - ); - versioned_rfp_fallback - } - }; - - let snapshot = RfpSnapshotRecord::from_contract_rfp( - versioned_rfp.into(), - transaction.receipt_block.block_timestamp, - transaction.receipt_block.block_height, - ); - - DB::insert_rfp_snapshot(&mut tx, &snapshot).await.unwrap(); - - // TODO check the function checkAndUpdateLabels in the indexer repo + issue #989 - - tx.commit() - .await - .map_err(|_e| Status::InternalServerError)?; - - Ok(()) -} - -fn get_rfp_id(transaction: &Transaction) -> Result { - let action = transaction - .actions - .as_ref() - .and_then(|actions| actions.first()) - .ok_or("No actions found in transaction")?; - - let args: PartialEditRFPArgs = - serde_json::from_str(action.args.as_ref().unwrap()).map_err(|e| { - eprintln!("Failed to parse JSON: {:?}", e); - "Failed to parse proposal arguments" - })?; - - Ok(args.id) -} - -async fn handle_edit_rfp( - transaction: Transaction, - db: &State, - contract: AccountId, -) -> Result<(), Status> { - let rpc_service = RpcService::new(contract); - let id = get_rfp_id(&transaction).map_err(|e| { - eprintln!("Failed to get RFP ID: {}", e); - Status::InternalServerError - })?; - println!("Updating rfp {}", id); - let versioned_rfp = match rpc_service - .get_rfp_on_block(id, transaction.receipt_block.block_height) - .await - { - Ok(rfp) => rfp, - Err(e) => { - eprintln!("Failed to get rfp from RPC: {:?}", e); - return Err(Status::InternalServerError); - } - }; - - let mut tx = db.begin().await.map_err(|_e| Status::InternalServerError)?; - - let contract_rfp: ContractRFP = versioned_rfp.clone().into(); - println!( - "RFP {} timestamp {}", - contract_rfp.id, transaction.receipt_block.block_timestamp - ); - - let snapshot = RfpSnapshotRecord::from_contract_rfp( - versioned_rfp.into(), - transaction.receipt_block.block_timestamp, - transaction.receipt_block.block_height, - ); - - DB::insert_rfp_snapshot(&mut tx, &snapshot) - .await - .map_err(|_e| Status::InternalServerError)?; - - // TODO check the function checkAndUpdateLabels in the indexer repo + issue #989 - - tx.commit() - .await - .map_err(|_e| Status::InternalServerError)?; - - Ok(()) -} - -async fn handle_set_block_height_callback( - transaction: Transaction, - db: &State, - contract: AccountId, -) -> Result<(), Status> { - if !transaction.receipt_outcome.status { - eprintln!( - "Proposal receipt outcome status is {:?}", - transaction.receipt_outcome.status - ); - eprintln!("On transaction: {:?}", transaction); - return Ok(()); - } - - let action = transaction - .actions - .as_ref() - .and_then(|actions| actions.first()) - .ok_or(Status::InternalServerError)?; - - let json_args = action.args.clone(); - - let args: SetBlockHeightCallbackArgs = - serde_json::from_str(&json_args.unwrap_or_default()).unwrap(); - - let mut tx = db.begin().await.map_err(|_e| Status::InternalServerError)?; - DB::upsert_proposal( - &mut tx, - args.clone().proposal.id, - args.clone().proposal.author_id.to_string(), - ) - .await - .unwrap(); - - let rpc_service = RpcService::new(contract); - let id = args.clone().proposal.id.try_into().unwrap(); - - let versioned_proposal_fallback: VersionedProposal = args.clone().proposal.into(); - let versioned_proposal = match rpc_service.get_proposal(id).await { - Ok(proposal) => proposal.data, - Err(e) => { - eprintln!( - "Failed to get proposal from RPC, using first snapshot as fallback {:?}", - e - ); - versioned_proposal_fallback - } - }; - - let snapshot = ProposalSnapshotRecord::from_contract_proposal( - versioned_proposal.into(), - transaction.receipt_block.block_timestamp, - transaction.receipt_block.block_height, - ); - - DB::insert_proposal_snapshot(&mut tx, &snapshot) - .await - .unwrap(); - // TODO check the function checkAndUpdateLinkedProposals in the indexer repo + issue #989 - - tx.commit() - .await - .map_err(|_e| Status::InternalServerError)?; - - Ok(()) -} - -fn get_proposal_id(transaction: &Transaction) -> Result { - let action = transaction - .actions - .as_ref() - .and_then(|actions| actions.first()) - .ok_or("No actions found in transaction")?; - - let args: PartialEditProposalArgs = serde_json::from_str(action.args.as_ref().unwrap()) - .map_err(|e| { - eprintln!("Failed to parse JSON: {:?}", e); - "Failed to parse proposal arguments" - })?; - - Ok(args.id) -} - -async fn handle_edit_proposal( - transaction: Transaction, - db: &State, - contract: AccountId, -) -> Result<(), rocket::http::Status> { - let rpc_service = RpcService::new(contract); - let id = get_proposal_id(&transaction).map_err(|e| { - eprintln!("Failed to get proposal ID: {}", e); - Status::InternalServerError - })?; - println!("Updating proposal {}", id); - let versioned_proposal = match rpc_service - .get_proposal_on_block(id, transaction.receipt_block.block_height) - .await - { - Ok(proposal) => proposal, - Err(e) => { - eprintln!("Failed to get proposal from RPC: {:?}", e); - return Err(Status::InternalServerError); - } - }; - - let mut tx = db.begin().await.map_err(|_e| Status::InternalServerError)?; - - let snapshot = ProposalSnapshotRecord::from_contract_proposal( - versioned_proposal.into(), - transaction.receipt_block.block_timestamp, - transaction.receipt_block.block_height, - ); - - DB::insert_proposal_snapshot(&mut tx, &snapshot) - .await - .unwrap(); - - // TODO check the function checkAndUpdateLinkedProposals in the indexer repo + issue #989 - - tx.commit() - .await - .map_err(|_e| Status::InternalServerError)?; - - Ok(()) -} diff --git a/src/nearblocks_client/types.rs b/src/nearblocks_client/types.rs index bc929e0..ffe6346 100644 --- a/src/nearblocks_client/types.rs +++ b/src/nearblocks_client/types.rs @@ -1,7 +1,36 @@ use serde::{Deserialize, Serialize}; +use serde_json::Value; use crate::db::db_types::BlockHeight; +pub struct LinkedProposals(pub Vec); + +impl From> for LinkedProposals { + fn from(value: Option) -> Self { + if let Some(Value::Array(arr)) = value { + let vec = arr + .into_iter() + .filter_map(|v| v.as_i64().map(|n| n as i32)) + .collect(); + LinkedProposals(vec) + } else { + LinkedProposals(Vec::new()) + } + } +} + +impl From for Option { + fn from(linked_proposals: LinkedProposals) -> Self { + Some(Value::Array( + linked_proposals + .0 + .into_iter() + .map(|n| Value::Number(n.into())) + .collect(), + )) + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Transaction { #[serde(default)] diff --git a/src/rpc_service.rs b/src/rpc_service.rs index a445e4f..3f452dd 100644 --- a/src/rpc_service.rs +++ b/src/rpc_service.rs @@ -29,9 +29,6 @@ struct QueryResponse { struct QueryResponseResult { // result is an array of bytes, to be specific it is an ASCII code of the string result: Vec, - // block_hash: String, - // block_height: i64, - // logs: Vec, } impl Default for RpcService { @@ -44,10 +41,10 @@ impl Default for RpcService { } impl RpcService { - pub fn new(id: AccountId) -> Self { + pub fn new(id: &AccountId) -> Self { Self { network: NetworkConfig::mainnet(), - contract: Contract(id), + contract: Contract(id.clone()), } }