86,43
рейтинг
22 июня 2015 в 15:41

Разработка → Потоковая обработка данных при помощи Akka

Привет, Хабр! Все привыкли ассоциировать обработку больших данных с Hadoop (или Spark), которые реализуют парадигму MapReduce (или его расширения). В этой статье я расскажу о недостатках MapReduce, о том, почему мы приняли решение отказываться от MapReduce, и как мы приспособили Akka + Akka Cluster на замену MapReduce.



Data Management Platform


Задача, для решения которой нам понадобились инструменты работы с большими данными, – сегментация пользователей. Класс систем, которые решают задачу сегментации пользователей во всём мире принято называть Data Management Platform или сокращённо DMP. На вход DMP поступают данные о действиях пользователей (в первую очередь, это факты посещений тех или иных страниц в интернете), на выходе DMP выдает «профиль пользователя» — его пол, возраст, интересы, намерения и так далее. Этот профиль в дальнейшем используется для таргетирования рекламы, персональных рекомендаций и для персонализации контента в целом. Продробнее о DMP можно почитать тут: http://digitalmarketing-glossary.com/What-is-DMP-definition.

Поскольку DMP работает с данными большого количества пользователей, объёмы данных, которые нужно обрабатывать, могут достигать очень внушительных размеров. Например, наша DMP Facetz.DCA обрабатывает данные 600 миллионов браузеров, ежедневно обрабатывая почти полпетабайта данных.

Архитектура DMP Facetz.DCA


Для того, чтобы обрабатывать такие объёмы данных необходима хорошая масштабируемая архитектура. Изначально мы построили систему на основе стека hadoop. Подробное описание архитектуры вполне заслуживает отдельной статьи, в этом же материале я ограничусь кратким описанием:

  1. Логи действий пользователя складываются на HDFS – распределённую файловую систему, являющуюся одной из базовых компонент экосистемы hadoop

  2. Из HDFS данные складываются в хранилище сырых данных, реализованное на основе Apache HBase – распределенной масштабируемой базы данных, построенной на основе идей Big Table. По сути, HBase – очень удобная для массовой обработки key-value база данных. Все данные пользователей хранятся в одной большой таблице facts. Данные одного пользователя соответствуют одной строке HBase, что позволяет очень быстро и удобно получить всю необходимую информацию о нём.

  3. Один раз в сутки запускается Analytic Engine – большой MapReduce job, который, собственно, и выполняет сегментацию пользователей. По сути, Analytic Engine – контейнер для правил сегментации, которые отдельно готовятся аналитиками. Например, один скрипт может размечать пол пользователя, другой — его интересы и так далее.

  4. Готовые сегменты пользователей складываются в Aerospike – key-value базу данных, которая очень хорошо заточена на быструю отдачу – 99% запросов на чтение отрабатывают менее чем за 1 мс даже при больших нагрузках в десятки тысяч запросов в секунду.



Архитектура Facetz.DCA

Проблемы MapReduce


Разработанная архитектура показала себя хорошо – позволила быстро смасштабироваться до обработки профилей пользователей всего Рунета и размечать их при помощи сотен скриптов (каждый может размечать пользователя по нескольким сегментам). Однако она оказалась не лишённой недостатков. Основная проблема – отсутствие интерактивности при обработке. MapReduce, по своей природе, – парадигма offline–обработки данных. Так, например, если пользователь посмотрел билеты на футбол сегодня, в сегмент «Интересуется футболом» он может попасть только завтра. В некоторых случаях такая задержка является критичной. Типичный пример – ретаргетинг – реклама пользователю товаров, которые он уже посмотрел. На графике приведена вероятность совершения покупки пользователем после просмотра товара по прошествии времени:


График вероятности конверсии после просмотра товара. При отстутсвии real-time движка для нас доступна только зеленая часть, в то время как максимальная вероятность приходится на первые часы.

Видно, что самая большая вероятность покупки – в течение нескольких первых часов. При таком подходе система узнала бы, что пользователь хочет купить товар, только по прошествии суток – когда вероятность покупки практически вышла на плато.

Очевидно, что необходим механизм потоковой real-time обработки данных, который сводит к минимуму задержку. При этом хочется сохранить универсальность обработки – возможность строить сколь угодно сложные правила сегментации пользователей.

