Pull to refresh

Comments 19

Если вы изменили spark.driver.memory, нужно поднимать maxResultSize до эквивалентных значений.

Немного упрощаете. Далеко не всегда так. Драйвер может потреблять память по множеству разных причин, например, построить какой-то индекс в виде Map, и забродкастить его потом executor-ам. С размером результата это может быть никак не связано вообще.

Эксперты считают, что оптимальный экзекьютор — с примерно 5 ядрами и соответствующим объёмом памяти.

Сколько ни пытался у них найти какого-либо обоснования, так и не нашел. Ну вот посудите сами — в идеале же ядра на ноде стоит занимать целиком, не оставляя непонятно для чего пару или там тройку (оставить нужно одно, для ОС и хадупа). А что такое пять ядер? Под какие такие процессоры это число оптимально, считая что у нас N executors на ноду? На ноде 6 ядер, или 11 ядер? Или 16? У экспертов, так, между прочим, кластер на Амазоне, то есть у них там виртуалки, поэтому у них есть своя специфика, и я бы аккуратно относился к их выводам (вовсе не хочу при этом сказать, что они вообще не правы — сам эту серию статей читал и перечитывал). Когда у вас (скажем, как у нас) bare metal кластер, то у него могут быть другие оптимальные параметры. Считать что их «около пяти» было бы разумнее.

Чтобы понять соотношение, зайдите в YARN → Scheduler.

Вообще, там вполне несложный вызов REST API. То есть, чтобы получить список нод кластера с их параметрами, мне на скале потребовалось примерно 15 строк кода.

Кроме того, хотел бы добавить, что большинство подобных методик (я вообще что-то не вспомню исключений) исходят из того, что почему-то все ядра и память доступны приложению. В то время как в Yarn есть такая штука, как очереди, и у них есть ограничения на выделяемые приложению ресурсы. Так что при выборе ресурсов для спарка на ярне следовало бы по крайней мере представлять, что там за параметры у нашей очереди. Тоже можно получить REST запросом, но тут уже ответ сильно сложнее, потому что очереди — они в виде древовидной структуры.

>берите в 5-10 раз больше памяти, чем занимают ваши данные;

А тут было бы неплохо уточнить, какие такие данные? Все данные, что я собрался обрабатывать? А если их сотня терабайт, у меня просто нет столько памяти на кластере?

Привет! Спасибо за классные комментарии!
Перед переходом к деталям, нужно наверное отметить идею статьи. Задача - научить работать со спарком именно аналитика, а не дата инженера. Поэтому моя цель была - дать какие-то простые эвристические рекомендации для работы.

Если вы изменили spark.driver.memory, нужно поднимать maxResultSize до эквивалентных значений.

Да, про бродкаст я кстати не написал. Для него может быть тоже нужно расширять драйвер. Но не понятно, зачем ограничивать driver.maxResultSize и делать его меньше driver.memory. Нужно понимать, что аналитик делает исследование, пишет код с нуля. Он часто не знает заранее что ему потребуется

Да, про бродкаст я кстати не написал. Для него может быть тоже нужно расширять драйвер. Но не понятно, зачем ограничивать driver.maxResultSize и делать его меньше driver.memory. Нужно понимать, что аналитик делает исследование, пишет код с нуля. Он часто не знает заранее что ему потребуется

Fair комментарий, что непонятно какие ядра и прочее. В статье, на которую я дал ссылку, один человек сделал один тест и верно, что на него нельзя полагаться как на истину. Но тем не менее это референс. Опять же - у аналитика нет возможности запускать один и тот же код 10 раз, чтобы подобрать параметры. Ему нужен какой-то стандартный конфиг.

Про то, чтобы все ядра на ноде занимать, не очень понял.. На ярн потому что выделяется не 100% ядер. По умолчанию две трети вроде. Или вы про то, чтобы доступные ядра делились на количество экзекьюторов? Последнее - хорошая идея. Но там то драйвер, то еще что-то, то кто-то выделит с другим количеством. Не подгадаешь. 16 ядер это конечно немного, мб на них и правда нужно брать поменьше экзекьюторы

А кстати, как вы считаете нужно делать в этой ситуации? Сколько-то ядер нужно же попросить.

Чтобы понять соотношение, зайдите в YARN → Scheduler.

Не очень уловил комментарий. UI ярна нужно смотрать, как раз чтобы видеть, сколько выделено на ярн..

берите в 5-10 раз больше памяти, чем занимают ваши данные;

В ежедневной работе мы не читаем все данные за все время в память. Да и в витринах тоже. Обычно считаем по дням или по часам. И это уже можно вместить в память так, чтобы это было 5-10 от общего объема. Если вмещать больше, то люди будут регулятно ловить OutOfMemory. Для аналитика, которому нужно просто получить результат и нет времени оптимизировать запросы, это очень болезненно

