Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker v2 #200

Merged
merged 34 commits into from
Jun 23, 2022
Merged
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
435f9d6
Move messages to a different file.
futursolo Mar 17, 2022
3845128
Remove WorkerLink.
futursolo Mar 17, 2022
ea8e5ec
Move WorkerExt to a different file.
futursolo Mar 17, 2022
7abcaf2
Bridge, Spawner and Registrar.
futursolo Mar 17, 2022
fb8cf16
Implement Spawner.
futursolo Mar 18, 2022
ff774ff
Remove existing implementation.
futursolo Mar 18, 2022
defbbba
Remove more unused code.
futursolo Mar 18, 2022
950060e
Remove more unused code.
futursolo Mar 18, 2022
e3aebf5
Better naming.
futursolo Mar 18, 2022
2845afa
worker_ext -> native_worker.
futursolo Mar 19, 2022
a3eb5b9
Registers the first callback as well.
futursolo Mar 19, 2022
74d5d1a
Fix exports.
futursolo Mar 19, 2022
a2ba75f
Less worker prefixes.
futursolo Mar 19, 2022
d26e873
Restore Worker Prefix, PartialEq on WorkerBridge.
futursolo Mar 20, 2022
19974b7
Introduce a method to delay worker shutdown.
futursolo Mar 22, 2022
f292201
Fix message handling.
futursolo Jun 12, 2022
3130340
Merge branch 'master' into worker-v2
futursolo Jun 12, 2022
64a08e0
Fix URL handling.
futursolo Jun 12, 2022
3af881f
Name WorkerScope scope.
futursolo Jun 12, 2022
8ee5efa
Add an example.
futursolo Jun 13, 2022
891ced8
Add test.
futursolo Jun 13, 2022
1d16320
Cargo fmt.
futursolo Jun 13, 2022
c5a0bd6
Swappable Encoding.
futursolo Jun 14, 2022
a3dbd0e
Adds an encoding setter for Registrar as well.
futursolo Jun 15, 2022
1bbeff4
Adjust the worker signature so it matches Component v2.
futursolo Jun 15, 2022
6843d99
Workers should continue to receive to messages.
futursolo Jun 15, 2022
3952ebb
More reliable destroy behaviour.
futursolo Jun 15, 2022
75402be
Adjust documentation & Destroy handle.
futursolo Jun 16, 2022
ae2435b
Export worker registrar.
futursolo Jun 16, 2022
fa79c2c
Avoid panicking after worker is destroyed.
futursolo Jun 16, 2022
cc6968f
Adjust documentation.
futursolo Jun 16, 2022
23a1e27
Make Registrable and Spawnable type configurable.
futursolo Jun 16, 2022
a2101e1
Merge branch 'master' into worker-v2
futursolo Jun 17, 2022
9a7b422
Instructions.
futursolo Jun 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -49,6 +49,11 @@ jobs:
- name: Install wasm-pack
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh

- name: Setup trunk
uses: jetli/trunk-action@v0.1.0
with:
version: 'latest'

- uses: actions/cache@v2
with:
path: |
@@ -71,6 +76,14 @@ jobs:
wasm-pack test --headless --firefox --chrome crates/$x --no-default-features
done

- name: Run tests for gloo worker
run: |
trunk build examples/markdown/index.html
nohup cargo run -p example-markdown --bin example_markdown_test_server -- examples/markdown/dist &

wasm-pack test --headless --firefox --chrome examples/markdown


test-net:
name: Test gloo-net
runs-on: ubuntu-latest
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@
**/*.rs.bk
Cargo.lock

examples/*/dist

# editor configs
.vscode
.idea
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -44,4 +44,7 @@ members = [
"crates/history",
"crates/worker",
"crates/net",

"examples/markdown",
"examples/clock",
]
5 changes: 2 additions & 3 deletions crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -23,9 +23,8 @@ gloo-console = { path = "../console", version = "0.2" }
gloo-utils = { path = "../utils", version = "0.1" }
js-sys = "0.3"
serde = { version = "1", features = ["derive"] }
slab = "0.4"
wasm-bindgen = "0.2"
wasm-bindgen-futures = { version = "0.4", optional = true }
wasm-bindgen-futures = { version = "0.4" }

[dependencies.web-sys]
version = "0.3"
@@ -41,4 +40,4 @@ features = [

[features]
default = []
futures = ["wasm-bindgen-futures"]
futures = []
179 changes: 179 additions & 0 deletions crates/worker/src/bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt;
use std::marker::PhantomData;
use std::rc::Rc;
use std::rc::Weak;

use crate::codec::Codec;
use crate::handler_id::HandlerId;
use crate::messages::ToWorker;
use crate::native_worker::NativeWorkerExt;
use crate::traits::Worker;
use crate::{Callback, Shared};

pub(crate) type ToWorkerQueue<W> = Vec<ToWorker<W>>;
pub(crate) type CallbackMap<W> = HashMap<HandlerId, Weak<dyn Fn(<W as Worker>::Output)>>;

struct WorkerBridgeInner<W>
where
W: Worker,
{
// When worker is loaded, queue becomes None.
pending_queue: Shared<Option<ToWorkerQueue<W>>>,
callbacks: Shared<CallbackMap<W>>,
post_msg: Rc<dyn Fn(ToWorker<W>)>,
}

impl<W> fmt::Debug for WorkerBridgeInner<W>
where
W: Worker,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("WorkerBridgeInner<_>")
}
}

