company_banner
8 августа 2013 в 13:53

Технология Real Time MapReduce в Яндексе. Как ускорить что-то очень большое

Некоторое время назад мы рассказывали на Хабре о том, что поиск Яндекса стал более персонализированным. Он учитывает не только постоянные, но и сиюминутные интересы пользователя, ориентируясь на последние несколько запросов и действий.

Сегодня мы хотим рассказать о технологии Real Time MapReduce, благодаря которой всё это стало возможно. Она обеспечивает передачу и обработку огромных объёмов данных, необходимых для этой задачи, и чтобы сделать это, нам даже не пришлось переписывать код для MapReduce, который у нас уже использовался.



Чтобы персонализировать поисковую выдачу, нужно определить круг интересов пользователя, для чего мы сохраняем информацию о его поведении на странице поиска. Данные о действиях пользователя записываются в логи, а затем обрабатываются при помощи специальных алгоритмов, которые позволяют нам составить наиболее релевантную выдачу по запросу для каждого отдельно взятого пользователя. Сначала обработка логов запускалась раз в сутки, для чего очень хорошо подходила технология распределенных вычислений MapReduce. Она прекрасно справляется с анализом значительных объемов данных.

Входящие данные (в нашем случае это логи) проходят через несколько последовательных шагов Map (данные разбиваются по ключам) и Reduce (происходят вычисления по определенной функции и собирается результат). При этом результат вычислений, происходящих на каждом этапе, одновременно является входящими данными для следующего этапа. Путем прогона через специальные алгоритмы на каждом Reduce-шаге объем данных сокращается. В результате из огромного объема сырых данных мы получаем небольшое количество полезных.

Однако, как уже говорилось в предыдущих постах, у пользователей есть не только постоянные, но и сиюминутные интересы, которые могут сменяться буквально за секунды. И тут данные суточной давности ничем помочь уже не могут. Мы могли бы ускорить этот процесс до получаса, некоторые логи обрабатываются именно с такой периодичностью, но наша задача требует мгновенной реакции. К сожалению, при использовании MapReduce существует вполне определенный потолок реактивности, обусловленный характером обработки.

Этапы обработки производятся в строгой очередности (ведь каждый этап генерирует данные, которые будут обрабатываться на следующих), и для получения финального результата по одному ключу нужно дождаться окончания всей обработки. Даже при незначительных изменениях во входящих данных всю цепочку этапов необходимо проводить заново, так как без пересчета невозможно определить, на какие из промежуточных результатов окажут влияние эти изменения. При постоянном потоке поступающих данных (а с наших поисковых машин идет поток со скоростью около 200 Мбайт/с) такая система эффективно работать неспособна, и добиться реакции на действия пользователя за несколько секунд этим методом невозможно.

Нам же нужна система, которая могла бы при мелких изменениях входящих данных быстро изменять окончательный результат. Для этого она должна уметь определять, какие ключи будут модифицированы в результате изменений, и проводить пересчет только для них. Таким образом, объем обрабатываемых на каждом этапе данных сократится, а вместе с тем увеличится и скорость обработки.

В Яндексе есть несколько проектов, применяющих классические кластерные приложения, где данные разбиваются по нодам, между которыми происходит обмен сообщениями, позволяющими изменять состояния для отдельных ключей, не производя полного пересчета. Однако для сложных вычислений, которые подразумевает обработка логов, такая модель не подходит из-за слишком большой нагрузки на обработчик сообщений.
Кроме того, большая часть накопившегося у нас кода, обеспечивающего качество поиска, написана под MapReduce. И почти весь этот код с небольшими модификациями можно было бы использовать повторно для обработки действий пользователя в реальном времени. Так и родилась идея создать систему c API, идентичным интерфейсу MapReduce, но при этом способную самостоятельно распознавать ключи, затрагиваемые изменениями входящих данных.

Архитектура

Изначально замысел казался достаточно простым: нужно было сделать архитектуру, которая умеет вычислять MapReduce-функции, но также может за секунды инкрементально и эффективно обновлять значение функций при небольших изменениях входных данных. Первый прототип RealTime MapReduce (RTMR) был готов уже через две недели. Однако в процессе тестирования стали открываться слабые места.

По отдельности все проблемы казались пустяковыми, но в системах такого масштаба увязать все друг с другом не так просто. В итоге после устранения всех проблем от изначального прототипа не осталось практически ничего, а объем кода вырос на порядок.

Кроме того, стало ясно, что для обеспечения необходимой скорости работы все данные, участвующие в расчетах, необходимо хранить в памяти, для чего мы реализовали специальную архитектуру.

image

