Pull to refresh

node-seq на новый лад (опять про асинхронность)

Reading time 5 min
Views 5.3K
Здравствуй, Хабр! Пишу тебе на правах слоупока. Ведь в то время как космические корабли с вертикальным взлетом и посадкой бороздят просторы мирового океана, а самые нетерпеливые вовсю используют фичи ES6 в своих проектах я принес тебе очередную библиотеку для облегчения жизни асинхронщика.

Конечно, давно уже есть миллион реализаций обещаний, а для желающих — async. Есть, также, широкоизвестная в узких кругах библиотека Seq от небезызвестного товарища Substack. Я начал пользоваться ей практически с первых дней моего яваскрипта и использовал везде где мог. Предлагаемый этой библиотекой подход кажется мне более понятным и логичным для обуздания асинхронной лапши чем подход используемый, например, в async. Смотрите сами:

var fs = require('fs');
var Hash = require('hashish');
var Seq = require('seq');

Seq()
  .seq(function () {
    fs.readdir(__dirname, this);
  })
  .flatten()
  .parEach(function (file) {
    fs.stat(__dirname + '/' + file, this.into(file));
  })
  .seq(function () {
    var sizes = Hash.map(this.vars, function (s) { return s.size })
    console.dir(sizes);
  });

Все настолько просто и понятно, что даже объяснять лень. К сожалению, библиотека давно не развивается и не поддерживается. К тому же, за время использования накопился список багов на которые так или иначе натыкался, список фич которых хотелось, список претензий — потому что не все работало так же хорошо как в моем воображении. Однажды, я в очередной раз я наткнулся на несоврешенство мира и решил, что настал Этот Момент. Пора форкнуть и пофиксить. Я часто так поступаю, но то что делает эта библиотека казалось мне настоящей Магией.

И вот счастливый и преисполненный решимости делаю git clone, cd, gvim, и пытаюсь понять что тут происходит. Не понимаю. Автор использует еще пару своих библиотек и для просветления необходимо разобраться сначала с ними. Через пару часов мне надоедает и я обнаруживаю Фатальный Недостаток. Сказано — сделано. Сажусь и пишу с нуля Библиотеку Мечты. Как ни странно, никакой магии во всем этом не оказалось. Прототип был готов за вечер. Затем, еще какое то время, я допиливал его уже используя в реальном проекте, заменив им Seq полностью. И вот получилось то, что получилось. Давайте познакомимся.

YAFF — Yet Another Flow Framework.

В целом, я старался сделать его совместимым с Seq. И большинство кода мигрировалось просто заменой импорта (было var Seq = require('seq'); стало var Seq = require('yaff');). Разумеется, заменить пришлось кое что еще. Seq использует метод .catch() для ловли блох. Напрмиер, вышепрведенный кусок кода можно изменить вот так:

var fs = require('fs');
var Hash = require('hashish');
var Seq = require('seq');

Seq()
  .seq(function () {
    fs.readdir(__dirname, this);
  })
  .flatten()
  .parEach(function (file) {
    fs.stat(__dirname + '/' + file, this.into(file));
  })
  .catch(function (err)(
    console.error(err);
  ))
  .seq(function () {
    var sizes = Hash.map(this.vars, function (s) { return s.size })
    console.dir(sizes);
  });

Эта конструкция ужасна тем, что после того как мы «поймали» ошибку мы можем продолжать. Так нельзя. Во-первых, не понятно что делать если parEach (или другие подобные методы) выбросят несколько ошибок. Ловить только первую? Ловить все? А если мы уже ушли далеко вниз а в каком-нибудь parEach выше по таймеру выскочила ошибка? А если ниже у нас уже нет никаких catch? А если те catch что ниже не подготовлены для обработки ошибок которые вылезут по таймеру из forEach? Возникает много вопросов без ответов. Поэтому, я решил, что в каждом YAFF должна быть только одна конструкция для обработки ошибок и она должна быть в конце. А чтобы не нарушать традиции nodejs пусть она обрабатывает еще и результат. Получается красота. Убедитесь:

var fs = require('fs');
YAFF(['./', '../'])
  .par(function (path) {
    fs.readdir(path, this);
  })
  .par(function (path) {
    fs.readdir(path, this);
  })
  .flatten()
  .parMap(function (file) {
    fs.stat(__dirname + '/' + file, this);
  })
  .map(function (stat) {
    return stat.size;
  })
  .unflatten()
  .finally(function (e, sizes) {
    if (e)
      throw e;
    log(sizes);
  });

Если предположить, что это все внтури асинхронной функции то в finally мы можем просто кинуть предоставленный нам коллбэк, а-ля:

var listDirs = function (dirs, cb) {
  YAFF(dirs)
    [волшебные пузырьки]
    .finally(cb);
};

