Обзор фреймворка Luigi для построения последовательностей выполнения задач

    Доброго времени суток! У нас открылось совершенно новое направление обучения — BigData, а это значит, что немного расширяется горизонт материалов, которыми мы будем делиться с вами. Сегодня рассмотрим Luigi, как часть того, что раскрывается на нашем курсе.

    Luigi — фреймворк на языке Python для построения сложных последовательностей по выполнению зависимых задач. Довольно большая часть фреймворка направлена на преобразования данных из различных источников (MySql, Mongo, redis, hdfs) и с помощью различных инструментов (от запуска процесса до выполнения задач разных типов на кластере Hadoop). Разработан в компании Spotify и открыт в виде open source инструмента в 2012 году.

    Самое главное преимущество фреймворка — возможность выстраивать последовательности зависимых задач. Фреймворк разрешает зависимости, отслеживает граф выполнения, управляет запуском задач, обрабатывает ошибки с возможностью перезапуска нужных задач, распределяет ресурсы рабочих процессов с возможностью параллельной работы независимых частей графа задач.

    Для выполнения всех этих задач существуют и другие инструменты. Это Oozie, Pinball, Airflow (находится в статусе инкубации в Apache — проходит различные проверки, недавно вышел обзор на хабре). В данной статье рассмотрим только Luigi.



    Установка и документация

    Для установки можно воспользоваться командой:

    pip install luigi

    Документация доступна тут

    Задача (Task)

    В файле luigi_demo_tasks.py определяем класс, наследуемый от luigi.Task. Добавляем вызов run для возможности запуска из консоли.

    from luigi import Task, run
    
    class MyTask(Task):
       pass
    
    
    if __name__ == '__main__':
       run()

    Запускаем. Дополнительно указываем опцию --local-scheduler, чтобы пока что не обращаться к центральному планировщику задач.

    python -m luigi_demo_tasks MyTask --local-scheduler

    Примечание. В документации указан другой способ запуска без вызова run и с добавлением директории в PYTHONPATH.

    Видим следующий результат:

    DEBUG: Checking if MyTask() is complete
    /usr/local/lib/python3.4/dist-packages/luigi/worker.py:334: UserWarning: Task MyTask() without outputs has no custom complete() method
      is_complete = task.complete()
    INFO: Informed scheduler that task   MyTask__99914b932b   has status   PENDING
    INFO: Done scheduling tasks
    INFO: Running Worker with 1 processes
    DEBUG: Asking scheduler for work...
    DEBUG: Pending tasks: 1
    INFO: [pid 5369] Worker Worker(salt=920153035, workers=1, host=your_host, username=username, pid=5369) running   MyTask()
    INFO: [pid 5369] Worker Worker(salt=920153035, workers=1, host=your_host username=username, pid=5369) done      MyTask()
    DEBUG: 1 running tasks, waiting for next task to finish
    INFO: Informed scheduler that task   MyTask__99914b932b   has status   DONE
    DEBUG: Asking scheduler for work...
    DEBUG: Done
    DEBUG: There are no more tasks to run at this time
    INFO: Worker Worker(salt=920153035, workers=1, host=your_host, username=username, pid=5369) was stopped. Shutting down Keep-Alive thread
    INFO: 
    ===== Luigi Execution Summary =====
    
    Scheduled 1 tasks of which:
    * 1 ran successfully:
        - 1 MyTask()
    
    This progress looks :) because there were no failed tasks or missing external dependencies
    
    ===== Luigi Execution Summary =====
    

    В сообщениях видим, что задача MyTask ставится на выполнение и успешно выполняется. Повторный запуск дает точно такой же результат.

    Сделаем теперь так, чтобы MyTask выполнял некоторую работу. Для этого переопределим метод run из базового класса:

    from luigi import Task, run
    
    class MyTask(Task):
       def run(self):
           print("Hello world!")
    
    
    if __name__ == '__main__':
       run()

    В информации о выполнении задачи увидим следующее:

    INFO: [pid 7448] Worker Worker(salt=857719525, workers=1, host=your_host, username=username, pid=7448) running   MyTask()
    Hello world!
    INFO: [pid 7448] Worker Worker(salt=857719525, workers=1, host=your_host, username=username, pid=7448) done      MyTask()

    Гарантия однократного выполнения задачи

    Часто бывает так, что необходимо выполнить некую задачу единожды. Например из-за того, что её выполнение является ресурсоёмким. В Luigi задача считается сделанной, если сгенерирован некий объект (файл на машине, файл в hdfs, артефакт в MySql, таблица в Hive и другие), и можно проверить его существование. Для указания объекта необходимо переопределить в задаче метод output и в нем вернуть любого наследника или наследников класса Target. Для примера будем использовать LocalTarget — файл в локальной файловой системе.

    from luigi import Task, run, LocalTarget
    
    class MyTask(Task):
    
       filename = "hello_file.txt"
    
       def run(self):
           with open(self.filename, 'w') as f:
               f.write("Hello world!")
    
       def output(self):
           return LocalTarget(self.filename)
    
    
    if __name__ == '__main__':
       run()

    Первый запуск задачи генерирует файл hello_file.txt. Повторный запуск задачи сообщает нам, что все задачи выполнены.

    Зависимые задачи

    В luigi задачи могут зависеть от других задач. Для указания зависимости от другой задачи необходимо переопределить метод requires. В нем вернуть объект класса любой другой задачи. Определим две зависимые задачи, каждая из которых пишет файл.

    from luigi import Task, run, LocalTarget
    
    class MyTaskFirst(Task):
    
       filename = "first.txt"
    
       def run(self):
           with open(self.filename, 'w') as f:
               f.write("first!")
    
       def output(self):
           return LocalTarget(self.filename)
    
    
    class MyTaskSecond(Task):
    
       filename = "second.txt"
    
       def run(self):
           with open(self.filename, 'w') as f:
               f.write("second!")
    
       def requires(self):
           return MyTaskFirst()
    
       def output(self):
           return LocalTarget(self.filename)
    
    
    if __name__ == '__main__':
       run()

    Запуск задачи немного изменился. Указываем для запуска самую последнюю задачу, фреймворк сам определит и выполнит все зависимости.

    python -m luigi_demo_tasks MyTaskSecond --local-scheduler

    Внешние зависимости

    Иногда для выполнения задачи необходимы данные, генерируемые внешними системами. Так как в зависимостях в методе requires можно указывать только другие задачи, то нам понадобится задача-обертка для внешних данных. Для примера рассмотрим задачу подсчета частоты каждого символа в файле hello_file.txt:

    from collections import defaultdict
    
    from luigi import Task, run, LocalTarget, ExternalTask
    
    
    class ExternalData(ExternalTask):
       def output(self):
           return LocalTarget("hello_file.txt")
    
    
    class TaskWithExternalData(Task):
    
       filename = "char_counts.txt"
    
       def run(self):
           frequencies = defaultdict(int)
    
           with open(self.requires().output().path) as f_in:
               for line in f_in:
                   for c in line:
                       frequencies[c] += 1
    
           with open(self.filename, 'w') as f_out:
               for c, count in frequencies.items():
                   f_out.write('{}\t{}\n'.format(c, count))
    
       def requires(self):
           return ExternalData()
    
       def output(self):
           return LocalTarget(self.filename)
    
    
    if __name__ == '__main__':
       run()

    Планировщик

    Для централизованного выполнения задач можно использовать центральный планировщик. Его основные задачи: обеспечить отсутствие одновременного выполнения одинаковых задач, предоставить визуализацию всех запущенных задач с их зависимостями.

    Из документации запуск планировщика:

    $ luigid --background --pidfile <PATH_TO_PIDFILE> --logdir <PATH_TO_LOGDIR> --state-path <PATH_TO_STATEFILE>

    Адрес планировщика по умолчанию:
    localhost:8082/

    Запустим предыдущую задачу с использованием планировщика, предварительно удалив файл char_counts.txt:

    python -m luigi_demo_tasks TaskWithExternalData

    В планировщике увидим обе задачи:



    А так же граф зависимостей:



    Параллельный запуск задач

    Реализуем несколько зависимых задач, зависимость оформим в виде песочных часов: корневая задача типа 1 зависит от десяти одинаковых задач типа 2. Эти 10 задач типа 2 зависят от одной типа 3. она, в свою очередь, зависит от 10 задач типа 4, и все эти 10 зависят от одной задачи типа 5.

    Каждая задача пишет в результате работы файл со своим именем и номером, а так же спит 10 секунд. Это нужно для того, чтобы в планировщике можно было проследить порядок выполнения задач. Обратите внимание на то, что в задачу можно передать параметр. Это позволяет запускать разные по сути задачи с одинаковым кодом.

    Для реализации будем использовать наследование, так как это позволит сократить код. Запустим одновременно 5 процессов указав опцию --workers=5

    python -m luigi_demo_tasks Task1 --Task1-task-index=0 --workers=5

    В планировщике обновляя страницу увидим следующую последовательность:










    Одновременно выполняется не более пяти задач, а иногда только одна, так как от нее зависят все остальные. При этом выделенные воркеры простаивают.

    Запуск задач

    Для запуска задач необходимо использовать какой-либо внешний планировщик, например cron. Соответственно необходимо самостоятельно настраивать получение актуального кода для запуска, логирование и конфигурирование всех задач.

    Дополнительные возможности

    В случае возникновения ошибок в работе luigi может отправить email.

    Для каждого набора задач можно указать внешний файл с настройками, настроив перед запуском переменную LUIGI_CONFIG_PATH.

    Каждая задача может быть запущена с некоторым приоритетом, для указания приоритета необходимо в классе указать поле priority.

    Реализовано довольно много классов задач, связанных с типичными примерами обработки данных на кластере — hadoop streaming задача на Python, hadoop jar задача, spark задача и другие. При этом часто они требуют существенной доработки.

    Возможно выполнение любой задачи в виде запуска консольной команды с отслеживанием процесса выполнения.

    Код самой библиотеки чаще всего довольно прост. Для понимания того, как выстроить зависимые задачи, достаточно посмотреть исходный код базовых классов в luigi. У них довольно простой интерфейс и неплохая документация.

    Разработан крупной компанией. На данный момент стабильно есть несколько коммитов в мастер каждую неделю. Скорее всего будет и дальше поддерживаться.

    Недостатки

    Проверка того, выполнена ли задача, происходит только во время построения графа зависимостей. Из-за этого, если запускать выполнение набора задач чаще, чем он весь успевает выполниться, может возникнуть ситуация, когда планировщик запускает две одинаковые задачи.

    Нет встроенного планировщика.

    Нет способа получить метаинформацию о выполняемых задачах. Нельзя без вспомогательных средств получить документацию по запускаемым задачам, процессам обработки данных. Нет способа, например, получить общий список задач, которые зависят от данной задачи.

    Веб-интерфейс планировщика полезен только для того, чтобы увидеть, почему тот или иной набор задач не выполняется. Обычно можно посмотреть на граф зависимостей и увидеть, каких именно данных не хватает.

    Не совсем очевидна настройка логирования выполнения задач.

    Довольно часто встречается неожиданное поведение.

    Поддержка и развитие менее активное, чем, например у Airflow. Для сравнения в luigi и в airflow

    Вывод

    Luigi хорошо подходит для построения процессов обработки данных. Однако прежде, чем начинать использовать эту библиотеку, есть смысл попробовать реализовать несколько типичных задач на разных фреймворках и выбрать такой, который лучше подойдет именно для ваших задач.

    THE END

    Как всегда рады мнениям, вопросам и тапкам.
    Отус 71,42
    Компания
    Поделиться публикацией
    Реклама помогает поддерживать и развивать наши сервисы

    Подробнее
    Реклама
    Комментарии 3
    • –1
      Чего только не придумают, лишь бы make не использовать…
      • 0
        Хм. А при чём тут тут make? :)
        • 0
          Он и занимается тем, что планирует выполнение задач с учетом их зависимостей.

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое