Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Catch up on existing Threads at startup and delete not to fail on providing a non existing Thread acc #70

Closed
wants to merge 18 commits into from
Closed
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"cSpell.words": [
"pdas",
"RUSTC",
"Sablier"
]
}
71 changes: 71 additions & 0 deletions plugin/src/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
use std::{fmt::Debug, sync::Arc};

use anchor_lang::{AccountDeserialize, Discriminator};
use log::info;
use sablier_thread_program::state::{Thread, VersionedThread};
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::{Memcmp, RpcFilterType},
};
use solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, ReplicaAccountInfoVersions, Result as PluginResult, SlotStatus,
};
use solana_sdk::pubkey::Pubkey;
use tokio::runtime::{Builder, Runtime};

use crate::{
config::PluginConfig,
error::PluginError,
events::{AccountUpdate, AccountUpdateEvent},
executors::Executors,
observers::Observers,
Expand Down Expand Up @@ -47,6 +56,68 @@ impl GeyserPlugin for SablierPlugin {
info!("Loading snapshot...");
let config = PluginConfig::read_from(config_file)?;
*self = SablierPlugin::new_from_config(config);

// Goal of this is to catch up on any existing threads that were created before the plugin was loaded.
{
info!("Loading previously existing Threads...");
let observers = self.inner.observers.clone();
self.inner.clone().spawn(|inner| async move {
info!("Fetch existing Thread pdas...");
let client = inner.executors.client.clone();
let program_id = sablier_thread_program::ID;

// Filter to retrieve only Thread PDAs
let account_type_filter =
RpcFilterType::Memcmp(Memcmp::new_base58_encoded(0, &Thread::discriminator()));
let config = RpcProgramAccountsConfig {
filters: Some([vec![account_type_filter]].concat()),
account_config: RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
..RpcAccountInfoConfig::default()
},
..RpcProgramAccountsConfig::default()
};

// Fetch Thread pdas
let thread_pdas = client
.get_program_accounts_with_config(&program_id, config)
.await
.map_err(|err| {
info!("Error fetching Thread PDAs: {}", err);
PluginError::from(err)
})?;
info!(" - Fetched {} Thread PDAs", thread_pdas.len());

let versioned_thread_pdas: Vec<(Pubkey, VersionedThread)> = thread_pdas
.into_iter()
.filter_map(|(pubkey, account)| {
VersionedThread::try_deserialize(&mut account.data.as_slice())
.ok()
.map(|thread| (pubkey, thread))
})
.collect();
info!(
" - after deserialization: {} Thread PDAs left",
versioned_thread_pdas.len()
);

info!(
"Adding {} fetched Thread pdas to observers...",
versioned_thread_pdas.len()
);
for (pubkey, thread) in versioned_thread_pdas {
info!("here3");
observers
.thread
.clone()
.observe_thread(thread, pubkey, 0)
.await
.ok();
}
Ok(())
});
}

Ok(())
}

Expand Down
6 changes: 6 additions & 0 deletions programs/network/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ pub enum SablierError {

#[msg("The worker cannot rotate into the pool right now")]
PoolFull,

#[msg("The provided authority does not match the thread's authority")]
InvalidThreadAuthority,

#[msg("The provided thread account is not a valid Thread account")]
InvalidThreadAccount,
}
2 changes: 2 additions & 0 deletions programs/thread/src/instructions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod thread_instruction_remove;
pub mod thread_kickoff;
pub mod thread_pause;
pub mod thread_reset;
pub mod thread_reset_next;
pub mod thread_resume;
pub mod thread_update;
pub mod thread_withdraw;
Expand All @@ -20,6 +21,7 @@ pub use thread_instruction_remove::*;
pub use thread_kickoff::*;
pub use thread_pause::*;
pub use thread_reset::*;
pub use thread_reset_next::*;
pub use thread_resume::*;
pub use thread_update::*;
pub use thread_withdraw::*;
101 changes: 83 additions & 18 deletions programs/thread/src/instructions/thread_delete.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,107 @@
use {
crate::{constants::*, state::*},
anchor_lang::prelude::*,
crate::{constants::SEED_THREAD, state::*},
anchor_lang::{prelude::*, solana_program::system_program},
sablier_network_program::errors::SablierError,
};

/// Accounts required by the `thread_delete` instruction.
#[derive(Accounts)]
pub struct ThreadDelete<'info> {
/// The authority (owner) of the thread.
#[account(
constraint = authority.key().eq(&thread.authority) || authority.key().eq(&thread.key())
// constraint = authority.key().eq(&thread.authority) || authority.key().eq(&thread.key())
)]
pub authority: Signer<'info>,

