Pull to refresh

Алертинг состояния выполения DAG`ов Apache Airflow в Telegram за 1 минуту

Level of difficultyEasy
Reading time4 min
Views3.2K

Коллеги, здарова! Вряд ли вы попали на эту статью случайно, по этому не будет никаких лирических отступлений и переходим сразу к делу.

Notifier

В Apache Airflow существует абстрактный класс BaseNotifier , который предоставляет базовую структуру для отправки уведомлений в Airflow с использованием различных  callback методов.

В этом приложении уже реализованы некоторые Нотификаторы, которые позволяют отправлять уведомления во внешние системы:

  • JiraNotifier

  • DiscordNotifier

  • SlackNotofier

  • SmtpNotifier

По названию понятно в какую систему уходят уведомления, более подробно ознакомится можно на странице с документацией - https://airflow.apache.org/docs/apache-airflow-providers/core-extensions/notifications.html

Но вышло так, что нет нотификатора, который отправляет уведомления в Telegram, по этому мы сейчас напишем максимально простой нотификатор, который будет выполнять эту задачу.

Для того чтобы создать свой нотификатор, необходимо унаследовать класс BaseNotifier, для того чтобы получать контекст информацию о выполнении DAG`ов. Для отправки сообщений в Telegram будем использовать пакет python-telegram-bot.

pip install python-telegram-bot==20.4

Далее реализовываем саму функцию для отправки сообщений в телеграм. И переопределяем метод notify, в котором будет происходить отправка уведомлений

from airflow.notifications.basenotifier import BaseNotifier
from telegram import Bot
import asyncio
import os


class TelegramNotification(BaseNotifier):

    @staticmethod
    async def send_tg_notifier(notifier_message):

        telegram_bot_token = os.environ.get("TELEGRAM_NOTIFIER_BOT_TOKEN")
        telegram_chat_id = os.environ.get("TELEGRAM_NOTIFIER_CHAT_ID")

        bot = Bot(token=telegram_bot_token)
        await bot.sendMessage(chat_id=telegram_chat_id, text=notifier_message, parse_mode="Markdown")

    def notify(self, context):

        task_id = context["ti"].task_id
        task_state = context["ti"].state
        task_log_url = context["ti"].log_url
        dag_name = context["ti"].dag_id

        message_template = (f"Dag name: `{dag_name}` \n"
                            f"Task id: `{task_id}` \n"
                            f"Task State: `🔴{task_state}🔴` \n"
                            f"Task Log URL: `{task_log_url}` \n"
                            )

        asyncio.run(self.send_tg_notifier(notifier_message=message_template))

Как вы видите для получения значений с токеном бота и id чата в Telegram используются переменные окружения.

Я составляю шаблон сообщения, используя параметры:

  • dag_name

  • task_id

  • task_state

  • task_log_url

Полный список элементов, полученных от TaskInstance вы можете найти в документации - https://airflow.apache.org/docs/apache-airflow/2.0.1/_api/airflow/models/taskinstance/index.html

Обратите внимание, что я хочу получать сообщение в канал, только если DAG завершается неудачно, по этому я сразу в коде хардкожу красные кружки (🔴)

Для того чтобы Apache Airflow мог сам вызывать этот класс и уведомлять нас об ошибках, то нужноо модифицировать DAG`и.

для начала нужно импортировать пакет с Нотификатором:

from Notifier.Notifier import TelegramNotification

Далее при объявлении дефолтных значений, необходимо вызвать этот метод для on_failure_callback

from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.empty import EmptyOperator
from airflow.decorators import dag, task
from datetime import datetime
from Notifier.Notifier import TelegramNotification
import pendulum


@dag(
	default_args={'owner': 'airflow', 'on_failure_callback': TelegramNotification()},
	schedule_interval="0 6 * * *",
	start_date=pendulum.datetime(2023, 10, 10, tz="Europe/Moscow"),
	catchup=False,
	description='Этот DAG делает что-то явно сложное'
)

Apache Airflow поддерживает еще несколько разных типов callback:

  • on_success_callback - Вызывается при успешном выполнении задачи

  • on_failure_callback - Вызывает при ошибки задачи

  • on_execute_callback - Вызывается перед выполнением задачи

  • on_retry_callback - Вызывается, в случае если задача завершилась неудачно и готова выполнится повторно

  • sla_miss_callback - Вызывается, когда задача когда задача не успевает выполнится в свой выделенный промежуток времени

Для того, чтобы Нотификатор смог отправлять уведомления, нужно добавить ему значения chat id и bot token, которые можно получить при создании бота и канала. Эти значения я передаю через переменные окружения TELEGRAM_NOTIFIER_BOT_TOKEN и TELEGRAM_NOTIFIER_CHAT_ID

Так как Apache Airflow я запускаю в K8S и использую community chart, то описываю переменные окружения в чарте helm.

existingSecret: &basic airflow-secret

extraEnv:
  - name: TELEGRAM_NOTIFIER_BOT_TOKEN
    valueFrom:
      secretKeyRef:
        name: *basic
        key: TELEGRAM_NOTIFIER_BOT_TOKEN
  - name: TELEGRAM_NOTIFIER_CHAT_ID
    valueFrom:
      secretKeyRef:
        name: *basic
        key: TELEGRAM_NOTIFIER_CHAT_ID

Сами же значения этих переменных хранятся в secrets.yaml.

Результат

В случае завершения DAG с ошибкой в Telegram канал приходит уведомление:

Пример сообщения об ошибке
Пример сообщения об ошибке

Обратите внимание на поле Task Log URL. Мы можем получить ссылку именно на лог об ошибке. В целом распарсить его и отправить прямо в Telegram не составит труда, но часто бывает, что логи огромные и в мессенджере это может быть не читаемо.

Итог

В данной микро-статье я хотел предложить вам готовый кусок кода, который реализует отправку сообщений об ошибке выполнения DAG Apache Airflow в ваш канал Telegram, а так же мельком рассказал о разных параметрах Airflow, такие как callback и Notifier, которые вы можете использовать в будущем.

Я использую этот мини-уведомитель, но вы можете легко увеличить его функционал и использовать для других частных случаев.


Бойся своих желаний, они имеют свойство сбываться

Tags:
Hubs:
Total votes 10: ↑8 and ↓2+6
Comments8

Articles