Comments 19
Если вы изменили spark.driver.memory, нужно поднимать maxResultSize до эквивалентных значений.
Немного упрощаете. Далеко не всегда так. Драйвер может потреблять память по множеству разных причин, например, построить какой-то индекс в виде Map, и забродкастить его потом executor-ам. С размером результата это может быть никак не связано вообще.
Эксперты считают, что оптимальный экзекьютор — с примерно 5 ядрами и соответствующим объёмом памяти.
Сколько ни пытался у них найти какого-либо обоснования, так и не нашел. Ну вот посудите сами — в идеале же ядра на ноде стоит занимать целиком, не оставляя непонятно для чего пару или там тройку (оставить нужно одно, для ОС и хадупа). А что такое пять ядер? Под какие такие процессоры это число оптимально, считая что у нас N executors на ноду? На ноде 6 ядер, или 11 ядер? Или 16? У экспертов, так, между прочим, кластер на Амазоне, то есть у них там виртуалки, поэтому у них есть своя специфика, и я бы аккуратно относился к их выводам (вовсе не хочу при этом сказать, что они вообще не правы — сам эту серию статей читал и перечитывал). Когда у вас (скажем, как у нас) bare metal кластер, то у него могут быть другие оптимальные параметры. Считать что их «около пяти» было бы разумнее.
Чтобы понять соотношение, зайдите в YARN → Scheduler.
Вообще, там вполне несложный вызов REST API. То есть, чтобы получить список нод кластера с их параметрами, мне на скале потребовалось примерно 15 строк кода.
Кроме того, хотел бы добавить, что большинство подобных методик (я вообще что-то не вспомню исключений) исходят из того, что почему-то все ядра и память доступны приложению. В то время как в Yarn есть такая штука, как очереди, и у них есть ограничения на выделяемые приложению ресурсы. Так что при выборе ресурсов для спарка на ярне следовало бы по крайней мере представлять, что там за параметры у нашей очереди. Тоже можно получить REST запросом, но тут уже ответ сильно сложнее, потому что очереди — они в виде древовидной структуры.
>берите в 5-10 раз больше памяти, чем занимают ваши данные;
А тут было бы неплохо уточнить, какие такие данные? Все данные, что я собрался обрабатывать? А если их сотня терабайт, у меня просто нет столько памяти на кластере?
Привет! Спасибо за классные комментарии!
Перед переходом к деталям, нужно наверное отметить идею статьи. Задача - научить работать со спарком именно аналитика, а не дата инженера. Поэтому моя цель была - дать какие-то простые эвристические рекомендации для работы.
Если вы изменили spark.driver.memory, нужно поднимать maxResultSize до эквивалентных значений.
Да, про бродкаст я кстати не написал. Для него может быть тоже нужно расширять драйвер. Но не понятно, зачем ограничивать driver.maxResultSize и делать его меньше driver.memory. Нужно понимать, что аналитик делает исследование, пишет код с нуля. Он часто не знает заранее что ему потребуется
Да, про бродкаст я кстати не написал. Для него может быть тоже нужно расширять драйвер. Но не понятно, зачем ограничивать driver.maxResultSize и делать его меньше driver.memory. Нужно понимать, что аналитик делает исследование, пишет код с нуля. Он часто не знает заранее что ему потребуется
Fair комментарий, что непонятно какие ядра и прочее. В статье, на которую я дал ссылку, один человек сделал один тест и верно, что на него нельзя полагаться как на истину. Но тем не менее это референс. Опять же - у аналитика нет возможности запускать один и тот же код 10 раз, чтобы подобрать параметры. Ему нужен какой-то стандартный конфиг.
Про то, чтобы все ядра на ноде занимать, не очень понял.. На ярн потому что выделяется не 100% ядер. По умолчанию две трети вроде. Или вы про то, чтобы доступные ядра делились на количество экзекьюторов? Последнее - хорошая идея. Но там то драйвер, то еще что-то, то кто-то выделит с другим количеством. Не подгадаешь. 16 ядер это конечно немного, мб на них и правда нужно брать поменьше экзекьюторы
А кстати, как вы считаете нужно делать в этой ситуации? Сколько-то ядер нужно же попросить.
Чтобы понять соотношение, зайдите в YARN → Scheduler.
Не очень уловил комментарий. UI ярна нужно смотрать, как раз чтобы видеть, сколько выделено на ярн..
берите в 5-10 раз больше памяти, чем занимают ваши данные;
В ежедневной работе мы не читаем все данные за все время в память. Да и в витринах тоже. Обычно считаем по дням или по часам. И это уже можно вместить в память так, чтобы это было 5-10 от общего объема. Если вмещать больше, то люди будут регулятно ловить OutOfMemory. Для аналитика, которому нужно просто получить результат и нет времени оптимизировать запросы, это очень болезненно
дать какие-то простые эвристические рекомендации для работы.
Не, рекомендации вполне хороши. Я просто уточнил некоторые детали.
зачем ограничивать driver.maxResultSize
тут не про то, чтобы его ограничивать, а скорее про то, что память, нужная драйверу, может быть не связана с размером результата вообще никак. И это два независимых параметра, в общем случае.
Про то, чтобы все ядра на ноде занимать, не очень понял…
ну тут простое соображение — если занять все ядра, почти, и оставить сколько-то простаивать — то кто их займет? Это неэффективная трата ядер. Сколько их выделено ярну — это вопрос немножко отдельный, но в любом случае, я рассчитываю что мы можем это узнать — и учесть.
Не очень уловил комментарий.
Тут я про то, что REST API ярна в этом месте совсем несложный, и узнать про ноды все что надо — легко. И дальше можно это автоматизировать как-то.
ну тут простое соображение — если занять все ядра, почти, и оставить
сколько-то простаивать — то кто их займет? Это неэффективная трата ядер.
Сколько их выделено ярну — это вопрос немножко отдельный, но в любом
случае, я рассчитываю что мы можем это узнать — и учесть.
Прошу прощения, немного не понял все-таки. А можете еще раз более подробно описать логику выбора размера экзекьюторов? Правильно я понял, что через API ярна можно посмотреть сколько доступно ядер на каждой ноде? (я про это не знал, нужно будет разобраться) И исходя из этого выбрать?
Да, из API ярна можно посмотреть. Там один запрос, не слишком сложный JSON.
GET http://rm-http-address:port/ws/v1/cluster/nodes
{
"nodes":
{
"node":
[
{
"rack":"\/default-rack",
"state":"RUNNING",
"id":"host.domain.com:54158",
"nodeHostName":"host.domain.com",
"nodeHTTPAddress":"host.domain.com:8042",
"lastHealthUpdate": 1476995346399,
"version": "3.0.0-alpha2-SNAPSHOT",
"healthReport":"",
"numContainers":0,
"usedMemoryMB":0,
"availMemoryMB":8192,
"usedVirtualCores":0,
"availableVirtualCores":8,
"resourceUtilization":
{
"nodePhysicalMemoryMB":1027,
"nodeVirtualMemoryMB":1027,
"nodeCPUUsage":0.016661113128066063,
"aggregatedContainersPhysicalMemoryMB":0,
"aggregatedContainersVirtualMemoryMB":0,
"containersCPUUsage":0
}
},
"usedMemoryMB"+"availMemoryMB" вот эта сумма — это общий объем на ноде.
Все варианты, что я видел, исходят из того, что ядер одинаковое число. Если ноды разные — я вообще не знаю, как это повлияет (точно так же я откровенно говоря не знаю, как быть в реальном случае, когда мы на кластере не одни, а реально работают тысячи приложений (как у нас это и происходит).
Ну то есть, по сути мы выбираем некую верхнюю границу, исходя из возможностей пустого кластера. То есть, число ядер как доступное на ноде, минус некий резерв (1 ядро минимум, возможно больше, тут я не уверен). Оставшиеся ядра делим поровну на N executors (которое еще не знаем). Тут два очевидных варианта — один executor и все доступные ядра, или наоборот, executor с 1 ядром. Они скорее всего плохие, тут я согласен (считаем, что ядер все-таки не 4, а скорее 16).
Выбираем что-то похожее на "5 ядер на executor". В итоге имеющееся число ядер может не делиться на 5, поэтому что-то останется. И будет простаивать. То есть, если у нас доступно скажем 12 ядер, то я бы взял бы 4, а не 5, потому что при 5 останется два ядра ни туда ни сюда...
Ну то есть, я бы брал число ядер на executor исходя из того, чтобы не было этого остатка. Ну если это возможно, разумеется. А потом бы распределял между ними память. Исходя из этого мы можем оценить, сколько вообще ресурсов имеет смысл просить. Ну и дальше можно попробовать его скорректировать исходя из того, сколько ресурсов доступно в нашей очереди (в меньшую сторону), или на кластере (это видно из того же запроса к ярну). Но тут уже мы уходим в область прогнозов, потому что никто не знает, что будет в следующие 15 минут (хотя мы можем попробовать собирать статистику).
По названию статьи подумал что это про то как аналитики приходят к дата-инженерам чтобы те подкинули ядер им в очередь в yarn :-).
А в итоге вы в Сбере или вы в Авито?
В pySpark результаты собирает функция toPandas().
Это упрощение. Как минимум есть еще классический collect(). А проблемы с нехваткой памяти также можно легко получить и при неосторожном использовании методов для pyspark.DataFrame coalesce() или repartition()
К тому же можно сделать не toPandas().to_csv(), а записать датафрейм в hive или hdfs, например, а потом оттуда забрать данные.
берите в 5-10 раз больше памяти, чем занимают ваши данные;
А shuffle, возникающий при джойне, изменяет количество партиций до параметра spark.sql.shuffle.partitions, который по умолчанию 200.
Не все данные могут так просто влезть в память. И как раз настройка этого параметра позволяет не брать десятки гигабайт памяти на экзекутор, и pyspark сам разобьет датафреймы на более мелкие партиции.
По ядрам все просто: чем их больше, тем быстрее идут расчёты.
Но чем больше ядер на экзекутор, тем меньше памяти на ядро. И тем больше потоков при shuffle read/write, что тоже не всегда хорошо. Это палка о двух концах.
И если основная цель - это все же ускорить вычисления, то как минимум стоит задумываться о том, как партицированы данные при джойнах, чтобы избежать лишней передачи данных.
настройка этого параметра позволяет не брать десятки гигабайт памяти на экзекутор
Более того, у спарка есть лимит на размер партиции, в который иногда упираешься при дефолтном числе 200. Помнится мне, что там 2 гигабайта, то есть при размере данных всего-то 400 гигабайт, и равном размере партиций — мы уже можем утыкаться и в него.
Привет! Тоже спасибо за классные каменты!
В pySpark результаты собирает функция toPandas()
Валидно.. Тема сохранения достаточно большая и у меня даже есть драфт отдельной статьи на эту тему. Там есть бенчмарки по toPandas с и без arrow оптимизации, а также про скачивания с hdfs. Для теста кстати получилось выгрузить через toPandas датасет в 40Гб (за 5 мин, с arrow оптимизацией)
берите в 5-10 раз больше памяти, чем занимают ваши данные;
Вы смотрите с перспективы дата инженера, где вы оптимизируете жесткие расчеты =) Я думаю для аналитика такое не норма. Я пытаюсь донести аналитикам мысль, если данные занимают так много, что не влезают, то нужно считать меньшими кусочками. Или нести задачу дата инженерам. Потому что если аналитик будет регулярно бесконтрольно в ноутбуке запускать жесткие расчеты (а в них скорее всего будут дата спиллы), это будет руинить кластер. У меня такое в практике увы было не раз( Периодически умирал кластер, начинали выяснять, оказывалось, что кто-то регулярно насиловал кластер чем-то очень жестким.
По ядрам все просто: чем их больше, тем быстрее идут расчёты.
Это правда.. Нужно будет про партиционирование и джойны написать отдельный гайд.
А расскажите при случае подробнее про Arrow. Мне его довольно активно рекламировали, и у меня нет причин не доверять этой рекламе, но на практике пока не удается добраться и пощупать.
про партиционирование и джойны написать отдельный гайд.
Особенно если есть опыт применения нового оптимизатора Spark 3.
Я считаю, что нужно сначала вообще озвучить какую версию Spark вы используете. И начать с более приземлённых понятий о работе Spark, на узлах кластера, и например не стоит мешать синтаксис Spark с наивным Python.
Саш, подскажи, пжл, почему это все еще надо донастраивать руками, почему нет какой-то программной реализации распределения ресурсов (хотя бы эмпирически)?
Наверное потому что спарк изначально занял нишу продвинутого инструмента. В качестве основного инструмента у нас большая часть аналитиков сидит на вертике. В большинстве других компаний тоже стоит какая-то MPP (massive parallel processing) база, гринплам там или еще что-то. И эти MPP из коробки закрывают очень много всего.
Отличие спарка от MPP баз отдельный вопрос. Но одно из преимуществ спарка - наличие контроля: выделяемых ресурсов, оптимизации, данных, которые в памяти, а которые нет и прочее. За счет этого спарк может решать потенциально более сложные задачи и масштабироваться.
В Сбере нам приходилось использовать хадуп и спарк в первую очередь потому что только он позволял нам построить единое федеративное хранилище на десятки независимых подразделений и тысячи пользователей. Кто поднимал свой гринплам, например, мучался - с трудом забирал и отдавал данные во вне. Хотя с точки зрения пользовательского опыта гринплам гораздо комфортнее спарка.
Задача планирования ресурсов в многозадачной системе не имеет простого решения, если нагрузка не предсказуема заранее (а это как правило так, у аналитиков так в особенности). Может поэтому (в том числе)?
PySpark для аналитика. Как правильно просить ресурсы и как понять, сколько нужно брать