Skip to content

Commit

Permalink
add a version numver to to command protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal committed Jan 20, 2017
1 parent 9f5b835 commit 086d08c
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 80 deletions.
16 changes: 8 additions & 8 deletions bin/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,18 +285,18 @@ impl CommandServer {

fn proxy_handle_message(&mut self, token: Token, msg: ServerMessage) {
println!("got answer msg: {:?}", msg);
let answer = ConfigMessageAnswer {
id: msg.id.clone(),
status: match msg.status {
let answer = ConfigMessageAnswer::new(
msg.id.clone(),
match msg.status {
ServerMessageStatus::Error(_) => ConfigMessageStatus::Error,
ServerMessageStatus::Ok => ConfigMessageStatus::Ok,
ServerMessageStatus::Processing => ConfigMessageStatus::Processing,
},
message: match msg.status {
ServerMessageStatus::Error(s) => s.clone(),
_ => String::new(),
},
};
match msg.status {
ServerMessageStatus::Error(s) => s.clone(),
_ => String::new(),
},
);
info!("sending: {:?}", answer);
for client in self.conns.iter_mut() {
if let Some(index) = client.has_message_id(&msg.id) {
Expand Down
52 changes: 26 additions & 26 deletions bin/src/command/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use sozu::messages::Order;
use sozu::network::ProxyOrder;
use sozu::network::buffer::Buffer;
use sozu_command::config::Config;
use sozu_command::data::{ConfigCommand,ConfigMessage,ConfigMessageAnswer,ConfigMessageStatus};
use sozu_command::data::{ConfigCommand,ConfigMessage,ConfigMessageAnswer,ConfigMessageStatus,PROTOCOL_VERSION};

use super::{CommandServer,FrontToken,ProxyConfiguration,StoredProxy};
use super::client::parse;
Expand All @@ -35,29 +35,29 @@ impl CommandServer {
let mut counter = 0usize;
for proxy in stored_proxies {
for command in proxy.state.generate_orders() {
let message = ConfigMessage {
id: format!("SAVE-{}", counter),
proxy: Some(proxy.tag.to_string()),
data: ConfigCommand::ProxyConfiguration(command)
};
let message = ConfigMessage::new(
format!("SAVE-{}", counter),
ConfigCommand::ProxyConfiguration(command),
Some(proxy.tag.to_string())
);
f.write_all(&serde_json::to_string(&message).map(|s| s.into_bytes()).unwrap_or(vec!()));
f.write_all(&b"\n\0"[..]);
counter += 1;
}
f.sync_all();
}
self.conns[token].write_message(&ConfigMessageAnswer {
id: message.id.clone(),
status: ConfigMessageStatus::Ok,
message: format!("saved to {}", path)
});
self.conns[token].write_message(&ConfigMessageAnswer::new(
message.id.clone(),
ConfigMessageStatus::Ok,
format!("saved to {}", path)
));
} else {
log!(log::LogLevel::Error, "could not open file: {}", &path);
self.conns[token].write_message(&ConfigMessageAnswer {
id: message.id.clone(),
status: ConfigMessageStatus::Error,
message: "could not open file".to_string()
});
self.conns[token].write_message(&ConfigMessageAnswer::new(
message.id.clone(),
ConfigMessageStatus::Error,
"could not open file".to_string()
));
}
},
ConfigCommand::DumpState => {
Expand All @@ -76,20 +76,20 @@ impl CommandServer {
proxies: stored_proxies,
};
//let encoded = serde_json::to_string(&conf).map(|s| s.into_bytes()).unwrap_or(vec!());
self.conns[token].write_message(&ConfigMessageAnswer {
id: message.id.clone(),
status: ConfigMessageStatus::Ok,
message: serde_json::to_string(&conf).unwrap_or(String::new())
});
self.conns[token].write_message(&ConfigMessageAnswer::new(
message.id.clone(),
ConfigMessageStatus::Ok,
serde_json::to_string(&conf).unwrap_or(String::new())
));
//self.conns[token].write_message(&encoded);
},
ConfigCommand::LoadState(path) => {
self.load_state(&message.id, &path);
self.conns[token].write_message(&ConfigMessageAnswer {
id: message.id.clone(),
status: ConfigMessageStatus::Ok,
message: "loaded the configuration".to_string()
});
self.conns[token].write_message(&ConfigMessageAnswer::new(
message.id.clone(),
ConfigMessageStatus::Ok,
"loaded the configuration".to_string()
));
},
ConfigCommand::ProxyConfiguration(order) => {
if let Some(ref tag) = message.proxy {
Expand Down
16 changes: 9 additions & 7 deletions command/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use toml;

use sozu::messages::{Order,HttpFront,TlsFront,Instance,HttpProxyConfiguration,TlsProxyConfiguration};

use data::{ConfigCommand,ConfigMessage,ProxyType};
use data::{ConfigCommand,ConfigMessage,ProxyType,PROTOCOL_VERSION};

#[derive(Debug,Clone,PartialEq,Eq,Hash,Serialize,Deserialize)]
pub struct ProxyConfig {
Expand Down Expand Up @@ -243,9 +243,10 @@ impl Config {
};

v.push(ConfigMessage {
id: format!("CONFIG-{}", count),
proxy: Some(tag.clone()),
data: ConfigCommand::ProxyConfiguration(frontend_order),
id: format!("CONFIG-{}", count),
version: PROTOCOL_VERSION,
proxy: Some(tag.clone()),
data: ConfigCommand::ProxyConfiguration(frontend_order),
});

count += 1;
Expand All @@ -268,9 +269,10 @@ impl Config {
});

v.push(ConfigMessage {
id: format!("CONFIG-{}", count),
proxy: Some(tag.clone()),
data: ConfigCommand::ProxyConfiguration(backend_order),
id: format!("CONFIG-{}", count),
version: PROTOCOL_VERSION,
proxy: Some(tag.clone()),
data: ConfigCommand::ProxyConfiguration(backend_order),
});

count += 1;
Expand Down
65 changes: 51 additions & 14 deletions command/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use serde_json;
use state::{HttpProxy,TlsProxy,ConfigState};
use sozu::messages::Order;

pub const PROTOCOL_VERSION: u8 = 0;

#[derive(Debug,Clone,Copy,PartialEq,Eq,Hash)]
pub enum ProxyType {
HTTP,
Expand Down Expand Up @@ -156,11 +158,22 @@ pub enum ConfigCommand {

#[derive(Debug,Clone,PartialEq,Eq,Hash)]
pub struct ConfigMessage {
pub id: String,
pub data: ConfigCommand,
pub proxy: Option<String>,
pub id: String,
pub version: u8,
pub data: ConfigCommand,
pub proxy: Option<String>,
}

impl ConfigMessage {
pub fn new(id: String, data: ConfigCommand, proxy: Option<String>) -> ConfigMessage {
ConfigMessage {
id: id,
version: PROTOCOL_VERSION,
data: data,
proxy: proxy,
}
}
}

#[derive(Debug,Clone,PartialEq,Eq,Hash,Serialize,Deserialize)]
pub enum ConfigMessageStatus {
Expand All @@ -172,17 +185,30 @@ pub enum ConfigMessageStatus {
#[derive(Debug,Clone,PartialEq,Eq,Hash,Serialize,Deserialize)]
pub struct ConfigMessageAnswer {
pub id: String,
pub version: u8,
pub status: ConfigMessageStatus,
pub message: String
}

impl ConfigMessageAnswer {
pub fn new(id: String, status: ConfigMessageStatus, message: String) -> ConfigMessageAnswer {
ConfigMessageAnswer {
id: id,
version: PROTOCOL_VERSION,
status: status,
message: message,
}
}
}

#[derive(Deserialize)]
struct SaveStateData {
path : String
}

enum ConfigMessageField {
Id,
Version,
Proxy,
Type,
Data,
Expand All @@ -199,10 +225,11 @@ impl serde::Deserialize for ConfigMessageField {
where E: serde::de::Error {
match value {
"id" => Ok(ConfigMessageField::Id),
"version" => Ok(ConfigMessageField::Version),
"type" => Ok(ConfigMessageField::Type),
"proxy" => Ok(ConfigMessageField::Proxy),
"data" => Ok(ConfigMessageField::Data),
_ => Err(serde::de::Error::custom("expected id, proxy, type or data")),
e => Err(serde::de::Error::custom(format!("expected id, version, proxy, type or data, got: {}", e))),
}
}
}
Expand All @@ -218,16 +245,18 @@ impl serde::de::Visitor for ConfigMessageVisitor {
fn visit_map<V>(&mut self, mut visitor: V) -> Result<ConfigMessage, V::Error>
where V: serde::de::MapVisitor {
let mut id:Option<String> = None;
let mut version:Option<u8> = None;
let mut proxy: Option<String> = None;
let mut config_type:Option<String> = None;
let mut data:Option<serde_json::Value> = None;

loop {
match try!(visitor.visit_key()) {
Some(ConfigMessageField::Type) => { config_type = Some(try!(visitor.visit_value())); }
Some(ConfigMessageField::Id) => { id = Some(try!(visitor.visit_value())); }
Some(ConfigMessageField::Proxy) => { proxy = Some(try!(visitor.visit_value())); }
Some(ConfigMessageField::Data) => { data = Some(try!(visitor.visit_value())); }
Some(ConfigMessageField::Type) => { config_type = Some(try!(visitor.visit_value())); }
Some(ConfigMessageField::Id) => { id = Some(try!(visitor.visit_value())); }
Some(ConfigMessageField::Version) => { version = Some(try!(visitor.visit_value())); }
Some(ConfigMessageField::Proxy) => { proxy = Some(try!(visitor.visit_value())); }
Some(ConfigMessageField::Data) => { data = Some(try!(visitor.visit_value())); }
None => { break; }
}
}
Expand All @@ -241,6 +270,10 @@ impl serde::de::Visitor for ConfigMessageVisitor {
Some(id) => id,
None => try!(visitor.missing_field("id")),
};
let version = match version {
Some(version) => version,
None => try!(visitor.missing_field("version")),
};

try!(visitor.end());

Expand Down Expand Up @@ -272,17 +305,18 @@ impl serde::de::Visitor for ConfigMessageVisitor {
};

Ok(ConfigMessage {
id: id,
data: data,
proxy: proxy
id: id,
version: PROTOCOL_VERSION,
data: data,
proxy: proxy
})
}
}

impl serde::Deserialize for ConfigMessage {
fn deserialize<D>(deserializer: &mut D) -> Result<ConfigMessage, D::Error>
where D: serde::de::Deserializer {
static FIELDS: &'static [&'static str] = &["id", "proxy", "type", "data"];
static FIELDS: &'static [&'static str] = &["id", "version", "proxy", "type", "data"];
deserializer.deserialize_struct("ConfigMessage", FIELDS, ConfigMessageVisitor)
}
}
Expand All @@ -297,14 +331,17 @@ impl serde::Serialize for ConfigMessage {
where S: serde::Serializer,
{
let mut state = if self.proxy.is_some() {
try!(serializer.serialize_map(Some(4)))
try!(serializer.serialize_map(Some(5)))
} else {
try!(serializer.serialize_map(Some(3)))
try!(serializer.serialize_map(Some(4)))
};

try!(serializer.serialize_map_key(&mut state, "id"));
try!(serializer.serialize_map_value(&mut state, &self.id));

try!(serializer.serialize_map_key(&mut state, "version"));
try!(serializer.serialize_map_value(&mut state, &self.version));

if self.proxy.is_some() {
try!(serializer.serialize_map_key(&mut state, "proxy"));
try!(serializer.serialize_map_value(&mut state, self.proxy.as_ref().unwrap()));
Expand Down
50 changes: 25 additions & 25 deletions ctl/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ fn generate_id() -> String {

pub fn save_state(channel: &mut Channel<ConfigMessage,ConfigMessageAnswer>, path: &str) {
let id = generate_id();
channel.write_message(&ConfigMessage {
id: id.clone(),
data: ConfigCommand::SaveState(path.to_string()),
proxy: None,
});
channel.write_message(&ConfigMessage::new(
id.clone(),
ConfigCommand::SaveState(path.to_string()),
None,
));

match channel.read_message() {
None => println!("the proxy didn't answer"),
Expand Down Expand Up @@ -46,11 +46,11 @@ pub fn save_state(channel: &mut Channel<ConfigMessage,ConfigMessageAnswer>, path

pub fn load_state(channel: &mut Channel<ConfigMessage,ConfigMessageAnswer>, path: &str) {
let id = generate_id();
channel.write_message(&ConfigMessage {
id: id.clone(),
data: ConfigCommand::LoadState(path.to_string()),
proxy: None,
});
channel.write_message(&ConfigMessage::new(
id.clone(),
ConfigCommand::LoadState(path.to_string()),
None,
));

match channel.read_message() {
None => println!("the proxy didn't answer"),
Expand Down Expand Up @@ -78,11 +78,11 @@ pub fn load_state(channel: &mut Channel<ConfigMessage,ConfigMessageAnswer>, path

pub fn dump_state(channel: &mut Channel<ConfigMessage,ConfigMessageAnswer>) {
let id = generate_id();
channel.write_message(&ConfigMessage {
id: id.clone(),
data: ConfigCommand::DumpState,
proxy: None,
});
channel.write_message(&ConfigMessage::new(
id.clone(),
ConfigCommand::DumpState,
None,
));

match channel.read_message() {
None => println!("the proxy didn't answer"),
Expand Down Expand Up @@ -112,11 +112,11 @@ pub fn soft_stop(channel: &mut Channel<ConfigMessage,ConfigMessageAnswer>, confi
let mut tags: HashMap<String,String> = HashMap::from_iter(config.proxies.keys().map(|tag| {
println!("shutting down proxy \"{}\"", tag);
let id = generate_id();
channel.write_message(&ConfigMessage {
id: id.clone(),
data: ConfigCommand::ProxyConfiguration(Order::SoftStop),
proxy: Some(tag.clone()),
});
channel.write_message(&ConfigMessage::new(
id.clone(),
ConfigCommand::ProxyConfiguration(Order::SoftStop),
Some(tag.clone()),
));
(id, tag.clone())
}));

Expand Down Expand Up @@ -155,11 +155,11 @@ pub fn hard_stop(channel: &mut Channel<ConfigMessage,ConfigMessageAnswer>, confi
let mut tags: HashMap<String,String> = HashMap::from_iter(config.proxies.keys().map(|tag| {
println!("shutting down proxy \"{}\"", tag);
let id = generate_id();
channel.write_message(&ConfigMessage {
id: id.clone(),
data: ConfigCommand::ProxyConfiguration(Order::HardStop),
proxy: Some(tag.clone()),
});
channel.write_message(&ConfigMessage::new(
id.clone(),
ConfigCommand::ProxyConfiguration(Order::HardStop),
Some(tag.clone()),
));
(id, tag.clone())
}));

Expand Down

0 comments on commit 086d08c

Please sign in to comment.