Skip to content

Commit

Permalink
refactor: rewrite ops to use ResourceTable2 (#8512)
Browse files Browse the repository at this point in the history
This commit migrates all ops to use new resource table
and "AsyncRefCell".

Old implementation of resource table was completely 
removed and all code referencing it was updated to use
new system.
  • Loading branch information
bartlomieju authored Dec 16, 2020
1 parent 9fe26f8 commit 6984b63
Show file tree
Hide file tree
Showing 26 changed files with 1,222 additions and 1,206 deletions.
60 changes: 55 additions & 5 deletions cli/tests/unit/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ unitTest({ perms: { net: true } }, function netTcpListenClose(): void {
unitTest(
{
perms: { net: true },
// TODO:
ignore: Deno.build.os === "windows",
},
function netUdpListenClose(): void {
const socket = Deno.listenDatagram({
Expand Down Expand Up @@ -257,7 +255,7 @@ unitTest(
);

unitTest(
{ ignore: Deno.build.os === "windows", perms: { net: true } },
{ perms: { net: true } },
async function netUdpSendReceive(): Promise<void> {
const alice = Deno.listenDatagram({ port: 3500, transport: "udp" });
assert(alice.addr.transport === "udp");
Expand Down Expand Up @@ -287,7 +285,31 @@ unitTest(
);

unitTest(
{ ignore: Deno.build.os === "windows", perms: { net: true } },
{ perms: { net: true } },
async function netUdpConcurrentSendReceive(): Promise<void> {
const socket = Deno.listenDatagram({ port: 3500, transport: "udp" });
assert(socket.addr.transport === "udp");
assertEquals(socket.addr.port, 3500);
assertEquals(socket.addr.hostname, "127.0.0.1");

const recvPromise = socket.receive();

const sendBuf = new Uint8Array([1, 2, 3]);
const sendLen = await socket.send(sendBuf, socket.addr);
assertEquals(sendLen, 3);

const [recvBuf, recvAddr] = await recvPromise;
assertEquals(recvBuf.length, 3);
assertEquals(1, recvBuf[0]);
assertEquals(2, recvBuf[1]);
assertEquals(3, recvBuf[2]);

socket.close();
},
);

unitTest(
{ perms: { net: true } },
async function netUdpBorrowMutError(): Promise<void> {
const socket = Deno.listenDatagram({
port: 4501,
Expand Down Expand Up @@ -335,6 +357,34 @@ unitTest(
},
);

// TODO(piscisaureus): Enable after Tokio v0.3/v1.0 upgrade.
unitTest(
{ ignore: true, perms: { read: true, write: true } },
async function netUnixPacketConcurrentSendReceive(): Promise<void> {
const filePath = await Deno.makeTempFile();
const socket = Deno.listenDatagram({
path: filePath,
transport: "unixpacket",
});
assert(socket.addr.transport === "unixpacket");
assertEquals(socket.addr.path, filePath);

const recvPromise = socket.receive();

const sendBuf = new Uint8Array([1, 2, 3]);
const sendLen = await socket.send(sendBuf, socket.addr);
assertEquals(sendLen, 3);

const [recvBuf, recvAddr] = await recvPromise;
assertEquals(recvBuf.length, 3);
assertEquals(1, recvBuf[0]);
assertEquals(2, recvBuf[1]);
assertEquals(3, recvBuf[2]);

socket.close();
},
);

unitTest(
{ perms: { net: true } },
async function netTcpListenIteratorBreakClosesResource(): Promise<void> {
Expand Down Expand Up @@ -385,7 +435,7 @@ unitTest(
);

unitTest(
{ ignore: Deno.build.os === "windows", perms: { net: true } },
{ perms: { net: true } },
async function netUdpListenCloseWhileIterating(): Promise<void> {
const socket = Deno.listenDatagram({ port: 8000, transport: "udp" });
const nextWhileClosing = socket[Symbol.asyncIterator]().next();
Expand Down
15 changes: 15 additions & 0 deletions core/async_cell.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.

use std::any::type_name;
use std::any::Any;
use std::borrow::Borrow;
use std::cell::Cell;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::ops::Deref;
use std::rc::Rc;

Expand Down Expand Up @@ -45,6 +49,17 @@ impl<T: 'static> AsyncRefCell<T> {
pub fn as_ptr(&self) -> *mut T {
self.value.get()
}

pub fn into_inner(self) -> T {
assert!(self.borrow_count.get().is_empty());
self.value.into_inner()
}
}

impl<T> Debug for AsyncRefCell<T> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "AsyncRefCell<{}>", type_name::<T>())
}
}

impl<T: Default + 'static> Default for AsyncRefCell<T> {
Expand Down
12 changes: 6 additions & 6 deletions core/examples/http_bench_bin_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ fn op_listen(
let std_listener = std::net::TcpListener::bind(&addr)?;
std_listener.set_nonblocking(true)?;
let listener = TcpListener::try_from(std_listener)?;
let rid = state.resource_table_2.add(listener);
let rid = state.resource_table.add(listener);
Ok(rid)
}

Expand All @@ -181,7 +181,7 @@ fn op_close(
) -> Result<u32, Error> {
debug!("close rid={}", rid);
state
.resource_table_2
.resource_table
.close(rid)
.map(|_| 0)
.ok_or_else(bad_resource_id)
Expand All @@ -196,11 +196,11 @@ async fn op_accept(

let listener = state
.borrow()
.resource_table_2
.resource_table
.get::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
let stream = listener.accept().await?;
let rid = state.borrow_mut().resource_table_2.add(stream);
let rid = state.borrow_mut().resource_table.add(stream);
Ok(rid)
}

Expand All @@ -214,7 +214,7 @@ async fn op_read(

let stream = state
.borrow()
.resource_table_2
.resource_table
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
stream.read(&mut bufs[0]).await
Expand All @@ -230,7 +230,7 @@ async fn op_write(

let stream = state
.borrow()
.resource_table_2
.resource_table
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
stream.write(&bufs[0]).await
Expand Down
12 changes: 6 additions & 6 deletions core/examples/http_bench_json_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ fn op_listen(
let std_listener = std::net::TcpListener::bind(&addr)?;
std_listener.set_nonblocking(true)?;
let listener = TcpListener::try_from(std_listener)?;
let rid = state.resource_table_2.add(listener);
let rid = state.resource_table.add(listener);
Ok(serde_json::json!({ "rid": rid }))
}

Expand All @@ -152,7 +152,7 @@ fn op_close(
.unwrap();
debug!("close rid={}", rid);
state
.resource_table_2
.resource_table
.close(rid)
.map(|_| serde_json::json!(()))
.ok_or_else(bad_resource_id)
Expand All @@ -174,11 +174,11 @@ async fn op_accept(

let listener = state
.borrow()
.resource_table_2
.resource_table
.get::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
let stream = listener.accept().await?;
let rid = state.borrow_mut().resource_table_2.add(stream);
let rid = state.borrow_mut().resource_table.add(stream);
Ok(serde_json::json!({ "rid": rid }))
}

Expand All @@ -199,7 +199,7 @@ async fn op_read(

let stream = state
.borrow()
.resource_table_2
.resource_table
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
let nread = stream.read(&mut bufs[0]).await?;
Expand All @@ -223,7 +223,7 @@ async fn op_write(

let stream = state
.borrow()
.resource_table_2
.resource_table
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
let nwritten = stream.write(&bufs[0]).await?;
Expand Down
6 changes: 2 additions & 4 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ mod normalize_path;
mod ops;
pub mod plugin_api;
mod resources;
mod resources2;
mod runtime;
mod shared_queue;
mod zero_copy_buf;
Expand Down Expand Up @@ -64,10 +63,9 @@ pub use crate::ops::OpFn;
pub use crate::ops::OpId;
pub use crate::ops::OpState;
pub use crate::ops::OpTable;
pub use crate::resources::Resource;
pub use crate::resources::ResourceId;
pub use crate::resources::ResourceTable;
pub use crate::resources2::Resource;
pub use crate::resources2::ResourceId;
pub use crate::resources2::ResourceTable2;
pub use crate::runtime::GetErrorClassFn;
pub use crate::runtime::JsErrorCreateFn;
pub use crate::runtime::JsRuntime;
Expand Down
15 changes: 10 additions & 5 deletions core/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::error::bad_resource_id;
use crate::error::type_error;
use crate::error::AnyError;
use crate::gotham_state::GothamState;
use crate::resources::ResourceTable;
use crate::runtime::GetErrorClassFn;
use crate::BufVec;
use crate::ZeroCopyBuf;
use futures::Future;
Expand Down Expand Up @@ -33,10 +35,9 @@ pub enum Op {

/// Maintains the resources and ops inside a JS runtime.
pub struct OpState {
pub resource_table: crate::ResourceTable,
pub resource_table_2: crate::resources2::ResourceTable,
pub resource_table: ResourceTable,
pub op_table: OpTable,
pub get_error_class_fn: crate::runtime::GetErrorClassFn,
pub get_error_class_fn: GetErrorClassFn,
gotham_state: GothamState,
}

Expand All @@ -47,7 +48,6 @@ impl Default for OpState {
fn default() -> OpState {
OpState {
resource_table: Default::default(),
resource_table_2: Default::default(),
op_table: OpTable::default(),
get_error_class_fn: &|_| "Error",
gotham_state: Default::default(),
Expand Down Expand Up @@ -279,7 +279,11 @@ pub fn op_resources(
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<Value, AnyError> {
let serialized_resources = state.resource_table.entries();
let serialized_resources: HashMap<u32, String> = state
.resource_table
.names()
.map(|(rid, name)| (rid, name.to_string()))
.collect();
Ok(json!(serialized_resources))
}

Expand All @@ -300,5 +304,6 @@ pub fn op_close(
.resource_table
.close(rid as u32)
.ok_or_else(bad_resource_id)?;

Ok(json!({}))
}
Loading

0 comments on commit 6984b63

Please sign in to comment.