Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

std/ws Add Async WebSocket API wrapper #7450

Closed
wants to merge 2 commits into from

Conversation

crowlKats
Copy link
Member

@crowlKats crowlKats commented Sep 13, 2020

This would add an async wrapper around the WebSocket API.
There are enough usecases where this would make more sense than using the WebSocket API directly, which is callback based.
Would this be something wanted in std? if not, i shall move it to /x/.

@crowlKats crowlKats changed the title Add Async WebSocket API wrapper std/ws Add Async WebSocket API wrapper Sep 13, 2020
@caspervonb
Copy link
Contributor

caspervonb commented Sep 14, 2020

I'm very much for an async interface but I'm wondering if we should be pushing for WebSocketStream instead 🤔

Chrome only interface at the moment as far as I know.

@kryptish
Copy link

kryptish commented Sep 17, 2020

Actually funny, found this by accident in the process of trying to debug the issue I outlined in the comments of #7457... I kinda did something similar, but without relying on the Streams API like you did (and I like it very much, way more elegant =D), basically an async iterable wrapper for a regular WebSocket, just sharing:

function WebsocketIterable(socket) {
	let done = false;
	const values = [];
	const resolvers = [];

	const close = () => {
		done = true;
		while (resolvers.length > 0) {
			(resolvers.shift())({ value: undefined, done: true });
		}
	};

	const push = (data) => {
		if ( done ) { return; }
		if ( resolvers.length > 0 ) {
			(resolvers.shift())(data);
		} else {
			values.push(data);
		}
	}

	const listen = () => {
		socket.addEventListener('open', (e) => push({ value: e, done: false }));
		socket.addEventListener('message', (e) => push({ value: e, done: false }));
		socket.addEventListener('close', (e) => {
			push({ value: e, done: false });
			close();
		});
		socket.addEventListener('error', (err) => { 
			push({ then: (resolve, reject) => reject(err) });
			close('from error');
		});
	};

	const iterator = {
		next: () => {
			if ( values.length > 0 ) { return Promise.resolve(values.shift()); }
			if ( done ) { return Promise.resolve({ value: undefined, done: true }); }
			
			return new Promise(resolve => resolvers.push(resolve));
		},
		throw: async (value) => {
			// pushing a value basically like Promise.reject but it won't throw unhandled
			// Promise rejection errors so they can be handled later on in the code
			// we do the same for the error event listener
			push({ then: (resolve, reject) => reject(value) });

			if ( socket.readyState === WebSocket.OPEN ) {
				socket.close();
			} else {
				close();
			}
			return iterator.next();
		},
		return: async () => {			
			if ( socket.readyState === WebSocket.OPEN ) {
				socket.close();
			} else {
				close();
			}
			return iterator.next();
		},
		[Symbol.asyncIterator]: () => iterator,
	}

	listen();

	return iterator;
};

pretty simple to use (and could probably refactored into a Class like yours):

const socket = WebsocketIterable(new WebSocket('<yourwsurl>'));

for await (const ev of socket) {
    // do your thing
}

EDIT: @crowlKats how does your solution deal with backpressure, or better: how does TransformStream deal with it? I assume, my async iterator solution would basically create a memory-leak if not consumed right? because the WebSocket keeps generating new events, which are pushed into the queue but if there is no for await loop consuming the iterator, data would accumulate until it crashes...would that not happen with your solution (if not consumed/read)? Just learning... thx for the patience... oh and when we're on it: Is there a way to somehow monitor the memory consumption of a script executed trough deno from within the code?

EDIT2: After some more digging I believe that either solution (e.g. my async iterator thingy and your TransformStream Class) will cause memory leaks if not consumed, based on the push nature of WebSocket (e.g. no way of telling the server to pause sending data), guess we need to wait for those hypothetical backpressure sockets: https://streams.spec.whatwg.org/#example-rs-push-backpressure =)

@bartlomieju
Copy link
Member

bartlomieju commented Oct 1, 2020

@kitsonk what's your opinion on this one? Especially using WebSocketStream instead of custom solution.

@crowlKats
Copy link
Member Author

I'd be in favour of WebSocketStream, just there isnt a proper spec yet. i would do the same we did with the WS API and have a temporary solution

@bartlomieju
Copy link
Member

@crowlKats I think this module should start as third party module in deno.land/x/ - if there's a lot of interest then we can move it to std/`.

@crowlKats
Copy link
Member Author

@bartlomieju alright.
i have been looking into websocketstream as i would rather have that than this, only trouble i have with it is that there is no spec whatsoever, so implementing it would be just guesswork from the explainer.

@crowlKats crowlKats closed this Nov 9, 2020
@crowlKats crowlKats deleted the asyncwebsocket_wrapper branch November 9, 2020 23:45
@crowlKats crowlKats mentioned this pull request Dec 19, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants