13 июля 2015 в 12:42

Реал-тайм процессинг данных в AWS Cloud. Часть 2

В первой части статьи мы описали одну из задач, с которой мы столкнулись при работе над публичным сервисом для хранения и анализа результатов биологических исследований. Были рассмотрены требования, предоставленные заказчиком, и несколько возможных вариантов имплементации на основе существующих продуктов.

image

Сегодня речь пойдет о решении, которое было воплощено.

Предложенная архитектура


image

Фронт-энд

Запросы пользователей приходят на фронт-энд, валидируются на соответствие формату и передаются в бэк-энд. Каждый запрос в итоге вернется на фронт-энд изображением или набором точек, если клиент самостоятельно желает строить такое изображение.

В будущем возможна установка LRU-кэша для хранения повторяющихся результатов с коротким сроком жизни элементов — соразмерно продолжительности пользовательской сессии.

Бэк-энд

Для каждого такого запроса бэк-энд
  • валидирует входящий запрос и проверяет его правомерность с точки зрения политики безопастности,
  • определяет, какие данные нужно зачитать из S3 и формирует общую задачу, результат обработки которой должен вернуться на фронт-энд,
  • разбивает задачи на подзадачи, принимая во внимание особенности, расположения данных на S3, чтобы избежать двойных зачитываний и т.п.,
  • ставит задачи в очередь, построенную на основе RabbitMQ,
  • обрабатывает результаты, полученные из очереди, собирая наборы точек воедино,
  • рендерит изображение, если запрос это предполагает,
  • возвращает результаты фронт-энду.


Обработка подзадач происходит посредством параллельной постановки в очередь каждой подзадачи в RPC-стиле (поставил задачу, подождал, получил результат). Для этого используется пул потоков, глобальный для бэк-энд приложения. Каждый поток в этом пуле ответственнен за взаимодействие с брокером: отправка сообщения, ожидание, прием результата.

К тому же, использование пула потоков известного размера позволяет контролировать количество одновременно обрабатываемых сообщений-подзадач. А запуск потоков по обработке известных подзадач дает возможность планировать какие именно общие задачи выполняются в данный момент, прогнозируя сроки готовности каждой общей задачи.

image

Для устойчивости работы требуется следить за тремя вещами:

  1. Время обработки одной подзадачи / количество подзадач, поставленных в очередь на момент времени — при увеличении этого параметра требуется увеличить пропускную спобность очереди.
  2. Приоритизация обработки подзадач с тем, чтобы каждая общая задача обрабатывалась за как можно меньшее время.
  3. Количество общих задач в обработке — чтобы избежать переполнения JVM heap на бэк-энде из-за необходимости держать в памяти промежуточные результаты.


Пункты 2 и 3 достигаются манипуляциями с размером пула потоков и подходом к постановке подзадач в очередь. При изменении среднего времени обработки подазадчи (пункт 1) требуется увеличить или, соответственно, уменьшить количество рабочих узлов по обработке подзадач.

Рабочие узлы-воркеры

Подписчиками на очередь RabbitMQ являются standalone-приложения, которые для определенности назовем воркерами. Каждый из них занимает полностью один из инстансов EC2, наиболее эффективно используя CPU, оперативную память и пропускную способность сети.

Подзадачи, сформированные на бэк-энде, потребляются каким-то из воркеров. Процесс обработки такой подзадачи не подразумевает глобального контекста, потому воркер работает назависимо от себе подобных.

Важным моментом является то, что Amazon S3 предоставляет случайный доступ к любым данным. Это означает, что вместо загрузки файла размером в 500 МБ, большая часть которого не нужна для процессинга данного запроса, мы можетм прочесть только то, что действительно нужно. То есть, разделяя общую задачу на поздачачи правильным образом, всегда можно добиться отсутствия двойных зачитываний одних и тех же данных.

