Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async2.6.1源码分析之parallel #27

Open
msforest opened this issue Aug 5, 2018 · 1 comment
Open

async2.6.1源码分析之parallel #27

msforest opened this issue Aug 5, 2018 · 1 comment

Comments

@msforest
Copy link
Owner

msforest commented Aug 5, 2018

分析参数为数组的情况

// entry
function parallel(tasks, callback) {  // parallelLimit -> parallel
    _parallel(eachOf, tasks, callback);
}

function _parallel(eachfn, tasks, callback) {
    callback = callback || noop;
    var results = isArrayLike(tasks) ? [] : {};

    eachfn(tasks, (task, key, taskCb) => {  // task如果是异步的,会被wrapAsync同步化
        wrapAsync(task)((err, ...result) => {
            if (result.length < 2) {
                [result] = result;
            }
            results[key] = result;
            taskCb(err);
        });
    }, err => callback(err, results));
}

function eachOf(coll, iteratee, callback) {
    var eachOfImplementation = isArrayLike(coll) ? eachOfArrayLike : eachOfGeneric;
    eachOfImplementation(coll, wrapAsync(iteratee), callback);
}

function eachOfArrayLike(coll, iteratee, callback) {
    callback = once(callback || noop);
    var index = 0,
        completed = 0,
        {length} = coll,
        canceled = false;  // 表示任务队列是否撤销,嗯,用撤销更好理解
    if (length === 0) {
        callback(null);
    }

    function iteratorCallback(err, value) {
        if (err === false) {
            canceled = true
        }
        if (canceled === true) return   // 退出执行流程,这里的退出不是指执行async.parallel([], callback)里的callback,而是撤销async.parallel()
        if (err) {
            callback(err);  // 若某个task中间有错误,提前退出parallel,已经发起的iteratee的异步任务可能会被执行,但是最后的result只保存了已经执行过的结果
        } else if ((++completed === length) || value === breakLoop) {
            callback(null);
        }
    }

    for (; index < length; index++) {
        iteratee(coll[index], index, onlyOnce(iteratorCallback));   // 重点,只是利用js的异步机制,如果函数没有异步操作,执行还是串行的
    }
}

并行和并发的区别

看过代码之后,才发现不是真正意义上的parallel

parallel功能和Promise.all方法一样

@msforest
Copy link
Owner Author

参数为Object类型

主要区别在于eachOf的处理不同

var eachOfGeneric = doLimit(eachOfLimit, Infinity);  // 控制并发的数量

function doLimit(fn, limit) {
    return (iterable, iteratee, cb) => fn(iterable, limit, iteratee, cb)
}

function eachOfLimit(coll, limit, iteratee, callback) {
    _eachOfLimit(limit)(coll, wrapAsync(iteratee), callback);
}

function _eachOfLimit(limit) => {
    return (obj, iteratee, callback) => {
        callback = once(callback || noop);
        if (limit <= 0) {
            throw new RangeError('concurrency limit cannot be less than 1')
        }
        if (!obj) {
            return callback(null);
        }
        var nextElem = _iterator(obj);  // 获取要执行的task
        var done = false;   //  任务队列是否退出
        var canceled = false;   // 如果其中一个err===false,就停止后面的task,已经在执行栈中的task会继续执行,未加入的task不会再执行了,且已经执行的result无效了,因为callback
        var running = 0;    // 并行运行的标志,当达到limit限制的时候,就停下来。当前一个task执行完一个,就执行下一个task,并不是说:limit=3时,就等待三个全部执行完再接着执行下一轮循环的三个
        var looping = false;    // 是否继续再次执行replenish函数,当limit===Infinity,此标志无效

        function iterateeCallback(err, value) {
            if (canceled) return
            running -= 1;   // task执行完,执行栈减1
            if (err) {
                done = true;
                callback(err);
            }
            else if (err === false) {
                done = true;
                canceled = true;
            }
            else if (value === breakLoop || (done && running <= 0)) {
                done = true;
                return callback(null);
            }
            else if (!looping) {
                replenish();
            }
        }

        function replenish () {
            looping = true;
            while (running < limit && !done) {
                var elem = nextElem();
                if (elem === null) {
                    done = true;
                    if (running <= 0) {
                        callback(null);
                    }
                    return;
                }
                running += 1;   // task加入任务队列,执行栈加1
                iteratee(elem.value, elem.key, onlyOnce(iterateeCallback));
            }
            looping = false;
        }

        replenish();
    };
}

// 模拟实现一个iterator遍历器
function _iterator(coll) {
    if (isArrayLike(coll)) {
        return createArrayIterator(coll);
    }

    var iterator = getIterator(coll);
    return iterator ? createES2015Iterator(iterator) : createObjectIterator(coll);
}

function createArrayIterator(coll) {
    var i = -1;
    var len = coll.length;
    return function next() {
        return ++i < len ? {value: coll[i], key: i} : null;
    }
}

function createES2015Iterator(iterator) {
    var i = -1;
    return function next() {
        var item = iterator.next();
        if (item.done)
            return null;
        i++;
        return {value: item.value, key: i};
    }
}

function createObjectIterator(obj) {
    var okeys = obj ? Object.keys(obj) : [];
    var i = -1;
    var len = okeys.length;
    return function next() {
        var key = okeys[++i];
        return i < len ? {value: obj[key], key} : null;
    };
}

function isArrayLike(value) {
    return value &&
        typeof value.length === 'number' &&
        value.length >= 0 &&
        value.length % 1 === 0;
}

function getIterator(coll) {
    return coll[Symbol.iterator] && coll[Symbol.iterator]();    // coll instanceof Set/Map
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant