Skip to content

Commit

Permalink
feat: Add hyper bindings
Browse files Browse the repository at this point in the history
Co-authered-by: Luca Casonato <[email protected]>
Co-authered-by: Ben Noordhuis <[email protected]>
Co-authered-by: Ryan Dahl <[email protected]>
  • Loading branch information
bartlomieju authored and ry committed Apr 8, 2021
1 parent c4b21fb commit f6d4a4b
Show file tree
Hide file tree
Showing 15 changed files with 996 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions cli/bench/deno_http_native.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.

const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const listener = Deno.listen({ hostname, port: Number(port) });
console.log("Server listening on", addr);

const body = Deno.core.encode("Hello World");

for await (const conn of listener) {
(async () => {
const requests = Deno.startHttp(conn);
for await (const { respondWith } of requests) {
respondWith(new Response(body));
}
})();
}
20 changes: 20 additions & 0 deletions cli/bench/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub(crate) fn benchmark(
res.insert("deno_tcp".to_string(), deno_tcp(deno_exe)?);
// res.insert("deno_udp".to_string(), deno_udp(deno_exe)?);
res.insert("deno_http".to_string(), deno_http(deno_exe)?);
res.insert("deno_http_native".to_string(), deno_http_native(deno_exe)?);
// TODO(ry) deno_proxy disabled to make fetch() standards compliant.
// res.insert("deno_proxy".to_string(), deno_http_proxy(deno_exe) hyper_hello_exe))
res.insert(
Expand Down Expand Up @@ -198,6 +199,25 @@ fn deno_http(deno_exe: &str) -> Result<HttpBenchmarkResult> {
)
}

fn deno_http_native(deno_exe: &str) -> Result<HttpBenchmarkResult> {
let port = get_port();
println!("http_benchmark testing DENO using native bindings.");
run(
&[
deno_exe,
"run",
"--allow-net",
"--reload",
"--unstable",
"cli/bench/deno_http_native.js",
&server_addr(port),
],
port,
None,
None,
)
}

#[allow(dead_code)]
fn deno_http_proxy(
deno_exe: &str,
Expand Down
25 changes: 25 additions & 0 deletions cli/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,31 @@ declare namespace Deno {
bytesSentData: number;
bytesReceived: number;
}

export interface RequestEvent {
readonly request: Request;
respondWith(r: Response | Promise<Response>): void;
}

export interface HttpConn extends AsyncIterableIterator<RequestEvent> {
close(): void;
readonly rid: number;
}

/** **UNSTABLE**: new API, yet to be vetted.
*
* Parse HTTP requests from the given connection
*
* ```ts
* const httpConn = await Deno.startHttp(conn);
* const { request, respondWith } = await httpConn.next();
* respondWith(new Response("Hello World"));
* ```
*
* If `httpConn.next()` encounters an error or returns `done == true` then
* the underlying HttpConn resource is closed automatically.
*/
export function startHttp(conn: Conn): HttpConn;
}

declare function fetch(
Expand Down
198 changes: 198 additions & 0 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
import {
assert,
assertEquals,
assertThrowsAsync,
unitTest,
} from "./test_util.ts";
import { BufReader, BufWriter } from "../../../test_util/std/io/bufio.ts";
import { TextProtoReader } from "../../../test_util/std/textproto/mod.ts";

unitTest({ perms: { net: true } }, async function httpServerBasic() {
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
for await (const conn of listener) {
const httpConn = Deno.startHttp(conn);
for await (const { request, respondWith } of httpConn) {
assertEquals(await request.text(), "");
respondWith(new Response("Hello World"));
}
break;
}
})();

const resp = await fetch("http://127.0.0.1:4501/", {
headers: { "connection": "close" },
});
const text = await resp.text();
assertEquals(text, "Hello World");
await promise;
});

unitTest(
{ perms: { net: true } },
async function httpServerStreamResponse() {
const stream = new TransformStream();
const writer = stream.writable.getWriter();
writer.write(new TextEncoder().encode("hello "));
writer.write(new TextEncoder().encode("world"));
writer.close();

const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
const conn = await listener.accept();
const httpConn = Deno.startHttp(conn);
const { value: { request, respondWith }, done } = await httpConn.next();
assert(!done);
assert(!request.body);
await respondWith(new Response(stream.readable));
httpConn.close();
listener.close();
})();

const resp = await fetch("http://127.0.0.1:4501/");
const respBody = await resp.text();
assertEquals("hello world", respBody);
await promise;
},
);

unitTest(
{ perms: { net: true } },
async function httpServerStreamRequest() {
const stream = new TransformStream();
const writer = stream.writable.getWriter();
writer.write(new TextEncoder().encode("hello "));
writer.write(new TextEncoder().encode("world"));
writer.close();

const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
const conn = await listener.accept();
const httpConn = Deno.startHttp(conn);
const { value: { request, respondWith } } = await httpConn.next();
const reqBody = await request.text();
assertEquals("hello world", reqBody);
await respondWith(new Response(""));

// TODO(ry) If we don't call httpConn.next() here we get "error sending
// request for url (https://localhost:4501/): connection closed before
// message completed".
const { done } = await httpConn.next();
assert(done);

listener.close();
})();

const resp = await fetch("http://127.0.0.1:4501/", {
body: stream.readable,
method: "POST",
headers: { "connection": "close" },
});

await resp.arrayBuffer();
await promise;
},
);