Модель Акторов


Поразмыслив, мы пришли к выводу, что лучше всего для решения задачи нам подходит парадигма реактивного программирования и модель акторов. Актор – это примитив параллельного программирования, который умеет:

  • Принимать сообщения
  • Посылать сообщения
  • Создавать новых актор’ов
  • Устанавливать реакцию на сообщения

Модель акторов зародилась в erlang-сообществе, сейчас реализации этой модели существуют для многих языков программирования.

Для языка scala, на котором написана наша DMP, очень хорошим тулкитом является akka. Она лежит в основе нескольких популярных фреймворков, хорошо задокументированна. Кроме того, на Coursera есть прекрасный курс принципы реактивного программирования, в котором эти самые принципы рассказываются как раз на примере akka. Отдельно стоит упомянуть модуль akka cluster, который позволяет масштабировать решения (базирующиеся на акторах) на несколько серверов.

Архитектура Real-Time DMP


Итоговая архитектура выглядит следующим образом:



Поставщик данных складывает информацию о действиях пользователей в RabbitMQ.

  1. Из RabbitMQ сообщения о действиях пользователя вычитывает Dispatcher. Dispatcher-ов может быть несколько, они работают независимо.

  2. Для каждого онлайн-пользователя в системе заводится актор. Dispatcher отправляет сообщение о новом событии (вычитанном из RabbitMQ) соответствующему актору (или заводит новый актор, если это первое действие пользователя и для него ещё нет актора).

  3. Актор, соответствующий пользователю, добавляет информацию о действии в список пользовательских действий и запускает скрипты сегментации (те же, что запускают Analytic Engine при MapReduce-обработке).

  4. Данные о размеченных сегментах складываются в Aerospike. Также данные о сегментах и действиях пользователях доступны по API, подключенным напрямую к акторам.

  5. Если о пользователе не поступало данных в течение часа, сессия считается законченной и актор уничтожается.

Шардированием акторов по кластеру, их жизнью и уничтожением управляет akka, что существенно упростило разработку.

Текущие результаты:


  • Akka-кластер из 6 нод;
  • Поток данных 3000 событий в секунду;
  • 4-6 миллионов пользователей онлайн (в зависимости от дня недели);
  • Среднее время выполнения одного скрипта сегментации пользователей меньше пяти миллисекунд;
  • Среднее время между событием и сегментацией на основе этого события – одна секунда.



Дальнейшее развитие


Наш Real-Time Engine показал себя хорошо и мы планируем развивать его дальше. Список шагов, которые мы планируем предпринять:

  • Персистентность – сейчас Real-Time Engine сегментирует пользователей только на основе последней сессии. Мы планируем добавить подтягивание более старой информации из HBase при появлении нового пользователя.
  • На текущий момент только часть наших данных переведена на realtime-обработку. Мы планируем постепенно перевести все наши источники данных на потоковую обработку, после этого поток обрабатываемых данных возрастёт до 30000 событий в секунду.
  • После завершения перевода на Realtime мы сможем отказаться от ежедневного расчёта MapReduce, что позволит сэкономить на серверах за счёт того, что обрабатываться будут только те пользователи, которые реально сегодня проявили активность в интернете.

Ссылки на похожие решения


В конце хотелось бы привести несколько ссылок на некоторые фреймворки, на основе которых также можно строить потоковую обработку данных:
Apache Storm
Spark Streaming
Apache Samza

Спасибо за внимание, готовы ответить на ваши вопросы.
Автор: @asash

