Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: eliminate multicast test FreeBSD flakiness #4042

Closed
wants to merge 12 commits into from
301 changes: 150 additions & 151 deletions test/internet/test-dgram-multicast-multi-process.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,122 @@
'use strict';
var common = require('../common'),
assert = require('assert'),
dgram = require('dgram'),
util = require('util'),
Buffer = require('buffer').Buffer,
fork = require('child_process').fork,
LOCAL_BROADCAST_HOST = '224.0.0.114',
TIMEOUT = common.platformTimeout(5000),
messages = [
new Buffer('First message to send'),
new Buffer('Second message to send'),
new Buffer('Third message to send'),
new Buffer('Fourth message to send')
];
const common = require('../common');
const assert = require('assert');
const dgram = require('dgram');
const fork = require('child_process').fork;
const LOCAL_BROADCAST_HOST = '224.0.0.114';
const TIMEOUT = common.platformTimeout(5000);
const messages = [
new Buffer('First message to send'),
new Buffer('Second message to send'),
new Buffer('Third message to send'),
new Buffer('Fourth message to send')
];
const workers = {};
const listeners = 3;


// Skip test in FreeBSD jails.
if (common.inFreeBSDJail) {
console.log('1..0 # Skipped: In a FreeBSD jail');
return;
}

function launchChildProcess(index) {
const worker = fork(__filename, ['child']);
workers[worker.pid] = worker;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe hoist workers to the top while you're here, it arguably reads easier that way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that does make it easier to follow. I'll also fix up the remaining bits of the related var statement to make it consistent with the current unofficial standard (separate statements for each var, const where possible).


worker.messagesReceived = [];

// Handle the death of workers.
worker.on('exit', function(code, signal) {
// Don't consider this the true death if the worker has finished
// successfully or if the exit code is 0.
if (worker.isDone || code === 0) {
return;
}

dead += 1;
console.error('[PARENT] Worker %d died. %d dead of %d',
worker.pid,
dead,
listeners);

if (dead === listeners) {
console.error('[PARENT] All workers have died.');
console.error('[PARENT] Fail');
process.exit(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a nit, more a stream-of-consciousness remark, but I would have thrown an exception here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really a response but getting all stream-of-consciousness over here too: FWIW, I agree and probably would have chosen to throw an exception too and would have also not included console.error() messages here and elsewhere in the test. But those are just my personal preferences and I only wanted to go so far in rewriting existing code. All this stuff was in the existing test. It only shows up as a change because the indentation changed and/or I moved it to a different location.

}
});

worker.on('message', function(msg) {
if (msg.listening) {
listening += 1;

if (listening === listeners) {
// All child process are listening, so start sending.
sendSocket.sendNext();
}
return;
}
if (msg.message) {
worker.messagesReceived.push(msg.message);

if (worker.messagesReceived.length === messages.length) {
done += 1;
worker.isDone = true;
console.error('[PARENT] %d received %d messages total.',
worker.pid,
worker.messagesReceived.length);
}

if (done === listeners) {
console.error('[PARENT] All workers have received the ' +
'required number of messages. Will now compare.');

Object.keys(workers).forEach(function(pid) {
const worker = workers[pid];

var count = 0;

worker.messagesReceived.forEach(function(buf) {
for (var i = 0; i < messages.length; ++i) {
if (buf.toString() === messages[i].toString()) {
count++;
break;
}
}
});

console.error('[PARENT] %d received %d matching messages.',
worker.pid, count);

assert.strictEqual(count, messages.length,
'A worker received an invalid multicast message');
});

clearTimeout(timer);
console.error('[PARENT] Success');
killChildren(workers);
}
}
});
}

function killChildren(children) {
Object.keys(children).forEach(function(key) {
const child = children[key];
child.kill();
});
}

if (process.argv[2] !== 'child') {
var workers = {},
listeners = 3,
listening = 0,
dead = 0,
i = 0,
done = 0,
timer = null;

//exit the test if it doesn't succeed within TIMEOUT
timer = setTimeout(function() {
var listening = 0;
var dead = 0;
var i = 0;
var done = 0;

// Exit the test if it doesn't succeed within TIMEOUT.
var timer = setTimeout(function() {
console.error('[PARENT] Responses were not received within %d ms.',
TIMEOUT);
console.error('[PARENT] Fail');
Expand All @@ -34,101 +126,18 @@ if (process.argv[2] !== 'child') {
process.exit(1);
}, TIMEOUT);

//launch child processes
// Launch child processes.
for (var x = 0; x < listeners; x++) {
(function() {
var worker = fork(process.argv[1], ['child']);
workers[worker.pid] = worker;

worker.messagesReceived = [];

//handle the death of workers
worker.on('exit', function(code, signal) {
// don't consider this the true death if the
// worker has finished successfully

// or if the exit code is 0
if (worker.isDone || code === 0) {
return;
}

dead += 1;
console.error('[PARENT] Worker %d died. %d dead of %d',
worker.pid,
dead,
listeners);

if (dead === listeners) {
console.error('[PARENT] All workers have died.');
console.error('[PARENT] Fail');

killChildren(workers);

process.exit(1);
}
});

worker.on('message', function(msg) {
if (msg.listening) {
listening += 1;

if (listening === listeners) {
//all child process are listening, so start sending
sendSocket.sendNext();
}
}
else if (msg.message) {
worker.messagesReceived.push(msg.message);

if (worker.messagesReceived.length === messages.length) {
done += 1;
worker.isDone = true;
console.error('[PARENT] %d received %d messages total.',
worker.pid,
worker.messagesReceived.length);
}

if (done === listeners) {
console.error('[PARENT] All workers have received the ' +
'required number of messages. Will now compare.');

Object.keys(workers).forEach(function(pid) {
var worker = workers[pid];

var count = 0;

worker.messagesReceived.forEach(function(buf) {
for (var i = 0; i < messages.length; ++i) {
if (buf.toString() === messages[i].toString()) {
count++;
break;
}
}
});

console.error('[PARENT] %d received %d matching messages.',
worker.pid, count);

assert.equal(count, messages.length,
'A worker received an invalid multicast message');
});

clearTimeout(timer);
console.error('[PARENT] Success');
killChildren(workers);
}
}
});
})(x);
launchChildProcess(x);
}

var sendSocket = dgram.createSocket('udp4');
// FIXME a libuv limitation makes it necessary to bind()
// before calling any of the set*() functions - the bind()
// call is what creates the actual socket...
// FIXME: a libuv limitation makes it necessary to bind()
// before calling any of the set*() functions. The bind()
// call is what creates the actual socket.
sendSocket.bind();

// The socket is actually created async now
// The socket is actually created async now.
sendSocket.on('listening', function() {
sendSocket.setTTL(1);
sendSocket.setBroadcast(true);
Expand All @@ -141,7 +150,7 @@ if (process.argv[2] !== 'child') {
});

sendSocket.sendNext = function() {
var buf = messages[i++];
const buf = messages[i++];

if (!buf) {
try { sendSocket.close(); } catch (e) {}
Expand All @@ -151,61 +160,51 @@ if (process.argv[2] !== 'child') {
sendSocket.send(buf, 0, buf.length,
common.PORT, LOCAL_BROADCAST_HOST, function(err) {
if (err) throw err;
console.error('[PARENT] sent %s to %s:%s',
util.inspect(buf.toString()),
console.error('[PARENT] sent "%s" to %s:%s',
buf.toString(),
LOCAL_BROADCAST_HOST, common.PORT);
process.nextTick(sendSocket.sendNext);
});
};

function killChildren(children) {
Object.keys(children).forEach(function(key) {
var child = children[key];
child.kill();
});
}
}

if (process.argv[2] === 'child') {
var receivedMessages = [];
var listenSocket = dgram.createSocket({
const receivedMessages = [];
const listenSocket = dgram.createSocket({
type: 'udp4',
reuseAddr: true
});

listenSocket.on('message', function(buf, rinfo) {
console.error('[CHILD] %s received %s from %j', process.pid,
util.inspect(buf.toString()), rinfo);
listenSocket.on('listening', function() {
listenSocket.addMembership(LOCAL_BROADCAST_HOST);

receivedMessages.push(buf);
listenSocket.on('message', function(buf, rinfo) {
console.error('[CHILD] %s received "%s" from %j', process.pid,
buf.toString(), rinfo);

process.send({ message: buf.toString() });
receivedMessages.push(buf);

if (receivedMessages.length == messages.length) {
// .dropMembership() not strictly needed but here as a sanity check
listenSocket.dropMembership(LOCAL_BROADCAST_HOST);
process.nextTick(function() {
listenSocket.close();
});
}
});
process.send({ message: buf.toString() });

listenSocket.on('close', function() {
//HACK: Wait to exit the process to ensure that the parent
//process has had time to receive all messages via process.send()
//This may be indicitave of some other issue.
setTimeout(function() {
process.exit();
}, 1000);
});
if (receivedMessages.length == messages.length) {
// .dropMembership() not strictly needed but here as a sanity check.
listenSocket.dropMembership(LOCAL_BROADCAST_HOST);
process.nextTick(function() {
listenSocket.close();
});
}
});

listenSocket.on('listening', function() {
listenSocket.on('close', function() {
// HACK: Wait to exit the process to ensure that the parent
// process has had time to receive all messages via process.send()
// This may be indicative of some other issue.
setTimeout(function() {
process.exit();
}, common.platformTimeout(1000));
});
process.send({ listening: true });
});

listenSocket.bind(common.PORT);

listenSocket.on('listening', function() {
listenSocket.addMembership(LOCAL_BROADCAST_HOST);
});
}