unitTest({ perms: { net: true } }, async function httpServerStreamDuplex() {
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
const conn = await listener.accept();
const httpConn = Deno.startHttp(conn);
const { value: { request, respondWith } } = await httpConn.next();
assert(request.body);
await respondWith(new Response(request.body));
httpConn.close();
listener.close();
})();

const ts = new TransformStream();
const writable = ts.writable.getWriter();
const resp = await fetch("http://127.0.0.1:4501/", {
method: "POST",
body: ts.readable,
});
assert(resp.body);
const reader = resp.body.getReader();
await writable.write(new Uint8Array([1]));
const chunk1 = await reader.read();
assert(!chunk1.done);
assertEquals(chunk1.value, new Uint8Array([1]));
await writable.write(new Uint8Array([2]));
const chunk2 = await reader.read();
assert(!chunk2.done);
assertEquals(chunk2.value, new Uint8Array([2]));
await writable.close();
const chunk3 = await reader.read();
assert(chunk3.done);
await promise;
});

unitTest({ perms: { net: true } }, async function httpServerClose() {
const listener = Deno.listen({ port: 4501 });
const client = await Deno.connect({ port: 4501 });
const httpConn = Deno.startHttp(await listener.accept());
client.close();
const { done } = await httpConn.next();
assert(done);
// Note httpConn is automatically closed when "done" is reached.
listener.close();
});

unitTest({ perms: { net: true } }, async function httpServerInvalidMethod() {
const listener = Deno.listen({ port: 4501 });
const client = await Deno.connect({ port: 4501 });
const httpConn = Deno.startHttp(await listener.accept());
await client.write(new Uint8Array([1, 2, 3]));
await assertThrowsAsync(
async () => {
await httpConn.next();
},
Deno.errors.Http,
"invalid HTTP method parsed",
);
// Note httpConn is automatically closed when it errors.
client.close();
listener.close();
});

unitTest(
{ perms: { read: true, net: true } },
async function httpServerWithTls(): Promise<void> {
const hostname = "localhost";
const port = 4501;

const promise = (async () => {
const listener = Deno.listenTls({
hostname,
port,
certFile: "cli/tests/tls/localhost.crt",
keyFile: "cli/tests/tls/localhost.key",
});
const conn = await listener.accept();
const httpConn = Deno.startHttp(conn);
const { value: { request, respondWith } } = await httpConn.next();
await respondWith(new Response("Hello World"));

// TODO(ry) If we don't call httpConn.next() here we get "error sending
// request for url (https://localhost:4501/): connection closed before
// message completed".
const { done } = await httpConn.next();
assert(done);

listener.close();
})();

const caData = Deno.readTextFileSync("cli/tests/tls/RootCA.pem");
const client = Deno.createHttpClient({ caData });
const resp = await fetch(`https://${hostname}:${port}/`, {
client,
headers: { "connection": "close" },
});
const respBody = await resp.text();
assertEquals("Hello World", respBody);
await promise;
client.close();
},
);
1 change: 1 addition & 0 deletions cli/tests/unit/unit_tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import "./fs_events_test.ts";
import "./get_random_values_test.ts";
import "./globals_test.ts";
import "./headers_test.ts";
import "./http_test.ts";
import "./internals_test.ts";
import "./io_test.ts";
import "./link_test.ts";
Expand Down
3 changes: 2 additions & 1 deletion cli/tokio_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ where
F: std::future::Future<Output = R>,
{
let rt = create_basic_runtime();
rt.block_on(future)
let local = tokio::task::LocalSet::new();
local.block_on(&rt, future)
}
4 changes: 3 additions & 1 deletion runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ deno_websocket = { path = "../op_crates/websocket", version = "0.7.1" }
deno_webgpu = { path = "../op_crates/webgpu", version = "0.3.1" }

atty = "0.2.14"
bytes = "1"
dlopen = "0.1.8"
encoding_rs = "0.8.28"
filetime = "0.2.14"
http = "0.2.3"
hyper = { version = "0.14.4", features = ["server"] }
hyper = { version = "0.14.4", features = ["server", "stream", "http1", "http2", "runtime"] }
indexmap = "1.6.1"
lazy_static = "1.4.0"
libc = "0.2.86"
Expand All @@ -63,6 +64,7 @@ serde = { version = "1.0.123", features = ["derive"] }
sys-info = "0.8.0"
termcolor = "1.1.2"
tokio = { version = "1.4.0", features = ["full"] }
tokio-util = { version = "0.6", features = ["io"] }
tokio-rustls = "0.22.0"
uuid = { version = "0.8.2", features = ["v4"] }
webpki = "0.21.4"
Expand Down
5 changes: 5 additions & 0 deletions runtime/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ fn get_url_parse_error_class(_error: &url::ParseError) -> &'static str {
"URIError"
}

fn get_hyper_error_class(_error: &hyper::Error) -> &'static str {
"Http"
}

#[cfg(unix)]
fn get_nix_error_class(error: &nix::Error) -> &'static str {
use nix::errno::Errno::*;
Expand All @@ -156,6 +160,7 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
e.downcast_ref::<dlopen::Error>()
.map(get_dlopen_error_class)
})
.or_else(|| e.downcast_ref::<hyper::Error>().map(get_hyper_error_class))
.or_else(|| {
e.downcast_ref::<deno_core::Canceled>().map(|e| {
let io_err: io::Error = e.to_owned().into();
Expand Down
Loading

0 comments on commit f6d4a4b

Please sign in to comment.