impl<W> WorkerBridgeInner<W>
where
W: Worker,
{
/// Send a message to the worker, queuing the message if necessary
fn send_message(&self, msg: ToWorker<W>) {
let mut pending_queue = self.pending_queue.borrow_mut();

match pending_queue.as_mut() {
Some(m) => {
m.push(msg);
}
None => {
(self.post_msg)(msg);
}
}
}
}

impl<W> Drop for WorkerBridgeInner<W>
where
W: Worker,
{
fn drop(&mut self) {
let destroy = ToWorker::Destroy;
self.send_message(destroy);
}
}

/// A connection manager for components interaction with workers.
pub struct WorkerBridge<W>
where
W: Worker,
{
inner: Rc<WorkerBridgeInner<W>>,
id: HandlerId,
_worker: PhantomData<W>,
cb: Option<Rc<dyn Fn(W::Output)>>,
}

impl<W> WorkerBridge<W>
where
W: Worker,
{
pub(crate) fn new<CODEC>(
id: HandlerId,
native_worker: web_sys::Worker,
pending_queue: Rc<RefCell<Option<ToWorkerQueue<W>>>>,
callbacks: Rc<RefCell<CallbackMap<W>>>,
callback: Option<Callback<W::Output>>,
) -> Self
where
CODEC: Codec,
{
let post_msg =
{ move |msg: ToWorker<W>| native_worker.post_packed_message::<_, CODEC>(msg) };

Self {
inner: WorkerBridgeInner {
pending_queue,
callbacks,
post_msg: Rc::new(post_msg),
}
.into(),
id,
_worker: PhantomData,
cb: callback,
}
}

/// Send a message to the current worker.
pub fn send(&self, msg: W::Input) {
let msg = ToWorker::ProcessInput(self.id, msg);
self.inner.send_message(msg);
}

/// Forks the bridge with a different callback.
///
/// This creates a new [HandlerID] that helps the worker to differentiate bridges.
pub fn fork<F>(&self, cb: Option<F>) -> Self
where
F: 'static + Fn(W::Output),
{
let cb = cb.map(|m| Rc::new(m) as Rc<dyn Fn(W::Output)>);
let handler_id = HandlerId::new();

if let Some(cb_weak) = cb.as_ref().map(Rc::downgrade) {
self.inner
.callbacks
.borrow_mut()
.insert(handler_id, cb_weak);
}

Self {
inner: self.inner.clone(),
id: handler_id,
_worker: PhantomData,
cb,
}
}
}

impl<W> Clone for WorkerBridge<W>
where
W: Worker,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
id: self.id,
_worker: PhantomData,
cb: self.cb.clone(),
}
}
}

impl<W> Drop for WorkerBridge<W>
where
W: Worker,
{
fn drop(&mut self) {
let disconnected = ToWorker::Disconnected(self.id);
self.inner.send_message(disconnected);
}
}

impl<W> fmt::Debug for WorkerBridge<W>
where
W: Worker,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("WorkerBridge<_>")
}
}

impl<W> PartialEq for WorkerBridge<W>
where
W: Worker,
{
fn eq(&self, rhs: &Self) -> bool {
self.id == rhs.id
}
}
38 changes: 38 additions & 0 deletions crates/worker/src/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use js_sys::Uint8Array;
use serde::{Deserialize, Serialize};
use wasm_bindgen::JsValue;

/// Message Encoding and Decoding Format
pub trait Codec {
/// Encode an input to JsValue
fn encode<I>(input: I) -> JsValue
where
I: Serialize;

/// Decode a message to a type
fn decode<O>(input: JsValue) -> O
where
O: for<'de> Deserialize<'de>;
}

/// Default message encoding with [bincode].
#[derive(Debug)]
pub struct Bincode;

impl Codec for Bincode {
fn encode<I>(input: I) -> JsValue
where
I: Serialize,
{
let buf = bincode::serialize(&input).expect("can't serialize an worker message");
Uint8Array::from(buf.as_slice()).into()
}

fn decode<O>(input: JsValue) -> O
where
O: for<'de> Deserialize<'de>,
{
let data = Uint8Array::from(input).to_vec();
bincode::deserialize(&data).expect("can't deserialize an worker message")
}
}
17 changes: 17 additions & 0 deletions crates/worker/src/handler_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::sync::atomic::{AtomicUsize, Ordering};

use serde::{Deserialize, Serialize};

/// Identifier to send output to bridges.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)]
pub struct HandlerId(usize);

impl HandlerId {
pub(crate) fn new() -> Self {
static CTR: AtomicUsize = AtomicUsize::new(0);

let id = CTR.fetch_add(1, Ordering::SeqCst);

HandlerId(id)
}
}
Loading