Skip to content
This repository has been archived by the owner on Aug 31, 2018. It is now read-only.

test: add more tests for workers #121

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ test-with-async-hooks:
$(CI_JS_SUITES) \
$(CI_NATIVE_SUITES)

test-worker:
$(PYTHON) tools/test.py --mode=release workers

ifneq ("","$(wildcard deps/v8/tools/run-tests.py)")
test-v8: v8
Expand Down
6 changes: 6 additions & 0 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ static struct {
platform_->DrainBackgroundTasks(isolate);
}

void CancelVMTasks(Isolate* isolate) {
platform_->CancelPendingDelayedTasks(isolate);
}

#if HAVE_INSPECTOR
bool StartInspector(Environment *env, const char* script_path,
const node::DebugOptions& options) {
Expand Down Expand Up @@ -321,6 +325,7 @@ static struct {
void Initialize(int thread_pool_size) {}
void Dispose() {}
void DrainVMTasks(Isolate* isolate) {}
void CancelVMTasks(Isolate* isolate) {}
bool StartInspector(Environment *env, const char* script_path,
const node::DebugOptions& options) {
env->ThrowError("Node compiled with NODE_USE_V8_PLATFORM=0");
Expand Down Expand Up @@ -4924,6 +4929,7 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data,
uv_key_delete(&thread_local_env);

v8_platform.DrainVMTasks(isolate);
v8_platform.CancelVMTasks(isolate);
WaitForInspectorDisconnect(&env);
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
Expand Down
1 change: 1 addition & 0 deletions src/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class MultiIsolatePlatform : public v8::Platform {
public:
virtual ~MultiIsolatePlatform() { }
virtual void DrainBackgroundTasks(v8::Isolate* isolate) = 0;
virtual void CancelPendingDelayedTasks(v8::Isolate* isolate) = 0;

// These will be called by the `IsolateData` creation/destruction functions.
virtual void RegisterIsolate(IsolateData* isolate_data,
Expand Down
54 changes: 39 additions & 15 deletions src/node_platform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "env.h"
#include "env-inl.h"
#include "util.h"
#include <algorithm>

namespace node {

Expand Down Expand Up @@ -45,13 +46,17 @@ void PerIsolatePlatformData::CallOnForegroundThread(Task* task) {

void PerIsolatePlatformData::CallDelayedOnForegroundThread(
Task* task, double delay_in_seconds) {
auto pair = new std::pair<Task*, double>(task, delay_in_seconds);
foreground_delayed_tasks_.Push(pair);
auto delayed = new DelayedTask();
delayed->task = task;
delayed->platform_data = this;
delayed->timeout = delay_in_seconds;
foreground_delayed_tasks_.Push(delayed);
uv_async_send(flush_tasks_);
}

PerIsolatePlatformData::~PerIsolatePlatformData() {
FlushForegroundTasksInternal();
CancelPendingDelayedTasks();

uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
[](uv_handle_t* handle) {
Expand Down Expand Up @@ -120,7 +125,7 @@ size_t NodePlatform::NumberOfAvailableBackgroundThreads() {
return threads_.size();
}

static void RunForegroundTask(Task* task) {
void PerIsolatePlatformData::RunForegroundTask(Task* task) {
Isolate* isolate = Isolate::GetCurrent();
HandleScope scope(isolate);
Environment* env = Environment::GetCurrent(isolate);
Expand All @@ -130,14 +135,29 @@ static void RunForegroundTask(Task* task) {
delete task;
}

static void RunForegroundTask(uv_timer_t* handle) {
Task* task = static_cast<Task*>(handle->data);
RunForegroundTask(task);
uv_close(reinterpret_cast<uv_handle_t*>(handle), [](uv_handle_t* handle) {
delete reinterpret_cast<uv_timer_t*>(handle);
void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
DelayedTask* delayed = static_cast<DelayedTask*>(handle->data);
auto& tasklist = delayed->platform_data->scheduled_delayed_tasks_;
auto it = std::find(tasklist.begin(), tasklist.end(), delayed);
CHECK_NE(it, tasklist.end());
tasklist.erase(it);
RunForegroundTask(delayed->task);
uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
[](uv_handle_t* handle) {
delete static_cast<DelayedTask*>(handle->data);
});
}

void PerIsolatePlatformData::CancelPendingDelayedTasks() {
for (auto delayed : scheduled_delayed_tasks_) {
uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
[](uv_handle_t* handle) {
delete static_cast<DelayedTask*>(handle->data);
});
}
scheduled_delayed_tasks_.clear();
}

void NodePlatform::DrainBackgroundTasks(Isolate* isolate) {
PerIsolatePlatformData* per_isolate = ForIsolate(isolate);

Expand All @@ -152,18 +172,18 @@ void NodePlatform::DrainBackgroundTasks(Isolate* isolate) {

bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
bool did_work = false;

while (auto delayed = foreground_delayed_tasks_.Pop()) {
did_work = true;
uint64_t delay_millis =
static_cast<uint64_t>(delayed->second + 0.5) * 1000;
uv_timer_t* handle = new uv_timer_t();
handle->data = static_cast<void*>(delayed->first);
uv_timer_init(loop_, handle);
static_cast<uint64_t>(delayed->timeout + 0.5) * 1000;
delayed->timer.data = static_cast<void*>(delayed);
uv_timer_init(loop_, &delayed->timer);
// Timers may not guarantee queue ordering of events with the same delay if
// the delay is non-zero. This should not be a problem in practice.
uv_timer_start(handle, RunForegroundTask, delay_millis, 0);
uv_unref(reinterpret_cast<uv_handle_t*>(handle));
delete delayed;
uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
scheduled_delayed_tasks_.push_back(delayed);
}
while (Task* task = foreground_tasks_.Pop()) {
did_work = true;
Expand Down Expand Up @@ -199,6 +219,10 @@ void NodePlatform::FlushForegroundTasks(v8::Isolate* isolate) {
ForIsolate(isolate)->FlushForegroundTasksInternal();
}

void NodePlatform::CancelPendingDelayedTasks(v8::Isolate* isolate) {
ForIsolate(isolate)->CancelPendingDelayedTasks();
}

bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }

double NodePlatform::MonotonicallyIncreasingTime() {
Expand Down
16 changes: 15 additions & 1 deletion src/node_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace node {

class NodePlatform;
class IsolateData;
class PerIsolatePlatformData;

template <class T>
class TaskQueue {
Expand All @@ -37,6 +38,13 @@ class TaskQueue {
std::queue<T*> task_queue_;
};

struct DelayedTask {
v8::Task* task;
uv_timer_t timer;
double timeout;
PerIsolatePlatformData* platform_data;
};

class PerIsolatePlatformData {
public:
PerIsolatePlatformData(v8::Isolate* isolate, uv_loop_t* loop);
Expand All @@ -52,15 +60,20 @@ class PerIsolatePlatformData {

// Returns true iff work was dispatched or executed.
bool FlushForegroundTasksInternal();
void CancelPendingDelayedTasks();

private:
static void FlushTasks(uv_async_t* handle);
static void RunForegroundTask(v8::Task* task);
static void RunForegroundTask(uv_timer_t* timer);

int ref_count_ = 1;
v8::Isolate* isolate_;
uv_loop_t* const loop_;
uv_async_t* flush_tasks_ = nullptr;
TaskQueue<v8::Task> foreground_tasks_;
TaskQueue<std::pair<v8::Task*, double>> foreground_delayed_tasks_;
TaskQueue<DelayedTask> foreground_delayed_tasks_;
std::vector<DelayedTask*> scheduled_delayed_tasks_;
};

class NodePlatform : public MultiIsolatePlatform {
Expand All @@ -69,6 +82,7 @@ class NodePlatform : public MultiIsolatePlatform {
virtual ~NodePlatform() {}

void DrainBackgroundTasks(v8::Isolate* isolate) override;
void CancelPendingDelayedTasks(v8::Isolate* isolate) override;
void Shutdown();

// v8::Platform implementation.
Expand Down
1 change: 1 addition & 0 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ void Worker::Run() {
Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);

platform->DrainBackgroundTasks(isolate_);
platform->CancelPendingDelayedTasks(isolate_);

// Grab the parent-to-child channel and render is unusable.
MessagePort* child_port;
Expand Down
10 changes: 7 additions & 3 deletions test/common/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,13 @@ exports.refreshTmpDir = function() {
fs.mkdirSync(exports.tmpDir);
};

if (process.env.TEST_THREAD_ID) {
exports.PORT += process.env.TEST_THREAD_ID * 100;
exports.tmpDirName += `.${process.env.TEST_THREAD_ID}`;
const { workerData } = require('worker');
if ((workerData && workerData.testThreadId) || process.env.TEST_THREAD_ID) {
const id = +((workerData && workerData.testThreadId) ||
process.env.TEST_THREAD_ID);

exports.PORT += id * 100;
exports.tmpDirName += `.${id}`;
}
exports.tmpDir = path.join(testRoot, exports.tmpDirName);

Expand Down
13 changes: 13 additions & 0 deletions test/parallel/test-worker-termination-2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
'use strict';

const common = require('../common');
const { Worker, isMainThread, postMessage } = require('worker');

if (isMainThread) {
const aWorker = new Worker(__filename);
aWorker.terminate(common.mustCall());
aWorker.on('message', common.mustNotCall());
} else {
while (true)
postMessage({ hello: 'world' });
}
22 changes: 22 additions & 0 deletions test/parallel/test-worker-termination-3.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict';

const common = require('../common');
const { Worker, isMainThread, postMessage } = require('worker');

if (isMainThread) {
const aWorker = new Worker(__filename);
aWorker.on('message', common.mustCallAtLeast(function() {
aWorker.postMessage();
aWorker.postMessage();
aWorker.postMessage();
aWorker.postMessage();
aWorker.terminate(common.mustCall());
}));
} else {
require('worker').on('workerMessage', function() {
while (true)
postMessage({ hello: 'world' });
});

postMessage();
}
30 changes: 30 additions & 0 deletions test/parallel/test-worker-termination-exit-2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Worker, isMainThread, postMessage } = require('worker');

if (isMainThread) {
const aWorker = new Worker(__filename, { keepAlive: false });
aWorker.on('exit', common.mustCall((code) => {
assert.strictEqual(1337, code);
}));
aWorker.on('message', common.mustCall((data) => {
assert.strictEqual(data, 0);
}));
} else {
process.on('beforeExit', () => {
setInterval(function() {
postMessage({ hello: 'world' });
}, 5000);
setImmediate(function f() {
postMessage({ hello: 'world' });
setImmediate(f);
});
process.exit(1337);
});
let emits = 0;
process.on('exit', function() {
postMessage(emits++);
});
}
26 changes: 26 additions & 0 deletions test/parallel/test-worker-termination-exit.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Worker, isMainThread, postMessage } = require('worker');

if (isMainThread) {
const aWorker = new Worker(__filename);
aWorker.on('exit', common.mustCall((code) => {
assert.strictEqual(1337, code);
}));
aWorker.on('message', common.mustNotCall());
} else {
setInterval(function() {
postMessage({ hello: 'world' });
}, 5000);
setImmediate(function f() {
postMessage({ hello: 'world' });
setImmediate(f);
});
(function() {
[1337, 2, 3].map(function(value) {
process.exit(value);
});
})();
}
54 changes: 54 additions & 0 deletions test/parallel/test-worker-termination-grand-parent.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Worker, isMainThread, postMessage } = require('worker');
const ids = [];

if (isMainThread) {
const aWorker = new Worker(__filename);
aWorker.postMessage({
init: true,
subWorker: false
});
aWorker.on('message', common.mustCall((data) => {
ids.push(data.id);
if (ids.length === 4) {
// Terminating the main worker should terminate its 4 sub-workers
aWorker.terminate();
}
}, 4));
process.on('beforeExit', function() {
assert.deepStrictEqual([0, 1, 2, 3].sort(), ids.sort());
});
} else {
require('worker').on('workerMessage', function(data) {
if (data.init) {
if (data.subWorker) {
subWorker(data.id);
} else {
mainWorker();
}
}
});
}

function mainWorker() {
let l = 4;
while (l--) {
const worker = new Worker(__filename);
worker.postMessage({
init: true,
subWorker: true,
id: l
});
worker.on('message', function(payload) {
postMessage(payload);
});
}
}

function subWorker(id) {
postMessage({ id: id });
while (true);
}
Loading