Удобно? Мне тоже нравится. Вообще, эта библиотека построена относительно концепции стека-массива аргументов (стоило написать об этом с самого начала). И все методы что здесь есть так или иначе этот стек меняют применяясь к нему или к отдельным его элементам. Скажем, стайка функций обернутых в par возьмет по одному элементу из стека, в той очередности в которой написаны эти самые par и только после того как все par отстреляют коллбэки (а коллбэк внутри всех методов это this) YAFF перейдет к тому что у него дальше на очереди. Пусть дальше у нас несколько функций завернутых в seq. YAFF будет применять к ним весь стек и заменять его на то что вернет в коллбэк обернутая функция. Вот код, чтобы было понятно:

YAFF(['one', 'two', 'three'])
  .par(function (one) {
    this(null, one);
  })
  .par(function (two) {
    this(null, two);
  })
  .par(function (three) {
    this(null, three);
  })
  .seq(function (one, two, three) {
    this(null, one, three);
  })
  .seq(function (one, three) {
    this(null, null);
  })
  .seq(function () {
    this(null, 'and so on and so forth');
  })

Разумеется, если кто-то вызовет свой коллбэек с ненулевым первым аргументом(ошибка) то YAFF тут же плюнет на все функции, что там еще остались, и пойдет выполнять то, что написано в finally. Если вы забыли написать finally или посчитали, что в вашем коде ошибок уж точно не может быть то YAFF, в случае ошибки, бесцеремонно кинет исключение. Так что лучше, чтобы finally был.

Еще, здесь есть всякие синхронные функции для работы со стеком аргументов как с массивом: push, pull, filter, set, reduce, flatten, extend, unflatten, empty, shift, unshift, splice, reverse, save(name), load(fromName). Фуф, вроде все. Названия говорят сами за себя, но если что — не стесняйтесь спрашивать и заглядывать в исходник. Там один файл (main.js) и все очень просто устроено.

Ну и, конечно, то ради чего все затевалось — асинхронные функции для обработки массивов данных: forEach(YAFF не станет ждать когда закончится обработка этого блока, результаты работы этого блока не повлияют на стек. YAFF сразу перейдет к следующему хендлеру в цепочке), seqEach, parEach ( YAFF подождет пока отстреляются все функции, но результаты на стек не повлияют). seqMap, parMap, seqFilter, parFilter — делают то, что вы ожидаете; YAFF ждет пока они отработают а результаты работы этих блоков заменяют те значения что были на стеке раньше. Кроме того, у всех методов с префиксом par после функции можно указать число. Это число — лимит одновременно работающих асинхронных функций. Как то так:

var resizePhotos(photos, cb) {
  YAFF(photos)
    .parMap(function (photo) {
      asyncReisze(photo.image, photo.params, this);
    }, 10)
    .unaflatten()
    .finally(cb);
}

В этом примере мы асинхронно ресайзим пачку фотографий. Чтобы не перегружать сервер мы ресайзим не больше 10 картинок одновременно для каждого клиента. unflatten нужен для того, чтобы размазанные по стеку фотографии собрать в массив который будет одним аргументом для коллбэка.

Еще у YAFF есть методы mseq и mpar — это реверанс в сторону пользователей async. Принимают эти методы массив функций которые будут исполнены последовательно или параллельно. С тем же успехом можно было бы написать кучу seq() и par(), но иногда хочется нагенерировать функций динамически. У нас же все-таки функциональный язык, да?

Чтобы окончательно вас запутать я придумал следующий пример и нарисовал картинку (в отчаянной надежде что она все прояснит):

YAFF(['./'])
  .mseq([
    function (path1) {
      fs.readdir(path1, this);
    },
    function (arg) {
      this(null, arg);
    }
  ])
  .flatten()
  .parMap(function (file) {
    fs.stat(__dirname + '/' + file, this);
  })
  .map(function (stat) {
    return stat.size;
  })
  .unflatten()
  .finally(function (e, sizes) {
    log(sizes);
  });

Большая картинка


Надеюсь, что мне удалось понятно рассказать то, что я хотел; вы прониклись идеей и я не один такой кто не понимает зачем нужен async.

Все конкретные пожелания и предложения лучше оформлять в виде issues (на английском, не стесняйтесь — я тоже плохо его знаю, будем тренировать словесность вместе) на гитхабе или даже в виде пул-реквестов.

Ах да, библиотека есть на npmjs.org.

P.S. Только что в порыве страсти добавил синхронный метод apply — теперь все остальные синхронные методы можно выбросить. Но я оставлю для удобства и совместимости.
Tags:
Hubs:
+11
Comments 9
Comments Comments 9

Articles