В случае рантайм-ошибки (закончилась память, произошел сбой и т.п.) задача просто возвращается назад в очередь, где распределяется на другой узел автоматически. Для устойчивости системы каждый из воркеров периодически рестартуется по cron, чтобы избежать возможных проблем с memory leaks и переполнением JVM heap.

Масштабирование

Причин, ведущих к потребности изменить количество узлов приложения, может быть несколько:

  1. Увеличение среднего времени обработки подзадач, что в конечном итоге приводит к проблемам в доставке конечного результата пользователям в требуемых временных рамках.
  2. Отсутсвие должной нагрузки на узлы-воркеры.
  3. Перегрузка бэк-энда по CPU или по потребляемой памяти.


Для решения проблем 1 и 2 мы использовали API, предоставляемый EC2, и создали отдельный модуль-скейлер, оперирующий инстансами. Каждый новый инстанс создается на основе заранее сконфигурированного образа операционной системы (Amazon Machine Image, AMI) и запускается посредством spot-запросов, что позволяет сэкономить средства на оплату хостинга примерно в пять раз.

Недостатком такого подхода является то, что от момента создания spot-запроса на инстанс до его ввода в строй проходит порядка 4-5 минут. К этому моменту пик нагрузки может быть уже пройден, и необходимость в увеличении количества узлов может исчезнуть сама по себе.

Чтобы попадать в такие ситуации реже, мы используем статистику по количеству запросов, географическому положению пользователей и времени суток. С ее помощью мы увеличиваем или сокращаем количество рабочих узлов “наперед”. Практически все пользователи работают с нашим сервисом исключительно в течение рабочего дня. Поэтому хорошо заметны всплески в начале рабочего дня в Штатах (особенно US West) и в Китае. А если проблемы с перегрузкой очереди все же возникают, то мы успеваем сгладить их за 4-5 минут.

Проблема №3 пока не решена и представляет для нас самое уязвимое место. Текущая связность трех вещей: контроля доступа к данным, знания об их специфике и местоположении и пост-обработки вычисляемых данных (шаг Reduce), — является надуманной и подлежит переработке в отдельные слои.

Справедливости ради нужно сказать, что процесс Reduce сводится к System.arraycopy(...), а общее количество данных в памяти (запросы + части готовых подзадач) на одном инстансе бэк-энда пока ни разу не превышало 1 ГБайта, что легко влезает в JVM heap.

Деплоймент

Любые изменения в существующей системе проходят несколько этапов тестирования:

  • Юнит-тестирование. Этот процесс интегрирован в билд, запускающийся на TeamCity после каждого коммита.
  • Интеграционное тестирование. Раз в сутки (иногда реже) TeamCity запускает несколько билдов, проверяющих взаимодействие модулей. В качестве тестовых данных мы используем заранее подготовленные файлы, результат обработки которых известен. По мере расширения набора функциональных особенностей мы добавляем специфичные случаи в тестовый код.
  • Если изменения касаются пользовательского интерфейса, то иногда требуется вмешательство человека на финальном этапе.


Для описываемой подсистемы изменения, в основой своей массе, касаются производительности и поддержки новых типов исходных данных. Поэтому юнит- и интеграционного тестирования, как правило, достаточно.

После каждого успешного билда из ветки “production” TeamCity публикует артефакты, представляющие собой готовые для использования JAR-ы и скрипты, контролирующие набор параметров для запуска приложения. При старте нового инстанса из предподготовленного AMI (или перезагрузке существующего) стартовый скрипт загружает с TeamCity последний продакшн-билд и запускает приложение с помощью поставляемого с билдом скрипта.

Таким образом, все, что нужно сделать для деплоймента новой версии в продакшн — дождаться конца тестов и нажать на “магическую” кнопку, перезапускающую инстансы. Контролируя набор запущенных инстансов и разделяя поток задач на разные RabbitMQ-очереди, можно проводить A/B-тестирование для групп пользователей.

