Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renames, moves and polish #260

Merged
merged 7 commits into from
May 25, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
request_stage::serialize_packets -> packet::to_blobs
Good stuff - no need to hide them.
garious committed May 25, 2018
commit 3186512c3b828f7e742888f6323064e8de0fb93e
28 changes: 28 additions & 0 deletions src/packet.rs
Original file line number Diff line number Diff line change
@@ -236,6 +236,34 @@ pub fn to_packets<T: Serialize>(r: &PacketRecycler, xs: Vec<T>) -> Vec<SharedPac
return out;
}

pub fn to_blob<T: Serialize>(
resp: T,
rsp_addr: SocketAddr,
blob_recycler: &BlobRecycler,
) -> Result<SharedBlob> {
let blob = blob_recycler.allocate();
{
let mut b = blob.write().unwrap();
let v = serialize(&resp)?;
let len = v.len();
b.data[..len].copy_from_slice(&v);
b.meta.size = len;
b.meta.set_addr(&rsp_addr);
}
Ok(blob)
}

pub fn to_blobs<T: Serialize>(
rsps: Vec<(T, SocketAddr)>,
blob_recycler: &BlobRecycler,
) -> Result<VecDeque<SharedBlob>> {
let mut blobs = VecDeque::new();
for (resp, rsp_addr) in rsps {
blobs.push_back(to_blob(resp, rsp_addr, blob_recycler)?);
}
Ok(blobs)
}

const BLOB_INDEX_END: usize = size_of::<u64>();
const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::<usize>() + size_of::<PublicKey>();

35 changes: 2 additions & 33 deletions src/request_stage.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
//! The `request_stage` processes thin client Request messages.
use bincode::{deserialize, serialize};
use bincode::deserialize;
use packet;
use packet::SharedPackets;
use rayon::prelude::*;
use request::Request;
use request_processor::RequestProcessor;
use result::Result;
use serde::Serialize;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -36,35 +34,6 @@ impl RequestStage {
.collect()
}

/// Split Request list into verified transactions and the rest
fn serialize_response<T: Serialize>(
resp: T,
rsp_addr: SocketAddr,
blob_recycler: &packet::BlobRecycler,
) -> Result<packet::SharedBlob> {
let blob = blob_recycler.allocate();
{
let mut b = blob.write().unwrap();
let v = serialize(&resp)?;
let len = v.len();
b.data[..len].copy_from_slice(&v);
b.meta.size = len;
b.meta.set_addr(&rsp_addr);
}
Ok(blob)
}

fn serialize_responses<T: Serialize>(
rsps: Vec<(T, SocketAddr)>,
blob_recycler: &packet::BlobRecycler,
) -> Result<VecDeque<packet::SharedBlob>> {
let mut blobs = VecDeque::new();
for (resp, rsp_addr) in rsps {
blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?);
}
Ok(blobs)
}

pub fn process_request_packets(
request_processor: &RequestProcessor,
packet_receiver: &Receiver<SharedPackets>,
@@ -91,7 +60,7 @@ impl RequestStage {

let rsps = request_processor.process_requests(reqs);

let blobs = Self::serialize_responses(rsps, blob_recycler)?;
let blobs = packet::to_blobs(rsps, blob_recycler)?;
if !blobs.is_empty() {
info!("process: sending blobs: {}", blobs.len());
//don't wake up the other side if there is nothing