company_banner

Работа с потоком логов в реальном времени с помощью Heka. Опыт Яндекс.Денег

    image alt text


    В этом материале я расскажу о том, как в Яндекс.Деньгах организована система сбора и доставки серверных логов платежных сервисов. Доставкой логов я занимаюсь весь последний год, и за это время накопилось достаточно нюансов, чтобы поделиться опытом с общественностью.


    Система построена на базе стека EHK (Elasticsearch/Heka/Kibana), с прицелом на работу практически в реальном времени. Особый упор сделаю на тонкие места и нюансы обработки миллиардов строк текста в сутки.


    Я верю в три вещи: мониторинг, логи и бэкапы.

    Для быстрого доступа к оперативным логам в Яндекс.Деньгах используется ElasticSearch, получающий данные из почти сотни микросервисов, которые и составляют платежный сервис.


    Под «оперативными» я подразумеваю логи, доступные в реальном времени: задержка доставки новых записей от файла лога до кластера Elasticsearch сейчас составляет менее секунды. Служба мониторинга всегда точно знает состояние сервиса в любой момент, а разработчики могут оперативно оценивать и корректировать поведение новой версии каждого микросервиса сразу после релиза.


    Но так было не всегда. Когда пять месяцев назад я пришел в компанию, мне поставили задачу наладить работу по доставке оперативных логов сервиса до кластера ElasticSearch (далее просто ES). На тот момент использовались четыре схемы их разбора и доставки до ES:


    • Heka → TCP output → Heka → ES.


    • Heka → Apache Kafka → Apache Flume → ES.


    • SyslogNg → ES.


    • nxlog → nxlog → ES.

    Почти все они работали неидеально: кластер Kafka был ненадежен, Flume периодически вис, отчего ES впадал в ступор.


    Логи – это на самом деле очень простая вещь: множество файлов, которые на боевых серверах быстро растут в объемах. Поэтому простые решения в этом случае — самые надежные.

    Так как «чем больше в зонтике сочленений — тем выше вероятность его поломки», то я выбросил все неработающие и неоправданно сложные схемы и остановился на одной, в которой роль обработчика и транспорта выполняла бы Heka.


    image alt text


    Heka забирает логи и по протоколу TCP отправляет их в другую Heka для дальнейшей пересылки в ElasticSearch Cluster.


    Идея следующая: на каждом сервере установлена Heka, логи всех сервисов этого сервера упаковываются в бинарный протокол Protobuf и отправляются на приемную Heka для разбора, фильтрации и отправки в ES.


    Почему не классический стек ELK и почему deprecated Heka, а не Logstash


    С точки зрения конфигурации и состава ПО наш кластер ES выглядит следующим образом:


    • ElasticSearch 2.2.


    • Kibana 4.4.2.


    • Две мастер-ноды: Intel Xeon 2x E5-2660, 64 ГБ ОЗУ, 2x 146 ГБ RAID-10.


    • Клиентская нода с установленной Kibana: Intel Xeon 2xE5-2660, 64 ГБ ОЗУ, 2x146 ГБ RAID-10.


    • Четыре дата-ноды: Intel Xeon 2x E5-2640 v3; 512 ГБ ОЗУ, 3x16 ТБ RAID-10.

    Все ноды ES находятся в одной локальной сети и подключены к одному маршрутизатору, что позволяет осуществлять общение внутри кластера по транспортному протоколу на максимальной скорости. Такая организация заметно ускоряет размещение индексов и rebalance кластера.


    В современном мире стек Elasticsearch/Logstash/Kibana стал уже практически стандартом де-факто для работы с логами. И если против Elasticsearch и Kibana нет никаких возражений, то с Logstash есть один нюанс — он создан на jRuby (написанный на Java интерпретатор языка Ruby) и требует наличия в системе JVM. Учитывая, что Яндекс.Деньги – это множество микросервисов, размещенных на сотнях физических серверов и виртуальных контейнеров, ставить в каждый из них тяжеловесные Logstash и JAVA было бы неправильно. На Heka выбор пал из-за ее простоты, надежности, легкости, умения фильтровать проходящие сообщения и отличной буферизации данных.


    Что касается статуса продукта (deprecated) – для нас это не аргумент. Лук и стрелы для военных целей тоже deprecated, но это никак не мешает вам выстрелить кому-нибудь в голову с гарантированным результатом. Если же понадобится, например, нестандартный плагин или обработчик, то доработать продукт поможет целый штат опытных разработчиков.


    Но это всё была теория, а при переходе к практике начались проблемы.


    Дьявол прятался в объеме данных


    Учитывая финансовую специфику нашей работы, почти все сервисы пишут много разной информации в логи. Для примера и понимания масштаба: объем логов некоторых из компонентов системы доходит до 250 тысяч строк в минуту.


    Ни одна Heka, на каком бы мощном железе она ни стояла, в одиночку такой объём не обработает — неизбежны проседания производительности и потеря данных. Ни то, ни другое, конечно же, совершенно недопустимо, поэтому на помощь приходит HAProxy. Итоговая схема получилась следующей:


    image alt text


    На схеме изображено общее направление трафика логов от Heka-источников до связки HAProxy + Heka.


    На каждом сервере стоит одна Heka, собирающая логи микросервисов этого сервера. Данные собираются, пакуются в Protobuf и по TCP уходят на балансировщик нагрузки, обслуживающий дата-центр. Бэкендами работают HAProxy, расположенные непосредственно на дата-нодах ES, за которыми стоят пулы Heka. В свою очередь, они принимают данные, переупаковывают их в ESJson и отправляют в локальную дата-ноду по протоколу HTTP.


    … и в разных форматах лог-файлов


    Несмотря на то, что основным языком в компании является Java и логи выводятся через стандартную библиотеку log4j, не было единого принятого формата на момент постройки «кластера мечты». Каждый микросервис писал логи собственного типа, включая вариации в наборе выводимых полей и форматах даты-времени. Не хотелось ждать, пока разработка приведёт логи к общим форматам, поэтому движение к цели было параллельным. Одновременно с разбором существующих форматов логов заводились задачи на доработку, а по мере релиза новых версий с правильными форматами менялись и настройки сборщиков.


    Перейдем к самому соку — нюансам настройки


    Поведение Heka описывается .toml файлами, которые при запуске собираются в единую конфигурацию, и в зависимости от описанных блоков Heka строит модель обработки данных.


    Каждый блок проходит через несколько этапов:


    • Input — входной поток (это может быть файл, TCP/UDP input, данные, прочитанные из Kafka, событий Docker контейнеров и т.п.).


    • Splitter — тут мы указываем начало и конец каждого блока данных в потоке. Обязательный этап для многострочных данных вроде Java Stacktrace.


    • Decoder — описывает правила декодирования входящих данных. Мы в основном используем Regex декодеры.


    • Filters — этап фильтрации и изменения данных.


    • Encoders — кодирование потока данных под формат получателя.


    • Output — здесь мы описываем, как и куда данные должны отправиться.

    Все эти этапы — просто плагины на Go и Lua. При необходимости можно написать что-нибудь своё. Например, плагин-фильтр на Lua, который будет отсекать отправку в ES записей о мониторинговых запросах к сервису; или вырезать из логов конфиденциальные данные.


    Хека в древнеегипетской мифологии — бог магии. И то, что Heka позволяет делать с логами — просто волшебно.

    Параметры сервера-источника логов


    Разберем конфигурацию Heka на примере сервера-источника логов и файла service.toml.


    [money-service-log]
    type = "LogstreamerInput"
    log_directory = "/var/log/tomcat"
    file_match = "money-service.log"
    splitter = "RegexSplitter"
    decoder = "service_decoder"

    Простейший случай — один файл, ротация происходит системными средствами. Если файлов много и они различаются по форматам, то следует описывать каждый парой input/decoder. За более детальным описанием лучше обратиться к официальной документации. Если что-то остается непонятным — обязательно спрашивайте в комментариях.


    [RegexSplitter]
    delimiter = '\n(\[\d\d\d\d-\d\d-\d\d)'
    delimiter_eol = false

    Так как логи могут быть многострочными (те же stacktraces), не забываем про RegexSplitter, который даёт Heka понять, где заканчивается один блок текста и начинается другой.


    [service_decoder]
    type = "PayloadRegexDecoder"
    match_regex = '^\[(?P<timestamp>\d{4}-\d{2}-\d{2}T[\d:.\+]+\])\s+(?P<level>[A-Z]+)\s+\[(?P<thread>.*)\]\s+\[(?P<context>\S*)\]\s+\[(?P<traceid>\S*)\]\s+\[(?P<unilabel>\S*)\]\s\[(?P<class>\S+)\]\s-\s(?P<msg>.*)'
    
    log_errors = true
        [service_decoder.message_fields]
        @timestamp = "%timestamp%"
        level = "%level%"
        thread = "%thread%"
        context = "%context%"
        traceid = "%traceid%"
        unilabel = "%unilabel%"
        class = "%class%"
        msg = "%msg%"

    В match_regex описываем строки лога регулярным выражением в стандарте языка Go. Регулярные выражения в Go практически совпадают со стандартным PCRE, но есть ряд нюансов, из-за которых Heka может отказаться стартовать. Например, некоторые реализации PCRE простят такой синтаксис:


    (?<groupname>.*)

    А вот GOLANG — не простит.


    С помощью параметра log_errors собираем все ошибки в отдельный лог — они понадобятся позже.


    [ServiceOutput]
    type = "TcpOutput"
    address = "loadbalancer.server.address:port"
    keep_alive = true
    message_matcher = "Logger == 'money-appname'"
    use_buffering = true
    
        [ServiceOutput.buffering]
        max_file_size = 100857600
        max_buffer_size = 1073741824
        full_action = "block"

    Буферизация выходящего потока — одна из отличных возможностей Heka. По умолчанию она хранит выходной буфер по следующему пути:


    /var/cache/hekad/output_queue/OutputName

    В настройках мы ограничиваем размер каждого файла буфера объемом 100 МБ, а также устанавливаем суммарный размер кеша для каждого Output-модуля в 1 ГБ. Параметр full_action может принимать три значения:


    • shutdown — при переполнении буфера Heka останавливается;


    • drop — при переполнении буфера он начинает работать как стек, удаляя старые сообщения в очереди;


    • block — при заполнении буфера Heka приостанавливает все операции и ждёт, пока не появится возможность отправить данные.

    С block вы гарантированно не потеряете ни одной строки лога. Единственный минус в том, что при обрыве соединения или деградации канала вы получите резкий скачок трафика при возобновлении связи. Это связано с тем, что Heka отправляет накопленный буфер, пытаясь вернуться к обработке в реальном времени. Приёмный пул нужно проектировать с запасом, обязательно учитывая возможность таких ситуаций, иначе можно с лёгкостью провернуть DDoS самих себя.


    Кстати, по поводу использования двойных и одинарных кавычек в конфигурации Heka — подразумевается следующее:


    • Значения переменных в одинарных кавычках по умолчанию рассматриваются как raw string. Спецсимволы экранировать не нужно.


    • Значения переменных в двойных кавычках рассматриваются как обычный текст и требуют двойного экранирования при использовании в них регулярных выражений.

    Этот нюанс в свое время попортил мне немало крови.


    Конфигурация бэкендa


    Бэкенд для балансировщика — это связка из HAProxy и трёх экземпляров Heka на каждой дата-ноде ES.


    В HAProxy всё довольно просто и пояснений, как мне кажется, не требует:


    listen pool_502
        bind 0.0.0.0:502
        balance roundrobin
        default-server fall 5 inter 5000 weight 10
        server heka_1502 127.0.0.1:1502 check
        server heka_2502 127.0.0.1:2502 check
        server heka_3502 127.0.0.1:3502 check

    На каждом сервере запущено три инстанса Heka, отличающихся только портами:


    [Input_502_1]
    type = "TcpInput"
    address = "0.0.0.0:1502"
    keep_alive = true
    keep_alive_period = 180
    decoder = "MultiDecoder_502"
    
    [MultiDecoder_502]
    type = "MultiDecoder"
    subs = ['Service1Decoder', 'Service2Decoder']
    cascade_strategy = "first-wins"
    log_sub_errors = true

    В конфигурации используется MultiDecoder, так как через один порт проходят логи многих сервисов. Политика first-wins означает, что после первого совпадения дальнейший перебор декодеров прекращается.


    [Service1Decoder]
    type = "ProtobufDecoder"
    
    [Service2Decoder]
    type = "ProtobufDecoder"
    
    [Service1Encoder]
    type = "ESJsonEncoder"
    index = "service1-logs-%{%Y.%m.%d}"
    es_index_from_timestamp = false
    type_name = "%{Logger}"
    fields = [ "DynamicFields", "Payload", "Hostname" ]
    dynamic_fields = ["@timestamp", "level", "thread", "context", "traceid", "unilabel", "class", "msg"]
            [Service1Encoder.field_mappings]
            Payload = "message"
            Hostname = "MessageSourceAddress"

    Параметр es_index_from_timestamp нужен для указания, что дата и время для формирования имени индекса берутся не из приходящих данных, а из локального времени сервера. Позволяет избежать бардака, когда серверы работают в разных временных зонах и кто-то пишет в логи время в UTC, а кто-то в MSK.


    В параметре index реализуется принцип «один сервис – один индекс», новый индекс создается каждые сутки.


    [Service1Output]
    type = "ElasticSearchOutput"
    message_matcher = "Logger == 'money-service1'"
    server = "http://localhost:9200"
    encoder = "Service1Encoder"
    use_buffering = true

    Плагины Output разбирают поток данных на основании параметра message_matcher, соответствующего имени файла лога. Чтобы не перегружать сеть, Heka отправляет данные в локальную дата-ноду, на которой установлена. А уже дальше ES сам рассылает индексированные данные по транспортному протоколу между дата-нодами кластера.


    Заключение


    Описанная выше схема успешно работает и индексирует по 25-30 тысяч записей в секунду. Запас прочности приемных пулов Heka позволяет выдерживать пики нагрузки до 100 тыс. записей/сек:


    image alt text


    Статистика из Zabbix.


    В ElasticSearch мы храним логи за последний 21 день. Опыт показывает, что к более старым данным оперативный доступ нужен крайне редко. Но если понадобится, их всегда можно запросить с лог-серверов, хранящих данные практически вечно.


    image alt text


    Текущее состояние кластера по данным Kopf.


    Я описал только ту часть системы, которая относится к сбору и доставке логов, поэтому в следующей статье собираюсь рассказать про сам кластер ElasticSearch и его настройку. Думаю рассказать, как мы его виртуализировали, как переезжали с версии 2.2 на 5.3 и перевозили с собой 24 миллиарда записей, не потеряв при этом веру в человечество.


    Может быть, добавить к рассказу что-то еще, какие-то упущенные из виду детали? Поделитесь мнением в комментариях.

    Яндекс.Деньги 105,17
    Об электронных платежах и устройстве Я.Денег
    Поделиться публикацией
    Комментарии 29
    • +2
      Уже много лет проводим тесты по вопросу как быстрее парсить логи регэкспом. И много лет побеждает perl. Он создан для регулярок все таки. У нас объем меньше вашего, но и парсим мы только логи nginx.
      Ansible проследит что бы везде был одинаковый формат. Syslog-ng направит поток из файла логов nginx на stdin парсера.
      150 тысяч записей лога в минуту (многострочные данные мы считаем за одну запись) — в сумме со всех серверов дает <10%CPU нагрузки.
      • +1

        Я смотрю на пример конфигурации Heka, и я считаю что сплиттер строк нужно было назвать не RegexSplitter, а как-нибудь типа JavaLogSplitter.
        А то RegexpSplitter не говорит нам ни о чём. Дла это сплиттер, да он работает на регулярках — но в имя лучше вынести специфику применения, а не специфику реализации.

        • 0
          Каноничная форма описания сплиттера такая:

          [SplitterName]
          type = "RegexSplitter"
          delimiter = ‘here_is_delimeter’
          delimiter_eol = true/false
          


          И в боевых конфигах мы используем имена реальных сервисов в заголовках всех блоков, для удобочитаемости. Но для статьи пришлось их поменять на нейтральные названия.
          • 0

            То, что logstash тормозной и тяжёлый уже и самим разработчикам ELK понятно, и ему есть более шустрые альтернативы.


            В HEKA, всё-же, смущает, что она deprecated и не развивается — вместо неё Мозилла начала новый проект Hindsight, по их бенчмаркам в 7-10 раз шустрее и легче, чем HEKA. Но насколько он готов, стабилен и как долго его будут сопровождать — трудно сказать.


            Рассматривался ли вами во многом похожий на HEKA движок от Триваго — Gollum. Из статьи неясно, когда ваш проект был начат, но по довольно старым бенчмаркам Gollum в сравнении с LogStash и HEKA смотрится вполне достойно и по-прежнему жив, в отличие от HEKA.


            А чем, всё-таки, kafka не устроила? Вроде как многие её активно в production используют и не особо жалуются…

            • 0
              На данный момент Heka отвечает нашим требованиям по простоте администрирования, надежности, функциональности и производительности. Для работы с логами мы выбрали Heka прежде всего потому, что он уже успешно работал у нас в системе для доставки событий из сервисов в Graphite.

              С Gollum мы не экспериментировали, но как видно из описанного в статье исходного зоопарка набора инструментов, который у нас был, процесс не стоит на месте и мы регулярно пробуем новые технологии. Сейчас рассматриваем Filebeat как вариант для замены Heka.

              Kafka мощный инструмент, но его не просто готовить, появляется промежуточный слой, добавляется точка отказа, появляется Zookiper и т.д. Люди регулярно борются с Kafka «из коробки», например, тут Bubuku. Скорее всего, придется бороться и нам, но уже для других целей. У нас основые проблемы возникли с Flume, перестроение кластера останавливало всю систему до полной перезагрузки. Потратив несколько дней, мы поняли, что можно проще, что Kafka не нужен, и наши усилия по налаживанию кластера лучше направить в другое русло. В итоге, получилось проще и дешевле по админским затратам.
              • 0
                Уточню про flume: под нагрузкой зависала связка kafka -> flume. Симптомы: все ядра под 100%, память утекает неизвестно куда, в логах пусто. Как в данных условиях лечить — непонятно, поэтому пришлось ампутировать.
            • 0
              все красиво расписали, спасибо.
              а как логируете и храните сообщения при работе с МПСами?
              как PA\PCI DSS по всем этим проходите?

              • 0
                Хороший и актуальный для нас вопрос! Сертификацию мы проходим каждый год, приложения в периметре контролируют (фильтруют) наличие карточных данных перед записью в лог, а сами логи мы НЕ пишем в «общий» ELS. Сейчас в работе внедрение поверх выделенного ELS инструмента для ограничения прав доступа на основании настроек в LDAP. Плюс думаем вместо уровня приложений использовать Lua в Heka для маскирования чуствительных данных, которые могут попасть в лог. Ну и конечно, если вдруг такое нам понадобится, то отправку даже очищенных логов из периметра PCI DSS в «общий» ELS мы будем делать только после её успешного прохождении сертификационного аудита.
                • 0
                  На сколько помню, на диск или в бд должно писаться только в маскированном виде.
              • 0
                Не смотрели в сторону fluentd (td-agent)?
                Он быстрее logstash (и потребляет меньше ресурсов) и он развивается.
                • 0
                  Не смотрели. Спасибо, записал в книжечку. Пригодится, когда будем думать на что съезжать с хеки, если она перестанет справляться.
                  • 0
                    Из опыта, с ним проблемы:
                    если есть сетевые проблемы (у вас, вроде, с сетью все ok) -> ведет себя плохо
                    проблемы с плагином для elasticsearch (вроде не сложно их исправить, но нужно время) -> ведет себя плохо
                    были проблемы с vmware (баги были в версии 5.0 или около того, не помню точно, хотя и сейчас всплывают, но они глобальные и все касаются сети), но это не про вас, если я не ошибаюсь :)
                    • 0
                      Сам эластик тоже очень чувствителен к проблемам с сетью. Но до этого дойдёт очередь только в третьей статье цикла.
                  • 0
                    Он тоже на Ruby.
                  • 0
                    Параметр es_index_from_timestamp нужен для указания, что дата и время для формирования имени индекса берутся не из приходящих данных, а из локального времени сервера. Позволяет избежать бардака, когда серверы работают в разных временных зонах и кто-то пишет в логи время в UTC, а кто-то в MSK.
                    — «при обрыве соединения или деградации канала», получится, что данные будут писаться в ES с временем, серьёзно отличающимся от реального?
                    • 0
                      Этот параметр влияет только на то, когда создаётся индекс. Позволяет избежать создания индексов, когда, например на сервере эластика ещё сегодня, а логи шлют уже из завтра. Эластик сам вычисляет таймстамп записи из даты и времени пришедших в логах. Поэтому, кстати, очень важно, чтобы в логах были указаны таймзоны. Потому что если их нет — Эластик будет считать по UTC и логи поплывут.
                    • 0
                      А вы не рассматривали Splunk как альтернативу?

                      Если — да, то было бы интересно послушать ваше мнение относительно выбора.
                      • 0
                        У нас есть опыт использования Splunk, отзыва положительные)

                        Выбор в пользу ELS был сделан из-за стоимости, нам нужно обрабатывать большие объемы логов.
                        • 0
                          А сколько примерно в деньгах вам выходит ELS суммарно, если не секрет? И какой объем Гб в день вы обрабатываете?
                          • 0
                            Поскольку хостинг и техподдержку мы осуществляем сами — ES нам не стоит ничего. В день система прокачивает около 3Тб логов. Под такие объёмы нужна лицензия Splunk Enerprise. Посчитайте — сколько это будет стоить за индексирование 3Тб/сутки.
                      • 0
                        Почти все они работали неидеально: кластер Kafka был ненадежен, Flume периодически вис, отчего ES впадал в ступор


                        а можно как-то поподробнее? как другие-то работают?

                        по Kafka — какие именно были проблемы? и пытались ли вы их решить?
                        • 0
                          Есть ли какой-то смысл в разных таймзонах на разных серверах?
                          • 0
                            Абсолютно никакого. Все сервера у нас в UTC, но компания большая, людей много, серверов ещё больше, и если откуда-то вдруг прилетит MSK — надо быть к этому готовым заранее. Индексы старше 21 дня удаляются, поэтому лучше чтобы они сразу создавались за правильную дату.
                          • 0
                            Если добрая часть приложений на Java/log4j, не рассматривали вариант с log4j custom appender?
                            • 0
                              Да, но есть ещё недобрая часть на других языках + логи nodejs + логи nginx + логи почты + много разной экзотики. Нужен был универсальный инструмент. К тому же, логи сливаются не только в эластик, в виде файлов они тоже нужны для долговременного хранения.
                            • 0
                              Интересно было бы посмотреть на метрики по логам и создание инцидентов.
                              • 0
                                Думаю рассказать, как мы его виртуализировали, как переезжали с версии 2.2 на 5.3 и перевозили с собой 24 миллиарда записей, не потеряв при этом веру в человечество.

                                Мне довелось переносить более 20 миллиардов записей с 1.x на 2.x, потом с 2.x на 5.x, но ресурсов было гораздо меньше… будет интересно почитать как это делается «в нормальных условиях» ;)
                                • 0
                                  Обязательно расскажу, с картинками и графиками. Оставайтесь с нами :)
                                • 0
                                  Если память не изменяет, в рассылках Rob и товарищи советовали не использовать RegexSplitter, т.к. он медленный.
                                  А вместо него сделать сплиттер на LPEG.
                                  Но они к производительности подходили очень серьезно — лишнюю инструкцию в цикле в фильтре не вставляли, чтобы скорость не падала.

                                  И еще, возможно вам, или еще кому-то пригодится: https://github.com/timurb/heka_mock

                                  Это мокер, чтобы можно было тесты к фильтрам писать.

                                  Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                                  Самое читаемое