Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
devp2p snappy compression
Browse files Browse the repository at this point in the history
  • Loading branch information
arkpar committed Oct 10, 2017
1 parent 77a2c77 commit 9e65cce
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 6 deletions.
29 changes: 27 additions & 2 deletions util/network/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ use rcrypto::buffer::*;
use tiny_keccak::Keccak;
use bytes::{Buf, BufMut};
use crypto;
use util::snappy;

const ENCRYPTED_HEADER_LEN: usize = 32;
const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000;
const MAX_PAYLOAD_SIZE: usize = (1 << 24) - 1;

pub trait GenericSocket : Read + Write {
}
Expand Down Expand Up @@ -295,6 +297,8 @@ pub struct EncryptedConnection {
protocol_id: u16,
/// Payload expected to be received for the last header.
payload_len: usize,
/// Enable snappy compression.
compression: bool,
}

impl EncryptedConnection {
Expand Down Expand Up @@ -345,7 +349,8 @@ impl EncryptedConnection {
ingress_mac: ingress_mac,
read_state: EncryptedConnectionState::Header,
protocol_id: 0,
payload_len: 0
payload_len: 0,
compression: false,
};
enc.connection.expect(ENCRYPTED_HEADER_LEN);
Ok(enc)
Expand All @@ -354,8 +359,17 @@ impl EncryptedConnection {
/// Send a packet
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
let mut header = RlpStream::new();
let mut compressed = Vec::new();
let mut payload = payload; // create a reference with local lifetime
if self.compression {
if payload.len() > MAX_PAYLOAD_SIZE {
return Err(NetworkError::OversizedPacket);
}
let len = snappy::compress_into(&payload, &mut compressed);
payload = &compressed[0..len];
}
let len = payload.len();
if len >= (1 << 24) {
if len > MAX_PAYLOAD_SIZE {
return Err(NetworkError::OversizedPacket);
}
header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1);
Expand Down Expand Up @@ -431,6 +445,12 @@ impl EncryptedConnection {
self.decoder.decrypt(&mut RefReadBuffer::new(&payload[0..self.payload_len]), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding");
let mut pad_buf = [0u8; 16];
self.decoder.decrypt(&mut RefReadBuffer::new(&payload[self.payload_len..(payload.len() - 16)]), &mut RefWriteBuffer::new(&mut pad_buf), false).expect("Invalid length or padding");
if self.compression {
if snappy::decompressed_len(&packet)? > MAX_PAYLOAD_SIZE {
return Err(NetworkError::OversizedPacket);
}
packet = snappy::decompress(&packet)?;
}
Ok(Packet {
protocol: self.protocol_id,
data: packet
Expand Down Expand Up @@ -477,6 +497,11 @@ impl EncryptedConnection {
self.connection.writable(io)?;
Ok(())
}

/// Enable snappy compression.
pub fn enable_compression(&mut self, enable: bool) {
self.compression = enable;
}
}

#[test]
Expand Down
10 changes: 10 additions & 0 deletions util/network/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use rlp::*;
use std::fmt;
use ethkey::Error as KeyError;
use crypto::Error as CryptoError;
use util::snappy;

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum DisconnectReason
Expand Down Expand Up @@ -107,6 +108,8 @@ pub enum NetworkError {
StdIo(::std::io::Error),
/// Packet size is over the protocol limit.
OversizedPacket,
/// Decompression error.
Decompression(snappy::InvalidInput),
}

impl fmt::Display for NetworkError {
Expand All @@ -126,6 +129,7 @@ impl fmt::Display for NetworkError {
StdIo(ref err) => format!("{}", err),
InvalidNodeId => "Invalid node id".into(),
OversizedPacket => "Packet is too large".into(),
Decompression(ref err) => format!("Error decompressing packet: {}", err),
};

f.write_fmt(format_args!("Network error ({})", msg))
Expand Down Expand Up @@ -162,6 +166,12 @@ impl From<CryptoError> for NetworkError {
}
}

impl From<snappy::InvalidInput> for NetworkError {
fn from(err: snappy::InvalidInput) -> NetworkError {
NetworkError::Decompression(err)
}
}

impl From<::std::net::AddrParseError> for NetworkError {
fn from(err: ::std::net::AddrParseError) -> NetworkError {
NetworkError::AddressParse(err)
Expand Down
2 changes: 1 addition & 1 deletion util/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub use node_table::{is_valid_node_url, NodeId};
use ipnetwork::{IpNetwork, IpNetworkError};
use std::str::FromStr;

const PROTOCOL_VERSION: u32 = 4;
const PROTOCOL_VERSION: u32 = 5;

/// Network IO protocol handler. This needs to be implemented for each new subprotocol.
/// All the handler function are called from within IO event loop.
Expand Down
11 changes: 8 additions & 3 deletions util/network/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use time;
// Timeout must be less than (interval - 1).
const PING_TIMEOUT_SEC: u64 = 60;
const PING_INTERVAL_SEC: u64 = 120;
const MIN_PROTOCOL_VERSION: u32 = 4;
const MIN_COMPRESSION_PROTOCOL_VERSION: u32 = 5;

#[derive(Debug, Clone)]
enum ProtocolState {
Expand Down Expand Up @@ -209,9 +211,8 @@ impl Session {
} else {
panic!("Unexpected state");
};
self.state = State::Session(connection);
self.write_hello(io, host)?;
self.send_ping(io)?;
self.state = State::Session(connection);
Ok(())
}

Expand Down Expand Up @@ -520,10 +521,14 @@ impl Session {
trace!(target: "network", "No common capabilities with peer.");
return Err(From::from(self.disconnect(io, DisconnectReason::UselessPeer)));
}
if protocol != host.protocol_version {
if protocol < MIN_PROTOCOL_VERSION {
trace!(target: "network", "Peer protocol version mismatch: {}", protocol);
return Err(From::from(self.disconnect(io, DisconnectReason::UselessPeer)));
}
if let State::Session(ref mut s) = self.state {
s.enable_compression(protocol >= MIN_COMPRESSION_PROTOCOL_VERSION);
}
self.send_ping(io)?;
self.had_hello = true;
Ok(())
}
Expand Down

0 comments on commit 9e65cce

Please sign in to comment.