/// The address to return the data rent lamports to.
#[account(mut)]
pub close_to: SystemAccount<'info>,

/// The thread to be delete.
#[account(
mut,
seeds = [
SEED_THREAD,
thread.authority.as_ref(),
thread.id.as_slice(),
thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice()
],
bump = thread.bump,
)]
pub thread: Account<'info, Thread>,
/// The thread to be deleted.
/// CHECKS: made during the instruction processing
#[account(mut)]
pub thread: UncheckedAccount<'info>,
// #[account(
// mut,
// seeds = [
// SEED_THREAD,
// thread.authority.as_ref(),
// thread.id.as_slice(),
// thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice()
// ],
// bump = thread.bump,
// )]
// pub thread: Account<'info, Thread>,
}

pub fn handler(ctx: Context<ThreadDelete>) -> Result<()> {
let thread = &ctx.accounts.thread;
let close_to = &ctx.accounts.close_to;

let thread_lamports = thread.get_lamports();
thread.sub_lamports(thread_lamports)?;
close_to.add_lamports(thread_lamports)?;
// We want this instruction not to fail if the thread is already deleted or inexistent.
// As such, all checks are done in the code that than in anchor (see commented code above)
// First, must try to deserialize the thread.

// Get either V1 or V2 thread - If the provided thread does not exist, print an error message and return Ok.
let thread = match Thread::try_deserialize_unchecked(&mut thread.data.borrow_mut().as_ref()) {
Ok(t) => t,
Err(_) => {
msg!("Not a thread or account does not exist");
return Ok(());
}
};

// Preliminary checks
{
// Verify the authority
let authority_key = ctx.accounts.authority.key;
let thread_key = ctx.accounts.thread.key;

require!(
thread.authority.eq(authority_key) || authority_key.eq(thread_key),
SablierError::InvalidThreadAuthority
);

// Verify the account provided
let thread_account = &ctx.accounts.thread;
{
// Verify the account is initialized
require!(
thread_account.owner != &system_program::ID && thread_account.lamports() > 0,
SablierError::InvalidThreadAccount
);

// Verify the account is owned by the program
require!(
thread_account.owner == &crate::ID,
SablierError::InvalidThreadAccount
);

// Verify the seed derivation
let default_vec = Vec::new();
let thread_bump = thread.bump.to_le_bytes();
let seed = [
SEED_THREAD,
thread.authority.as_ref(),
thread.id.as_slice(),
thread.domain.as_ref().unwrap_or(&default_vec).as_slice(),
thread_bump.as_ref(),
];
let expected_thread_key = Pubkey::create_program_address(&seed, &crate::ID)
.map_err(|_| SablierError::InvalidThreadAccount)?;
require!(
expected_thread_key == *thread_key,
SablierError::InvalidThreadAccount
);
}
}

// Transfer lamports out (implicit close)
{
let thread_account = &ctx.accounts.thread;
let thread_lamports = thread_account.get_lamports();
thread_account.sub_lamports(thread_lamports)?;
close_to.add_lamports(thread_lamports)?;
}
Ok(())
}
33 changes: 33 additions & 0 deletions programs/thread/src/instructions/thread_reset_next.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use {crate::state::*, anchor_lang::prelude::*, sablier_network_program::state::Config};

/// Accounts required by the `thread_reset` instruction.
#[derive(Accounts)]
pub struct ThreadResetNext<'info> {
#[account(has_one = admin)]
pub config: AccountLoader<'info, Config>,
pub admin: Signer<'info>,
/// The thread to be paused.
#[account(mut)]
pub thread: Account<'info, Thread>,
}

pub fn handler(ctx: Context<ThreadResetNext>, timestamp: i64) -> Result<()> {
// Get accounts
let thread = &mut ctx.accounts.thread;
let clock = Clock::get()?;

// Full reset the thread state.
thread.exec_context = Some(ExecContext {
exec_index: 0,
execs_since_reimbursement: 0,
execs_since_slot: 0,
last_exec_at: clock.slot,
trigger_context: TriggerContext::Periodic {
started_at: timestamp,
},
});
thread.trigger = Trigger::Periodic { delay: 21600 };
thread.next_instruction = None;

Ok(())
}
4 changes: 4 additions & 0 deletions programs/thread/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,8 @@ pub mod thread_program {
pub fn thread_withdraw(ctx: Context<ThreadWithdraw>, amount: u64) -> Result<()> {
thread_withdraw::handler(ctx, amount)
}

pub fn thread_reset_next(ctx: Context<ThreadResetNext>, timestamp: i64) -> Result<()> {
thread_reset_next::handler(ctx, timestamp)
}
}