From 94acfbe202e672819d7482b7806337daf0b0bca7 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Thu, 1 Oct 2020 01:55:22 +0800 Subject: [PATCH] Use protobufs to store confirmed blocks in BigTable (#12526) * Use protobufs to store confirmed blocks in BigTable * Cleanup * Reorganize proto * Clean up use statements * Split out function for unit testing * s/utils/convert Co-authored-by: Tyera Eulberg (cherry picked from commit ce598c5c98e7384c104fe7f5121e32c2c5a2d2eb) --- storage-bigtable/build-proto/src/main.rs | 16 + .../proto/solana.bigtable.confirmed_block.rs | 95 ++++++ storage-bigtable/src/bigtable.rs | 223 +++++++++++++- storage-bigtable/src/confirmed_block.proto | 67 ++++ storage-bigtable/src/convert.rs | 291 ++++++++++++++++++ storage-bigtable/src/lib.rs | 34 +- transaction-status/src/lib.rs | 4 +- 7 files changed, 707 insertions(+), 23 deletions(-) create mode 100644 storage-bigtable/proto/solana.bigtable.confirmed_block.rs create mode 100644 storage-bigtable/src/confirmed_block.proto create mode 100644 storage-bigtable/src/convert.rs diff --git a/storage-bigtable/build-proto/src/main.rs b/storage-bigtable/build-proto/src/main.rs index a61afd1e183484..1afc36df440503 100644 --- a/storage-bigtable/build-proto/src/main.rs +++ b/storage-bigtable/build-proto/src/main.rs @@ -15,5 +15,21 @@ fn main() -> Result<(), std::io::Error> { .compile( &[googleapis.join("google/bigtable/v2/bigtable.proto")], &[googleapis], + )?; + + let out_dir = manifest_dir.join("../proto"); + let proto_files = manifest_dir.join("../src"); + + println!("Protobuf directory: {}", proto_files.display()); + println!("output directory: {}", out_dir.display()); + + tonic_build::configure() + .build_client(true) + .build_server(false) + .format(true) + .out_dir(&out_dir) + .compile( + &[proto_files.join("confirmed_block.proto")], + &[proto_files], ) } diff --git a/storage-bigtable/proto/solana.bigtable.confirmed_block.rs b/storage-bigtable/proto/solana.bigtable.confirmed_block.rs new file mode 100644 index 00000000000000..f49b1a7a40dd90 --- /dev/null +++ b/storage-bigtable/proto/solana.bigtable.confirmed_block.rs @@ -0,0 +1,95 @@ +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ConfirmedBlock { + #[prost(string, tag = "1")] + pub previous_blockhash: std::string::String, + #[prost(string, tag = "2")] + pub blockhash: std::string::String, + #[prost(uint64, tag = "3")] + pub parent_slot: u64, + #[prost(message, repeated, tag = "4")] + pub transactions: ::std::vec::Vec, + #[prost(message, repeated, tag = "5")] + pub rewards: ::std::vec::Vec, + #[prost(message, optional, tag = "6")] + pub block_time: ::std::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ConfirmedTransaction { + #[prost(message, optional, tag = "1")] + pub transaction: ::std::option::Option, + #[prost(message, optional, tag = "2")] + pub meta: ::std::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Transaction { + #[prost(bytes, repeated, tag = "1")] + pub signatures: ::std::vec::Vec>, + #[prost(message, optional, tag = "2")] + pub message: ::std::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Message { + #[prost(message, optional, tag = "1")] + pub header: ::std::option::Option, + #[prost(bytes, repeated, tag = "2")] + pub account_keys: ::std::vec::Vec>, + #[prost(bytes, tag = "3")] + pub recent_blockhash: std::vec::Vec, + #[prost(message, repeated, tag = "4")] + pub instructions: ::std::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MessageHeader { + #[prost(uint32, tag = "1")] + pub num_required_signatures: u32, + #[prost(uint32, tag = "2")] + pub num_readonly_signed_accounts: u32, + #[prost(uint32, tag = "3")] + pub num_readonly_unsigned_accounts: u32, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TransactionStatusMeta { + #[prost(message, optional, tag = "1")] + pub err: ::std::option::Option, + #[prost(uint64, tag = "2")] + pub fee: u64, + #[prost(uint64, repeated, tag = "3")] + pub pre_balances: ::std::vec::Vec, + #[prost(uint64, repeated, tag = "4")] + pub post_balances: ::std::vec::Vec, + #[prost(message, repeated, tag = "5")] + pub inner_instructions: ::std::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TransactionError { + #[prost(bytes, tag = "1")] + pub err: std::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InnerInstructions { + #[prost(uint32, tag = "1")] + pub index: u32, + #[prost(message, repeated, tag = "2")] + pub instructions: ::std::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CompiledInstruction { + #[prost(uint32, tag = "1")] + pub program_id_index: u32, + #[prost(bytes, tag = "2")] + pub accounts: std::vec::Vec, + #[prost(bytes, tag = "3")] + pub data: std::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Reward { + #[prost(string, tag = "1")] + pub pubkey: std::string::String, + #[prost(int64, tag = "2")] + pub lamports: i64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct UnixTimestamp { + #[prost(int64, tag = "1")] + pub timestamp: i64, +} diff --git a/storage-bigtable/src/bigtable.rs b/storage-bigtable/src/bigtable.rs index 9d2b1d5740aaeb..0cfbd16f43e94d 100644 --- a/storage-bigtable/src/bigtable.rs +++ b/storage-bigtable/src/bigtable.rs @@ -1,8 +1,10 @@ // Primitives for reading/writing BigTable tables -use crate::access_token::{AccessToken, Scope}; -use crate::compression::{compress_best, decompress}; -use crate::root_ca_certificate; +use crate::{ + access_token::{AccessToken, Scope}, + compression::{compress_best, decompress}, + root_ca_certificate, +}; use log::*; use thiserror::Error; use tonic::{metadata::MetadataValue, transport::ClientTlsConfig, Request}; @@ -26,10 +28,14 @@ mod google { use google::bigtable::v2::*; pub type RowKey = String; -pub type CellName = String; -pub type CellValue = Vec; pub type RowData = Vec<(CellName, CellValue)>; pub type RowDataSlice<'a> = &'a [(CellName, CellValue)]; +pub type CellName = String; +pub type CellValue = Vec; +pub enum CellData { + Bincode(B), + Protobuf(P), +} #[derive(Debug, Error)] pub enum Error { @@ -196,6 +202,23 @@ impl BigTableConnection { .retry(ExponentialBackoff::default()) .await } + + pub async fn put_protobuf_cells_with_retry( + &self, + table: &str, + cells: &[(RowKey, T)], + ) -> Result + where + T: prost::Message, + { + use backoff::{future::FutureOperation as _, ExponentialBackoff}; + (|| async { + let mut client = self.client(); + Ok(client.put_protobuf_cells(table, cells).await?) + }) + .retry(ExponentialBackoff::default()) + .await + } } pub struct BigTable { @@ -484,7 +507,20 @@ impl BigTable { T: serde::de::DeserializeOwned, { let row_data = self.get_single_row_data(table, key.clone()).await?; - deserialize_cell_data(&row_data, table, key.to_string()) + deserialize_bincode_cell_data(&row_data, table, key.to_string()) + } + + pub async fn get_protobuf_or_bincode_cell( + &mut self, + table: &str, + key: RowKey, + ) -> Result> + where + B: serde::de::DeserializeOwned, + P: prost::Message + Default, + { + let row_data = self.get_single_row_data(table, key.clone()).await?; + deserialize_protobuf_or_bincode_cell_data(&row_data, table, key) } pub async fn put_bincode_cells( @@ -506,9 +542,74 @@ impl BigTable { self.put_row_data(table, "x", &new_row_data).await?; Ok(bytes_written) } + + pub async fn put_protobuf_cells( + &mut self, + table: &str, + cells: &[(RowKey, T)], + ) -> Result + where + T: prost::Message, + { + let mut bytes_written = 0; + let mut new_row_data = vec![]; + for (row_key, data) in cells { + let mut buf = Vec::with_capacity(data.encoded_len()); + data.encode(&mut buf).unwrap(); + let data = compress_best(&buf)?; + bytes_written += data.len(); + new_row_data.push((row_key, vec![("proto".to_string(), data)])); + } + + self.put_row_data(table, "x", &new_row_data).await?; + Ok(bytes_written) + } +} + +pub(crate) fn deserialize_protobuf_or_bincode_cell_data( + row_data: RowDataSlice, + table: &str, + key: RowKey, +) -> Result> +where + B: serde::de::DeserializeOwned, + P: prost::Message + Default, +{ + match deserialize_protobuf_cell_data(row_data, table, key.to_string()) { + Ok(result) => return Ok(CellData::Protobuf(result)), + Err(err) => match err { + Error::ObjectNotFound(_) => {} + _ => return Err(err), + }, + } + match deserialize_bincode_cell_data(row_data, table, key) { + Ok(result) => Ok(CellData::Bincode(result)), + Err(err) => Err(err), + } +} + +pub(crate) fn deserialize_protobuf_cell_data( + row_data: RowDataSlice, + table: &str, + key: RowKey, +) -> Result +where + T: prost::Message + Default, +{ + let value = &row_data + .iter() + .find(|(name, _)| name == "proto") + .ok_or_else(|| Error::ObjectNotFound(format!("{}/{}", table, key)))? + .1; + + let data = decompress(&value)?; + T::decode(&data[..]).map_err(|err| { + warn!("Failed to deserialize {}/{}: {}", table, key, err); + Error::ObjectCorrupt(format!("{}/{}", table, key)) + }) } -pub(crate) fn deserialize_cell_data( +pub(crate) fn deserialize_bincode_cell_data( row_data: RowDataSlice, table: &str, key: RowKey, @@ -528,3 +629,111 @@ where Error::ObjectCorrupt(format!("{}/{}", table, key)) }) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{convert::generated, StoredConfirmedBlock}; + use prost::Message; + use solana_sdk::{hash::Hash, pubkey::Pubkey, signature::Keypair, system_transaction}; + use solana_transaction_status::{ + ConfirmedBlock, TransactionStatusMeta, TransactionWithStatusMeta, + }; + use std::convert::TryInto; + + #[test] + fn test_deserialize_protobuf_or_bincode_cell_data() { + let from = Keypair::new(); + let recipient = Pubkey::new_rand(); + let transaction = system_transaction::transfer(&from, &recipient, 42, Hash::default()); + let with_meta = TransactionWithStatusMeta { + transaction, + meta: Some(TransactionStatusMeta { + status: Ok(()), + fee: 1, + pre_balances: vec![43, 0, 1], + post_balances: vec![0, 42, 1], + inner_instructions: Some(vec![]), + }), + }; + let block = ConfirmedBlock { + transactions: vec![with_meta], + parent_slot: 1, + blockhash: Hash::default().to_string(), + previous_blockhash: Hash::default().to_string(), + rewards: vec![], + block_time: Some(1_234_567_890), + }; + let bincode_block = compress_best( + &bincode::serialize::(&block.clone().into()).unwrap(), + ) + .unwrap(); + + let protobuf_block = generated::ConfirmedBlock::from(block.clone()); + let mut buf = Vec::with_capacity(protobuf_block.encoded_len()); + protobuf_block.encode(&mut buf).unwrap(); + let protobuf_block = compress_best(&buf).unwrap(); + + let deserialized = deserialize_protobuf_or_bincode_cell_data::< + StoredConfirmedBlock, + generated::ConfirmedBlock, + >( + &[("proto".to_string(), protobuf_block.clone())], + "", + "".to_string(), + ) + .unwrap(); + if let CellData::Protobuf(protobuf_block) = deserialized { + assert_eq!(block, protobuf_block.try_into().unwrap()); + } else { + panic!("deserialization should produce CellData::Protobuf"); + } + + let deserialized = deserialize_protobuf_or_bincode_cell_data::< + StoredConfirmedBlock, + generated::ConfirmedBlock, + >( + &[("bin".to_string(), bincode_block.clone())], + "", + "".to_string(), + ) + .unwrap(); + if let CellData::Bincode(bincode_block) = deserialized { + let mut block = block; + if let Some(meta) = &mut block.transactions[0].meta { + meta.inner_instructions = None; // Legacy bincode implementation does not suport inner_instructions + } + assert_eq!(block, bincode_block.into()); + } else { + panic!("deserialization should produce CellData::Bincode"); + } + + let result = deserialize_protobuf_or_bincode_cell_data::< + StoredConfirmedBlock, + generated::ConfirmedBlock, + >(&[("proto".to_string(), bincode_block)], "", "".to_string()); + assert!(result.is_err()); + + let result = deserialize_protobuf_or_bincode_cell_data::< + StoredConfirmedBlock, + generated::ConfirmedBlock, + >( + &[("proto".to_string(), vec![1, 2, 3, 4])], + "", + "".to_string(), + ); + assert!(result.is_err()); + + let result = deserialize_protobuf_or_bincode_cell_data::< + StoredConfirmedBlock, + generated::ConfirmedBlock, + >(&[("bin".to_string(), protobuf_block)], "", "".to_string()); + assert!(result.is_err()); + + let result = deserialize_protobuf_or_bincode_cell_data::< + StoredConfirmedBlock, + generated::ConfirmedBlock, + >(&[("bin".to_string(), vec![1, 2, 3, 4])], "", "".to_string()); + assert!(result.is_err()); + } +} diff --git a/storage-bigtable/src/confirmed_block.proto b/storage-bigtable/src/confirmed_block.proto new file mode 100644 index 00000000000000..762dd1faf4b825 --- /dev/null +++ b/storage-bigtable/src/confirmed_block.proto @@ -0,0 +1,67 @@ +syntax = "proto3"; + +package solana.bigtable.ConfirmedBlock; + +message ConfirmedBlock { + string previous_blockhash = 1; + string blockhash = 2; + uint64 parent_slot = 3; + repeated ConfirmedTransaction transactions = 4; + repeated Reward rewards = 5; + UnixTimestamp block_time = 6; +} + +message ConfirmedTransaction { + Transaction transaction = 1; + TransactionStatusMeta meta = 2; +} + +message Transaction { + repeated bytes signatures = 1; + Message message = 2; +} + +message Message { + MessageHeader header = 1; + repeated bytes account_keys = 2; + bytes recent_blockhash = 3; + repeated CompiledInstruction instructions = 4; +} + +message MessageHeader { + uint32 num_required_signatures = 1; + uint32 num_readonly_signed_accounts = 2; + uint32 num_readonly_unsigned_accounts = 3; +} + +message TransactionStatusMeta { + TransactionError err = 1; + uint64 fee = 2; + repeated uint64 pre_balances = 3; + repeated uint64 post_balances = 4; + repeated InnerInstructions inner_instructions = 5; +} + +message TransactionError { + bytes err = 1; +} + +message InnerInstructions { + uint32 index = 1; + repeated CompiledInstruction instructions = 2; +} + +message CompiledInstruction { + uint32 program_id_index = 1; + bytes accounts = 2; + bytes data = 3; +} + +message Reward { + string pubkey = 1; + int64 lamports = 2; +} + +message UnixTimestamp { + int64 timestamp = 1; +} diff --git a/storage-bigtable/src/convert.rs b/storage-bigtable/src/convert.rs new file mode 100644 index 00000000000000..cafe71552b6b08 --- /dev/null +++ b/storage-bigtable/src/convert.rs @@ -0,0 +1,291 @@ +use solana_sdk::{ + hash::Hash, + instruction::CompiledInstruction, + message::{Message, MessageHeader}, + pubkey::Pubkey, + signature::Signature, + transaction::Transaction, +}; +use solana_transaction_status::{ + ConfirmedBlock, InnerInstructions, Reward, TransactionStatusMeta, TransactionWithStatusMeta, +}; +use std::convert::{TryFrom, TryInto}; + +pub mod generated { + include!(concat!( + env!("CARGO_MANIFEST_DIR"), + concat!("/proto/solana.bigtable.confirmed_block.rs") + )); +} + +impl From for generated::Reward { + fn from(reward: Reward) -> Self { + Self { + pubkey: reward.pubkey, + lamports: reward.lamports, + } + } +} + +impl From for Reward { + fn from(reward: generated::Reward) -> Self { + Self { + pubkey: reward.pubkey, + lamports: reward.lamports, + } + } +} + +impl From for generated::ConfirmedBlock { + fn from(confirmed_block: ConfirmedBlock) -> Self { + let ConfirmedBlock { + previous_blockhash, + blockhash, + parent_slot, + transactions, + rewards, + block_time, + } = confirmed_block; + + Self { + previous_blockhash, + blockhash, + parent_slot, + transactions: transactions.into_iter().map(|tx| tx.into()).collect(), + rewards: rewards.into_iter().map(|r| r.into()).collect(), + block_time: block_time.map(|timestamp| generated::UnixTimestamp { timestamp }), + } + } +} + +impl TryFrom for ConfirmedBlock { + type Error = bincode::Error; + fn try_from( + confirmed_block: generated::ConfirmedBlock, + ) -> std::result::Result { + let generated::ConfirmedBlock { + previous_blockhash, + blockhash, + parent_slot, + transactions, + rewards, + block_time, + } = confirmed_block; + + Ok(Self { + previous_blockhash, + blockhash, + parent_slot, + transactions: transactions + .into_iter() + .map(|tx| tx.try_into()) + .collect::, Self::Error>>()?, + rewards: rewards.into_iter().map(|r| r.into()).collect(), + block_time: block_time.map(|generated::UnixTimestamp { timestamp }| timestamp), + }) + } +} + +impl From for generated::ConfirmedTransaction { + fn from(value: TransactionWithStatusMeta) -> Self { + let meta = if let Some(meta) = value.meta { + Some(meta.into()) + } else { + None + }; + Self { + transaction: Some(value.transaction.into()), + meta, + } + } +} + +impl TryFrom for TransactionWithStatusMeta { + type Error = bincode::Error; + fn try_from(value: generated::ConfirmedTransaction) -> std::result::Result { + let meta = if let Some(meta) = value.meta { + Some(meta.try_into()?) + } else { + None + }; + Ok(Self { + transaction: value.transaction.expect("transaction is required").into(), + meta, + }) + } +} + +impl From for generated::Transaction { + fn from(value: Transaction) -> Self { + Self { + signatures: value + .signatures + .into_iter() + .map(|signature| >::as_ref(&signature).into()) + .collect(), + message: Some(value.message.into()), + } + } +} + +impl From for Transaction { + fn from(value: generated::Transaction) -> Self { + Self { + signatures: value + .signatures + .into_iter() + .map(|x| Signature::new(&x)) + .collect(), + message: value.message.expect("message is required").into(), + } + } +} + +impl From for generated::Message { + fn from(value: Message) -> Self { + Self { + header: Some(value.header.into()), + account_keys: value + .account_keys + .into_iter() + .map(|key| >::as_ref(&key).into()) + .collect(), + recent_blockhash: value.recent_blockhash.to_bytes().into(), + instructions: value.instructions.into_iter().map(|ix| ix.into()).collect(), + } + } +} + +impl From for Message { + fn from(value: generated::Message) -> Self { + Self { + header: value.header.expect("header is required").into(), + account_keys: value + .account_keys + .into_iter() + .map(|key| Pubkey::new(&key)) + .collect(), + recent_blockhash: Hash::new(&value.recent_blockhash), + instructions: value.instructions.into_iter().map(|ix| ix.into()).collect(), + } + } +} + +impl From for generated::MessageHeader { + fn from(value: MessageHeader) -> Self { + Self { + num_required_signatures: value.num_required_signatures as u32, + num_readonly_signed_accounts: value.num_readonly_signed_accounts as u32, + num_readonly_unsigned_accounts: value.num_readonly_unsigned_accounts as u32, + } + } +} + +impl From for MessageHeader { + fn from(value: generated::MessageHeader) -> Self { + Self { + num_required_signatures: value.num_required_signatures as u8, + num_readonly_signed_accounts: value.num_readonly_signed_accounts as u8, + num_readonly_unsigned_accounts: value.num_readonly_unsigned_accounts as u8, + } + } +} + +impl From for generated::TransactionStatusMeta { + fn from(value: TransactionStatusMeta) -> Self { + let TransactionStatusMeta { + status, + fee, + pre_balances, + post_balances, + inner_instructions, + } = value; + let err = match status { + Ok(()) => None, + Err(err) => Some(generated::TransactionError { + err: bincode::serialize(&err).expect("transaction error to serialize to bytes"), + }), + }; + let inner_instructions = inner_instructions + .unwrap_or_default() + .into_iter() + .map(|ii| ii.into()) + .collect(); + Self { + err, + fee, + pre_balances, + post_balances, + inner_instructions, + } + } +} + +impl TryFrom for TransactionStatusMeta { + type Error = bincode::Error; + + fn try_from(value: generated::TransactionStatusMeta) -> std::result::Result { + let generated::TransactionStatusMeta { + err, + fee, + pre_balances, + post_balances, + inner_instructions, + } = value; + let status = match &err { + None => Ok(()), + Some(tx_error) => Err(bincode::deserialize(&tx_error.err)?), + }; + let inner_instructions = Some( + inner_instructions + .into_iter() + .map(|inner| inner.into()) + .collect(), + ); + Ok(Self { + status, + fee, + pre_balances, + post_balances, + inner_instructions, + }) + } +} + +impl From for generated::InnerInstructions { + fn from(value: InnerInstructions) -> Self { + Self { + index: value.index as u32, + instructions: value.instructions.into_iter().map(|i| i.into()).collect(), + } + } +} + +impl From for InnerInstructions { + fn from(value: generated::InnerInstructions) -> Self { + Self { + index: value.index as u8, + instructions: value.instructions.into_iter().map(|i| i.into()).collect(), + } + } +} + +impl From for generated::CompiledInstruction { + fn from(value: CompiledInstruction) -> Self { + Self { + program_id_index: value.program_id_index as u32, + accounts: value.accounts, + data: value.data, + } + } +} + +impl From for CompiledInstruction { + fn from(value: generated::CompiledInstruction) -> Self { + Self { + program_id_index: value.program_id_index as u8, + accounts: value.accounts, + data: value.data, + } + } +} diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index 56d7b3abb17cb3..dc62cc712f6e3b 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -8,11 +8,10 @@ use solana_sdk::{ transaction::{Transaction, TransactionError}, }; use solana_transaction_status::{ - ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, - InnerInstructions, Rewards, TransactionStatus, TransactionStatusMeta, - TransactionWithStatusMeta, + ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards, + TransactionStatus, TransactionStatusMeta, TransactionWithStatusMeta, }; -use std::collections::HashMap; +use std::{collections::HashMap, convert::TryInto}; use thiserror::Error; #[macro_use] @@ -21,8 +20,11 @@ extern crate serde_derive; mod access_token; mod bigtable; mod compression; +mod convert; mod root_ca_certificate; +use convert::generated; + #[derive(Debug, Error)] pub enum Error { #[error("BigTable: {0}")] @@ -162,7 +164,6 @@ struct StoredConfirmedBlockTransactionStatusMeta { fee: u64, pre_balances: Vec, post_balances: Vec, - inner_instructions: Option>, } impl From for TransactionStatusMeta { @@ -172,7 +173,6 @@ impl From for TransactionStatusMeta { fee, pre_balances, post_balances, - inner_instructions, } = value; let status = match &err { None => Ok(()), @@ -183,7 +183,7 @@ impl From for TransactionStatusMeta { fee, pre_balances, post_balances, - inner_instructions, + inner_instructions: None, } } } @@ -195,7 +195,6 @@ impl From for StoredConfirmedBlockTransactionStatusMeta { fee, pre_balances, post_balances, - inner_instructions, .. } = value; Self { @@ -203,7 +202,6 @@ impl From for StoredConfirmedBlockTransactionStatusMeta { fee, pre_balances, post_balances, - inner_instructions, } } } @@ -279,10 +277,18 @@ impl LedgerStorage { /// Fetch the confirmed block from the desired slot pub async fn get_confirmed_block(&self, slot: Slot) -> Result { let mut bigtable = self.connection.client(); - let block = bigtable - .get_bincode_cell::("blocks", slot_to_key(slot)) + let block_cell_data = bigtable + .get_protobuf_or_bincode_cell::( + "blocks", + slot_to_key(slot), + ) .await?; - Ok(block.into()) + Ok(match block_cell_data { + bigtable::CellData::Bincode(block) => block.into(), + bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| { + bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_key(slot))) + })?, + }) } pub async fn get_signature_status(&self, signature: &Signature) -> Result { @@ -400,7 +406,7 @@ impl LedgerStorage { )) })?; let mut cell_data: Vec = - bigtable::deserialize_cell_data(&data, "tx-by-addr", row_key)?; + bigtable::deserialize_bincode_cell_data(&data, "tx-by-addr", row_key)?; cell_data.reverse(); for tx_by_addr_info in cell_data.into_iter() { // Filter out records before `before_transaction_index` @@ -503,7 +509,7 @@ impl LedgerStorage { let blocks_cells = [(slot_to_key(slot), confirmed_block.into())]; bytes_written += self .connection - .put_bincode_cells_with_retry::("blocks", &blocks_cells) + .put_protobuf_cells_with_retry::("blocks", &blocks_cells) .await?; info!( "uploaded block for slot {}: {} transactions, {} bytes", diff --git a/transaction-status/src/lib.rs b/transaction-status/src/lib.rs index e7ee0c10bbdb15..95ae4b638e4835 100644 --- a/transaction-status/src/lib.rs +++ b/transaction-status/src/lib.rs @@ -225,7 +225,7 @@ pub struct ConfirmedTransactionStatusWithSignature { pub memo: Option, } -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Reward { pub pubkey: String, pub lamports: i64, @@ -233,7 +233,7 @@ pub struct Reward { pub type Rewards = Vec; -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ConfirmedBlock { pub previous_blockhash: String,