Pull to refresh

Велосипед: Promises в Node.js

Reading time 6 min
Views 33K
Добрый день, Хабрахабр.

Предисловие


Была довольно простая задача: получить набор документов из базы, каждый документ преобразовать и отправить пользователю все преобразованные документы, порядок их менять нельзя, для обработки документа используется асинхронная функция. Если на каком-то документе вылезла ошибка — документы мы не отправляем, только ошибку и заканчиваем обработку документов.
Для решения задачи была выбрана библиотека Q, так как сам поход Promise мне симпатичен. Но возникла одна загвоздка, вроде бы элементарная задача, а выполняется больше секунды, а точнее 1300 мс, вместо ожидаемых 50-80 мс. Дабы разобраться, как все устроено и проникнуться асинхронностью было решено написать специализированный «велосипед» под данную задачу.



Как было устроена работа с Q



В первую очередь, хотелось бы рассказать, как это было реализовано изначально.

1. Процедура, которая проходила последовательно по массиву и возвращала нам promise.
forEachSerial
function forEachSerial(array, func) {
    var tickFunction = function (el, index, callback) {
        if (func.length == 2)
            func(el, callback);
        else if (func.length == 3)
            func(el, index, callback);
    }
    var functions = [];
    array.forEach(function (el, index) {
        functions.push(Q.nfcall(tickFunction, el, index));        
    });
    return Q.all(functions);
}



2. Процедура, которая обрабатывала документ и возвращала нам promise.
documentToJSON
function documentToJSON(el, options) {
    var obj = el.toObject(options);
    var deferred = Q.defer();
    var columns = ['...','...','...'];
    forEachSerial(columns,function (el, next) {
        el.somyAsyncFunction(options, function (err, result) {
            if (err)
                next(err);
            else result.somyAsyncFunction2(options, function (err, result) {
                if (!err)
                    obj[el] = result;
                next(err);
            });
        });
    }).then(function () {
            deferred.resolve(obj);
        }, function (err) {
            deferred.reject(err);
        });
    return deferred.promise;
}



3. И головная процедура, отправляющая результат пользователю
sendResponse
exports.get = function (req, res, next) {
    dbModel.find(search, {}, { skip: from, limit: limit }).sort({column1: -1}).exec(function (err, result) {
        if (err)
            next(new errorsHandlers.internalError());
        else {
            var result = [];
            forEachSerial(result,function (el, index, next) {
                documentToJSON(el,options).then(function (obj) {
                    result[index] = obj;
                    next()
                }, function (err) {
                    next(new errorsHandlers.internalError())
                });
            }).then(function () {
                    res.send({responce: result});
                }, function (err) {
                    next(new errorsHandlers.internalError())
                });
        }
    });
};



Кто-то может заметить, что всевозможных «обещаний» очень много даже там, где они не сильно нужны, и что такое большое зацикливание привело к такой скорости, но процедура отдает всего 20 простых документов, преобразования примитивны и выполнять их такое количество времени уже точно никуда не годиться.

Пишем свою библиотеку promise


Как вообще «это» работает

В сети полно описаний что и как. Опишу вкратце. Promise — это своего рода обещание. Какая-то функция нам обещает результат, получить его мы можем с помощью then(success, error), в свою очередь, при успешной обработке мы может назначить новый promise и так же обработать его. В частном случае это выглядит так:
Promise.then(step1).then(step2).then(step3).then(function () {
    //All OK
}, function (err) {
    //Error in any step
});

Результат каждого этапа передается как параметр в следующий и так последовательно. В итоге мы обрабатываем все ошибки в одном блоке и избавляемся от «лапши».
Внутри выглядит примерно так: создаются события, которые вызываются при успешном завершении или при ошибке:
var promise = fs.stat("foo");
promise.addListener("success", function (value) {
    // ok
})
promise.addListener("error", function (error) {
    // error
});

Все это можно почитать здесь.
Теория закончилась, приступим к практике.

Начнем с простого — Deferred

Задачей этого объекта будет создать нужные нам события и выдать Promise
function deferred() {
    this.events = new EventEmitter(); //Объект события
    this.promise = new promise(this); // Возвращаемый нам Promise
    this.thenDeferred = []; //Последующие обработчики, нужны для того что бы передать ошибку дальше по цепочке
    var self = this;

    //Вызывается в успешном случае
    this.resolve = function () {
        self.events.emit('completed', arguments);
    }
    //Вызывается в случае ошибки
    this.reject = function (error) {
        self.events.emit('error', error);
        //Передаем ошибку дальше по цепочке
        self.thenDeferred.forEach(function (el) {
            el.reject(error);
        });
    }
}

