Skip to content

Commit

Permalink
wip: Print Scylla sharding info when connecting to a node
Browse files Browse the repository at this point in the history
  • Loading branch information
psarna committed Jan 3, 2020
1 parent b036739 commit 1b25aff
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 0 deletions.
16 changes: 16 additions & 0 deletions src/cluster/tcp_connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::error;
use crate::frame::parser::parse_frame;
use crate::frame::{Frame, IntoBytes, Opcode};
use crate::transport::{CDRSTransport, TransportTcp};
use crate::sharding::ShardingInfo;

/// Shortcut for `r2d2::Pool` type of TCP-based CDRS connections.
pub type TcpConnectionPool<A> = Pool<TcpConnectionsManager<A>>;
Expand Down Expand Up @@ -83,6 +84,21 @@ pub fn startup<'b, T: CDRSTransport + 'static, A: Authenticator + 'static + Size

let start_response = parse_frame(transport, compression)?;

let options_frame = Frame::new_req_options().into_cbytes();
transport.borrow_mut().write(options_frame.as_slice())?;
let options_response = parse_frame(transport, compression)?;

if options_response.opcode == Opcode::Supported {
let body = options_response.get_body()?;
let options = body.into_supported();
let options = match options {
Some(opt) => {
println!("Sharding options: {:?}", ShardingInfo::parse(&opt.data));
},
_ => (),
};
}

if start_response.opcode == Opcode::Ready {
return Ok(());
}
Expand Down
9 changes: 9 additions & 0 deletions src/frame/frame_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ impl ResponseBody {
}
}

/// It unwraps body and returns BodyResSupported.
/// If frame body is not of type `BodyResSupported` this method returns `None`.
pub fn into_supported(self) -> Option<BodyResSupported> {
match self {
ResponseBody::Supported(supported) => Some(supported),
_ => None,
}
}

pub fn get_authenticator<'a>(&'a self) -> Option<&'a str> {
match *self {
ResponseBody::Authenticate(ref auth) => Some(auth.data.as_str()),
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod consistency;
pub mod error;
pub mod events;
pub mod transport;
pub mod sharding;

pub type Error = error::Error;
pub type Result<T> = error::Result<T>;
41 changes: 41 additions & 0 deletions src/sharding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::option::*;
use std::collections::HashMap;

pub const SCYLLA_SHARD: &str = "SCYLLA_SHARD";
pub const SCYLLA_NR_SHARDS: &str = "SCYLLA_NR_SHARDS";
pub const SCYLLA_SHARDING_IGNORE_MSB: &str = "SCYLLA_SHARDING_IGNORE_MSB";
pub const SCYLLA_PARTITIONER: &str = "SCYLLA_PARTITIONER";
pub const SCYLLA_SHARDING_ALGORITHM: &str = "SCYLLA_SHARDING_ALGORITHM";

#[derive(Debug)]
pub struct ShardingInfo {
shard_id: i32,
shards_count: i32,
partitioner: String,
sharding_algorithm: String,
ignore_msb: i32,
}

impl ShardingInfo {
pub fn parse(options : &HashMap<String, Vec<String>>) -> Option<ShardingInfo> {
let parsed_shard_id = match options.get(&SCYLLA_SHARD.to_string())?[0].parse::<i32>() {
Ok(shard_count) => shard_count,
_ => return None,
};
let parsed_shards_count = match options.get(&SCYLLA_NR_SHARDS.to_string())?[0].parse::<i32>() {
Ok(shard_count) => shard_count,
_ => return None,
};
let parsed_ignore_msb = match options.get(&SCYLLA_SHARDING_IGNORE_MSB.to_string())?[0].parse::<i32>() {
Ok(ignore_msb_value) => ignore_msb_value,
_ => return None,
};
Some(ShardingInfo {
shard_id: parsed_shard_id,
shards_count: parsed_shards_count,
partitioner: options.get(&SCYLLA_PARTITIONER.to_string())?[0].clone(),
sharding_algorithm: options.get(&SCYLLA_SHARDING_ALGORITHM.to_string())?[0].clone(),
ignore_msb: parsed_ignore_msb,
})
}
}

0 comments on commit 1b25aff

Please sign in to comment.