Skip to content

Commit

Permalink
Merge pull request snapview#65 from vorot93/rust-2018
Browse files Browse the repository at this point in the history
Upgrade to Rust 2018, format the code
  • Loading branch information
daniel-abramov authored Aug 21, 2019
2 parents 9cf7243 + 9bd5f01 commit 59ca2c8
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 279 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ homepage = "https://github.com/snapview/tokio-tungstenite"
documentation = "https://docs.rs/tokio-tungstenite/0.9.0"
repository = "https://github.com/snapview/tokio-tungstenite"
version = "0.9.0"
edition = "2018"

[features]
default = ["connect", "tls"]
Expand Down
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,13 @@ Asynchronous WebSockets for Tokio stack.

## Usage

First, you need to add this in your `Cargo.toml`:
Add this in your `Cargo.toml`:

```toml
[dependencies]
tokio-tungstenite = "*"
```

Next, add this to your crate:

```rust
extern crate tokio_tungstenite;
```

Take a look at the `examples/` directory for client and server examples. You may also want to get familiar with
[tokio](https://tokio.rs/) if you don't have any experience with it.

Expand Down
42 changes: 17 additions & 25 deletions examples/autobahn-client.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,39 @@
#[macro_use] extern crate log;
extern crate env_logger;
extern crate futures;
extern crate tokio;
extern crate tokio_tungstenite;
extern crate url;

use url::Url;
use futures::{Future, Stream};
use log::*;
use tokio_tungstenite::{
connect_async,
tungstenite::{
connect,
Result,
Error as WsError,
},
tungstenite::{connect, Error as WsError, Result},
};
use url::Url;

const AGENT: &'static str = "Tungstenite";

fn get_case_count() -> Result<u32> {
let (mut socket, _) = connect(
Url::parse("ws://localhost:9001/getCaseCount").unwrap(),
)?;
let (mut socket, _) = connect(Url::parse("ws://localhost:9001/getCaseCount").unwrap())?;
let msg = socket.read_message()?;
socket.close(None)?;
Ok(msg.into_text()?.parse::<u32>().unwrap())
}

fn update_reports() -> Result<()> {
let (mut socket, _) = connect(
Url::parse(&format!("ws://localhost:9001/updateReports?agent={}", AGENT)).unwrap(),
Url::parse(&format!(
"ws://localhost:9001/updateReports?agent={}",
AGENT
))
.unwrap(),
)?;
socket.close(None)?;
Ok(())
}

fn run_test(case: u32) {
info!("Running test case {}", case);
let case_url = Url::parse(
&format!("ws://localhost:9001/runCase?case={}&agent={}", case, AGENT)
).unwrap();
let case_url = Url::parse(&format!(
"ws://localhost:9001/runCase?case={}&agent={}",
case, AGENT
))
.unwrap();

let job = connect_async(case_url)
.map_err(|err| error!("Connect error: {}", err))
Expand All @@ -49,11 +43,9 @@ fn run_test(case: u32) {
.filter(|msg| msg.is_text() || msg.is_binary())
.forward(sink)
.and_then(|(_stream, _sink)| Ok(()))
.map_err(|err| {
match err {
WsError::ConnectionClosed => (),
err => info!("WS error {}", err),
}
.map_err(|err| match err {
WsError::ConnectionClosed => (),
err => info!("WS error {}", err),
})
});

Expand Down
55 changes: 24 additions & 31 deletions examples/autobahn-server.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
#[macro_use] extern crate log;
extern crate env_logger;
extern crate futures;
extern crate tokio;
extern crate tokio_tungstenite;

use futures::{Future, Stream};
use log::*;
use tokio::net::TcpListener;
use tokio_tungstenite::{
accept_async,
tungstenite::Error as WsError,
};
use tokio_tungstenite::{accept_async, tungstenite::Error as WsError};

fn main() {
env_logger::init();
Expand All @@ -20,30 +12,31 @@ fn main() {
let socket = TcpListener::bind(&addr).unwrap();
info!("Listening on: {}", addr);

let srv = socket.incoming().map_err(Into::into).for_each(move |stream| {

let peer = stream.peer_addr().expect("connected streams should have a peer address");
info!("Peer address: {}", peer);

accept_async(stream).and_then(move |ws_stream| {
info!("New WebSocket connection: {}", peer);
let (sink, stream) = ws_stream.split();
let job = stream
.filter(|msg| msg.is_text() || msg.is_binary())
.forward(sink)
.and_then(|(_stream, _sink)| Ok(()))
.map_err(|err| {
match err {
let srv = socket
.incoming()
.map_err(Into::into)
.for_each(move |stream| {
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
info!("Peer address: {}", peer);

accept_async(stream).and_then(move |ws_stream| {
info!("New WebSocket connection: {}", peer);
let (sink, stream) = ws_stream.split();
let job = stream
.filter(|msg| msg.is_text() || msg.is_binary())
.forward(sink)
.and_then(|(_stream, _sink)| Ok(()))
.map_err(|err| match err {
WsError::ConnectionClosed => (),
err => info!("WS error: {}", err),
}
});

tokio::spawn(job);
Ok(())
})
});
});

tokio::spawn(job);
Ok(())
})
});

runtime.block_on(srv).unwrap();
}
64 changes: 31 additions & 33 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@
//!
//! You can use this example together with the `server` example.
extern crate futures;
extern crate tokio;
extern crate tokio_tungstenite;
extern crate tungstenite;
extern crate url;

use std::env;
use std::io::{self, Read, Write};
use std::thread;
Expand All @@ -29,9 +23,9 @@ use tokio_tungstenite::stream::PeerAddr;

fn main() {
// Specify the server address to which the client will be connecting.
let connect_addr = env::args().nth(1).unwrap_or_else(|| {
panic!("this program requires at least one argument")
});
let connect_addr = env::args()
.nth(1)
.unwrap_or_else(|| panic!("this program requires at least one argument"));

let url = url::Url::parse(&connect_addr).unwrap();

Expand Down Expand Up @@ -59,32 +53,37 @@ fn main() {
// finishes. If we don't have any more data to read or we won't receive any
// more work from the remote then we can exit.
let mut stdout = io::stdout();
let client = connect_async(url).and_then(move |(ws_stream, _)| {
println!("WebSocket handshake has been successfully completed");
let client = connect_async(url)
.and_then(move |(ws_stream, _)| {
println!("WebSocket handshake has been successfully completed");

let addr = ws_stream.peer_addr().expect("connected streams should have a peer address");
println!("Peer address: {}", addr);
let addr = ws_stream
.peer_addr()
.expect("connected streams should have a peer address");
println!("Peer address: {}", addr);

// `sink` is the stream of messages going out.
// `stream` is the stream of incoming messages.
let (sink, stream) = ws_stream.split();
// `sink` is the stream of messages going out.
// `stream` is the stream of incoming messages.
let (sink, stream) = ws_stream.split();

// We forward all messages, composed out of the data, entered to
// the stdin, to the `sink`.
let send_stdin = stdin_rx.forward(sink);
let write_stdout = stream.for_each(move |message| {
stdout.write_all(&message.into_data()).unwrap();
Ok(())
});
// We forward all messages, composed out of the data, entered to
// the stdin, to the `sink`.
let send_stdin = stdin_rx.forward(sink);
let write_stdout = stream.for_each(move |message| {
stdout.write_all(&message.into_data()).unwrap();
Ok(())
});

// Wait for either of futures to complete.
send_stdin.map(|_| ())
.select(write_stdout.map(|_| ()))
.then(|_| Ok(()))
}).map_err(|e| {
println!("Error during the websocket handshake occurred: {}", e);
io::Error::new(io::ErrorKind::Other, e)
});
// Wait for either of futures to complete.
send_stdin
.map(|_| ())
.select(write_stdout.map(|_| ()))
.then(|_| Ok(()))
})
.map_err(|e| {
println!("Error during the websocket handshake occurred: {}", e);
io::Error::new(io::ErrorKind::Other, e)
});

// And now that we've got our client, we execute it in the event loop!
tokio::runtime::run(client.map_err(|_e| ()));
Expand All @@ -97,8 +96,7 @@ fn read_stdin(mut tx: mpsc::Sender<Message>) {
loop {
let mut buf = vec![0; 1024];
let n = match stdin.read(&mut buf) {
Err(_) |
Ok(0) => break,
Err(_) | Ok(0) => break,
Ok(n) => n,
};
buf.truncate(n);
Expand Down
Loading

0 comments on commit 59ca2c8

Please sign in to comment.