Объект — Promise

Задача его будет отслеживать события "completed" и "error" вызывать нужные функции, которые назначены через "then" и отслеживать что там вернула эта функция: если возвратила нам еще один promise то подключаться к нему для того что бы срабатывали следующие then, если просто данные, то выполнять последующие then, таким образом мы сможем строить цепочки из then.
function promise(def) {
    this.def = def;
    this.completed = false;
    this.events = def.events;
    var self = this;
    var thenDeferred;
    self._successListener = null;
    self._errorListener = null;

    //Результатом выполнения then - будет возвращаться новый promise
    this.then = function (success, error) {
        if (success)
            self._successListener = success;
        if (error)
            self._errorListener = error;
        thenDeferred = new deferred();
        self.def.thenDeferred.push(thenDeferred);
        return thenDeferred.promise;
    }

    //Обрабатываем успешное выполнение задачи
    this.events.on('completed', function (result) {
        // объекты, аргументы, массивы приводим к виду массива для передачи их в дальнейшем как атрибуты в функцию
        var args = inputOfFunctionToArray(result);
        //Если вдруг задача была уже выполнена, то дальше не проходим
        if (self.completed) return;
        self.completed = true;

        if (self._successListener) {
            var result;
            try {
                result = self._successListener.apply(self, args);
            } catch (e) {
                self.def.reject(e);
                result;
            }
            //Если результатом функции Promise и есть последующие then, подключаемся к нему
            var promise;
            if (isPromise(result))
                promise = result;
            else if (result instanceof deferred)
                promise = result.promise;
            if (promise && thenDeferred) {
                promise.then(function () {
                    var args = arguments;
                    process.nextTick(function () {
                        thenDeferred.resolve.apply(self, args);
                    });
                }, function (error) {
                    process.nextTick(function () {
                        thenDeferred.reject(error);
                    });
                });
            } else if (thenDeferred)
                           process.nextTick(function () {
                                   //Для скалярных параметров просто запускаем следующие then
                                  thenDeferred.resolve.apply(self, [result]);
                          });
        } else if (thenDeferred)
            process.nextTick(function () {
                thenDeferred.resolve.apply(self, []);
            });
    });

    //Обрабатываем ошибки
    this.events.on('error', function (error) {
        if (self.completed) return;
        self.completed = true;
        if (self._errorListener)
            process.nextTick(function () {
                self._errorListener.apply(self, [error]);
            });
    });
}


Итак, базовая модель готова. Осталось сделать обвязку для функций с callback

PromiseFn

Его задача — сделать обертку для функции с callback с возможностью указания this и аргументов запуска
var promisefn = function (bind, fn) {
    var def = new deferred();
    //bind является не обязательным параметром
    if (typeof bind === 'function' && !fn) {
        fn = bind;
        bind = def;
    }
    //Назначаем наш callback для данной функции
    var callback = function (err) {
        if (err)
            def.reject(err);
        else {
            var args = [];
            for (var key in arguments)
                args.push(arguments[key]);
            args.splice(0, 1);
            def.resolve.apply(bind, args);
        }
    };
    var result = function () {

        var args = [];
        for (var key in arguments)
            args.push(arguments[key]);
        args.push(callback);
        process.nextTick(function () {
            fn.apply(bind, args);
        });
        return def.promise;
    }
    return result;
}


И напоследок ALL — последовательное выполнение функций с callback

Тут все просто: нам передают массив функций, мы их обвязываем через promisefn и, когда они все выполнятся — вызываем resolve
var all = function (functions) {
    var def = new deferred();
    process.nextTick(function () {
        var index = -1;
        var result = [];
        var next = function (err, arguments) {
            if (err) {
                def.reject(err);
                return;
            }
            if (arguments) result.push(inputOfFunctionToArray(arguments));
            index++;
            if (index >= functions.length) {
                def.resolve(result);
            } else process.nextTick(function () {
                promisefn(functions[index])().then(function () {
                    var args = arguments;
                    process.nextTick(function () {
                        next(err, args);
                    });
                }, function (err) {
                    process.nextTick(function () {
                        next(err);
                    });
                });
            });
        }
        process.nextTick(next);

    });
    return def.promise;
}


В заключение


После тестирования старый подход (через библиотеку Q) был переписан заменено пару объявлений и запущен в тех же условиях. Результат положительный — 50-100 мс (вместо прежних 1300 мс).
Все исходники доступны на Github, там же можно найти и примеры. Изобретение «велосипедов» полезно хотя бы тем, что это улучшает понимание.
Спасибо за внимание!
Tags:
Hubs:
0
Comments 4
Comments Comments 4

Articles