Разберем подробнее последовательность действий, отображенную на схеме:

  1. Начинается все с того, что обработчик входящего запроса разбирает данные и запускает отдельную транзакцию для каждого ключа.
  2. При этом транзакция считается запущенной только после получения подтверждения, что запись о начале операции (PrepareRecord) сохранена на диске.
  3. После запуска транзакции по сети отправляется ответ на входящий запрос, а транзакция ставится в очередь на обработку рабочим потоком.
  4. Далее рабочий поток подхватывает транзакцию и в зависимости от локальности ключа либо отправляет его по сети и ждет подтверждения, либо создает контент обработки, в котором выполняются операции.
  5. В дальнейшем транзакции обрабатываются изолированно друг от друга: после старта от общего состояния системы отводится бранч, по окончании обработки изменения принимаются.
  6. Все данные хранятся в памяти, что крайне важно для оптимизации reduce-операций.
  7. Данные по модифицированным ключам периодически сохраняются в постоянное хранилище в качестве резервных копий.
  8. После обработки транзакции ее результат фиксируется на диске, и при получении подтверждения записи от лога она считается завершенной.
  9. Также транзакция может порождать дочерние транзакции, в этом случае все повторяется, начиная с четвертого пункта.
  10. В процессе восстановления состояния данные извлекаются из постоянного хранилища и воспроизводятся по логу, незавершенные транзакции при этом перезапускаются. Если же возникает необходимость прервать транзакцию, в лог записывается AbortRecord (индикатор того, что при восстановлении состояния эту транзакцию перезапускать не нужно).


Вторая часть схемы демонстрирует принцип работы хранилища логов. Оно поддерживает асинхронную запись с входящей очередью, а также callback для нотификаций. На диске реализована append-only структура и индекс для фильтрации записей по ключу. Адаптивный планировщик использует статистику read-операций для определения размера записи, находя оптимальный баланс между скоростью записи и временем ожидания перед ее началом. Хранилище логов состоит из четырех HDD без RAID с шардированием по ключу. Для записи данных на диски используется Direct I/O.

image

В in-memory storage реализован подход с последовательной записью и параллельным чтением. Шардирование осуществляется по ключу. Данные хранятся на протяжении определенного в настройках промежутка времени, по истечении которого могут быть перезаписаны новыми. Для хранения value-данных применяется lock-free кольцевой буфер. Для записей (key, subkey, table, timestamp + дополнительная информация) используется append-only lock-free Skip List. После заполнения Skip List создается новый, а старый объединяется с имеющимися immutable-данными. Для каждого ключа записи сортируются и дельта-кодируются.

Координация узлов в кластере осуществляется при помощи Zookeeper. Определение мастера/слейва производится через consistent hashing. Исходящие транзакции обрабатываются мастером и дополнительно сохраняются на слейвах. Срезы также размещаются как на мастере, так и на слейвах. При перешардировании новый мастер собирает данные из среза на кластере, а также необработанные транзакции из лога по своему диапазону ключей, догоняет свое состояние и включается в обработку.

Перспективы RealTime MapReduce

Стоит отметить, что кроме персонализации поиска для RTMR можно найти и другие применения. В большинстве случаев применяемые в Поиске алгоритмы можно перестроить на работу в режиме реального времени. Например, с его помощью можно улучшить качество поиска по свежим документам, публикациям в СМИ и блогах. Ведь ранжирование свежих документов зависит не только от скорости их обхода и индексации, это не самый сложный процесс и в большинстве случаев наш робот уже достаточно давно справляется с этим за несколько секунд. Однако значительная часть данных для ранжирования берется из внешних по отношению к документу источников информации, и для агрегации этих данных используется MapReduce. Как уже говорилось выше, ограничения методов батч-обработки не позволяют производить агрегацию быстрее, чем за 20-30 минут. Поэтому без RTMR часть внешних сигналов для свежих документов приходит с запаздыванием.

Подстройку поисковых подсказок тоже можно сделать моментальной. Тогда предлагаемые пользователю варианты запроса будут зависеть от того, что он искал несколько секунд назад.

Так как наша система еще достаточно молода, в ближайших планах у нас расширение парадигмы MapReduce: добавление в нее новых интерфейсов, заточенных специально под работу в реальном времени. Например, операций, способных способных делать предварительную неполную предагрегацию.

Кроме того, мы планируем произвести унифицированное для MapReduce и RTMR декларативное описание потоков данных и графа вычислений. Стадии RTMR-вычислений для разных ключей в отличие от MapReduce работают непоследовательно, а значит, последовательный запуск стадий из кода теряет всякий смысл.
Автор: @elcoyot
Яндекс
рейтинг 548,92
Как мы делаем Яндекс

