Skip to content

Commit

Permalink
Merge pull request #97 from fegies/buffer-reuse
Browse files Browse the repository at this point in the history
Buffer reuse
  • Loading branch information
sevagh authored Dec 20, 2020
2 parents ab25389 + a5253ac commit 034bf64
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 84 deletions.
18 changes: 12 additions & 6 deletions src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl CommandRunner {
Ok(x) => x,
Err(e) => panic!("Couldn't initialize kafka consumer: {}", e),
};
decode_or_convert(consumer, matches, self.descriptors);
decode_or_convert(consumer, matches, self.descriptors).unwrap();
} else {
panic!("Kafka needs broker[s] and topic");
}
Expand All @@ -58,14 +58,15 @@ impl CommandRunner {
matches,
self.descriptors,
)
.unwrap()
}
}

fn decode_or_convert<T: 'static + Send + Iterator<Item = Vec<u8>>>(
fn decode_or_convert<T: Iterator<Item = Vec<u8>> + FramedRead>(
mut consumer: T,
matches: &ArgMatches<'_>,
descriptors: Vec<FileDescriptorSet>,
) {
) -> io::Result<()> {
let count = value_t!(matches, "COUNT", i32).unwrap_or(-1);

let stdout = io::stdout();
Expand All @@ -79,10 +80,11 @@ fn decode_or_convert<T: 'static + Send + Iterator<Item = Vec<u8>>>(
let stdout_ = &mut stdout.lock();
for (ctr, item) in converter.enumerate() {
if count >= 0 && ctr >= count as usize {
return;
break;
}
stdout_.write_all(&item).expect("Couldn't write to stdout");
}
Ok(())
} else {
let msgtype = format!(
".{}",
Expand All @@ -95,11 +97,15 @@ fn decode_or_convert<T: 'static + Send + Iterator<Item = Vec<u8>>>(
let mut formatter = CustomFormatter::new(out_is_tty);
let stdout_ = stdout.lock();
let mut serializer = Serializer::with_formatter(stdout_, &mut formatter);
for (ctr, item) in consumer.enumerate() {
if count >= 0 && ctr >= count as usize {
let mut buffer = Vec::new();
let mut ctr = 0;
while let Some(item) = consumer.read_next_frame(&mut buffer)? {
if count >= 0 && ctr >= count {
break;
}
ctr += 1;
decoder.transcode_message(&item, &mut serializer);
}
Ok(())
}
}
80 changes: 59 additions & 21 deletions stream-delimit/src/byte_consumer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#![deny(missing_docs)]

use crate::i32be::consume_single_i32be;
use crate::stream::*;
use crate::varint::consume_single_varint;
use std::io::Read;
use crate::{error::StreamDelimitError, varint::decode_varint};
use byteorder::{BigEndian, ReadBytesExt};
use std::{
io::{self, Read},
num::NonZeroUsize,
};

/// A consumer for a byte stream
pub struct ByteConsumer<T: Read> {
Expand All @@ -16,30 +19,65 @@ impl<T: Read> ByteConsumer<T> {
pub fn new(read: T, type_: StreamType) -> ByteConsumer<T> {
ByteConsumer { read, type_ }
}

fn read_next_frame_length(&mut self) -> io::Result<Option<NonZeroUsize>> {
let r = match self.type_ {
StreamType::Leb128 | StreamType::Varint => decode_varint(&mut self.read)
.map_err(|e| {
// For unified error handling we force everything into io::Error
match e {
StreamDelimitError::VarintDecodeError(i) => i,
e => io::Error::new(io::ErrorKind::InvalidData, format!("{}", e)),
}
})
.map(|v| NonZeroUsize::new(v as usize)),
StreamType::I32BE => self
.read
.read_i32::<BigEndian>()
.map(|v| NonZeroUsize::new(v as usize)),
StreamType::Single => Ok(None),
};

// In the cases where we have hit the end of the stream, read_i32 will return UnexpectedEof
// we treat this as no more data
match r {
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
a => a,
}
}
}

impl<T: Read> Iterator for ByteConsumer<T> {
type Item = Vec<u8>;

fn next(&mut self) -> Option<Vec<u8>> {
match self.type_ {
StreamType::Leb128 | StreamType::Varint => consume_single_varint(&mut self.read),
StreamType::I32BE => consume_single_i32be(&mut self.read),
StreamType::Single => {
let ret: Option<Vec<u8>>;
let mut buf = Vec::new();
match self.read.read_to_end(&mut buf) {
Ok(x) => {
if x > 0 {
ret = Some(buf);
} else {
ret = None
}
}
Err(_) => ret = None,
fn next(&mut self) -> Option<Self::Item> {
let mut buffer = Vec::new();
self.read_next_frame(&mut buffer).ok()??;
Some(buffer)
}
}

impl<T: Read> FramedRead for ByteConsumer<T> {
fn read_next_frame<'a>(&mut self, buffer: &'a mut Vec<u8>) -> io::Result<Option<&'a [u8]>> {
let r = match self.read_next_frame_length()? {
Some(length) => {
buffer.clear();
let mut take = (&mut self.read).take(length.get() as u64);
take.read_to_end(buffer)?;
Some(&buffer[..])
}
// the single stream type does not have a defined length, so read_next_frame_length will return None
// and we catch that special case here
None if self.type_ == StreamType::Single => {
buffer.clear();
if self.read.read_to_end(buffer)? > 0 {
Some(&buffer[..])
} else {
None
}
ret
}
}
_ => None,
};
Ok(r)
}
}
40 changes: 0 additions & 40 deletions stream-delimit/src/i32be.rs

This file was deleted.

14 changes: 14 additions & 0 deletions stream-delimit/src/kafka_consumer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![deny(missing_docs)]

use crate::error::*;
use crate::stream::FramedRead;
use kafka::consumer::{Consumer, FetchOffset};
use std;
use std::collections::VecDeque;
Expand All @@ -11,6 +12,19 @@ pub struct KafkaConsumer {
messages: VecDeque<Vec<u8>>,
}

impl FramedRead for KafkaConsumer {
fn read_next_frame<'a>(
&mut self,
buffer: &'a mut Vec<u8>,
) -> std::io::Result<Option<&'a [u8]>> {
let res = self.next().map(move |mut v| {
std::mem::swap(&mut v, buffer);
&buffer[..]
});
Ok(res)
}
}

impl Iterator for KafkaConsumer {
type Item = Vec<u8>;

Expand Down
1 change: 0 additions & 1 deletion stream-delimit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ extern crate byteorder;
#[cfg(feature = "with_kafka")]
extern crate kafka;

mod i32be;
mod varint;

pub mod error;
Expand Down
10 changes: 10 additions & 0 deletions stream-delimit/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#![deny(missing_docs)]

use std::io;

use crate::error::*;

/// An enum type for byte streams
#[derive(PartialEq, Eq)]
pub enum StreamType {
/// Protobuf messages with leading length encoded in leb128
Leb128,
Expand All @@ -27,3 +30,10 @@ pub fn str_to_streamtype(input: &str) -> Result<StreamType> {
)),
}
}

/// A trait for a stream that can be read in clearly defined chunks
pub trait FramedRead {
/// should read the next available frame into the provided buffer.
/// clear() will be called before the buffer is filled
fn read_next_frame<'a>(&mut self, buffer: &'a mut Vec<u8>) -> io::Result<Option<&'a [u8]>>;
}
16 changes: 0 additions & 16 deletions stream-delimit/src/varint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,6 @@ use std::io::Read;

const VARINT_MAX_BYTES: usize = 10;

pub fn consume_single_varint(read: &mut dyn Read) -> Option<Vec<u8>> {
let ret: Option<Vec<u8>>;
match decode_varint(read) {
Ok(x) => {
let mut msg_buf = vec![0; x as usize];
match read.read_exact(&mut msg_buf) {
Ok(_) => (),
Err(_) => return None,
}
ret = Some(msg_buf);
}
Err(_) => ret = None,
}
ret
}

pub fn decode_varint(read: &mut dyn Read) -> Result<u64> {
let mut varint_buf: Vec<u8> = Vec::new();
for i in 0..VARINT_MAX_BYTES {
Expand Down

0 comments on commit 034bf64

Please sign in to comment.