diff --git a/Cargo.toml b/Cargo.toml index bfc06f6..91b10bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] cryptohelpers = {version = "^2.0.0", features= ["async", "sha256"]} +futures = "^0.3" hex = "^0.4" solana-client = "^1.9" solana-transaction-status = "^1.9" @@ -23,3 +24,4 @@ serde = "^1" serde_json = "^1" reqwest = {version = "^0.11", features = ["multipart", "stream"]} tokio = {version = "1.18.2", features = ["rt", "fs", "io-util"]} +tracing = "^0.1" diff --git a/example/Cargo.toml b/example/Cargo.toml index 44ca660..d9ac75d 100644 --- a/example/Cargo.toml +++ b/example/Cargo.toml @@ -14,3 +14,7 @@ solana-sdk = "^1.9" solana-client = "^1.9" byte-unit = "^4" tokio = {version = "1.18.2", features = ["full"]} +tokio-stream = {version = "^0.1", features = ["fs"]} +futures = "^0.3" +tracing = "0.1.34" +tracing-subscriber = {version = "0.3.11", features = ["env-filter"]} \ No newline at end of file diff --git a/example/multiple_uploads/0.txt b/example/multiple_uploads/0.txt new file mode 100644 index 0000000..d426ba8 --- /dev/null +++ b/example/multiple_uploads/0.txt @@ -0,0 +1 @@ +V \ No newline at end of file diff --git a/example/multiple_uploads/1.txt b/example/multiple_uploads/1.txt new file mode 100644 index 0000000..9cbe6ea --- /dev/null +++ b/example/multiple_uploads/1.txt @@ -0,0 +1 @@ +e \ No newline at end of file diff --git a/example/multiple_uploads/10.txt b/example/multiple_uploads/10.txt new file mode 100644 index 0000000..60a89ed --- /dev/null +++ b/example/multiple_uploads/10.txt @@ -0,0 +1 @@ +O \ No newline at end of file diff --git a/example/multiple_uploads/11.txt b/example/multiple_uploads/11.txt new file mode 100644 index 0000000..1d2f014 --- /dev/null +++ b/example/multiple_uploads/11.txt @@ -0,0 +1 @@ +r \ No newline at end of file diff --git a/example/multiple_uploads/12.txt b/example/multiple_uploads/12.txt new file mode 100644 index 0000000..3410062 --- /dev/null +++ b/example/multiple_uploads/12.txt @@ -0,0 +1 @@ +c \ No newline at end of file diff --git a/example/multiple_uploads/2.txt b/example/multiple_uploads/2.txt new file mode 100644 index 0000000..7937c68 --- /dev/null +++ b/example/multiple_uploads/2.txt @@ -0,0 +1 @@ +g \ No newline at end of file diff --git a/example/multiple_uploads/3.txt b/example/multiple_uploads/3.txt new file mode 100644 index 0000000..9cbe6ea --- /dev/null +++ b/example/multiple_uploads/3.txt @@ -0,0 +1 @@ +e \ No newline at end of file diff --git a/example/multiple_uploads/4.txt b/example/multiple_uploads/4.txt new file mode 100644 index 0000000..32f64f4 --- /dev/null +++ b/example/multiple_uploads/4.txt @@ -0,0 +1 @@ +t \ No newline at end of file diff --git a/example/multiple_uploads/5.txt b/example/multiple_uploads/5.txt new file mode 100644 index 0000000..2e65efe --- /dev/null +++ b/example/multiple_uploads/5.txt @@ -0,0 +1 @@ +a \ No newline at end of file diff --git a/example/multiple_uploads/6.txt b/example/multiple_uploads/6.txt new file mode 100644 index 0000000..1d2f014 --- /dev/null +++ b/example/multiple_uploads/6.txt @@ -0,0 +1 @@ +r \ No newline at end of file diff --git a/example/multiple_uploads/7.txt b/example/multiple_uploads/7.txt new file mode 100644 index 0000000..597a6db --- /dev/null +++ b/example/multiple_uploads/7.txt @@ -0,0 +1 @@ +i \ No newline at end of file diff --git a/example/multiple_uploads/8.txt b/example/multiple_uploads/8.txt new file mode 100644 index 0000000..2e65efe --- /dev/null +++ b/example/multiple_uploads/8.txt @@ -0,0 +1 @@ +a \ No newline at end of file diff --git a/example/multiple_uploads/9.txt b/example/multiple_uploads/9.txt new file mode 100644 index 0000000..ef073cc --- /dev/null +++ b/example/multiple_uploads/9.txt @@ -0,0 +1 @@ +n \ No newline at end of file diff --git a/example/src/bin/cancel_delete_file.rs b/example/src/bin/cancel_delete_file.rs index a3d4eb3..c7d8618 100644 --- a/example/src/bin/cancel_delete_file.rs +++ b/example/src/bin/cancel_delete_file.rs @@ -26,7 +26,7 @@ async fn main() { //cancel delete file let cancel_delete_file_response = shdw_drive_client - .cancel_delete_file(storage_account_key, url) + .cancel_delete_file(&storage_account_key, url) .await .expect("failed to cancel file deletion"); diff --git a/example/src/bin/cancel_delete_storage_account.rs b/example/src/bin/cancel_delete_storage_account.rs index 2ec0b8f..668cdb9 100644 --- a/example/src/bin/cancel_delete_storage_account.rs +++ b/example/src/bin/cancel_delete_storage_account.rs @@ -17,7 +17,7 @@ async fn main() { let shdw_drive_client = Client::new(keypair, solana_rpc); let response = shdw_drive_client - .cancel_delete_storage_account(storage_account_key) + .cancel_delete_storage_account(&storage_account_key) .await .expect("failed to cancel storage account deletion"); diff --git a/example/src/bin/delete_file.rs b/example/src/bin/delete_file.rs index 1eb8c19..1a919bc 100644 --- a/example/src/bin/delete_file.rs +++ b/example/src/bin/delete_file.rs @@ -26,7 +26,7 @@ async fn main() { //delete file let delete_file_response = shdw_drive_client - .delete_file(storage_account_key, url) + .delete_file(&storage_account_key, url) .await .expect("failed to request storage account deletion"); diff --git a/example/src/bin/delete_storage_account.rs b/example/src/bin/delete_storage_account.rs index eec2866..a4af922 100644 --- a/example/src/bin/delete_storage_account.rs +++ b/example/src/bin/delete_storage_account.rs @@ -17,7 +17,7 @@ async fn main() { let shdw_drive_client = Client::new(keypair, solana_rpc); let response = shdw_drive_client - .delete_storage_account(storage_account_key) + .delete_storage_account(&storage_account_key) .await .expect("failed to request storage account deletion"); diff --git a/example/src/bin/main.rs b/example/src/bin/main.rs index 1663432..065a2ce 100644 --- a/example/src/bin/main.rs +++ b/example/src/bin/main.rs @@ -30,12 +30,12 @@ async fn main() { ); let shdw_drive_client = Client::new(keypair, solana_rpc); - list_objects_test(shdw_drive_client, storage_account_key).await; + list_objects_test(shdw_drive_client, &storage_account_key).await; } async fn list_objects_test( shdw_drive_client: Client, - storage_account_key: Pubkey, + storage_account_key: &Pubkey, ) { let objects = shdw_drive_client .list_objects(storage_account_key) @@ -47,7 +47,7 @@ async fn list_objects_test( async fn make_storage_immutable_test( shdw_drive_client: Client, - storage_account_key: Pubkey, + storage_account_key: &Pubkey, ) { let storage_account = shdw_drive_client .get_storage_account(storage_account_key) @@ -59,14 +59,14 @@ async fn make_storage_immutable_test( ); let make_immutable_response = shdw_drive_client - .make_storage_immutable(storage_account_key) + .make_storage_immutable(&storage_account_key) .await .expect("failed to make storage immutable"); println!("txn id: {:?}", make_immutable_response.txid); let storage_account = shdw_drive_client - .get_storage_account(storage_account_key) + .get_storage_account(&storage_account_key) .await .expect("failed to get storage account"); println!( @@ -77,10 +77,10 @@ async fn make_storage_immutable_test( async fn add_storage_test( shdw_drive_client: Client, - storage_account_key: Pubkey, + storage_account_key: &Pubkey, ) { let storage_account = shdw_drive_client - .get_storage_account(storage_account_key) + .get_storage_account(&storage_account_key) .await .expect("failed to get storage account"); @@ -95,7 +95,7 @@ async fn add_storage_test( println!("txn id: {:?}", add_storage_response.txid); let storage_account = shdw_drive_client - .get_storage_account(storage_account_key) + .get_storage_account(&storage_account_key) .await .expect("failed to get storage account"); @@ -104,7 +104,7 @@ async fn add_storage_test( async fn reduce_storage_test( shdw_drive_client: Client, - storage_account_key: Pubkey, + storage_account_key: &Pubkey, ) { let storage_account = shdw_drive_client .get_storage_account(storage_account_key) @@ -133,7 +133,7 @@ async fn reduce_storage_test( async fn upload_file_test( shdw_drive_client: Client, - storage_account_key: Pubkey, + storage_account_key: &Pubkey, ) { let file = tokio::fs::File::open("example.png") .await @@ -143,7 +143,7 @@ async fn upload_file_test( .upload_file( storage_account_key, ShdwFile { - name: Some(String::from("example.png")), + name: String::from("example.png"), file, }, ) diff --git a/example/src/bin/upload_multiple_files.rs b/example/src/bin/upload_multiple_files.rs new file mode 100644 index 0000000..44b7490 --- /dev/null +++ b/example/src/bin/upload_multiple_files.rs @@ -0,0 +1,72 @@ +use byte_unit::{Byte, ByteUnit}; +use futures::{future::join_all, TryStreamExt}; +use shadow_drive_rust::{models::ShdwFile, Client}; +use solana_client::rpc_client::RpcClient; +use solana_sdk::{ + pubkey::Pubkey, + signer::{keypair::read_keypair_file, Signer}, +}; +use std::str::FromStr; +use tokio::fs::File; +use tokio_stream::StreamExt; +use tracing::Level; + +const KEYPAIR_PATH: &str = "keypair.json"; + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt() + .with_env_filter("off,shadow_drive_rust=debug") + .init(); + + //load keypair from file + let keypair = read_keypair_file(KEYPAIR_PATH).expect("failed to load keypair at path"); + let pubkey = keypair.pubkey(); + let (storage_account_key, _) = + shadow_drive_rust::derived_addresses::storage_account(&pubkey, 4); + + //create shdw drive client + let solana_rpc = RpcClient::new("https://ssc-dao.genesysgo.net"); + let shdw_drive_client = Client::new(keypair, solana_rpc); + + //ensure storage account + if let Err(_) = shdw_drive_client + .get_storage_account(&storage_account_key) + .await + { + println!("Error finding storage account, assuming it's not created yet"); + shdw_drive_client + .create_storage_account( + "shadow-drive-rust-test-2", + Byte::from_str("1MB").expect("failed to parse byte string"), + ) + .await + .expect("failed to create storage account"); + } + + let dir = tokio::fs::read_dir("multiple_uploads") + .await + .expect("failed to read multiple uploads dir"); + + let files = tokio_stream::wrappers::ReadDirStream::new(dir) + .filter(Result::is_ok) + .and_then(|entry| async move { + Ok(ShdwFile { + name: entry + .file_name() + .into_string() + .expect("failed to convert os string to regular string"), + file: File::open(entry.path()).await.expect("failed to open file"), + }) + }) + .collect::, _>>() + .await + .expect("failed to create shdw files for dir"); + + let upload_results = shdw_drive_client + .upload_multiple_files(&storage_account_key, files) + .await + .expect("failed to upload files"); + + println!("upload results: {:#?}", upload_results); +} diff --git a/src/client_impl.rs b/src/client_impl.rs index 352ca6e..3d1b729 100644 --- a/src/client_impl.rs +++ b/src/client_impl.rs @@ -15,6 +15,7 @@ mod list_objects; mod make_storage_immutable; mod reduce_storage; mod upload_file; +mod upload_multiple_files; pub use add_storage::*; pub use cancel_delete_file::*; @@ -29,6 +30,7 @@ pub use list_objects::*; pub use make_storage_immutable::*; pub use reduce_storage::*; pub use upload_file::*; +pub use upload_multiple_files::*; use crate::{ constants::SHDW_DRIVE_ENDPOINT, diff --git a/src/client_impl/edit_file.rs b/src/client_impl/edit_file.rs index 0bdb4dd..47e156b 100644 --- a/src/client_impl/edit_file.rs +++ b/src/client_impl/edit_file.rs @@ -67,15 +67,13 @@ where //construct file part and create form let mut file_part = Part::stream(data.file); - if let Some(name) = data.name.as_ref() { - if name.as_bytes().len() > 32 { - errors.push(FileError { - file: Some(name.to_string()), - error: String::from("Exceed the 1GB limit."), - }); - } else { - file_part = file_part.file_name(name.to_string()); - } + if data.name.as_bytes().len() > 32 { + errors.push(FileError { + file: data.name.clone(), + error: String::from("File name too long. Reduce to 32 bytes long."), + }); + } else { + file_part = file_part.file_name(data.name.clone()); } if errors.len() > 0 { diff --git a/src/client_impl/upload_file.rs b/src/client_impl/upload_file.rs index 89f77b7..7379dcc 100644 --- a/src/client_impl/upload_file.rs +++ b/src/client_impl/upload_file.rs @@ -58,15 +58,13 @@ where //construct file part and create form let mut file_part = Part::stream(data.file); - if let Some(name) = data.name.as_ref() { - if name.as_bytes().len() > 32 { - errors.push(FileError { - file: Some(name.to_string()), - error: String::from("Exceed the 1GB limit."), - }); - } else { - file_part = file_part.file_name(name.to_string()); - } + if data.name.as_bytes().len() > 32 { + errors.push(FileError { + file: data.name.clone(), + error: String::from("File name too long. Reduce to 32 bytes long."), + }); + } else { + file_part = file_part.file_name(data.name.clone()); } if errors.len() > 0 { @@ -90,7 +88,7 @@ where system_program: system_program::ID, }; let args = shdw_drive_instructions::StoreFile { - filename: data.name.unwrap_or_default().to_string(), + filename: data.name, sha256_hash: hex::encode(sha256_hash.into_bytes()), size: file_size as u64, }; diff --git a/src/client_impl/upload_multiple_files.rs b/src/client_impl/upload_multiple_files.rs new file mode 100644 index 0000000..d4b6247 --- /dev/null +++ b/src/client_impl/upload_multiple_files.rs @@ -0,0 +1,420 @@ +use anchor_lang::{system_program, InstructionData, ToAccountMetas}; +use cryptohelpers::sha256; +use futures::future::join_all; +use reqwest::multipart::{Form, Part}; +use serde_json::Value; +use shadow_drive_user_staking::accounts as shdw_drive_accounts; +use shadow_drive_user_staking::instruction as shdw_drive_instructions; +use solana_client::rpc_client::serialize_and_encode; +use solana_sdk::{ + instruction::Instruction, pubkey::Pubkey, signer::Signer, transaction::Transaction, +}; +use solana_transaction_status::UiTransactionEncoding; +use std::collections::HashSet; +use std::fs::Metadata; +use std::io::SeekFrom; +use std::time::Duration; +use tokio::fs::File; +use tokio::io::AsyncSeekExt; + +use super::Client; +use crate::{ + constants::{PROGRAM_ADDRESS, SHDW_DRIVE_ENDPOINT, STORAGE_CONFIG_PDA, TOKEN_MINT, UPLOADER}, + derived_addresses, + error::{Error, FileError}, + models::*, +}; + +/// UploadingData is a collection of info required for uploading a file +/// to Shadow Drive. Fields are generally derived from a given [`ShdwFile`] during the upload process. +#[derive(Debug)] +struct UploadingData { + name: String, + size: u64, + sha256_hash: sha256::Sha256Hash, + url: String, + file: File, +} + +impl Client +where + T: Signer + Send + Sync, +{ + /// upload_multiple_files uploads a list of [`ShdwFile`]s to Shadow Drive. + /// The multiple upload process is done in 4 steps: + /// 1. Validate & prepare all files into [`UploadingData`]. If a file there are validation errors, the process is aborted. + /// 2. Filter files that have the same name as a previously uploaded file. Uploads are not attempted for duplicates. + /// 3. Divide files to be uploaded into batches of 5 or less to reduce calls but keep transaction size below the limit. + /// 4. For each batch: + /// a. confirm file account seed + /// b. derive file account pubkey for each file + /// c. construct & partial sign transaction + /// d. submit transaction and files to Shadow Drive as multipart form data + pub async fn upload_multiple_files( + &self, + storage_account_key: &Pubkey, + data: Vec, + ) -> ShadowDriveResult> { + let wallet_pubkey = self.wallet.pubkey(); + let (user_info, _) = derived_addresses::user_info(&wallet_pubkey); + let selected_account = self.get_storage_account(storage_account_key).await?; + + //collect upload data for each file + let upload_data_futures = + data.into_iter() + .map(|shdw_file| async move { + self.prepare_upload(shdw_file, storage_account_key).await + }) + .collect::>(); + + let file_data = join_all(upload_data_futures).await; + + let (succeeded_files, errored_files): (Vec<_>, Vec<_>) = + file_data.into_iter().partition(Result::is_ok); + //it's safe to unwrap after the above partition + let errored_files: Vec> = + errored_files.into_iter().map(Result::unwrap_err).collect(); + if errored_files.len() > 0 { + return Err(Error::FileValidationError( + errored_files.into_iter().flatten().collect(), + )); + } + let succeeded_files = succeeded_files.into_iter().map(Result::unwrap); + + //filter out any existing files + let all_objects: HashSet = self + .list_objects(&storage_account_key) + .await? + .into_iter() + .collect(); + let (to_upload, existing_uploads): (Vec<_>, Vec<_>) = succeeded_files + .into_iter() + .partition(|file| !all_objects.contains(&file.name)); + + //pre-fill results w/ existing files + let mut upload_results = existing_uploads + .into_iter() + .map(|file| ShadowBatchUploadResponse { + file_name: file.name, + status: BatchUploadStatus::AlreadyExists, + location: Some(file.url), + transaction_signature: None, + }) + .collect::>(); + + if upload_results.len() > 0 { + tracing::debug!(existing_uploads = ?upload_results, "found existing files, will not attempt re-upload for existing files"); + } + + let mut batches = Vec::default(); + let mut current_batch: Vec = Vec::default(); + let mut batch_total_name_length = 0; + + for file_data in to_upload { + if current_batch.is_empty() { + batch_total_name_length += file_data.name.as_bytes().len(); + current_batch.push(file_data); + continue; + } + + //if the current batch has 5 or less + if current_batch.len() < 5 && + //our current name buffer is under the limit + batch_total_name_length < 154 && + //the name buffer will be under size with the new file + batch_total_name_length + file_data.name.as_bytes().len() < 154 + { + //add to current batch + batch_total_name_length += file_data.name.as_bytes().len(); + current_batch.push(file_data); + } else { + //create new batch and clear name buffer + batches.push(current_batch); + current_batch = Vec::default(); + current_batch.push(file_data); + batch_total_name_length = 0; + } + } + //if the final batch has something, push it to batches + if !current_batch.is_empty() { + batches.push(current_batch); + } + + let mut new_file_seed = selected_account.init_counter; + + //send each batch to shdw drive + for batch in batches { + //confirm file seed before sending + new_file_seed = self + .confirm_storage_account_seed(new_file_seed, storage_account_key) + .await?; + + let mut num_retries = 0; + loop { + match self + .send_batch(storage_account_key, user_info, &mut new_file_seed, &batch) + .await + { + Ok(response) => { + upload_results.extend(response.into_iter()); + //break loop to move to next batch + break; + } + Err(error) => { + tracing::error!( + retries = num_retries, + ?error, + "error uploading batch to shdw drive" + ); + num_retries += 1; + //after 5 attempts bail on the batch + if num_retries == 5 { + //reset file seed + new_file_seed = self + .confirm_storage_account_seed( + selected_account.init_counter, + storage_account_key, + ) + .await?; + + //save failed entries + let failed = batch.into_iter().map(|file| ShadowBatchUploadResponse { + file_name: file.name, + status: BatchUploadStatus::Error(format!("{:?}", error)), + location: None, + transaction_signature: None, + }); + upload_results.extend(failed); + //break batch retry loop to move to next + break; + } + } + } + } + + //wait 1/2 second before going to next batch + //without this the latest blockhash doesn't align with the server's recent blockhash + tokio::time::sleep(Duration::from_millis(500)).await; + } + + Ok(upload_results) + } + + /// prepare_upload takes a [`ShdwFile`] and creates an [`UploadingData`] by: + /// 1. retrieve file size & validate that it is below the 1GB limit + /// 2. derive the expected upload URL from the `storage_account_key` and the file name, + /// 3. calculate sha256 hash of the data in streaming fashion + /// 4. validate that the file name is less than 32 bytes + async fn prepare_upload( + &self, + mut shdw_file: ShdwFile, + storage_account_key: &Pubkey, + ) -> Result> { + let mut errors = Vec::new(); + let file_meta: Metadata; + match shdw_file.file.metadata().await { + Ok(meta) => file_meta = meta, + Err(err) => { + errors.push(FileError { + file: shdw_file.name.clone(), + error: format!("error opening file metadata: {:?}", err), + }); + return Err(errors); + } + } + let file_size = file_meta.len(); + if file_size > 1_073_741_824 { + errors.push(FileError { + file: shdw_file.name.clone(), + error: String::from("Exceed the 1GB limit."), + }); + } + + //this may need to be url encoded + let url = format!( + "https://shdw-drive.genesysgo.net/{}/{}", + storage_account_key.to_string(), + &shdw_file.name + ); + + //store any info about file bytes before moving into form + let sha256_hash = match sha256::compute(&mut shdw_file.file).await { + Ok(hash) => hash, + Err(err) => { + errors.push(FileError { + file: shdw_file.name.clone(), + error: format!("error hashing file: {:?}", err), + }); + return Err(errors); + } + }; + + if shdw_file.name.as_bytes().len() > 32 { + errors.push(FileError { + file: shdw_file.name.clone(), + error: String::from("Exceed the 1GB limit."), + }); + } + + if errors.len() > 0 { + return Err(errors); + } + + Ok(UploadingData { + name: shdw_file.name, + size: file_size, + sha256_hash, + url, + file: shdw_file.file, + }) + } + + /// confirm_storage_account_seed performs a retry loop to ensure that the file seed + /// use to derive file account pubkeys is valid before creating a batch upload transaction. + /// In the event that the chain has a more up to date file seed, the on-chain seed is used for the transaction. + async fn confirm_storage_account_seed( + &self, + expected_seed: u32, + storage_account_key: &Pubkey, + ) -> ShadowDriveResult { + let mut num_tries = 0; + loop { + let storage_account = self.get_storage_account(storage_account_key).await?; + if expected_seed == storage_account.init_counter { + tracing::debug!( + expected_seed, + actual_seed = storage_account.init_counter, + "Chain has up to date info. Moving onto the next batch." + ); + return Ok(expected_seed); + } else if expected_seed < storage_account.init_counter { + tracing::debug!( + expected_seed, + actual_seed = storage_account.init_counter, + "Chain has higher seed. Fast forwarding to new start." + ); + return Ok(storage_account.init_counter); + } else { + num_tries += 1; + if num_tries == 300 { + // if we've tried for 5 minutes, give up + return Err(Error::InvalidStorage); + } + + tracing::debug!( + expected_seed, + actual_seed = storage_account.init_counter, + "Chain does not have up to date info. Waiting 1s to check again." + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + + /// send_batch constructs and partially signs a transaction for a given slice of [`UploadingData`]. + /// The transcation is then sent via HTTP POST to Shadow Drive servers as multipart form data alongside file contents. + async fn send_batch( + &self, + storage_account_key: &Pubkey, + user_info: Pubkey, + new_file_seed: &mut u32, + batch: &[UploadingData], + ) -> ShadowDriveResult> { + //derive file account pubkeys using new_file_seed + let mut files_with_pubkeys: Vec<(Pubkey, &UploadingData)> = Vec::with_capacity(batch.len()); + for file in batch { + files_with_pubkeys.push(( + derived_addresses::file_account(&storage_account_key, *new_file_seed).0, + file, + )); + *new_file_seed += 1; + } + + //build txn + let instructions = files_with_pubkeys + .iter() + .map(|(file_account, file)| { + let accounts = shdw_drive_accounts::StoreFile { + storage_config: *STORAGE_CONFIG_PDA, + storage_account: *storage_account_key, + user_info, + owner: self.wallet.pubkey(), + uploader: UPLOADER, + token_mint: TOKEN_MINT, + system_program: system_program::ID, + file: *file_account, + }; + let args = shdw_drive_instructions::StoreFile { + filename: file.name.clone(), + sha256_hash: hex::encode(file.sha256_hash.into_bytes()), + size: file.size, + }; + Instruction { + program_id: PROGRAM_ADDRESS, + accounts: accounts.to_account_metas(None), + data: args.data(), + } + }) + .collect::>(); + + let mut txn = + Transaction::new_with_payer(instructions.as_slice(), Some(&self.wallet.pubkey())); + + txn.try_partial_sign(&[&self.wallet], self.rpc_client.get_latest_blockhash()?)?; + let txn_encoded = serialize_and_encode(&txn, UiTransactionEncoding::Base64)?; + + //construct HTTP form data + let mut form = Form::new(); + for (_, file) in files_with_pubkeys { + //because file is borrowed, we have to obtain a new instance. + // `try_clone` uses the underlying file handle as the original + let mut file_data = file + .file + .try_clone() + .await + .map_err(Error::FileSystemError)?; + + //seek to front of file + file_data + .seek(SeekFrom::Start(0)) + .await + .map_err(Error::FileSystemError)?; + + form = form.part( + "file", + Part::stream_with_length(file_data, file.size).file_name(file.name.clone()), + ); + } + + let form = form.part("transaction", Part::text(txn_encoded)); + + //submit files to Shadow Drive + let response = self + .http_client + .post(format!("{}/upload-batch", SHDW_DRIVE_ENDPOINT)) + .multipart(form) + .send() + .await?; + + if !response.status().is_success() { + return Err(Error::ShadowDriveServerError { + status: response.status().as_u16(), + message: response.json::().await?, + }); + } + + //deserialize the response from ShadowDrive and return the upload details + let response = response.json::().await?; + let response = batch + .iter() + .map(|file| ShadowBatchUploadResponse { + file_name: file.name.clone(), + status: BatchUploadStatus::Uploaded, + location: Some(file.url.clone()), + transaction_signature: Some(response.transaction_signature.clone()), + }) + .collect(); + + Ok(response) + } +} diff --git a/src/error.rs b/src/error.rs index c2b655c..26180e9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -28,7 +28,7 @@ pub enum Error { #[derive(Debug)] pub struct FileError { - pub file: Option, + pub file: String, pub error: String, } diff --git a/src/models.rs b/src/models.rs index ed5a7dc..cc5bf8f 100644 --- a/src/models.rs +++ b/src/models.rs @@ -19,7 +19,7 @@ pub struct CreateStorageAccountResponse { /// A ShdwFile is the pairing of a filename w/ bytes to be uploaded #[derive(Debug)] pub struct ShdwFile { - pub name: Option, + pub name: String, pub file: fs::File, } @@ -29,6 +29,26 @@ pub struct ShadowUploadResponse { pub transaction_signature: String, } +#[derive(Clone, Debug, Deserialize)] +pub(crate) struct ShdwDriveBatchServerResponse { + pub _finalized_locations: Option>, + pub transaction_signature: String, +} + +#[derive(Clone, Debug, Deserialize)] +pub enum BatchUploadStatus { + Uploaded, + AlreadyExists, + Error(String), +} +#[derive(Clone, Debug, Deserialize)] +pub struct ShadowBatchUploadResponse { + pub file_name: String, + pub status: BatchUploadStatus, + pub location: Option, + pub transaction_signature: Option, +} + #[derive(Clone, Debug, Deserialize)] pub struct FileDataResponse { pub file_data: FileData,