дать какие-то простые эвристические рекомендации для работы.

Не, рекомендации вполне хороши. Я просто уточнил некоторые детали.


зачем ограничивать driver.maxResultSize

тут не про то, чтобы его ограничивать, а скорее про то, что память, нужная драйверу, может быть не связана с размером результата вообще никак. И это два независимых параметра, в общем случае.


Про то, чтобы все ядра на ноде занимать, не очень понял…

ну тут простое соображение — если занять все ядра, почти, и оставить сколько-то простаивать — то кто их займет? Это неэффективная трата ядер. Сколько их выделено ярну — это вопрос немножко отдельный, но в любом случае, я рассчитываю что мы можем это узнать — и учесть.


Не очень уловил комментарий.

Тут я про то, что REST API ярна в этом месте совсем несложный, и узнать про ноды все что надо — легко. И дальше можно это автоматизировать как-то.

ну тут простое соображение — если занять все ядра, почти, и оставить
сколько-то простаивать — то кто их займет? Это неэффективная трата ядер.
Сколько их выделено ярну — это вопрос немножко отдельный, но в любом
случае, я рассчитываю что мы можем это узнать — и учесть.

Прошу прощения, немного не понял все-таки. А можете еще раз более подробно описать логику выбора размера экзекьюторов? Правильно я понял, что через API ярна можно посмотреть сколько доступно ядер на каждой ноде? (я про это не знал, нужно будет разобраться) И исходя из этого выбрать?

Да, из API ярна можно посмотреть. Там один запрос, не слишком сложный JSON.


GET http://rm-http-address:port/ws/v1/cluster/nodes


{
  "nodes":
  {
    "node":
    [
      {
        "rack":"\/default-rack",
        "state":"RUNNING",
        "id":"host.domain.com:54158",
        "nodeHostName":"host.domain.com",
        "nodeHTTPAddress":"host.domain.com:8042",
        "lastHealthUpdate": 1476995346399,
        "version": "3.0.0-alpha2-SNAPSHOT",
        "healthReport":"",
        "numContainers":0,
        "usedMemoryMB":0,
        "availMemoryMB":8192,
        "usedVirtualCores":0,
        "availableVirtualCores":8,
        "resourceUtilization":
        {
          "nodePhysicalMemoryMB":1027,
          "nodeVirtualMemoryMB":1027,
          "nodeCPUUsage":0.016661113128066063,
          "aggregatedContainersPhysicalMemoryMB":0,
          "aggregatedContainersVirtualMemoryMB":0,
          "containersCPUUsage":0
        }
      },

"usedMemoryMB"+"availMemoryMB" вот эта сумма — это общий объем на ноде.


Все варианты, что я видел, исходят из того, что ядер одинаковое число. Если ноды разные — я вообще не знаю, как это повлияет (точно так же я откровенно говоря не знаю, как быть в реальном случае, когда мы на кластере не одни, а реально работают тысячи приложений (как у нас это и происходит).


Ну то есть, по сути мы выбираем некую верхнюю границу, исходя из возможностей пустого кластера. То есть, число ядер как доступное на ноде, минус некий резерв (1 ядро минимум, возможно больше, тут я не уверен). Оставшиеся ядра делим поровну на N executors (которое еще не знаем). Тут два очевидных варианта — один executor и все доступные ядра, или наоборот, executor с 1 ядром. Они скорее всего плохие, тут я согласен (считаем, что ядер все-таки не 4, а скорее 16).


Выбираем что-то похожее на "5 ядер на executor". В итоге имеющееся число ядер может не делиться на 5, поэтому что-то останется. И будет простаивать. То есть, если у нас доступно скажем 12 ядер, то я бы взял бы 4, а не 5, потому что при 5 останется два ядра ни туда ни сюда...


Ну то есть, я бы брал число ядер на executor исходя из того, чтобы не было этого остатка. Ну если это возможно, разумеется. А потом бы распределял между ними память. Исходя из этого мы можем оценить, сколько вообще ресурсов имеет смысл просить. Ну и дальше можно попробовать его скорректировать исходя из того, сколько ресурсов доступно в нашей очереди (в меньшую сторону), или на кластере (это видно из того же запроса к ярну). Но тут уже мы уходим в область прогнозов, потому что никто не знает, что будет в следующие 15 минут (хотя мы можем попробовать собирать статистику).

Ого, спасибо за инсайт! Очень круто)

По названию статьи подумал что это про то как аналитики приходят к дата-инженерам чтобы те подкинули ядер им в очередь в yarn :-).

Бывает и такое ) Ну или можно попросить очередь побольше...

А в итоге вы в Сбере или вы в Авито?

Сейчас я работаю в Авито. В Сбере тоже работал, провел там хорошие 4 года =)

В pySpark результаты собирает функция toPandas().