Хозяйке на заметку


  • Знайте, как устроены ваши данные. Обеспечьте случайный доступ к любой части за минимальное время. [Ключевые слова]: Amazon S3, random access.
  • Используйте spot-запросы для экономии средств. [Ключевые слова]: Amazon EC2, spot requests.
  • Обязательно стройте прототипы на основе имеющихся решений. Как минимум — получите опыт. Как максимум — получите практически готовое решение.


А напоследок я скажу...


В этой обзорной статье мы описали наш подход к решению довольно типичной задачи. Проект продолжает развиваться, становясь с каждым днем все более функциональным. Мы будем рады поделиться с аудиторией накопленным опытом и ответить на возникшие вопросы.
Oleksii Tymchenko @armiol
карма
4,0
рейтинг 0,0
Похожие публикации
Самое читаемое Разработка

Комментарии (4)

  • 0
    Какой суммарный объём исходных данных? Не будет ли эффективнее (в вашем случае — дешевле) работать самопальный MR, быстро считающий нужные цифры на машинах с данными (скажем, можно использовать тот же HDFS, но данные обрабатывать демоном, который постоянно запущен и запускает подсчёт по http запросу), и с recude фазой на вашем бэкэнде?
    • 0
      Это хорошее предложение, на первый взгляд. По сути, мы так и сделали, но вместо HDFS используем общий S3.

      Данные загружаются конечными пользователями — примерно 30-50 ГБайт в день (объем увеличивается с ростом пользовательской базы). К текущему моменту у нас порядка 22 ТБайт исходных данных.

      Кроме визуализации, описанной в статье, данные подвергаются другим видам преобразований для последующего анализа — на отдельных воркерах. Некоторые из этих преобразований задействуют GPU, другие — кастомные библиотеки из мира mass spectrometry. К текущему моменту у нас около 12 ТБайт частично преобразованных данных, использующихся для разных видов анализа. Они хранятся на S3, доступ к ним производится со своих узлов-воркеров в random access-режиме.

      «Сырые» данные, которые мы храним, должны выдерживать проверку временем. Загрузив файлы сегодня, пользователь должен иметь возможность работать с ними и через пять лет. То есть, любые креши узлов HDFS, потенциально приводящие к потерям, для нас являются уязвимостью. Amazon S3, в свою очередь, дает гарантии безотказности (если не использовать reduced redundancy). Из дополнительных бесплатных плюшек — удобный и безопасный аплоад на S3 прямо с клиента (если интересно — могу описать в следующей статье).

      Вдобавок, пользователи всегда могут попросить свои «сырые» данные назад — на download.

      Поэтому мы вынуждены иметь централизованное хранилище для «сырых данных», подходящее для доступа из внешнего мира (download) и со всех инстансов-воркеров (random access, partial download). Это выходит дешевле, чем поддержка HDFS-кластера, имплементация доступа к данным на HDFS из внешнего мира и дополнительные расходы на резервное копирование в условиях постоянно пополняющегося набора файлов от конечных пользователей.

      Кажется, я ответил на Ваш вопрос. Простите, если немного сумбурно — общий контекст приложения сложно описать в нескольких абзацах.
      • 0
        Да, я в целом понял. Единственный момент — я не предлагал HDFS или нечто подобное вместо S3, скорее иметь штук 10 воркеров, на каждом по 1.2ТБ активных данных, и на них уже считать. Обменять стоимость 10 инстансов на стоимость прокачки данных через S3. Минусы сразу видны: эти 10 воркеров будет существенно сложнее сворачивать в часы наименьшей нагрузки, ну и вообще больше кода писать. Из плюсов — можно получить буст по скорости реакции. Ну и потенциально может быть экономия денег, но надо крайне внимательно считать.
        • 0
          С плюсами такого решения согласен полностью.

          Кроме указанных минусов — это действительно дороже. Минимум в два раза (посчитал на 1000 ГБ) — даже если использовать general storage, а не SSD.

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.