Skip to content

Commit

Permalink
Use protobufs to store confirmed blocks in BigTable (#12526)
Browse files Browse the repository at this point in the history
* Use protobufs to store confirmed blocks in BigTable

* Cleanup

* Reorganize proto

* Clean up use statements

* Split out function for unit testing

* s/utils/convert

Co-authored-by: Tyera Eulberg <[email protected]>
(cherry picked from commit ce598c5)
  • Loading branch information
jstarry authored and mergify-bot committed Sep 30, 2020
1 parent 700c8c1 commit 94acfbe
Show file tree
Hide file tree
Showing 7 changed files with 707 additions and 23 deletions.
16 changes: 16 additions & 0 deletions storage-bigtable/build-proto/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,21 @@ fn main() -> Result<(), std::io::Error> {
.compile(
&[googleapis.join("google/bigtable/v2/bigtable.proto")],
&[googleapis],
)?;

let out_dir = manifest_dir.join("../proto");
let proto_files = manifest_dir.join("../src");

println!("Protobuf directory: {}", proto_files.display());
println!("output directory: {}", out_dir.display());

tonic_build::configure()
.build_client(true)
.build_server(false)
.format(true)
.out_dir(&out_dir)
.compile(
&[proto_files.join("confirmed_block.proto")],
&[proto_files],
)
}
95 changes: 95 additions & 0 deletions storage-bigtable/proto/solana.bigtable.confirmed_block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ConfirmedBlock {
#[prost(string, tag = "1")]
pub previous_blockhash: std::string::String,
#[prost(string, tag = "2")]
pub blockhash: std::string::String,
#[prost(uint64, tag = "3")]
pub parent_slot: u64,
#[prost(message, repeated, tag = "4")]
pub transactions: ::std::vec::Vec<ConfirmedTransaction>,
#[prost(message, repeated, tag = "5")]
pub rewards: ::std::vec::Vec<Reward>,
#[prost(message, optional, tag = "6")]
pub block_time: ::std::option::Option<UnixTimestamp>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ConfirmedTransaction {
#[prost(message, optional, tag = "1")]
pub transaction: ::std::option::Option<Transaction>,
#[prost(message, optional, tag = "2")]
pub meta: ::std::option::Option<TransactionStatusMeta>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Transaction {
#[prost(bytes, repeated, tag = "1")]
pub signatures: ::std::vec::Vec<std::vec::Vec<u8>>,
#[prost(message, optional, tag = "2")]
pub message: ::std::option::Option<Message>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Message {
#[prost(message, optional, tag = "1")]
pub header: ::std::option::Option<MessageHeader>,
#[prost(bytes, repeated, tag = "2")]
pub account_keys: ::std::vec::Vec<std::vec::Vec<u8>>,
#[prost(bytes, tag = "3")]
pub recent_blockhash: std::vec::Vec<u8>,
#[prost(message, repeated, tag = "4")]
pub instructions: ::std::vec::Vec<CompiledInstruction>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MessageHeader {
#[prost(uint32, tag = "1")]
pub num_required_signatures: u32,
#[prost(uint32, tag = "2")]
pub num_readonly_signed_accounts: u32,
#[prost(uint32, tag = "3")]
pub num_readonly_unsigned_accounts: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TransactionStatusMeta {
#[prost(message, optional, tag = "1")]
pub err: ::std::option::Option<TransactionError>,
#[prost(uint64, tag = "2")]
pub fee: u64,
#[prost(uint64, repeated, tag = "3")]
pub pre_balances: ::std::vec::Vec<u64>,
#[prost(uint64, repeated, tag = "4")]
pub post_balances: ::std::vec::Vec<u64>,
#[prost(message, repeated, tag = "5")]
pub inner_instructions: ::std::vec::Vec<InnerInstructions>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TransactionError {
#[prost(bytes, tag = "1")]
pub err: std::vec::Vec<u8>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct InnerInstructions {
#[prost(uint32, tag = "1")]
pub index: u32,
#[prost(message, repeated, tag = "2")]
pub instructions: ::std::vec::Vec<CompiledInstruction>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CompiledInstruction {
#[prost(uint32, tag = "1")]
pub program_id_index: u32,
#[prost(bytes, tag = "2")]
pub accounts: std::vec::Vec<u8>,
#[prost(bytes, tag = "3")]
pub data: std::vec::Vec<u8>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Reward {
#[prost(string, tag = "1")]
pub pubkey: std::string::String,
#[prost(int64, tag = "2")]
pub lamports: i64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UnixTimestamp {
#[prost(int64, tag = "1")]
pub timestamp: i64,
}
223 changes: 216 additions & 7 deletions storage-bigtable/src/bigtable.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Primitives for reading/writing BigTable tables

use crate::access_token::{AccessToken, Scope};
use crate::compression::{compress_best, decompress};
use crate::root_ca_certificate;
use crate::{
access_token::{AccessToken, Scope},
compression::{compress_best, decompress},
root_ca_certificate,
};
use log::*;
use thiserror::Error;
use tonic::{metadata::MetadataValue, transport::ClientTlsConfig, Request};
Expand All @@ -26,10 +28,14 @@ mod google {
use google::bigtable::v2::*;

pub type RowKey = String;
pub type CellName = String;
pub type CellValue = Vec<u8>;
pub type RowData = Vec<(CellName, CellValue)>;
pub type RowDataSlice<'a> = &'a [(CellName, CellValue)];
pub type CellName = String;
pub type CellValue = Vec<u8>;
pub enum CellData<B, P> {
Bincode(B),
Protobuf(P),
}

#[derive(Debug, Error)]
pub enum Error {
Expand Down Expand Up @@ -196,6 +202,23 @@ impl BigTableConnection {
.retry(ExponentialBackoff::default())
.await
}

pub async fn put_protobuf_cells_with_retry<T>(
&self,
table: &str,
cells: &[(RowKey, T)],
) -> Result<usize>
where
T: prost::Message,
{
use backoff::{future::FutureOperation as _, ExponentialBackoff};
(|| async {
let mut client = self.client();
Ok(client.put_protobuf_cells(table, cells).await?)
})
.retry(ExponentialBackoff::default())
.await
}
}

pub struct BigTable {
Expand Down Expand Up @@ -484,7 +507,20 @@ impl BigTable {
T: serde::de::DeserializeOwned,
{
let row_data = self.get_single_row_data(table, key.clone()).await?;
deserialize_cell_data(&row_data, table, key.to_string())
deserialize_bincode_cell_data(&row_data, table, key.to_string())
}

pub async fn get_protobuf_or_bincode_cell<B, P>(
&mut self,
table: &str,
key: RowKey,
) -> Result<CellData<B, P>>
where
B: serde::de::DeserializeOwned,
P: prost::Message + Default,
{
let row_data = self.get_single_row_data(table, key.clone()).await?;
deserialize_protobuf_or_bincode_cell_data(&row_data, table, key)
}

pub async fn put_bincode_cells<T>(
Expand All @@ -506,9 +542,74 @@ impl BigTable {
self.put_row_data(table, "x", &new_row_data).await?;
Ok(bytes_written)
}

pub async fn put_protobuf_cells<T>(
&mut self,
table: &str,
cells: &[(RowKey, T)],
) -> Result<usize>
where
T: prost::Message,
{
let mut bytes_written = 0;
let mut new_row_data = vec![];
for (row_key, data) in cells {
let mut buf = Vec::with_capacity(data.encoded_len());
data.encode(&mut buf).unwrap();
let data = compress_best(&buf)?;
bytes_written += data.len();
new_row_data.push((row_key, vec![("proto".to_string(), data)]));
}

self.put_row_data(table, "x", &new_row_data).await?;
Ok(bytes_written)
}
}

pub(crate) fn deserialize_protobuf_or_bincode_cell_data<B, P>(
row_data: RowDataSlice,
table: &str,
key: RowKey,
) -> Result<CellData<B, P>>
where
B: serde::de::DeserializeOwned,
P: prost::Message + Default,
{
match deserialize_protobuf_cell_data(row_data, table, key.to_string()) {
Ok(result) => return Ok(CellData::Protobuf(result)),
Err(err) => match err {
Error::ObjectNotFound(_) => {}
_ => return Err(err),
},
}
match deserialize_bincode_cell_data(row_data, table, key) {
Ok(result) => Ok(CellData::Bincode(result)),
Err(err) => Err(err),
}
}

pub(crate) fn deserialize_protobuf_cell_data<T>(
row_data: RowDataSlice,
table: &str,
key: RowKey,
) -> Result<T>
where
T: prost::Message + Default,
{
let value = &row_data
.iter()
.find(|(name, _)| name == "proto")
.ok_or_else(|| Error::ObjectNotFound(format!("{}/{}", table, key)))?
.1;

let data = decompress(&value)?;
T::decode(&data[..]).map_err(|err| {
warn!("Failed to deserialize {}/{}: {}", table, key, err);
Error::ObjectCorrupt(format!("{}/{}", table, key))
})
}

pub(crate) fn deserialize_cell_data<T>(
pub(crate) fn deserialize_bincode_cell_data<T>(
row_data: RowDataSlice,
table: &str,
key: RowKey,
Expand All @@ -528,3 +629,111 @@ where
Error::ObjectCorrupt(format!("{}/{}", table, key))
})
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{convert::generated, StoredConfirmedBlock};
use prost::Message;
use solana_sdk::{hash::Hash, pubkey::Pubkey, signature::Keypair, system_transaction};
use solana_transaction_status::{
ConfirmedBlock, TransactionStatusMeta, TransactionWithStatusMeta,
};
use std::convert::TryInto;

#[test]
fn test_deserialize_protobuf_or_bincode_cell_data() {
let from = Keypair::new();
let recipient = Pubkey::new_rand();
let transaction = system_transaction::transfer(&from, &recipient, 42, Hash::default());
let with_meta = TransactionWithStatusMeta {
transaction,
meta: Some(TransactionStatusMeta {
status: Ok(()),
fee: 1,
pre_balances: vec![43, 0, 1],
post_balances: vec![0, 42, 1],
inner_instructions: Some(vec![]),
}),
};
let block = ConfirmedBlock {
transactions: vec![with_meta],
parent_slot: 1,
blockhash: Hash::default().to_string(),
previous_blockhash: Hash::default().to_string(),
rewards: vec![],
block_time: Some(1_234_567_890),
};
let bincode_block = compress_best(
&bincode::serialize::<StoredConfirmedBlock>(&block.clone().into()).unwrap(),
)
.unwrap();

let protobuf_block = generated::ConfirmedBlock::from(block.clone());
let mut buf = Vec::with_capacity(protobuf_block.encoded_len());
protobuf_block.encode(&mut buf).unwrap();
let protobuf_block = compress_best(&buf).unwrap();

let deserialized = deserialize_protobuf_or_bincode_cell_data::<
StoredConfirmedBlock,
generated::ConfirmedBlock,
>(
&[("proto".to_string(), protobuf_block.clone())],
"",
"".to_string(),
)
.unwrap();
if let CellData::Protobuf(protobuf_block) = deserialized {
assert_eq!(block, protobuf_block.try_into().unwrap());
} else {
panic!("deserialization should produce CellData::Protobuf");
}

let deserialized = deserialize_protobuf_or_bincode_cell_data::<
StoredConfirmedBlock,
generated::ConfirmedBlock,
>(
&[("bin".to_string(), bincode_block.clone())],
"",
"".to_string(),
)
.unwrap();
if let CellData::Bincode(bincode_block) = deserialized {
let mut block = block;
if let Some(meta) = &mut block.transactions[0].meta {
meta.inner_instructions = None; // Legacy bincode implementation does not suport inner_instructions
}
assert_eq!(block, bincode_block.into());
} else {
panic!("deserialization should produce CellData::Bincode");
}

let result = deserialize_protobuf_or_bincode_cell_data::<
StoredConfirmedBlock,
generated::ConfirmedBlock,
>(&[("proto".to_string(), bincode_block)], "", "".to_string());
assert!(result.is_err());

let result = deserialize_protobuf_or_bincode_cell_data::<
StoredConfirmedBlock,
generated::ConfirmedBlock,
>(
&[("proto".to_string(), vec![1, 2, 3, 4])],
"",
"".to_string(),
);
assert!(result.is_err());

let result = deserialize_protobuf_or_bincode_cell_data::<
StoredConfirmedBlock,
generated::ConfirmedBlock,
>(&[("bin".to_string(), protobuf_block)], "", "".to_string());
assert!(result.is_err());

let result = deserialize_protobuf_or_bincode_cell_data::<
StoredConfirmedBlock,
generated::ConfirmedBlock,
>(&[("bin".to_string(), vec![1, 2, 3, 4])], "", "".to_string());
assert!(result.is_err());
}
}
Loading

0 comments on commit 94acfbe

Please sign in to comment.