You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I was working on a PR to node to improve readline async iterator performance (nodejs/node#41276) and it ended up, after some reviews, changing the function events.on, that transforms an event emitting into an async iterator, almost like this library, but the native function doesn't support closing events to end the iterator.
The changes I did there, though, can bring an improvement up to 50% to the function performance, If the PR is accepted. So, as the new version of the function will do pretty much what this package does, I'm posting the code here if you want to adapt it onto your package. The main ideas behind were not mine, but preexisting in the node code. My main contribution was to use the FixedQueue, adding watermark control and support for close events. Here it is:
/** * Returns an `AsyncIterator` that iterates `event` events. * @param {EventEmitter} emitter * @param {string | symbol} event * @param {{ * signal: AbortSignal; * close?: string[]; * highWatermark?: number, * lowWatermark?: number * }} [options] * @returns {AsyncIterator} */functionon(emitter,event,options={}){// Parameters validationconstsignal=options.signal;validateAbortSignal(signal,'options.signal');if(signal?.aborted)thrownewAbortError(undefined,{cause: signal?.reason});consthighWatermark=options.highWatermark||NumberMAX_SAFE_INTEGER;validateInteger(highWatermark,'options.highWatermark',1);constlowWatermark=options.lowWatermark||1;validateInteger(lowWatermark,'options.lowWatermark',1);// Preparing controlling queues and variablesif(!FixedQueue)FixedQueue=require('internal/fixed_queue');constunconsumedEvents=newFixedQueue();constunconsumedPromises=newFixedQueue();letpaused=false;leterror=null;letfinished=false;letsize=0;constiterator=ObjectSetPrototypeOf({next(){// First, we consume all unread eventsif(size){constvalue=unconsumedEvents.shift();size--;if(paused&&size<lowWatermark){emitter.resume();paused=false;}returnvalue;}// Then we error, if an error happened// This happens one time if at all, because after 'error'// we stop listeningif(error){constp=PromiseReject(error);// Only the first element errorserror=null;returnp;}// If the iterator is finished, resolve to doneif(finished)returncloseHandler();// Wait until an event happensreturnnewPromise(function(resolve,reject){unconsumedPromises.push({ resolve, reject });});},return(){returncloseHandler();},throw(err){if(!err||!(errinstanceofError)){thrownewERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator','Error',err);}errorHandler(err);},[kWatermarkData]: {/** * The current queue size */getsize(){returnsize;},/** * The low watermark. The emitter is resumed every time size is lower than it */getlow(){returnlowWatermark;},/** * The high watermark. The emitter is paused every time size is higher than it */gethigh(){returnhighWatermark;},/** * It checks wether the emitter is paused by the watermark controller or not */getisPaused(){returnpaused;}},},AsyncIteratorPrototype);// Adding event handlersconst{ addEventListener, removeAll }=listenersController();if(!kFirstEventParam)kFirstEventParam=require('internal/events/symbols').kFirstEventParam;addEventListener(emitter,event,options[kFirstEventParam] ? eventHandler : function(...args){returneventHandler(args);});if(event!=='error'&&typeofemitter.on==='function'){addEventListener(emitter,'error',errorHandler);}constcloseEvents=options?.close;if(closeEvents&&closeEvents.length){for(leti=0;i<closeEvents.length;i++){addEventListener(emitter,closeEvents[i],closeHandler);}}if(signal){addEventListener(signal,'abort',abortListener,{once: true});}returniterator;functionabortListener(){errorHandler(newAbortError(undefined,{cause: signal?.reason}));}functioneventHandler(value){constarg=createIterResult(value,false);if(unconsumedPromises.isEmpty()){size++;if(!paused&&size>highWatermark){paused=true;emitter.pause();}unconsumedEvents.push(arg);}elseunconsumedPromises.shift().resolve(arg);}functionerrorHandler(err){if(unconsumedPromises.isEmpty())error=err;elseunconsumedPromises.shift().reject(err);closeHandler();}functioncloseHandler(){removeAll();finished=true;constdoneResult=createIterResult(undefined,true);while(!unconsumedPromises.isEmpty()){unconsumedPromises.shift().resolve(doneResult);}returnPromiseResolve(doneResult);}}
The FixedQueue code isn't exposed on nodejs, but you can find it here. This is a pretty smart implementation of a queue that takes advantage of how v8 manages memory, and it really improved the performance big time!
The text was updated successfully, but these errors were encountered:
I was working on a PR to node to improve readline async iterator performance (nodejs/node#41276) and it ended up, after some reviews, changing the function events.on, that transforms an event emitting into an async iterator, almost like this library, but the native function doesn't support closing events to end the iterator.
The changes I did there, though, can bring an improvement up to 50% to the function performance, If the PR is accepted. So, as the new version of the function will do pretty much what this package does, I'm posting the code here if you want to adapt it onto your package. The main ideas behind were not mine, but preexisting in the node code. My main contribution was to use the FixedQueue, adding watermark control and support for close events. Here it is:
The FixedQueue code isn't exposed on nodejs, but you can find it here. This is a pretty smart implementation of a queue that takes advantage of how v8 manages memory, and it really improved the performance big time!
The text was updated successfully, but these errors were encountered: