Skip to content

Commit

Permalink
Basic WebWorker implementation.
Browse files Browse the repository at this point in the history
Only accessible from Rust currently.
  • Loading branch information
ry committed Dec 12, 2018
1 parent 65dd0d5 commit 95c6b40
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 43 deletions.
1 change: 1 addition & 0 deletions BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ ts_sources = [
"js/types.ts",
"js/url_search_params.ts",
"js/util.ts",
"js/workers.ts",
"js/write_file.ts",
"tsconfig.json",

Expand Down
6 changes: 6 additions & 0 deletions js/globals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import * as headers from "./headers";
import * as textEncoding from "./text_encoding";
import * as timers from "./timers";
import * as urlSearchParams from "./url_search_params";
import * as workers from "./workers";

// These imports are not exposed and therefore are fine to just import the
// symbols required.
Expand Down Expand Up @@ -71,3 +72,8 @@ window.TextEncoder = textEncoding.TextEncoder;
export type TextEncoder = textEncoding.TextEncoder;
window.TextDecoder = textEncoding.TextDecoder;
export type TextDecoder = textEncoding.TextDecoder;

window.workerMain = workers.workerMain;
// TODO These shouldn't be available in main isolate.
window.postMessage = workers.postMessage;
window.close = workers.workerClose;
72 changes: 72 additions & 0 deletions js/workers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import * as dispatch from "./dispatch";
import * as msg from "gen/msg_generated";
import * as flatbuffers from "./flatbuffers";
import { window } from "./globals";
import { libdeno } from "./libdeno";
import { assert, log, setLogDebug } from "./util";

export async function postMessage(data: Uint8Array): Promise<void> {
const builder = flatbuffers.createBuilder();
msg.WorkerPostMessage.startWorkerPostMessage(builder);
const inner = msg.WorkerPostMessage.endWorkerPostMessage(builder);
const baseRes = await dispatch.sendAsync(
builder,
msg.Any.WorkerPostMessage,
inner,
data
);
assert(baseRes != null);
}

export async function getMessage(): Promise<null | Uint8Array> {
log("getMessage");
// Send CodeFetch message
const builder = flatbuffers.createBuilder();
msg.WorkerGetMessage.startWorkerGetMessage(builder);
const inner = msg.WorkerGetMessage.endWorkerGetMessage(builder);
const baseRes = await dispatch.sendAsync(
builder,
msg.Any.WorkerGetMessage,
inner
);
assert(baseRes != null);
assert(
msg.Any.WorkerGetMessageRes === baseRes!.innerType(),
`base.innerType() unexpectedly is ${baseRes!.innerType()}`
);
const res = new msg.WorkerGetMessageRes();
assert(baseRes!.inner(res) != null);

const dataArray = res.dataArray();
if (dataArray == null) {
return null;
} else {
return new Uint8Array(dataArray!);
}
}

let isClosing = false;

export function workerClose(): void {
isClosing = true;
}

export async function workerMain() {
libdeno.recv(dispatch.handleAsyncMsgFromRust);
setLogDebug(true);
log("workerMain");

while (!isClosing) {
let data = await getMessage();
if (data == null) {
log("workerMain got null message. quitting.");
break;
}
if (window["onmessage"]) {
let event = { data };
window.onmessage(event);
} else {
break;
}
}
}
21 changes: 17 additions & 4 deletions src/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use js_errors::JSError;
use libdeno;
use permissions::DenoPermissions;

use futures::sync::mpsc as async_mpsc;
use futures::Future;
use libc::c_void;
use std;
Expand All @@ -22,6 +23,7 @@ use std::ffi::CString;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use tokio;
Expand Down Expand Up @@ -52,6 +54,10 @@ pub struct Isolate {
pub state: Arc<IsolateState>,
}

pub type WorkerSender = async_mpsc::Sender<Buf>;
pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
pub type WorkerChannels = (WorkerSender, WorkerReceiver);

// Isolate cannot be passed between threads but IsolateState can.
// IsolateState satisfies Send and Sync.
// So any state that needs to be accessed outside the main V8 thread should be
Expand All @@ -63,17 +69,24 @@ pub struct IsolateState {
pub permissions: DenoPermissions,
pub flags: flags::DenoFlags,
pub metrics: Metrics,
pub worker_channels: Option<Mutex<WorkerChannels>>,
}

impl IsolateState {
pub fn new(flags: flags::DenoFlags, argv_rest: Vec<String>) -> Self {
pub fn new(
flags: flags::DenoFlags,
argv_rest: Vec<String>,
worker_channels: Option<WorkerChannels>,
) -> Self {
let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok();

Self {
dir: deno_dir::DenoDir::new(flags.reload, custom_root).unwrap(),
argv: argv_rest,
permissions: DenoPermissions::new(&flags),
flags,
metrics: Metrics::default(),
worker_channels: worker_channels.map(|wc| Mutex::new(wc)),
}
}

Expand Down Expand Up @@ -397,7 +410,7 @@ mod tests {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();

let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = Arc::new(IsolateState::new(flags, rest_argv, None));
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, dispatch_sync);
tokio_util::init(|| {
Expand Down Expand Up @@ -438,7 +451,7 @@ mod tests {
fn test_metrics_sync() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = Arc::new(IsolateState::new(flags, rest_argv, None));
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, metrics_dispatch_sync);
tokio_util::init(|| {
Expand Down Expand Up @@ -474,7 +487,7 @@ mod tests {
fn test_metrics_async() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = Arc::new(IsolateState::new(flags, rest_argv, None));
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, metrics_dispatch_async);
tokio_util::init(|| {
Expand Down
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod snapshot;
mod tokio_util;
mod tokio_write;
pub mod version;
pub mod workers;

#[cfg(unix)]
mod eager_unix;
Expand Down Expand Up @@ -96,7 +97,7 @@ fn main() {
log::LevelFilter::Info
});

let state = Arc::new(isolate::IsolateState::new(flags, rest_argv));
let state = Arc::new(isolate::IsolateState::new(flags, rest_argv, None));
let snapshot = snapshot::deno_snapshot();
let isolate = isolate::Isolate::new(snapshot, state, ops::dispatch);
tokio_util::init(|| {
Expand Down
15 changes: 15 additions & 0 deletions src/msg.fbs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
union Any {
Start,
StartRes,
WorkerGetMessage,
WorkerGetMessageRes,
WorkerPostMessage,
CodeFetch,
CodeFetchRes,
CodeCache,
Expand Down Expand Up @@ -145,6 +148,18 @@ table StartRes {
v8_version: string;
}

table WorkerGetMessage {
unused: int8;
}

table WorkerGetMessageRes {
data: [ubyte];
}

table WorkerPostMessage {
// data passed thru the zero-copy data parameter.
}

table CodeFetch {
specifier: string;
referrer: string;
Expand Down
Loading

0 comments on commit 95c6b40

Please sign in to comment.