diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..9d394997 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "cSpell.words": [ + "pdas", + "RUSTC", + "Sablier" + ] +} \ No newline at end of file diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 98e7eb57..d163cb98 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -1,13 +1,22 @@ use std::{fmt::Debug, sync::Arc}; +use anchor_lang::{AccountDeserialize, Discriminator}; use log::info; +use sablier_thread_program::state::{Thread, VersionedThread}; +use solana_account_decoder::UiAccountEncoding; +use solana_client::{ + rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + rpc_filter::{Memcmp, RpcFilterType}, +}; use solana_geyser_plugin_interface::geyser_plugin_interface::{ GeyserPlugin, ReplicaAccountInfoVersions, Result as PluginResult, SlotStatus, }; +use solana_sdk::pubkey::Pubkey; use tokio::runtime::{Builder, Runtime}; use crate::{ config::PluginConfig, + error::PluginError, events::{AccountUpdate, AccountUpdateEvent}, executors::Executors, observers::Observers, @@ -47,6 +56,68 @@ impl GeyserPlugin for SablierPlugin { info!("Loading snapshot..."); let config = PluginConfig::read_from(config_file)?; *self = SablierPlugin::new_from_config(config); + + // Goal of this is to catch up on any existing threads that were created before the plugin was loaded. + { + info!("Loading previously existing Threads..."); + let observers = self.inner.observers.clone(); + self.inner.clone().spawn(|inner| async move { + info!("Fetch existing Thread pdas..."); + let client = inner.executors.client.clone(); + let program_id = sablier_thread_program::ID; + + // Filter to retrieve only Thread PDAs + let account_type_filter = + RpcFilterType::Memcmp(Memcmp::new_base58_encoded(0, &Thread::discriminator())); + let config = RpcProgramAccountsConfig { + filters: Some([vec![account_type_filter]].concat()), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + ..RpcAccountInfoConfig::default() + }, + ..RpcProgramAccountsConfig::default() + }; + + // Fetch Thread pdas + let thread_pdas = client + .get_program_accounts_with_config(&program_id, config) + .await + .map_err(|err| { + info!("Error fetching Thread PDAs: {}", err); + PluginError::from(err) + })?; + info!(" - Fetched {} Thread PDAs", thread_pdas.len()); + + let versioned_thread_pdas: Vec<(Pubkey, VersionedThread)> = thread_pdas + .into_iter() + .filter_map(|(pubkey, account)| { + VersionedThread::try_deserialize(&mut account.data.as_slice()) + .ok() + .map(|thread| (pubkey, thread)) + }) + .collect(); + info!( + " - after deserialization: {} Thread PDAs left", + versioned_thread_pdas.len() + ); + + info!( + "Adding {} fetched Thread pdas to observers...", + versioned_thread_pdas.len() + ); + for (pubkey, thread) in versioned_thread_pdas { + info!("here3"); + observers + .thread + .clone() + .observe_thread(thread, pubkey, 0) + .await + .ok(); + } + Ok(()) + }); + } + Ok(()) } diff --git a/programs/network/src/errors.rs b/programs/network/src/errors.rs index 405e9078..2d3b08cd 100644 --- a/programs/network/src/errors.rs +++ b/programs/network/src/errors.rs @@ -22,4 +22,10 @@ pub enum SablierError { #[msg("The worker cannot rotate into the pool right now")] PoolFull, + + #[msg("The provided authority does not match the thread's authority")] + InvalidThreadAuthority, + + #[msg("The provided thread account is not a valid Thread account")] + InvalidThreadAccount, } diff --git a/programs/thread/src/instructions/mod.rs b/programs/thread/src/instructions/mod.rs index 533d9450..571407d7 100644 --- a/programs/thread/src/instructions/mod.rs +++ b/programs/thread/src/instructions/mod.rs @@ -7,6 +7,7 @@ pub mod thread_instruction_remove; pub mod thread_kickoff; pub mod thread_pause; pub mod thread_reset; +pub mod thread_reset_next; pub mod thread_resume; pub mod thread_update; pub mod thread_withdraw; @@ -20,6 +21,7 @@ pub use thread_instruction_remove::*; pub use thread_kickoff::*; pub use thread_pause::*; pub use thread_reset::*; +pub use thread_reset_next::*; pub use thread_resume::*; pub use thread_update::*; pub use thread_withdraw::*; diff --git a/programs/thread/src/instructions/thread_delete.rs b/programs/thread/src/instructions/thread_delete.rs index 4f468a75..f0cb9a94 100644 --- a/programs/thread/src/instructions/thread_delete.rs +++ b/programs/thread/src/instructions/thread_delete.rs @@ -1,6 +1,7 @@ use { - crate::{constants::*, state::*}, - anchor_lang::prelude::*, + crate::{constants::SEED_THREAD, state::*}, + anchor_lang::{prelude::*, solana_program::system_program}, + sablier_network_program::errors::SablierError, }; /// Accounts required by the `thread_delete` instruction. @@ -8,7 +9,7 @@ use { pub struct ThreadDelete<'info> { /// The authority (owner) of the thread. #[account( - constraint = authority.key().eq(&thread.authority) || authority.key().eq(&thread.key()) + // constraint = authority.key().eq(&thread.authority) || authority.key().eq(&thread.key()) )] pub authority: Signer<'info>, @@ -16,27 +17,91 @@ pub struct ThreadDelete<'info> { #[account(mut)] pub close_to: SystemAccount<'info>, - /// The thread to be delete. - #[account( - mut, - seeds = [ - SEED_THREAD, - thread.authority.as_ref(), - thread.id.as_slice(), - thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice() - ], - bump = thread.bump, - )] - pub thread: Account<'info, Thread>, + /// The thread to be deleted. + /// CHECKS: made during the instruction processing + #[account(mut)] + pub thread: UncheckedAccount<'info>, + // #[account( + // mut, + // seeds = [ + // SEED_THREAD, + // thread.authority.as_ref(), + // thread.id.as_slice(), + // thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice() + // ], + // bump = thread.bump, + // )] + // pub thread: Account<'info, Thread>, } pub fn handler(ctx: Context) -> Result<()> { let thread = &ctx.accounts.thread; let close_to = &ctx.accounts.close_to; - let thread_lamports = thread.get_lamports(); - thread.sub_lamports(thread_lamports)?; - close_to.add_lamports(thread_lamports)?; + // We want this instruction not to fail if the thread is already deleted or inexistent. + // As such, all checks are done in the code that than in anchor (see commented code above) + // First, must try to deserialize the thread. + + // Get either V1 or V2 thread - If the provided thread does not exist, print an error message and return Ok. + let thread = match Thread::try_deserialize_unchecked(&mut thread.data.borrow_mut().as_ref()) { + Ok(t) => t, + Err(_) => { + msg!("Not a thread or account does not exist"); + return Ok(()); + } + }; + + // Preliminary checks + { + // Verify the authority + let authority_key = ctx.accounts.authority.key; + let thread_key = ctx.accounts.thread.key; + + require!( + thread.authority.eq(authority_key) || authority_key.eq(thread_key), + SablierError::InvalidThreadAuthority + ); + + // Verify the account provided + let thread_account = &ctx.accounts.thread; + { + // Verify the account is initialized + require!( + thread_account.owner != &system_program::ID && thread_account.lamports() > 0, + SablierError::InvalidThreadAccount + ); + + // Verify the account is owned by the program + require!( + thread_account.owner == &crate::ID, + SablierError::InvalidThreadAccount + ); + + // Verify the seed derivation + let default_vec = Vec::new(); + let thread_bump = thread.bump.to_le_bytes(); + let seed = [ + SEED_THREAD, + thread.authority.as_ref(), + thread.id.as_slice(), + thread.domain.as_ref().unwrap_or(&default_vec).as_slice(), + thread_bump.as_ref(), + ]; + let expected_thread_key = Pubkey::create_program_address(&seed, &crate::ID) + .map_err(|_| SablierError::InvalidThreadAccount)?; + require!( + expected_thread_key == *thread_key, + SablierError::InvalidThreadAccount + ); + } + } + // Transfer lamports out (implicit close) + { + let thread_account = &ctx.accounts.thread; + let thread_lamports = thread_account.get_lamports(); + thread_account.sub_lamports(thread_lamports)?; + close_to.add_lamports(thread_lamports)?; + } Ok(()) } diff --git a/programs/thread/src/instructions/thread_reset_next.rs b/programs/thread/src/instructions/thread_reset_next.rs new file mode 100644 index 00000000..b3920727 --- /dev/null +++ b/programs/thread/src/instructions/thread_reset_next.rs @@ -0,0 +1,33 @@ +use {crate::state::*, anchor_lang::prelude::*, sablier_network_program::state::Config}; + +/// Accounts required by the `thread_reset` instruction. +#[derive(Accounts)] +pub struct ThreadResetNext<'info> { + #[account(has_one = admin)] + pub config: AccountLoader<'info, Config>, + pub admin: Signer<'info>, + /// The thread to be paused. + #[account(mut)] + pub thread: Account<'info, Thread>, +} + +pub fn handler(ctx: Context, timestamp: i64) -> Result<()> { + // Get accounts + let thread = &mut ctx.accounts.thread; + let clock = Clock::get()?; + + // Full reset the thread state. + thread.exec_context = Some(ExecContext { + exec_index: 0, + execs_since_reimbursement: 0, + execs_since_slot: 0, + last_exec_at: clock.slot, + trigger_context: TriggerContext::Periodic { + started_at: timestamp, + }, + }); + thread.trigger = Trigger::Periodic { delay: 21600 }; + thread.next_instruction = None; + + Ok(()) +} diff --git a/programs/thread/src/lib.rs b/programs/thread/src/lib.rs index fa543897..7f05ed54 100644 --- a/programs/thread/src/lib.rs +++ b/programs/thread/src/lib.rs @@ -97,4 +97,8 @@ pub mod thread_program { pub fn thread_withdraw(ctx: Context, amount: u64) -> Result<()> { thread_withdraw::handler(ctx, amount) } + + pub fn thread_reset_next(ctx: Context, timestamp: i64) -> Result<()> { + thread_reset_next::handler(ctx, timestamp) + } }