Skip to content

Commit

Permalink
Fixes #179 Nested stream atomic updates
Browse files Browse the repository at this point in the history
We were tracking flushing using one variable while there
were in fact two types of flushing. This commit remedies that
And furthermore adds a check for function streams
whether their listeners need updating at the end of
updateStream

This also has some general code quality improvements 
including variable name changes
  • Loading branch information
Einar Norðfjörð committed Jun 27, 2018
1 parent 190d68a commit 24f0026
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 30 deletions.
50 changes: 30 additions & 20 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ var toUpdate = [];
var inStream;
var order = [];
var orderNextIdx = -1;
var flushing = false;
var flushingUpdateQueue = false;
var flushingStreamValue = false;

function flushing() {
return flushingUpdateQueue || flushingStreamValue;
}


/** @namespace */
var flyd = {}
Expand Down Expand Up @@ -524,7 +530,7 @@ function streamToString() {
function createStream() {
function s(n) {
if (arguments.length === 0) return s.val
updateStreamValue(s, n)
updateStreamValue(n, s)
return s
}
s.hasVal = false;
Expand Down Expand Up @@ -587,21 +593,22 @@ function initialDepsNotMet(stream) {
return !stream.depsMet;
}

function isEnded(stream) {
return stream.end && stream.end.val === true;
}

function listenersNeedUpdating(s) {
return !flushingStreamValue && s.listeners.some(function(s) { return s.shouldUpdate; });
}

/**
* @private
* Update a dependent stream using its dependencies in an atomic way
* @param {stream} stream - the stream to update
*/
function updateStream(s) {
if ((s.depsMet !== true && initialDepsNotMet(s)) ||
(s.end !== undefined && s.end.val === true)) {
if (toUpdate.length > 0 && inStream !== undefined) {
toUpdate.push(function() {
updateStream(s);
});
}
return;
}
if (isEnded(s)) return;
if (s.depsMet !== true && initialDepsNotMet(s)) return;
if (inStream !== undefined) {
toUpdate.push(function() {
updateStream(s);
Expand All @@ -617,15 +624,18 @@ function updateStream(s) {
inStream = undefined;
if (s.depsChanged !== undefined) s.depsChanged = [];
s.shouldUpdate = false;
if (flushing === false) flushUpdate();
if (flushing() === false) flushUpdate();
if (listenersNeedUpdating(s)) {
s(s.val);
}
}

/**
* @private
* Update the dependencies of a stream
* @param {stream} stream
*/
function updateDeps(s) {
function updateListeners(s) {
var i, o, list
var listeners = s.listeners;
for (i = 0; i < listeners.length; ++i) {
Expand Down Expand Up @@ -667,12 +677,12 @@ function findDeps(s) {
* @private
*/
function flushUpdate() {
flushing = true;
flushingUpdateQueue = true;
while (toUpdate.length > 0) {
var updater = toUpdate.shift();
updater();
}
flushing = false;
flushingUpdateQueue = false;
}

/**
Expand All @@ -681,18 +691,18 @@ function flushUpdate() {
* @param {stream} stream
* @param {*} value
*/
function updateStreamValue(s, n) {
function updateStreamValue(n, s) {
s.val = n;
s.hasVal = true;
if (inStream === undefined) {
flushing = true;
updateDeps(s);
if (toUpdate.length > 0) flushUpdate(); else flushing = false;
flushingStreamValue = true;
updateListeners(s);
if (toUpdate.length > 0) flushUpdate(); else flushingStreamValue = false;
} else if (inStream === s) {
markListeners(s, s.listeners);
} else {
toUpdate.push(function() {
updateStreamValue(s, n);
updateStreamValue(n, s);
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
},
"scripts": {
"test-lib": "mocha",
"test": "eslint --fix lib/ test/ module/ && mocha -R dot test/*.js module/**/test/*.js",
"test": "eslint --fix lib/ test/ module/ && mocha test/*.js module/**/test/*.js",
"docs": "documentation -f md lib/index.js > API.md",
"perf": "./perf/run-benchmarks",
"coverage": "istanbul cover _mocha -- -R spec",
Expand Down
29 changes: 20 additions & 9 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -279,19 +279,17 @@ describe('stream', function() {
it('can create multi-level dependent streams inside a stream body', function() {
var result = 0;
var externalStream = stream(0);
function mapper(val) {
++result;
return val + 1;
}
stream(1).map(function() {
externalStream
.map(function() {
result += 1;
return 0;
})
.map(function() {
result += 2;
return 0;
});
.map(mapper)
.map(mapper);
return;
});
assert.equal(result, 3);
assert.equal(result, 2);
});
});

Expand Down Expand Up @@ -1006,6 +1004,19 @@ describe('stream', function() {
[], [1, 3, 2], [2, 8, 7, 6], [3, 5, 4]
]);
});
it('#179 nested streams atomic update', function() {
var invocationCount = 0;
var mapper = function(val) {
invocationCount += 1;
return val + 1;
};
stream(1).map(function() {
stream(0)
.map(mapper)
.map(mapper);
});
assert.equal(invocationCount, 2);
});
});

describe('fantasy-land', function() {
Expand Down

0 comments on commit 24f0026

Please sign in to comment.