company_banner

Airflow Workshop: сложные DAG’и без костылей


    Привет, Хабр! Меня зовут Дина, и я занимаюсь разработкой игрового хранилища данных для решения задач аналитики в Mail.Ru Group. Наша команда для разработки batch-процессов обработки данных использует Apache Airflow (далее Airflow), об этом yuryemeliyanov писал в недавней статье. Airflow — это opensource-библиотека для разработки ETL/ELT-процессов. Отдельные задачи объединяются в периодически выполняемые цепочки задач — даги (DAG — Directed Acyclic Graph).


    Как правило, 80 % проекта на Airflow — это стандартные DAG’и. В моей статье речь пойдёт об оставшихся 20 %, которые требуют сложных ветвлений, коммуникации между задачами — словом, о DAG’ах, нуждающихся в нетривиальных алгоритмах.


    Управление потоком


    Условие перехода


    Представьте, что перед нами стоит задача ежедневно забирать данные с нескольких шардов. Мы параллельно записываем их в стейджинговую область, а потом строим на них целевую таблицу в хранилище. Если в процессе работы по какой-то причине произошла ошибка — например, часть шардов оказалась недоступна, — DAG будет выглядеть так:



    Для того чтобы перейти к выполнению следующей задачи, нужно обработать ошибки в предшествующих. За это отвечает один из параметров оператора — trigger_rule. Его значение по умолчанию — all_success — говорит о том, что задача запустится тогда и только тогда, когда успешно завершены все предыдущие.


    Также trigger_rule может принимать следующие значения:


    • all_failed — если все предыдущие задачи закончились неуспешно;
    • all_done — если все предыдущие задачи завершились, неважно, успешно или нет;
    • one_failed — если любая из предыдущих задач упала, завершения остальных не требуется;
    • one_success — если любая из предыдущих задач закончилась успешно, завершения остальных не требуется.

    Ветвление


    Для реализации логики if-then-else можно использовать оператор ветвления BranchPythonOperator. Вызываемая функция должна реализовывать алгоритм выбора задачи, который запустится следующим. Можно ничего не возвращать, тогда все последующие задачи будут помечены как не нуждающиеся в исполнении.


    В нашем примере выяснилось, что недоступность шардов связана с периодическим отключением игровых серверов, соответственно, при их отключении никаких данных за нужный нам период мы не теряем. Правда, и витрины нужно строить с учётом количества включённых серверов.


    Вот как выглядит этот же DAG со связкой из двух задач с параметром trigger_rule, принимающим значения one_success (хотя бы одна из предыдущих задач успешна) и all_done (все предыдущие задачи завершились), и оператором ветвления select_next_task вместо единого PythonOperator’а.


    # Запускается, когда все предыдущие задачи завершены
    all_done = DummyOperator(task_id='all_done', trigger_rule='all_done', dag=dag)
    # Запускается, как только любая из предыдущих задач успешно отработал
    one_success = DummyOperator(task_id='one_success', trigger_rule='one_success', dag=dag)
    
    # Возвращает название одной из трёх последующих задач
    def select_next_task():
       success_shard_count = get_success_shard_count()
       if success_shard_count == 0:
           return 'no_data_action'
       elif success_shard_count == 6:
           return 'all_shards_action'
       else:
           return 'several_shards_action'
    
    select_next_task = BranchPythonOperator(task_id='select_next_task',
                                            python_callable=select_next_task,
                                            dag=dag)


    Документация по параметру оператора trigger_rule
    Документация по оператору BranchPythonOperator


    Макросы Airflow


    Операторы Airflow также поддерживают рендеринг передаваемых параметров с помощью Jinja. Это мощный шаблонизатор, подробно о нём можно почитать в документации, я же расскажу только о тех его аспектах, которые мы применяем в работе с Airflow.


    Шаблонизатор обрабатывает:


    • строковые параметры оператора, указанные в кортеже template_field;
    • файлы, переданные в параметрах оператора, с расширением, указанным в template_ext;
    • любые строки, обработанные функцией task.render_template сущности task, переданной через контекст. Пример функции PythonOperator’а с переданным контекстом (provide_context=True):

    def index_finder(conn_id, task, **kwargs):
       sql = "SELECT MAX(idtransaction) FROM {{ params.billing }}"
       max_id_sql = task.render_template("", sql, kwargs)
       ...

    Вот как мы применяем Jinja в Airflow:


    1. Конечно же, это работа с датами. {{ ds }}, {{ yesterday_ds }}, {{ tomorrow_ds }} — после препроцессинга эти шаблоны заменяются датой запуска, днём до него и следующим днём в формате YYYY-MM-DD. То же самое, но только цифры, без дефисов: {{ ds_nodash }}, {{ yesterday_ds_nodash }}, {{ tomorrow_ds_nodash }}
    2. Использование встроенных функций. Например, {{ macros.ds_add(ds, -5) }} — это способ отнять или добавить несколько дней; {{ macros.ds_format(ds, “%Y-%m-%d”, “%Y”) }} — форматирование даты.
    3. Передача параметров. Они передаются в виде словаря в аргументе params, а получаются так: {{ params.name_of_our_param }}
    4. Использование пользовательских функций, точно так же переданных в параметрах. {{ params.some_func(ds) }}
    5. Использование встроенных библиотек Python:
      {{ (macros.dateutil.relativedelta.relativedelta(day=1, months=-params.retention_shift)).strftime("%Y-%m-%d") }}
    6. Использование конструкции if-else:
      {{ dag_run.conf[“message”] if dag_run else “” }}
    7. Организация циклов:
      {% for idx in range(params.days_to_load,-1,-1) %}
      {{ macros.ds_add(ds, -idx) }}
      {% endfor %}

    Приведу несколько примеров рендеринга параметров в интерфейсе Airflow. В первом мы удаляем записи старше количества дней, передаваемого параметром cut_days. Так выглядит sql c использованием шаблонов jinja в Airflow:



    В обработанном sql вместо выражения уже подставляется конкретная дата:



    Второй пример посложнее. В нём используется преобразование даты в unixtime для упрощения фильтрации данных на источнике. Конструкция "{:.0f}" используется, чтобы избавиться от вывода знаков после запятой:



    Jinja заменяет выражения между двойными фигурными скобками на unixtime, соответствующий дате исполнения DAG’а и следующей за ней дате:



    Ну и в последнем примере мы используем функцию truncshift, переданную в виде параметра:



    Вместо этого выражения шаблонизатор подставляет результат работы функции:



    Документация по шаблонизатору jinja


    Коммуникация между задачами


    В одном из наших источников интересная система хранения логов. Каждые пять дней источник создаёт новую таблицу такого вида: squads_02122017. В её названии присутствует дата, поэтому возник вопрос, как именно её высчитывать. Какое-то время мы использовали таблицы с названиями из всех пяти дней. Четыре запроса падали, но trigger_rule=’one_success’ спасал нас (как раз тот случай, когда выполнение всех пяти задач необязательно).


    Спустя какое-то время мы стали использовать вместо trigger_rule встроенную в Airflow технологию для обмена сообщениями между задачами в одном DAG’е — XCom (сокращение от cross-communication). XCom’ы определяются парой ключ-значение и названием задачи, из которой его отправили.



    XCom создаётся в PythonOperator’е на основании возвращаемого им значения. Можно создать XCom вручную с помощью функции xcom_push. После выполнения задачи значение сохраняется в контексте, и любая последующая задача может принять XCom функцией xcom_pull в другом PythonOperator’е или из шаблона jinja внутри любой предобработанной строки.


    Вот как выглядит получение названия таблицы сейчас:


    def get_table_from_mysql(**kwargs):
      """
      Выбирает существующую из пяти таблиц и пушит значение
      """
      hook = MySqlHook(conn_name)
      cursor = hook.get_conn().cursor()
      cursor.execute(kwargs['templates_dict']['sql'])
      table_name = cursor.fetchall()
      # Посылаем XCom с названием ‘table_name’
      kwargs['ti'].xcom_push(key='table_name', value=table_name[0][1])
      # Второй вариант отправления XCom’а:
      # return table_name[0][1]
      # Можно получить по названию задачи-отправителя без ключа
    
    # Запрос, вынимающий из метаданных PostgreSQL название нужной таблицы
    select_table_from_mysql_sql = '''
    SELECT table_name
      FROM information_schema.TABLES
     WHERE table_schema = 'jungle_logs'
       AND table_name IN
        ('squads_{{ macros.ds_format(ds, "%Y-%m-%d", "%d%m%Y") }}',
         'squads_{{ macros.ds_format( macros.ds_add(ds, -1), "%Y-%m-%d", "%d%m%Y") }}',
         'squads_{{ macros.ds_format( macros.ds_add(ds, -2), "%Y-%m-%d", "%d%m%Y") }}',
         'squads_{{ macros.ds_format( macros.ds_add(ds, -3), "%Y-%m-%d", "%d%m%Y") }}',
         'squads_{{ macros.ds_format( macros.ds_add(ds, -4), "%Y-%m-%d", "%d%m%Y") }}')
    '''
    
    select_table_from_mysql = PythonOperator(
       task_id='select_table_from_mysql',
       python_callable=get_table_from_mysql,
       provide_context=True,
       templates_dict={'sql': select_table_from_mysql_sql},
       dag=dag
    )
    
    # Получаем XCom из задачи 'select_table_from_mysql' по ключу 'table_name'
    sensor_jh_squad_sql = '''
    SELECT 1
      FROM jungle_logs.{{ task_instance.xcom_pull(task_ids='select_table_from_mysql',
                                                  key='table_name') }}
     LIMIT 1
    '''

    Ещё один пример использования технологии XCom — рассылка email-уведомлений с текстом, отправленным из PythonOperator’а:


    kwargs['ti'].xcom_push(key='mail_body', value=mail_body)

    А вот получение текста письма внутри оператора EmailOperator:


    email_notification_lost_keys = EmailOperator(
       task_id='email_notification_lost_keys',
       to=alert_mails,
       subject='[airflow] Lost keys',
       html_content='''{{ task_instance.xcom_pull(task_ids='find_lost_keys',
                                                  key='mail_body') }}''',
       dag=dag
    )

    Документация по технологии XCom


    Заключение


    Я рассказала о способах ветвления, коммуникации между задачами и шаблонах подстановки. С помощью встроенных механизмов Airflow можно решать самые разные задачи, не отходя от общей концепции реализации DAG’ов. На этом интересные нюансы Airflow не заканчиваются. У нас с коллегами есть идеи для следующих статей на эту тему. Если вас заинтересовал этот инструмент, пишите, о чём именно вам хотелось бы прочитать в следующий раз.

    Mail.Ru Group 1 101,74
    Строим Интернет
    Поделиться публикацией
    Комментарии 22
    • +1

      Мы используем у себя AirFlow для запуска периодических задач. И с ним постоянно какие-то проблемы. Регулярно видим "ядерный гриб". Сабдаги просто не работают. Старые issue закрываются, новые открываются. Давно уже хотим съехать с AirFlow на какой-то другой сервис. Но вот на какой — непонятно. Может кто-то посоветует?

      • 0
        Согласна с seidzi, airflow очень гибкий. Хотя и без недочётов.
        У нас гриб появляется в трёх случаях:
        — когда в даге что-то не в порядке и тогда мы используем консольную команду list_dags, чтобы выяснить, что именно;
        — при маркировке mark success за большое количество дней;
        — при большом или неправильном запросе при работе с источниками напрямую, через интерфейс Ad Hoc Query.
        Про сабдаги я сказать ничего не могу, мы ими ещё не пользовались.

        Что касается альтернативы, из предложенных open source продуктов слышала хорошие отзывы о Luigi. Он попроще, чем airflow, но для некоторых задач подходит лучше.
        • 0

          Смотрели и на Luigi и на Oozie, но они выглядят почти такими же. Не хочется менять шило на мыло.

      • +1
        а мы используем Oozie и хотим перейти с него на Airflow, сейчас тестирую его возможности и по моим наблюдениям он очень гибкий и нужно уметь его «готовить»
        • +1

          Сабдаги отлично работают. Мы их используем для:


          • объединения несколько задач в один с целью улучшения читаемости всего ДАГа.
          • переиспользования кода, когда один набор задач используется в нескольких сабдагов.

          У них есть свои минусы в том, что появляется дополнительная сущность сабдага, которая одновременно является задачей в главном ДАГе и также является ДАГом для внутренних тасков. Т.е. при чистке и повторном прогоне например, надо не забывать и то и другое вычищать из базы.

        • 0
          Подскажите, как вы решаете задачу версионирования дагов/тасков? И как сбрасываете кеш дагов?
          • 0
            У нас код дагов лежат в гите, мы их разрабатываем локально и после внесения изменений обновляем на сервере.

            Что касается сбрасывания кеша, я не поняла, что имеется ввиду. Если обновление дага по свежему коду, то пользуемся кнопкой «Обновить» в веб-интерфейсе.
            • +1
              Я так понимаю, что речь идёт об устаревших данных. Например когда ДАГ переименовывается, по факту файла уже нет, а в вебсервесе он ещё висит и в базе есть. Мы в таком случае идём в базу Postgres airflow и там запросами вручную чистим устаревшие ДАГи.
              • 0
                где то читал что в новой версии должны для этого сделать кнопку в web интерфейсе, но пока что да другого решения кроме как удалять руками нет, можно так же сделать свою тулзу по чистке старых дагов
          • 0
            А можно ли вообще не делать сложной логики внутри Airflow, а вместо этого делать параллельную загрузку и все проверки внутри Python скрипта?

            DAG при этом будет выглядеть так:
            START -> export_from_all_shards_and_action -> FINISH

            Внутри скрипта запускаем 3-6-10-400-1000 потоков и загружаем данные. Сразу после загрузки проверяем результат и, если были фатальные ошибки, то отправляем на рестарт. Иначе делаем какое-то полезное действие.

            Со временем образуется 5-7 стандартных загрузчиков из принципиально разных источников. И можно эту логику в отдельные классы вынести, и просто запускать с разными параметрами.

            Логика ETL склонна усложнятся со временем. Есть риск, что через годик во всех хитросплетениях и костыликах уже сложно будет разобраться.
            • +1
              Это вполне себе вариант, но тогда Airflow вырождается в планировщик задач на Питоне.

              Основное преимущество использования всех встроенных возможностей Airflow на мой взгляд — это простая локализация ошибки. Можно с одного взгляда понять, что пропал доступ к источнику или задача в соседнем таске никак не может просчитаться и после починки перезапустить только эту часть.

              Что же касается костыликов и хитросплетений, во-первых, у нас есть регламент, согласно которому мы называем задачи и используем те или иные сенсоры и операторы, а во-вторых, тяжёлую повторяющуюся логику мы инкапсулируем в самописные операторы.
              • 0
                Было б интересно про этот регламент и прочие know how почитать, позволяющие не утонуть.
                • +1
                  На понимание темы на уровне рекомендаций в разных ситуациях я пока не претендую.
                  А если рассматривать конкретно наш регламент, это достаточно скучное чтиво вроде чтения ГОСТ'ов. Два главных момента — удобство и последовательное их соблюдения — кажутся очевидными. Можно при случае обсудить.
                  • 0
                    Ну вот за счёт чего достигаются удобство и непотопляемость — вопрос кажется интересный
            • 0
              Вот бы жаргонизмы типа «шарды», «стеджинговый сервер» и т.п. были б вкратце раскрыты через более широко известные словечки при первом упоминании…
              • 0

                У нас недавно возникла небольшая проблема, может подскажите как из неё лучше выйти?


                Есть DAG, где находится 120 задач, 60 из них на первом уровне, т.е. зависимостей никаких нет. Выполняются, в основном, довольно быстро (от 5 секунд до 120 секунд). Когда мы делаем backfill на этот DAG на 30 дней, то получается 30*120 = 3600 задач, из которых сразу в очередь встаёт 1800. При этом, если попадаются лёгкие задачи, airflow worker быстро его отрабатывает и потом долго перебирает все задачи из пула. В итоге вместо 15 worker, которые положены по конфигу, успевает отработать только 1-2. Это можно решить дополнительной оберткой bash скриптом, где будут задаваться начальные и конечные даты, но, кажется, должен быть вариант получше.


                Под airflow у нас отдельная машина (24 CPU, 64Gb памяти), работает с помощью LocalExecutor. Celery пока не прикуритили. Конфиги airflow.cfg у нас следующие:


                [core]
                parallelism = 16
                dag_concurrency = 15
                max_active_runs_per_dag = 16


                [scheduler]
                max_threads = 10

                • 0
                  У нас одно из слабых мест airflow — это шедулер. Время от времени он не справляется с несовершенством некоторых дагов и падает. Поэтому у нас на Кроне стоит проверка его состояния и перезапуск при необходимости (с несовершенствами мы тоже работаем, конечно :) ). Если бы я кого и подозревала в ситуации, которая у вас сложилась, так это его. Может в его логах можно что-нибудь раскопать?

                  Ещё я бы посмотрела на параметры вроде этого:
                  # The scheduler constantly tries to trigger new tasks (look at the
                  # scheduler section in the docs for more information). This defines
                  # how often the scheduler should run (in seconds).
                  scheduler_heartbeat_sec = 5

                  Мы, кстати, довольно много внимания уделяем pool'ам, в которых запускаются задачи и priority_weight, которые помогают ранжировать таски в очереди по приоритетам. Это не совсем в тему данной проблемы, но, например, выделение тяжёлых тасков в отдельный pool с ограничением максимального количества одновременно-запущенных задач может в будущем уберечь от пиков загрузки сервера.

                  Варианты решения вашей проблемы, которые мне пришли в голову:
                  * Нужно ли вам, чтобы все дни стартовали одновременно? Если нет, можно выставить параметр дага depends_on_past в True, тогда таски последующих дней не будут становиться в очередь, пока эти же таски в предыдущем дне не выполнятся.
                  * Параметризация дагов. У нас есть несколько проектов, по которым данные на источнике меняются задним числом. Иногда нам нужно забирать данные за довольно большой период времени. Тогда мы вешаем переключение логики на параметр airflow: обычно это или флаг «пересчитать всю историю» или дата, начиная с которой нужно забрать все данные. И в даге зашиваемся на этот параметр. Но это как раз вариант с баш-скриптом, наверное.
                  • 0

                    Спасибо за ответ.
                    У нас шедулер работает через systemd, и он вроде сам следит за процессом шедулера.


                    Про prioriy_weight думали менять, а вот про pool спасибо, попробуем.


                    По поводу предлагаемых вариантов решения:


                    • depends_on_past может здесь помочь, хотя явно они получаются независимы, но это хотя бы ограничит одним днейм.
                    • в качестве параметризации дагов мы решили использовать файлы конфигурации в формате YAML и соответственно там держать все ключевые параметры дагов и всех возможных изменяемых параметров сабдагов и тасков. В частности у нас в переменной airflow содержится ключевое слово production или staging и по нему уже парсится соотвествующая ветка YAML конфига.

                    P.S. Еще при scheduler_heartbeat_sec = 5 у нас довольно много логов сыпится, почти под 1Гб в сутки, при этом в основном ненужная инфа. Вот думаем либо величину этого параметра увеличить, либо логирование настроить на другой уровень информативности.

                    • 0
                      Кстати, у нас есть маленький чатик для разработчиков airflow: t.me/ruairflow
                      Присоединяйтесь!
                • 0

                  del

                  Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                  Самое читаемое