diff --git a/lib/timers.js b/lib/timers.js index de0c2689c5ccc2..5a9073756fc1fe 100644 --- a/lib/timers.js +++ b/lib/timers.js @@ -50,7 +50,11 @@ const { kInit, kDestroy, kAsyncIdCounter } = async_wrap.constants; const async_id_symbol = Symbol('asyncId'); const trigger_async_id_symbol = Symbol('triggerAsyncId'); -const [activateImmediateCheck, scheduledImmediateCountArray] = +// *Must* match Environment::ImmediateInfo::Fields in src/env.h. +const kCount = 0; +const kHasOutstanding = 1; + +const [activateImmediateCheck, immediateInfo] = setImmediateCallback(processImmediate); // Timeout values > TIMEOUT_MAX are set to 1. @@ -674,16 +678,23 @@ ImmediateList.prototype.remove = function(item) { }; // Create a single linked list instance only once at startup -var immediateQueue = new ImmediateList(); +const immediateQueue = new ImmediateList(); + +// If an uncaught exception was thrown during execution of immediateQueue, +// this queue will store all remaining Immediates that need to run upon +// resolution of all error handling (if process is still alive). +const outstandingQueue = new ImmediateList(); function processImmediate() { - var immediate = immediateQueue.head; - var tail = immediateQueue.tail; + const queue = outstandingQueue.head !== null ? + outstandingQueue : immediateQueue; + var immediate = queue.head; + var tail = queue.tail; // Clear the linked list early in case new `setImmediate()` calls occur while // immediate callbacks are executed - immediateQueue.head = immediateQueue.tail = null; + queue.head = queue.tail = null; while (immediate !== null) { if (!immediate._onImmediate) { @@ -692,9 +703,14 @@ function processImmediate() { } // Save next in case `clearImmediate(immediate)` is called from callback - var next = immediate._idleNext; + const next = immediate._idleNext; + + const asyncId = immediate[async_id_symbol]; + emitBefore(asyncId, immediate[trigger_async_id_symbol]); - tryOnImmediate(immediate, tail); + tryOnImmediate(immediate, next, tail); + + emitAfter(asyncId); // If `clearImmediate(immediate)` wasn't called from the callback, use the // `immediate`'s next item @@ -703,45 +719,36 @@ function processImmediate() { else immediate = next; } + + immediateInfo[kHasOutstanding] = 0; } // An optimization so that the try/finally only de-optimizes (since at least v8 // 4.7) what is in this smaller function. -function tryOnImmediate(immediate, oldTail) { +function tryOnImmediate(immediate, next, oldTail) { var threw = true; - emitBefore(immediate[async_id_symbol], immediate[trigger_async_id_symbol]); try { // make the actual call outside the try/finally to allow it to be optimized runCallback(immediate); threw = false; } finally { immediate._onImmediate = null; - if (!threw) - emitAfter(immediate[async_id_symbol]); if (!immediate._destroyed) { immediate._destroyed = true; - scheduledImmediateCountArray[0]--; + immediateInfo[kCount]--; if (async_hook_fields[kDestroy] > 0) { emitDestroy(immediate[async_id_symbol]); } } - if (threw && immediate._idleNext !== null) { - // Handle any remaining on next tick, assuming we're still alive to do so. - const curHead = immediateQueue.head; - const next = immediate._idleNext; - if (curHead !== null) { - curHead._idlePrev = oldTail; - oldTail._idleNext = curHead; - next._idlePrev = null; - immediateQueue.head = next; - } else { - immediateQueue.head = next; - immediateQueue.tail = oldTail; - } - process.nextTick(processImmediate); + if (threw && (immediate._idleNext !== null || next !== null)) { + // Handle any remaining Immediates after error handling has resolved, + // assuming we're still alive to do so. + outstandingQueue.head = immediate._idleNext || next; + outstandingQueue.tail = oldTail; + immediateInfo[kHasOutstanding] = 1; } } } @@ -775,9 +782,9 @@ function Immediate(callback, args) { this); } - if (scheduledImmediateCountArray[0] === 0) + if (immediateInfo[kCount] === 0) activateImmediateCheck(); - scheduledImmediateCountArray[0]++; + immediateInfo[kCount]++; immediateQueue.append(this); } @@ -823,7 +830,7 @@ exports.clearImmediate = function(immediate) { if (!immediate) return; if (!immediate._destroyed) { - scheduledImmediateCountArray[0]--; + immediateInfo[kCount]--; immediate._destroyed = true; if (async_hook_fields[kDestroy] > 0) { diff --git a/src/env-inl.h b/src/env-inl.h index a4a27cbfe69ff8..0328f058de3ab0 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -217,6 +217,30 @@ inline bool Environment::AsyncCallbackScope::in_makecallback() const { return env_->makecallback_cntr_ > 1; } +inline Environment::ImmediateInfo::ImmediateInfo(v8::Isolate* isolate) + : fields_(isolate, kFieldsCount) {} + +inline AliasedBuffer& + Environment::ImmediateInfo::fields() { + return fields_; +} + +inline uint32_t Environment::ImmediateInfo::count() const { + return fields_[kCount]; +} + +inline bool Environment::ImmediateInfo::has_outstanding() const { + return fields_[kHasOutstanding] == 1; +} + +inline void Environment::ImmediateInfo::count_inc(uint32_t increment) { + fields_[kCount] = fields_[kCount] + increment; +} + +inline void Environment::ImmediateInfo::count_dec(uint32_t decrement) { + fields_[kCount] = fields_[kCount] - decrement; +} + inline Environment::TickInfo::TickInfo(v8::Isolate* isolate) : fields_(isolate, kFieldsCount) {} @@ -263,6 +287,7 @@ inline Environment::Environment(IsolateData* isolate_data, v8::Local context) : isolate_(context->GetIsolate()), isolate_data_(isolate_data), + immediate_info_(context->GetIsolate()), tick_info_(context->GetIsolate()), timer_base_(uv_now(isolate_data->event_loop())), printed_error_(false), @@ -270,7 +295,6 @@ inline Environment::Environment(IsolateData* isolate_data, abort_on_uncaught_exception_(false), emit_napi_warning_(true), makecallback_cntr_(0), - scheduled_immediate_count_(isolate_, 1), should_abort_on_uncaught_toggle_(isolate_, 1), #if HAVE_INSPECTOR inspector_agent_(new inspector::Agent(this)), @@ -357,6 +381,10 @@ inline Environment::AsyncHooks* Environment::async_hooks() { return &async_hooks_; } +inline Environment::ImmediateInfo* Environment::immediate_info() { + return &immediate_info_; +} + inline Environment::TickInfo* Environment::tick_info() { return &tick_info_; } @@ -486,11 +514,6 @@ inline void Environment::set_fs_stats_field_array(double* fields) { fs_stats_field_array_ = fields; } -inline AliasedBuffer& -Environment::scheduled_immediate_count() { - return scheduled_immediate_count_; -} - void Environment::SetImmediate(native_immediate_callback cb, void* data, v8::Local obj) { @@ -500,9 +523,9 @@ void Environment::SetImmediate(native_immediate_callback cb, std::unique_ptr>( obj.IsEmpty() ? nullptr : new v8::Persistent(isolate_, obj)) }); - if (scheduled_immediate_count_[0] == 0) + if (immediate_info()->count() == 0) ActivateImmediateCheck(); - scheduled_immediate_count_[0] = scheduled_immediate_count_[0] + 1; + immediate_info()->count_inc(1); } inline performance::performance_state* Environment::performance_state() { diff --git a/src/env.cc b/src/env.cc index 67a6f2c3d46fd3..e4afb35786e2af 100644 --- a/src/env.cc +++ b/src/env.cc @@ -281,14 +281,14 @@ void Environment::RunAndClearNativeImmediates() { } #ifdef DEBUG - CHECK_GE(scheduled_immediate_count_[0], count); + CHECK_GE(immediate_info()->count(), count); #endif - scheduled_immediate_count_[0] = scheduled_immediate_count_[0] - count; + immediate_info()->count_dec(count); } } static bool MaybeStopImmediate(Environment* env) { - if (env->scheduled_immediate_count()[0] == 0) { + if (env->immediate_info()->count() == 0) { uv_check_stop(env->immediate_check_handle()); uv_idle_stop(env->immediate_idle_handle()); return true; @@ -307,12 +307,14 @@ void Environment::CheckImmediate(uv_check_t* handle) { env->RunAndClearNativeImmediates(); - MakeCallback(env->isolate(), - env->process_object(), - env->immediate_callback_function(), - 0, - nullptr, - {0, 0}).ToLocalChecked(); + do { + MakeCallback(env->isolate(), + env->process_object(), + env->immediate_callback_function(), + 0, + nullptr, + {0, 0}).ToLocalChecked(); + } while (env->immediate_info()->has_outstanding()); MaybeStopImmediate(env); } diff --git a/src/env.h b/src/env.h index d8012f7bcba762..b295347a9ae334 100644 --- a/src/env.h +++ b/src/env.h @@ -451,6 +451,30 @@ class Environment { DISALLOW_COPY_AND_ASSIGN(AsyncCallbackScope); }; + class ImmediateInfo { + public: + inline AliasedBuffer& fields(); + inline uint32_t count() const; + inline bool has_outstanding() const; + + inline void count_inc(uint32_t increment); + inline void count_dec(uint32_t decrement); + + private: + friend class Environment; // So we can call the constructor. + inline explicit ImmediateInfo(v8::Isolate* isolate); + + enum Fields { + kCount, + kHasOutstanding, + kFieldsCount + }; + + AliasedBuffer fields_; + + DISALLOW_COPY_AND_ASSIGN(ImmediateInfo); + }; + class TickInfo { public: inline AliasedBuffer& fields(); @@ -530,6 +554,7 @@ class Environment { inline void FinishHandleCleanup(uv_handle_t* handle); inline AsyncHooks* async_hooks(); + inline ImmediateInfo* immediate_info(); inline TickInfo* tick_info(); inline uint64_t timer_base() const; @@ -577,8 +602,6 @@ class Environment { inline double* fs_stats_field_array() const; inline void set_fs_stats_field_array(double* fields); - inline AliasedBuffer& scheduled_immediate_count(); - inline performance::performance_state* performance_state(); inline std::map* performance_marks(); @@ -686,6 +709,7 @@ class Environment { uv_check_t idle_check_handle_; AsyncHooks async_hooks_; + ImmediateInfo immediate_info_; TickInfo tick_info_; const uint64_t timer_base_; bool printed_error_; @@ -695,7 +719,6 @@ class Environment { size_t makecallback_cntr_; std::vector destroy_async_id_list_; - AliasedBuffer scheduled_immediate_count_; AliasedBuffer should_abort_on_uncaught_toggle_; int should_not_abort_scope_counter_ = 0; diff --git a/src/timer_wrap.cc b/src/timer_wrap.cc index 5c3f499d163005..ab450fcb3e7346 100644 --- a/src/timer_wrap.cc +++ b/src/timer_wrap.cc @@ -90,8 +90,9 @@ class TimerWrap : public HandleWrap { env->NewFunctionTemplate(activate_cb)->GetFunction(env->context()) .ToLocalChecked(); auto result = Array::New(env->isolate(), 2); - result->Set(0, activate_function); - result->Set(1, env->scheduled_immediate_count().GetJSArray()); + result->Set(env->context(), 0, activate_function).FromJust(); + result->Set(env->context(), 1, + env->immediate_info()->fields().GetJSArray()).FromJust(); args.GetReturnValue().Set(result); } diff --git a/test/parallel/test-timers-immediate-queue-throw.js b/test/parallel/test-timers-immediate-queue-throw.js new file mode 100644 index 00000000000000..9929b27ab2fea3 --- /dev/null +++ b/test/parallel/test-timers-immediate-queue-throw.js @@ -0,0 +1,53 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const domain = require('domain'); + +// setImmediate should run clear its queued cbs once per event loop turn +// but immediates queued while processing the current queue should happen +// on the next turn of the event loop. + +// In addition, if any setImmediate throws, the rest of the queue should +// be processed after all error handling is resolved, but that queue +// should not include any setImmediate calls scheduled after the +// processing of the queue started. + +let threw = false; +let stage = -1; + +const QUEUE = 10; + +const errObj = { + type: Error, + message: 'setImmediate Err' +}; + +process.once('uncaughtException', common.expectsError(errObj)); +process.once('uncaughtException', () => assert.strictEqual(stage, 0)); + +const d1 = domain.create(); +d1.once('error', common.expectsError(errObj)); +d1.once('error', () => assert.strictEqual(stage, 0)); + +const run = common.mustCall((callStage) => { + assert(callStage >= stage); + stage = callStage; + if (threw) + return; + + setImmediate(run, 2); +}, QUEUE * 3); + +for (let i = 0; i < QUEUE; i++) + setImmediate(run, 0); +setImmediate(() => { + threw = true; + process.nextTick(() => assert.strictEqual(stage, 1)); + throw new Error('setImmediate Err'); +}); +d1.run(() => setImmediate(() => { + throw new Error('setImmediate Err'); +})); +for (let i = 0; i < QUEUE; i++) + setImmediate(run, 1);