Комментарии (18)

  • 0
    Изобрели CouchDB?
    • +5
      CouchDB применяется немного для других целей, настолько повысить скорость вычислений она не позволит. Однако похожие решения есть у многих, но нам выгоднее иметь свое, так как для решения наших задач с нужным KPI требуется очень тесная интеграция всех компонентов. Добиться этого путем комбинации сторонних законченных решений крайне трудно. К тому же это не позволило бы нам повторно использовать наработанный код.
  • 0
    > Сначала обработка логов запускалась раз в сутки, для чего очень хорошо подходила технология распределенных вычислений MapReduce.

    У вас в конце предложения ссылка ведёт на этот же топик:) Поправьте, пожалуйста.
    • +2
      Спасибо, поправили.
      • +2
        Нужно ещё убрать слэш в конце этой же ссылки :)
  • +8
    В чем преймущество перед storm? Планируется ли открыть систему как это было сделано как c яндекс.танком например?
    • +3
      У storm есть фатальный недостаток :)
      А если серьезно, то storm же не позволит сохранить тонны map-reduce кода.
    • +1
      Про storm, dremel, impala и прочие подобные решения можно повторить практически все то же, что сказано в этом комментарии. В нашем случае это был оптимальный вариант. А про открытие технологии говорить пока слишком рано.
      • +2
        impala и dremel — это не совсем про то, это скорее попытка сильно ускорить обычный mapreduce методом запихивания всего чего можно в память, кэширования и прочих оптимизаций. Настоящей реалтаймовости на большом потоке запросов они все равно не дадут.
        • 0
          Настоящая риалтаймовость для write intensive задач недостижима без кэширования и предагрегации. Фактически, описанное решение, это интеллектуальная инвалидация кэша промежуточных результатов MapReduce задачи при поступлении новых данных. Сам же MapReduce никто не отменял, хотя бы и по кэшу плюс изменения. Так что, глобально, это туда же, куда и Impala.
  • +1
    мне бы очень хотелось чтобы у яндекса появились Search tools, как у гугла, особенно полезна возможность искать по времени. за неделю, за месяц, за год.
    • +4
      Так вроде бы же есть возможность искать и по времени и по прочим параметрам: yandex.ru/search/advanced
      • 0
        дураку ясно что есть, я говорю о том что надо это вынести в шапку поискового запроса и упростить ввод параметров, как у гугла
  • +2
    Было бы интересно, если бы смогли подробнее остановиться на тех проблемах, которые решались после разработки изначального прототипа.
    • +1
      Если рассматривать все эти проблемы по отдельности, то ничего сверхсложного в них не было. Например, поначалу шина передачи данных не выдерживала потока, потребовалось добавить I/O-буфер. База данных, которую мы взяли изначально тоже работала не так быстро, как нам хотелось бы.
  • +4
    У Яндекса очень специфические задачи и хорошие программисты и пилить свой hadoop и impala вполне можно, но… все кто поменьше пользуются opensource и там жизнь бурлит.

    Даже если у Яндекса выделено 100 программистов только под MapReduce фрэймворк, есть шанс что проекты из экосистемы hadoop все равно обойдут разработку Яндекса по качеству, скорости, удобству. Даже в таких мелочах как документированность и наличие обученных спецов на рынке: любой может развернуть себе hadoop и играться, а вот с фрэймворком Яндекса — не уверен что порог вхождения такой низкий.

    Я к чему — в мечтах вместо того чтобы делать 5 разных mapreduce фрэймворков, было бы круто если бы все навалились на hadoop ) Хотя я конечно понимаю, что у Яндекса свои задачи, и даже если эти задачи выполняются на своем решении на 30% лучше по железу, то в масштабах это уже существенная экономия
  • +4
    Не совсем понятно, чем достигается RealTime преимущество? Написано про реализацию отдельных мест системы, но ничего про MapReduce, каким образом существующий MapReduce код обрабатывает только часть данных, изменение которого затронуто (как определяется).
    • +2
      Риалтаймовость обеспечивается тем, что весь набор данных держится в памяти со строгой привязкой к машине по ключу. При этом данные для reduce-операций подкапливаются, что позволяет запускать этот шаг не на каждый входящий сигнал. Результаты reduce-операций дедуплицируются, чтобы не страдали последующие шаги. Кроме того, мы стараемся максимально уменьшать диапазон данных, затрагиваемых изменениями. Сейчас движемся в сторону полноценного инкрементального подхода с применением комбинаторов. Весь новый код пишется уже с учетом этого фактора, но необходимость в поддержке старого кода остается.

Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

Самое читаемое Разработка