diff --git a/src/cluster/tcp_connection_pool.rs b/src/cluster/tcp_connection_pool.rs index 8934dd3..42bc24b 100644 --- a/src/cluster/tcp_connection_pool.rs +++ b/src/cluster/tcp_connection_pool.rs @@ -11,6 +11,7 @@ use crate::error; use crate::frame::parser::parse_frame; use crate::frame::{Frame, IntoBytes, Opcode}; use crate::transport::{CDRSTransport, TransportTcp}; +use crate::sharding::ShardingInfo; /// Shortcut for `r2d2::Pool` type of TCP-based CDRS connections. pub type TcpConnectionPool = Pool>; @@ -83,6 +84,21 @@ pub fn startup<'b, T: CDRSTransport + 'static, A: Authenticator + 'static + Size let start_response = parse_frame(transport, compression)?; + let options_frame = Frame::new_req_options().into_cbytes(); + transport.borrow_mut().write(options_frame.as_slice())?; + let options_response = parse_frame(transport, compression)?; + + if options_response.opcode == Opcode::Supported { + let body = options_response.get_body()?; + let options = body.into_supported(); + let options = match options { + Some(opt) => { + println!("Sharding options: {:?}", ShardingInfo::parse(&opt.data)); + }, + _ => (), + }; + } + if start_response.opcode == Opcode::Ready { return Ok(()); } diff --git a/src/frame/frame_response.rs b/src/frame/frame_response.rs index 2af5e92..37c9703 100644 --- a/src/frame/frame_response.rs +++ b/src/frame/frame_response.rs @@ -120,6 +120,15 @@ impl ResponseBody { } } + /// It unwraps body and returns BodyResSupported. + /// If frame body is not of type `BodyResSupported` this method returns `None`. + pub fn into_supported(self) -> Option { + match self { + ResponseBody::Supported(supported) => Some(supported), + _ => None, + } + } + pub fn get_authenticator<'a>(&'a self) -> Option<&'a str> { match *self { ResponseBody::Authenticate(ref auth) => Some(auth.data.as_str()), diff --git a/src/lib.rs b/src/lib.rs index 0f0074d..d0e7e5c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ pub mod consistency; pub mod error; pub mod events; pub mod transport; +pub mod sharding; pub type Error = error::Error; pub type Result = error::Result; diff --git a/src/sharding.rs b/src/sharding.rs new file mode 100644 index 0000000..7c81822 --- /dev/null +++ b/src/sharding.rs @@ -0,0 +1,41 @@ +use std::option::*; +use std::collections::HashMap; + +pub const SCYLLA_SHARD: &str = "SCYLLA_SHARD"; +pub const SCYLLA_NR_SHARDS: &str = "SCYLLA_NR_SHARDS"; +pub const SCYLLA_SHARDING_IGNORE_MSB: &str = "SCYLLA_SHARDING_IGNORE_MSB"; +pub const SCYLLA_PARTITIONER: &str = "SCYLLA_PARTITIONER"; +pub const SCYLLA_SHARDING_ALGORITHM: &str = "SCYLLA_SHARDING_ALGORITHM"; + +#[derive(Debug)] +pub struct ShardingInfo { + shard_id: i32, + shards_count: i32, + partitioner: String, + sharding_algorithm: String, + ignore_msb: i32, +} + +impl ShardingInfo { + pub fn parse(options : &HashMap>) -> Option { + let parsed_shard_id = match options.get(&SCYLLA_SHARD.to_string())?[0].parse::() { + Ok(shard_count) => shard_count, + _ => return None, + }; + let parsed_shards_count = match options.get(&SCYLLA_NR_SHARDS.to_string())?[0].parse::() { + Ok(shard_count) => shard_count, + _ => return None, + }; + let parsed_ignore_msb = match options.get(&SCYLLA_SHARDING_IGNORE_MSB.to_string())?[0].parse::() { + Ok(ignore_msb_value) => ignore_msb_value, + _ => return None, + }; + Some(ShardingInfo { + shard_id: parsed_shard_id, + shards_count: parsed_shards_count, + partitioner: options.get(&SCYLLA_PARTITIONER.to_string())?[0].clone(), + sharding_algorithm: options.get(&SCYLLA_SHARDING_ALGORITHM.to_string())?[0].clone(), + ignore_msb: parsed_ignore_msb, + }) + } +}