diff --git a/README.md b/README.md index 13a9ac65..cb190c2f 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ A [tokio](https://tokio.rs)-based modbus library. - sync (blocking) - Modbus TCP - Modbus RTU +- Client & Server - Open Source (MIT/Apache-2.0) ## Installation diff --git a/src/lib.rs b/src/lib.rs index 384197e5..c6da93e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ //! - sync (blocking) //! - Modbus TCP //! - Modbus RTU +//! - Client & Server //! - Open Source (MIT/Apache-2.0) //! //! # Installation @@ -143,6 +144,8 @@ mod codec; mod proto; mod service; mod client; +mod server; pub use frame::*; pub use client::*; +pub use server::*; diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 00000000..05ce640d --- /dev/null +++ b/src/server.rs @@ -0,0 +1,126 @@ +use frame::*; +use std::io::Error; +use std::net::SocketAddr; +use futures::prelude::*; +use tokio_service::{NewService, Service}; +use tokio_proto::TcpServer; +use proto; + +/// A multithreaded Modbus server. +pub struct Server { + server_type: ServerType, +} + +enum ServerType { + Tcp(SocketAddr), +} + +struct ServiceWrapper { + service: S, +} + +impl ServiceWrapper { + fn new(service: S) -> ServiceWrapper { + ServiceWrapper { service } + } +} + +impl Service for ServiceWrapper +where + S: Service + Send + Sync + 'static, + S::Request: From, + S::Response: Into, + S::Error: Into, +{ + type Request = TcpAdu; + type Response = TcpAdu; + type Error = Error; + type Future = Box>; + + fn call(&self, req: Self::Request) -> Self::Future { + let TcpAdu { header, pdu } = req; + if let Pdu::Request(req) = pdu { + Box::new(self.service.call(req.into()).then(|res| match res { + Ok(res) => { + let pdu = Pdu::Result(Ok(res.into())); + Ok(TcpAdu { header, pdu }) + } + Err(e) => Err(e.into()), + })) + } else { + panic!("Received response instead of a request"); + } + } +} + +impl Server { + /// Create a new Modbus TCP server instance. + #[cfg(feature = "tcp")] + pub fn new_tcp(addr: SocketAddr) -> Server { + Server { + server_type: ServerType::Tcp(addr), + } + } + + pub fn serve(&self, service: S) + where + S: NewService + Send + Sync + 'static, + S::Request: From, + S::Response: Into, + S::Error: Into, + S::Instance: Send + Sync + 'static, + { + match self.server_type { + ServerType::Tcp(addr) => { + TcpServer::new(proto::tcp::Proto, addr) + .serve(move || Ok(ServiceWrapper::new(service.new_service()?))); + } + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use futures::future; + + #[test] + fn service_wrapper() { + #[derive(Clone)] + struct DummyService { + response: Response, + }; + + impl Service for DummyService { + type Request = Request; + type Response = Response; + type Error = Error; + type Future = Box>; + + fn call(&self, _: Self::Request) -> Self::Future { + Box::new(future::ok(self.response.clone())) + } + } + + let s = DummyService { + response: Response::ReadInputRegisters(vec![0x33]), + }; + let service = ServiceWrapper::new(s.clone()); + let pdu = Pdu::Request(Request::ReadInputRegisters(0, 1)); + let header = TcpHeader { + transaction_id: 9, + unit_id: 7, + }; + let adu = TcpAdu { header, pdu }; + let res = service.call(adu).wait().unwrap(); + assert_eq!( + res.header, + TcpHeader { + transaction_id: 9, + unit_id: 7, + } + ); + assert_eq!(res.pdu, Pdu::Result(Ok(s.response))); + } +}