company_banner

BI в Одноклассниках: сбор данных и их доставка до DWH

    В прошлый раз (http://habrahabr.ru/company/odnoklassniki/blog/149391/) мы говорили о системе графиков и дешбордов, которые используем для мониторинга сайта и активности пользователей. Нам приходится логировать более двух триллионов (2 000 000 000 000) событий в день. В этом посте мы расскажем, как мы собираем эти данные, обрабатываем и загружаем в хранилище.

    Природа данных


    В системе мониторинга мы собираем данные, не привязанные к конкретному пользователю. Данные агрегируем по классификаторам (одному или нескольким на каждую таблицу), что позволяет существенно сжать их общий объём. При этом мы практически ничего не теряем – информации об общей активности всех пользователей в определённый момент времени достаточно.

    Точное время события мы тоже не логируем, а агрегируем данные по пятиминуткам. Это также позволяет существенно уменьшать объем данных и создавать графики, более «легкие» для восприятия и отображения в браузере.

    Логгер


    Почти на каждом сервере нашего сайта (а их более 4000), в каждом приложении используется библиотека Logger на базе Log4j, которая агрегирует по таблицам, пятиминуткам и классификаторам в асинхронном режиме все записанные логи. Агрегация проводится в памяти, то есть работа с диском не происходит вообще.

    В результате агрегации мы сжимаем данные в 1000 раз – вместо 20 миллиардов записей за 5 минут в час пик получаем всего 20 миллионов записей. Это, мягко говоря, очень большая разница.

    Раз в пять минут Logger сбрасывает агрегированные данные в промежуточное хранилище данных, а после – стирает их из памяти. Причем успеть сбросить эти данные надо за 2 минуты, чтобы они как можно быстрее дошли до конечного потребителя и чтобы был резерв мощностей (максимальное время, отведенное на сброс данных – 5 минут, ведь после этого начнется сброс данных за следующую пятиминутку).

    Давайте сравним два подхода. Средняя длина записи — 200 байт.
    1) Неагрегированные данные
    объем данных в секунду – 20 000 000 000 (записей) *200 (байт) /120 сек = 33 GB/cек
    объем данных в день – 2 000 000 000 000 (записей) *200 (байт) = 400 TB/день
    2) Агрегированные данные
    объем данных в секунду – 20 000 000 (записей) * 200 (байт) / 120 сек = 33 MB/cек
    объем данных в день – 2 000 000 000 (записей) * 200 (байт) = 400 GB/день

    Если бы мы не агрегировали данные, то только под статистику нам необходимо было бы 33 GB * 8 = 264 Gb/с сетевых мощностей.

    При таком подходе мы используем часть ресурсов сервера (CPU и память) для агрегации данных, но так как эта часть очень мала, это нас не особо волнует.

    Промежуточное хранилище данных: эволюция


    Если бы не было промежуточного хранилища, то на долю DWH-серверов пришлось бы 4000 параллельных сессий, которые бы писали данные со скоростью 33 MB в секунду. Параллельно с такой нагрузкой заниматься какой-либо обработкой данных было бы невозможно.

    Задача промежуточного хранилища данных:
    1) быстро сохранить полученные данные из 4000 Logger-приложений;
    2) быстро отдать свежие данные DWH-серверам;
    3) быстро удалить старые данные после того, как они выгружены в DWH.

    Хоть это и звучит просто, к архитектуре промежуточного хранилища данных, которая у нас есть сейчас и которая нас устраивает, мы пришли не сразу.

    2008 год

    В 2008 году DWH у нас еще не было, но было 300 серверов, которые писали свои логи в одну базу c 20 таблицами. Отдельное приложение генерировало графики, обращаясь напрямую к данным в этой же базе. У каждой таблицы для обеспечения быстрого чтения по определенному периоду был кластерный индекс по дате и времени.

    По мере роста количества серверов и объёма данных графики стали генерироваться все медленнее и медленнее. Ими запросто можно было так нагрузить базу, что она переставала принимать новые записи. Поэтому приняли решение создать отдельное хранилище данных, которое будет выкачивать из этой базы данные и эффективно обслуживать запросы с приложений бизнес-анализа.

    2009 год

    Наша новая система выкачивала данные из базы логов, которую мы стали называть «промежуточным хранилищем». Структуру данных не меняли, потому что было необходимо, чтобы старые графики и Логгер продолжали работать без изменений.

    Данные для выгрузки мы фильтровали по дате и времени. При этом данных с одинаковым временем было много, и появлялись они асинхронно. Приходилось выгружать данные с «большим запасом прочности». Поэтому к каждой таблице добавили колонку ID типа bigint с автоинкрементом и создали индекс на эту колонку. После этого мы уже могли выгружать одну запись только один раз.

    Какое-то время это работало хорошо. Количество серверов выросло до 700. И вдруг оказалось, что база логов «не резиновая» – заканчивалось место на диске. Старые логи, выгруженные в DWH, пришлось удалять, но эффективного способа это сделать не было. Командой DELETE нельзя, так как на больших объёмах это слишком тяжелая и медленная операция. Пришлось удалять записи путём пересоздания таблиц, что приводило к потере данных и недоступности сервиса.

    2010 год

    Количество серверов и объёмов данных стремительно росло. Старые логи таким неэффективным способом приходилось удалять всё чаще и чаще. Это переросло в реальную проблему, которую надо было решать, причём желательно таким образом, чтобы ни Логгер, ни DWH не пришлось переписывать.

    Мы смогли сделать это с помощью представлений (views) и триггеров.

    image

    Для каждой таблицы создали идентичную пару и переименовали ее, добавив окончание _1 и _2. Создали представление с тем же именем, что у оригинальной таблицы. Серверы пишут в это представление. Добавили к представлению триггер instead of insert, который одну неделю пишет в таблицу с окончанием _1, а вторую неделю – в таблицу с окончанием _2.

    Переключение происходит в два этапа. Сначала переключается только триггер, а само представление читает данные из предыдущей таблицы. После того, как DWH выкачал к себе все данные из предыдущей таблицы, переключается представление.

    После этого данные из старых таблиц эффективно удаляются командой truncate table.

    Таким образом, мы получили решение, которое позволяло удалять данные без простоя и потери данных, при этом не требовало менять Логгер.

    С течением времени старой системой графиков пользоваться перестали, то есть отпала необходимость в кластерном индексе по дате и времени. Поэтому мы его удалили, а индекс по ID сделали кластерным. Благодаря этому система стала ещё более эффективной.

    2011 год

    Количество серверов превысило 3000. Решение c представлениями и триггерами стало очень нагружать CPU сервера при достаточно низкой дисковой очереди. Нагрузку необходимо было снижать, но наличие триггеров не позволяло это сделать. Решили, что нужно всё-таки переделать Логгер, чтобы одну неделю он писал в таблицы с _1, а вторую – в _2. Так и сделали.

    Часть инфраструктуры перевели на новый Логгер. Нагрузка на CPU упала, но дисковая очередь выросла до критического уровня. Структура базы максимально проста – улучшать нечего. Надо было искать другое решение.

    2012 год

    Дисковую очередь создавал MS SQL Log Manager, работая с лог-файлом базы данных. В тот момент у нас была база данных MS SQL 2005 Standard Edition. Эксперты утверждали, что у MS SQL 2008 Log Manager на порядок лучше, чем у MS SQL 2005. Мы решили это проверить и перешли на на MS SQL 2008 Enterprise Edition. Дисковая очередь действительно снизилась на порядок!

    Заодно для удаления данных мы решили использовать partitioning таблиц. То есть вернулись к модели 2009 года, но только кластерный индекс по ID разделили на партиции по 20 000 000 записей. После того, как партиция заполнилась на 100% и все DWH-серверы данные выкачали, эти партиции удаляются.

    Партиции в SQL-серверах «невидимы» для разработчиков, и это решение не усложняет код. Удаление и добавление партиций происходит онлайн, потери подключений и данных не происходят.

    В результате получилась следующая архитектура:



    Промежуточные хранилища в схеме названы базами Logs. Все они одинаковы по структуре и содержат полный комплект возможных таблиц. Все агрегированные данные 1 раз в 5 минут скидываются в эти таблицы. Администраторы сайта настраивают Логгер каждого проинсталлированного приложения, чтобы нагрузка равномерно распределялась по всем базам Logs. При этом информация не дублируется, то есть одна и та же информация может попасть только в одну из баз Logs.

    Изначально у нас была одна такая база, но сейчас у нас их несколько. Причина в следующем:
    1) в каждом большом хостинге должно быть по одной базе Logs, чтобы при проблемах с хостингом мы не теряли всю статистику;
    2) базы Logs перестают справляться с задачами по мере роста собираемого объёма данных.

    Выгрузка данных из промежуточного хранилища данных


    Параллельно c записью в базы Logs происходит выкачка данных из них в базы DWH.



    DWH-сервер в одном потоке опрашивает каждую таблицу во всех базах Logs и выкачивает все новые записи, которые появились после предыдущей выкачки.

    Посчитаем, насколько быстрой должна быть выгрузка данных, чтобы уложится в 5 минут. Количество опрашиваемых таблиц – 300 * 4 = 1200. Получается, что в среднем на одну таблицу отводится 300 / 1200 = 0,25 сек. Что делать, если в среднем на таблицу уходит больше времени? Масштабировать! Ставим еще один DWH-сервер и половину таблиц выгружаем в один сервер, а вторую половину – во второй сервер.



    Если таблиц или данных стало больше и опять невозможно уложится в 2 минуты, то ставим ещё один DWH-сервер и т.д. Сейчас у нас 3 таких DWH-сервера.

    В следующей статье


    В следующей статье мы расскажем о том, как мы построили загрузку данных в DWH-серверах, и на что же тратится в среднем по 0,25 сек на таблицу.
    Метки:
    Mail.Ru Group 958,34
    Строим Интернет
    Поделиться публикацией
    Похожие публикации
    Комментарии 0

    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

    Самое читаемое