Это упрощение. Как минимум есть еще классический collect(). А проблемы с нехваткой памяти также можно легко получить и при неосторожном использовании методов для pyspark.DataFrame coalesce() или repartition()

К тому же можно сделать не toPandas().to_csv(), а записать датафрейм в hive или hdfs, например, а потом оттуда забрать данные.

берите в 5-10 раз больше памяти, чем занимают ваши данные;

А shuffle, возникающий при джойне, изменяет количество партиций до параметра spark.sql.shuffle.partitions, который по умолчанию 200.

Не все данные могут так просто влезть в память. И как раз настройка этого параметра позволяет не брать десятки гигабайт памяти на экзекутор, и pyspark сам разобьет датафреймы на более мелкие партиции.

По ядрам все просто: чем их больше, тем быстрее идут расчёты.

Но чем больше ядер на экзекутор, тем меньше памяти на ядро. И тем больше потоков при shuffle read/write, что тоже не всегда хорошо. Это палка о двух концах.

И если основная цель - это все же ускорить вычисления, то как минимум стоит задумываться о том, как партицированы данные при джойнах, чтобы избежать лишней передачи данных.

настройка этого параметра позволяет не брать десятки гигабайт памяти на экзекутор

Более того, у спарка есть лимит на размер партиции, в который иногда упираешься при дефолтном числе 200. Помнится мне, что там 2 гигабайта, то есть при размере данных всего-то 400 гигабайт, и равном размере партиций — мы уже можем утыкаться и в него.

Привет! Тоже спасибо за классные каменты!

В pySpark результаты собирает функция toPandas()

Валидно.. Тема сохранения достаточно большая и у меня даже есть драфт отдельной статьи на эту тему. Там есть бенчмарки по toPandas с и без arrow оптимизации, а также про скачивания с hdfs. Для теста кстати получилось выгрузить через toPandas датасет в 40Гб (за 5 мин, с arrow оптимизацией)

берите в 5-10 раз больше памяти, чем занимают ваши данные;

Вы смотрите с перспективы дата инженера, где вы оптимизируете жесткие расчеты =) Я думаю для аналитика такое не норма. Я пытаюсь донести аналитикам мысль, если данные занимают так много, что не влезают, то нужно считать меньшими кусочками. Или нести задачу дата инженерам. Потому что если аналитик будет регулярно бесконтрольно в ноутбуке запускать жесткие расчеты (а в них скорее всего будут дата спиллы), это будет руинить кластер. У меня такое в практике увы было не раз( Периодически умирал кластер, начинали выяснять, оказывалось, что кто-то регулярно насиловал кластер чем-то очень жестким.

По ядрам все просто: чем их больше, тем быстрее идут расчёты.

Это правда.. Нужно будет про партиционирование и джойны написать отдельный гайд.

А расскажите при случае подробнее про Arrow. Мне его довольно активно рекламировали, и у меня нет причин не доверять этой рекламе, но на практике пока не удается добраться и пощупать.


про партиционирование и джойны написать отдельный гайд.
Особенно если есть опыт применения нового оптимизатора Spark 3.

Я считаю, что нужно сначала вообще озвучить какую версию Spark вы используете. И начать с более приземлённых понятий о работе Spark, на узлах кластера, и например не стоит мешать синтаксис Spark с наивным Python.

На мой взгляд, по большому счету с версии 1.6 по 3.2 ничего особо не поменялось. И потом, это же не учебник, а статья про собственный опыт автора.

Саш, подскажи, пжл, почему это все еще надо донастраивать руками, почему нет какой-то программной реализации распределения ресурсов (хотя бы эмпирически)?

Наверное потому что спарк изначально занял нишу продвинутого инструмента. В качестве основного инструмента у нас большая часть аналитиков сидит на вертике. В большинстве других компаний тоже стоит какая-то MPP (massive parallel processing) база, гринплам там или еще что-то. И эти MPP из коробки закрывают очень много всего.

Отличие спарка от MPP баз отдельный вопрос. Но одно из преимуществ спарка - наличие контроля: выделяемых ресурсов, оптимизации, данных, которые в памяти, а которые нет и прочее. За счет этого спарк может решать потенциально более сложные задачи и масштабироваться.

В Сбере нам приходилось использовать хадуп и спарк в первую очередь потому что только он позволял нам построить единое федеративное хранилище на десятки независимых подразделений и тысячи пользователей. Кто поднимал свой гринплам, например, мучался - с трудом забирал и отдавал данные во вне. Хотя с точки зрения пользовательского опыта гринплам гораздо комфортнее спарка.

Задача планирования ресурсов в многозадачной системе не имеет простого решения, если нагрузка не предсказуема заранее (а это как правило так, у аналитиков так в особенности). Может поэтому (в том числе)?

Sign up to leave a comment.