Skip to content

Commit

Permalink
add initial server implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
flosse committed Jan 27, 2018
1 parent 2d060df commit b7a7edb
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ A [tokio](https://tokio.rs)-based modbus library.
- sync (blocking)
- Modbus TCP
- Modbus RTU
- Client & Server
- Open Source (MIT/Apache-2.0)

## Installation
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//! - sync (blocking)
//! - Modbus TCP
//! - Modbus RTU
//! - Client & Server
//! - Open Source (MIT/Apache-2.0)
//!
//! # Installation
Expand Down Expand Up @@ -143,6 +144,8 @@ mod codec;
mod proto;
mod service;
mod client;
mod server;

pub use frame::*;
pub use client::*;
pub use server::*;
126 changes: 126 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use frame::*;
use std::io::Error;
use std::net::SocketAddr;
use futures::prelude::*;
use tokio_service::{NewService, Service};
use tokio_proto::TcpServer;
use proto;

/// A multithreaded Modbus server.
pub struct Server {
server_type: ServerType,
}

enum ServerType {
Tcp(SocketAddr),
}

struct ServiceWrapper<S> {
service: S,
}

impl<S> ServiceWrapper<S> {
fn new(service: S) -> ServiceWrapper<S> {
ServiceWrapper { service }
}
}

impl<S> Service for ServiceWrapper<S>
where
S: Service + Send + Sync + 'static,
S::Request: From<Request>,
S::Response: Into<Response>,
S::Error: Into<Error>,
{
type Request = TcpAdu;
type Response = TcpAdu;
type Error = Error;
type Future = Box<Future<Item = Self::Request, Error = Self::Error>>;

fn call(&self, req: Self::Request) -> Self::Future {
let TcpAdu { header, pdu } = req;
if let Pdu::Request(req) = pdu {
Box::new(self.service.call(req.into()).then(|res| match res {
Ok(res) => {
let pdu = Pdu::Result(Ok(res.into()));
Ok(TcpAdu { header, pdu })
}
Err(e) => Err(e.into()),
}))
} else {
panic!("Received response instead of a request");
}
}
}

impl Server {
/// Create a new Modbus TCP server instance.
#[cfg(feature = "tcp")]
pub fn new_tcp(addr: SocketAddr) -> Server {
Server {
server_type: ServerType::Tcp(addr),
}
}

pub fn serve<S>(&self, service: S)
where
S: NewService + Send + Sync + 'static,
S::Request: From<Request>,
S::Response: Into<Response>,
S::Error: Into<Error>,
S::Instance: Send + Sync + 'static,
{
match self.server_type {
ServerType::Tcp(addr) => {
TcpServer::new(proto::tcp::Proto, addr)
.serve(move || Ok(ServiceWrapper::new(service.new_service()?)));
}
}
}
}

#[cfg(test)]
mod tests {

use super::*;
use futures::future;

#[test]
fn service_wrapper() {
#[derive(Clone)]
struct DummyService {
response: Response,
};

impl Service for DummyService {
type Request = Request;
type Response = Response;
type Error = Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

fn call(&self, _: Self::Request) -> Self::Future {
Box::new(future::ok(self.response.clone()))
}
}

let s = DummyService {
response: Response::ReadInputRegisters(vec![0x33]),
};
let service = ServiceWrapper::new(s.clone());
let pdu = Pdu::Request(Request::ReadInputRegisters(0, 1));
let header = TcpHeader {
transaction_id: 9,
unit_id: 7,
};
let adu = TcpAdu { header, pdu };
let res = service.call(adu).wait().unwrap();
assert_eq!(
res.header,
TcpHeader {
transaction_id: 9,
unit_id: 7,
}
);
assert_eq!(res.pdu, Pdu::Result(Ok(s.response)));
}
}

0 comments on commit b7a7edb

Please sign in to comment.