Комментарии (50)

  • 0
    Aerospike сколько серверов используете, если не секрет? Или одного хватает?
    • +3
      Не секрет. на текущий момент 6 серверов.
      • 0
        То есть с каждым акка-сервером крутится инстанс? Или это отдельно?
        • 0
          Отдельно.
  • 0
    Спасибо за статью! Скажите, какое количество сегментов и могут ли пользователи из них удаляться?
    • +3
      Сегментов на текущий момент около 2000 разных(пользователь одновременно находится в нескольких из них, но не во всех). Если хотите удалиться из сегментов -можете например пройти по этой: exebid.ru/optout.html ссылке.
  • +3
    Респект! Я делал похожую штуку, на несколько ином наборе инструментов.
    Если позволите, несколько вопросов :)

    — какой у вас роутинг в раббите? (директ, топик, фанаут?)
    — как оно переживает деплой новой версии программного кода?
    — что происходит в системе, если актор не справляется с обработкой сообщения и падает?
    — данные о пользователях отдаются по явному запросу, или доставляются потребителям так же через очереди?
    • +1
      — какой у вас роутинг в раббите? (директ, топик, фанаут?)
      для этой системы мы не используем routing, все dispatchers читают из одной очереди
      — как оно переживает деплой новой версии программного кода?
      хорошо переживает :) мы не пробовали запускать несколько нод с разными версиями акки, а наш внутренний протокол мы делаем обратно совместимым
      — что происходит в системе, если актор не справляется с обработкой сообщения и падает?
      у акки есть механизм supervisors, в самом простейшем случае этот актор будет перезапущен(но есть варианты)

    • +1
      — данные о пользователях отдаются по явному запросу, или доставляются потребителям так же через очереди?
      текущие сырые данные по пользователю можно достать запросом в кластер, но эта функциональность пока редко используется. а потребителям сырые данные мы доставляем другим способом. этот кластер именно для обработки
  • 0
    Рассматривали ли вы использование akka streams?
    • 0
      Они ж пока в rc
    • 0
      нет, да и вроде как akka streams еще не стабильны
  • +1
    А почему не подошел Spark Streaming? Как минимум есть два преимущества:
    — готовая инфраструктура во всяких облаках
    — нет потерь данных при фейлах из коробки (в akke надо делать руками persistent actors)

    Тем более что вы сами ссылку на него даете.
    • 0
      Мы были уверены что при помощи akka можно решить бизнес-задачу и умеем ею пользоваться. О спарке слышали(и проверили на собственном опыте) много отзывов насчет его глючности.
      • 0
        Изначальная идея была взята из research.microsoft.com/en-us/projects/orleans, ну и да, акку мы тогда уже умели готовить :)

        возможно в будущем посмотрим на возможность переезда на spark streaming, благо кода получилось не очень много
      • +1
        А что скажете про apache storm?
        • 0
          Storm тоже хорошая система. Пробовал его на предыдущем месте работы, вполне себе работало. В общем вполне можно промышленные системы строить и на нем тоже, тут скорее дело вкуса. Нам понравилась именно идеалогия акторов, которая подходит не только для потоковой обработки данных, но и вообще для широкого класса задач распределенных систем.
      • +2
        Да, Spark нужно уметь готовить. Не у многих компаний он в production работает, хотя с версии 1.1.1 все стало хорошо.
  • +1
    Почему используется RabbitMQ, а не, например, Apache Kafka?
    • 0
      Проще в эксплуатации?
      • 0
        Тут скорее дело в производительности, бенчмарк.
        • 0
          Нет, я имел в виду, что кролик проще в эксплуатации :)
          • 0
            Возможно, в kafka приходится работать c offset'ами и partition'ами, но, например, для spark есть модуль spark-streaming-kafka, к хорошим API (в ввиде RDD).
    • 0
      Кролик у нас уже был, а разворачивать в проде еще одну новую систему — это гарантированные грабли. Задача была как можно быстрее это запустить.

      На текущий момент кролик вполне справляется с нагрузкой. Как только перестанет — посмотрим на kafkу.
      • 0
        Это скорее вариант на будущее).
  • 0
    Использова ли вы akka-persistance?
    • 0
      Да зачем он им там?
  • +2
    Большое спасибо за интересную статью! Пользуясь случаем хотелось бы оставить для заинтересованных читателей ссылку на серию из трех статей об Akka Cluster.

    Также, если позволите, несколько вопросов:

    • Как вы решили проблему обратной совместимости сообщений между разными версиями приложения и Akka?
    • Kryo или не Kryo? Kamon или не Kamon?
    • Не в облаках ли вы хоститесь? Не возникает ли у вас проблемы, что кластер время от времени разваливается и нужно поднимать его руками? В AWS такое иногда случается.
    • При падении одной машины остальной кластер детектит это не сразу. В результате акторы продолжают долбиться на упавшую машину, получать ask timeout'ы, вот это все. Из-за падения одной машины страдает весь кластер. Как вы с этим живете?
  • 0
    1. есть ли в акка-скриптах агрегация (самое простое — количество пользователей за день)? Вопрос в контексте того, как вы избегаете повторного учета? В случае MapReduce эта проблема решается иначе.

    2. офф. что за система рисует график «Alive Users»?
    • 0
      1. Aгрегации в скриптах нет, все скрипты выполняются для одного пользователя

      2. Метрики пишем в graphite, из graphite графики рисует grafana
  • +2
    Очень интересная статья!
    Скажите, пожалуйста, а есть ли оценки затрат вычислительных ресурсов? Сейчас же практически всё в облаках, сколько стоила бы обработка N пользователей по первой схеме и сколько по real-time, насколько возрастают затраты? Каков сравнительный объем хранимых данных? создалось ощущение, что во втором случае не хранится большой массив сырых данных.
  • 0
    А сколько серверов понадобилось бы на 1 млрд юзеров и 20 млрд событий в сутки (примерно 500к/с в пике)?
    • 0
      Сложно сказать. На таком потоке вероятно вылезут какие-то дополнительные ограничения, которые не видно сейчас. Например у меня серьезные подозрения что RabbitMQ пришлось бы заменить на что-то другое.
  • 0
    а зачем на каждого юзера создаётся по актору?
    • +1
      Актор на каждого юзера создается для того чтобы инкапсулировать в себя всю информацию про этого юзера, чтобы вся нужная информация нужная для обработки была доступна локально.
      • 0
        Хм, а почему бы всю необходимое информацию не сохранить в message и иметь 1 актор на всех юзеров?
        • +2
          Да в общем зачем, если акторы достаточно легковесны? Размазывать по кластеру, опять же, можно.
          • 0
            Размазывать то, можно но стоит ли. Это опять же лишние затраты на общение между нодами.
            • 0
              Наоборот, цель такого подхода — сократить взаимодействие между нодами в системе, которая на одной ноде работать не может. Актор содержит весь необходимый стейт и в состоянии сам выдать необходимую информацию о себе.
              • 0
                Стоп, а что мне мешает создать 1 actor и засунуть state в message? Тем самым мы не создаем миллион actors и опять же message будет процесситься от и до на одной node, которая успела его поймать.
                • 0
                  Подождите, источником стейта является не сообщение а актор.
        • +1
          Около 400 байт памяти «стоит» один актор
  • 0
    .
  • +2
    На заметку: модель Акторов зародилась не в сообществе эрланга.

    Модели акторов 40 лет!
    • 0
      В документации по Akka постоянно встречаются отсылки к Erlang-реализации
      • 0
        Это не означает, что акторы зародились в сообществе эрланга.
        • +1
          Акторы зародились не в сообществе Erlang, но как раз в Erlang появилась одна из первых реализаций модели акторов, которую можно использовать в реальных системах на больших нагрузках. В целом же, почему в доках по акка идет постоянная отсылка к Erlang? Ответ очень прост, авторы акка в первых версиях пытались просто портировать модель Erlang на Scala. Позже же появились некоторые улучшения, которых нет в Erlang.
          • 0
            Все эти доводы не являются достаточным оправданием для подмены понятия «первая реализация используемая в реальных системах» на «зародились в сообществе».

            Я просто оставил ссылку, чтобы пытливые умы могли более детально посмотреть историю акторов при желании. В интернетах порой бывает трудно найти первоисточник и выяснить истинное положение вещей. Мне кажется надо придерживаться фактов и стараться избегать искажения информация при повествовании.
  • 0
    Кстати итоговая архитектура стала чем-то похожа на так называемые Lambda и Kappa архитектуры; radar.oreilly.com/2014/07/questioning-the-lambda-architecture.html
  • 0
    >Также данные о сегментах и действиях пользователях доступны по API, подключенным напрямую к акторам.

    А можно два слова про это? Что это за запросы, почему это не отдельный сервер, который берет данные из aerospike?

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.

Самое читаемое Разработка