Apache Spark: что там под капотом?

    Вступление


    В последнее время проект Apache Spark привлекает к себе огромное внимание, про него написано большое количество маленьких практических статей, он стал частью Hadoop 2.0. Плюс он быстро оброс дополнительными фреймворками, такими, как Spark Streaming, SparkML, Spark SQL, GraphX, а кроме этих «официальных» фреймворков появилось море проектов — различные коннекторы, алгоритмы, библиотеки и так далее. Достаточно быстро и уверенно разобраться в этом зоопарке при отсутсвие серьезной документации, особенно учитывая факт того, что Spark содержит всякие базовые кусочки других проектов Беркли (например BlinkDB) — дело непростое. Поэтому решил написать эту статью, чтобы немножко облегчить жизнь занятым людям.

    Небольшая предыстория:


    Spark — проект лаборатории UC Berkeley, который зародился примерно в 2009г. Основатели Спарка — известные ученые из области баз данных, и по философии своей Spark в каком-то роде ответ на MapReduce. Сейчас Spark находится под «крышей» Apache, но идеологи и основные разработчики — те же люди.

    Spoiler: Spark в 2-х словах


    Spark можно описать одной фразой так — это внутренности движка массивно-параллельной СУБД. То есть Spark не продвигает свое хранилище, а живет сверх других (HDFS — распределенная файловая система Hadoop File System, HBase, JDBC, Cassandra,… ). Правда стоит сразу отметить проект IndexedRDD — key/value хранилище для Spark, которое наверное скоро будет интегрировано в проект.Также Spark не заботится о транзакциях, но в остальном это именно движок MPP DBMS.

    RDD — основная концепция Spark


    Ключ к пониманию Spark — это RDD: Resilient Distributed Dataset. По сути это надежная распределенная таблица (на самом деле RDD содержит произвольную коллекцию, но удобнее всего работать с кортежами, как в реляционной таблице). RDD может быть полностью виртуальной и просто знать, как она породилась, чтобы, например, в случае сбоя узла, восстановиться. А может быть и материализована — распределенно, в памяти или на диске (или в памяти с вытеснением на диск). Также, внутри, RDD разбита на партиции — это минимальный объем RDD, который будет обработан каждым рабочим узлом.



    Все интересное, что происходит в Spark, происходит через операции над RDD. То есть обычно приложения для Spark выглядит так — создаем RDD (например достаем данные из HDFS), мусолим его (map, reduce, join, groupBy, aggregate, reduce, ...), что-нибудь делаем с результатом — например кидаем обратно в HDFS.

    Ну и уже исходя из этого понимания следует Spark рассматривать как параллельную среду для сложных аналитических банч заданий, где есть мастер, который координирует задание, и куча рабочих узлов, которые участвуют в выполнении.

    Давайте рассмотрим такое простое приложение в деталях (напишем его на Scala — вот и повод изучить этот модный язык):

    Пример Spark приложения (не все включено, например include)


    Мы отдельно разберем, что происходит на каждом шаге.

    def main(args: Array[String]){
    
      // Инициализация, не особо интересно
     val conf = new SparkConf().setAppName(appName).setMaster(master) 
     val sc = new SparkContext(conf)
    
     // Прочитаем данные из HDFS, сразу получив RDD
     val myRDD  = sc.textFile("hdfs://mydata.txt")
    
     // Из текстового файла мы получаем строки. Не слишком интересные данные.
     // Мы из этих строк сделаем кортежи, где первый элемент (сделаем его потом 
     // ключем) - первое "слово" строки 
     val afterSplitRDD = myRDD.map( x => ( x.split(" ")( 0 ), x ) )
    
     // Сделаем группировку по ключу: ключ - первый элемент кортежа
     val groupByRDD = afterSplitRDD.groupByKey( x=>x._1 )
    
     // Посчитаем кол-во элементов в каждой группе
     val resultRDD = groupByRDD.map( x => ( x._1, x._2.length ))
    
     // Теперь можно записать результат обратно на HDFS
     resultRDD.saveAsTextFile("hdfs://myoutput.txt")
      
    }
    


    А что же там происходит?


    Теперь пробежимся по этой программе и посмотрим что происходит.

    Ну во-первых программа запускается на мастере кластера, и прежде чем пойдет какая-нибудь параллельная обработка данные есть возможность что-то поделать спокойно в одном потоке. Далее — как уже наверное заметно — каждая операция над RDD создает другой RDD (кроме saveAsTextFile). При этом RDD все создаются лениво, только когда мы просим или записать в файл, или например выгрузить в память на мастер — начинается выполнение. То есть выполнение происходит как в плане запроса, конвеером, где элемент конвеера — это партиция.

    Что происходит с самой первой RDD, которую мы сделали из файла HDFS? Spark хорошо синтегрирован с Hadoop, поэтому на каждом рабочем узле будет закачиваться свое подмножество данных, и закачиваться будет по партициям (которые в случае HDFS совпадают с блоками). То есть все узлы закачали первый блок, и пошло выполнение дальше по плану.

    После чтения с диска у нас map — он выполняется тривиально на каждом рабочем узле.

    Дальше идет groupBy. Это уже не простая конвеерная операция, а настоящая распределенная группировка. По хорошему, лучше этот оператор избегать, так как пока он реализован не слишком умно — плохо отслеживает локальность данных и по производительности будет сравним с распределенной сортировкой. Ну это уже информация к размышлению.

    Давайте задумаемся о состоянии дел в момент выполнения groupBy. Все RDD до этого были конвеерными, то есть они ничего нигде не сохраняли. В случае сбоя, они опять бы вытащили недостающие данные из HDFS и пропустили через конвеер. Но groupBy нарушает конвеерность и в результате мы получим закэшированный RDD. В случае потери теперь мы вынуждены будем переделать все RDD до groupBy полностью.

    Чтобы избежать ситуации, когда из-за сбоев в сложном приложении для Spark приходится пересчитывать весь конвеер, Spark позволяет пользователю контролировать кэширование оператором persist. Он умеет кэшировать в память (в этом случае идет пересчет при потере данных в памяти — она может случится при переполнении кэша), на диск (не всегда достаточно быстро), или в память с выбросом на диск в случае переполнения кэша.

    После, у нас опять map и запись в HDFS.

    Ну вот, теперь более менее понятно что происходит внутри Spark на простом уровне.

    А как же подробности?


    Например хочется знать как именно работает операция groupBy. Или операция reduceByKey, и почему она намного эфективнее, чем groupBy. Или как работает join и leftOuterJoin. К сожалению большинство подробностей пока легче всего узнать только из исходников Spark или задав вопрос на их mailing list (кстати, рекомендую подписаться на него, если будете что-то серьезное или нестандартное делать на Spark).

    Еще хуже с понимаем, что творится в различных коннекторах к Spark. И насколько ими вообще можно пользоваться. Например нам на время пришлось отказаться от идеи интегрироваться с Cassandra из-за их непонятной поддержки коннектора к Spark. Но надежда есть что документация качественная в скором будущем появится.

    А что у нас интересного есть сверху Spark?


    • SparkSQL: SQL движок сверху Spark. Как мы видели уже, в Sparke уже практически все для этого есть, кроме хранилища, индексов и своей статистики. Это серьезно затрудняет оптимизацию, но команда SparkSQL утверждает, что они пилят свой новый фреймворк оптимизации, а также AMP LAB (лаборатория, откуда вырос Spark) не собирается отказывать и от проекта Shark — полное замещение Apache HIVE
    • Spark MLib: Это по сути замещение Apache Mahaout, только намного серьезнее. Помимо эффективного параллельного машинного обучения (не только средствами RDD, но и дополнительными примитивами) SparkML еще намного качественнее работает с локальными данными, используя пакет нативной линейной алгебры Breeze, который притянет к вам в кластер Фортрановский код. Ну и очень хорошо продуманный API. Простой пример: параллельно обучаемся на кластере с кросс-валидацией.
    • BlinkDB: Очень интересный проект — неточные SQL запросы сверху больших объемов данных. Хотим подсчитать average по какому-нибудь полю, но хочется сделать это не дольше чем за 5 секунд (потеряв в точности) — пожалуйста. Хотим результат с погрешностью не больше заданной — тоже годится. Кстати куски этой BlinkDB можно встретить внутри Spark (это можно рассматривать как отдельный квест).
    • Ну и много-много всего сейчас пишется сверху Spark, я только самые интересные с моей точки зрения проекты перечислил
    Метки:
    Поделиться публикацией
    Комментарии 12
    • +1
      Спасибо за отличную статью!

      А можно поподробнее про непонятную поддержку коннектора для Кассандры? Пользуюсь этим коннектором для чтения, вроде все нормально.
      • 0
        А кстати как работает параллельное чтение из Кассандры — удалось выяснить?
    • 0
      Спасибо!

      Не было много времени разбираться с коннектором — была надежда, что заработает хорошо из коробки. С чем столкнулись:

      1. Не смогли собрать .jar чтобы не конфликтовал с Spark SQL, поэтому сразу вылетел SQL функционал
      2. Ладно, для начала SQL не нужен был — перелопачиваем все таблицы. Делаем RDD, читаем, сохраняем в HDFS.
      2.1. На одном узле все отлично.
      2.2. На кластере вылетает с ошибкой, что файл уже существует на HDFS.
      3. Ок, сохранять в HDFS нам и не нужно — читаем, map, пишем обратно:
      3.1. На одном узле работает отлично
      3.2. На кластере — job выполняется успешно, без ошибок. Ничего в Кассандру не попадает. Это вообще кошмар. Особенно то, что Spark рапортует, что все успешно.

      В общем пока поставили на паузу. Появится время — вернемся к этому вопросу. Но вообще впечатление нехорошее
    • +1
      Немного с опозданием, но пару комментариев:

      1) он стал частью Hadoop 2.0.
      а можно узнать где и когда? хадуп 2.0 уже давно вышел, 3.0 на подходе, и спарк там никаким боком не светится

      2) Spark можно описать одной фразой так — это внутренности движка массивно-параллельной СУБД
      не правда, если уж и описывать одной фразой, то это: ленивая обработка вычислений и pipeline map фаз
      выполнять параллельно несколько задач он не умеет, в dag последовательно проходят все агрерирующие операции, если нужно действительно параллельность, то или ручками в Tez углубляться и строить DAG, или смотреть проект Apache Flink у них на слайдах это хорошо показано.

      3) groupBy
      у этого оператора, как и всех операторов использующих ShuffleMapTask есть большая проблема: они ВСЕГДА пишут результат на диск, независимо от того кешируете вы результат в памяти или нет, следующие в списке забирают результат по http. поэтому слова на их слайды про in-memory немного приувеличения =) разница в обмене между map и reduce у них с хадупом почти нулевая.

      4) а также AMP LAB (лаборатория, откуда вырос Spark) не собирается отказывать и от проекта Shark — полное замещение Apache HIVE
      опять мимо, на шарк уже все забили, amp lab все что полезное вытянули в spark sql, то что осталось отправилось на свалку. что действительно пилят, так это hive on spark, совместные усилия клоудеры и amp lab по реализации еще одного бекенда для hive (на данный момент есть mr и tez), но с полной совместимостью hive sql

      5) не только средствами RDD, но и дополнительными примитивами
      можно подробней? так как там все построено поверх RDD, это главный примитив в самом спарке, по сути key-value, а уже поверх него строится все остальное

      ну как-то так, если совсем кратко =)
      • 0
        1) Он в Hadoop 2.0

        2) В Спарке есть все операторы реляционной алгебры + дополнителные операторы для BlinkDB

        3) Возможны другие имплементации groupBy — например если все уже отсортировано по ключу — не надо ничего делать. Такой оператор появится — например когда будут merge-joins

        4) Нет, не забили, видел 3-х месячной давности презентацию

        5) Прочитайте про broadcast variables и accumulators

        6) да, кратко, чтобы можно было быстро прочесть и дальше изучать
        • 0
          1) Хмм… да, читал ананс давно, что будут включать, но не включили. Но уже есть другие дистрибутивы со Спарком
          • +1
            1) насколько помню даже и не собирались =) в момент выхода второго хадупа, спарк еще был в настолько сырой бете…
            может вы перепутали с «Spark on Pivotal Hadoop 2.0» или с HDP 2.0 от hortonworks, но это всего лишь сборки конкретных дистрибутивов с полностью независимой от hadoop нумерацией

            2) поддержка нескольких партиций и параллельная их обработка не делает из спарка «массивно-параллельной СУБД», с таким же успехом будет тоже «массивно-параллельной СУБД». реляционной алгебры — update и транзакционность уже прикрутили?

            3) «если бы, да кабы»,
            на данный момент в случае отсортированности какой либо таблицы, мы к ней делаем join, предварительно сортируя вторую
            если отсортированы обе, то в любом случае со второй дергаем репартишинг, так как диапазоны ключей конечно же разные
            выполнение repartition выкинет в обязательном порядке все данные на диск (в офф документации я что-то не заметил упоминания этого факта, благо хоть cloudera не молчит: The MapReduce and Spark shuffles use a “pull” model. Every map task writes out data to local disk, and then the reduce tasks make remote requests to fetch that data. А то на слайдах у датабрикса все так красиво)
            MergeJoin operator relies on SortBasedShuffle to create partitions that sorted by the join key, так что магии не будет, все расходимся

            4) ну за 3 месяца многое поменялось, меньше месяца назад они заявляли про spark sql как основная цель и catalist как движок для запиливания оптимизаций. можно даже пройти на офф сайт http://shark.cs.berkeley.edu/ «Shark has been subsumed by Spark SQL, a new module in Apache Spark. Please see the following blog post for more information: Shark, Spark SQL, Hive on Spark, and the future of SQL on Spark.» ;)

            5) читал и использовал.
            Ничем accumulators от таких в хадупе не отличаются (локальная работа в пределах партиции и сброс на агрегацию в драйвер. Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value).
            Broadcast Variables вообще сугубо read only и неизменяемы, раз разослали и все.

            p.s. активно пользовался хадупом, когда он еще 0.18 был, за спарком тоже слежу почти с самого его начала, но пока вижу пиара вокруг него больше, чем он реально может. я согласен с тем, что продукт дальше развивает подход с параллельной обработкой данных, которые пнул гугл и дальше подхватил hadoop, но не согласен с уровнем маркетинга вокруг него.
            • –1
              1) уже забыл, где видел

              2) я же написал — движок, то есть процессор, а не хранилище. Но на самом деле основная масса MPP DBMS используется для аналитики, тем не особо нужны транзакции. Так что тут уже 1 шаг.

              3) я привел пример groupBy, причем здесь reshuffling? GroupBy много разных имплементаций — если будет интегрировано свое хранилище с мета-данными то можно разшардить данные так, что они будут в пределах одного ключа для группировки лежать на одном узле. MergeJoin может ничего не сортировать, если данные идут из отсортированного индекса, например B-Tree. Далее — когда у нас здоровенный план запроса с кучей joins — можно 1 раз для всех сделать сортировку, и потом ей пользоваться, конечно, если ключи будут совпадать — а это очень частый случай (куча outerjoin-ов и группировкой например — часто хватит одной сортировки на весь план выполения).

              4) broadcast очень полезны, по сути — это барьерная синхронизация из MPI. Я не писал, что их нет в хадупе, хотя не знал, что они там есть. Особо никогда за хадупом не следил.

              5) маркетинга много, но тому есть резон. Уровень людей, которые делают стэк Спарка намного серьезнее, чем Хадупа.
    • +1
      2) ок, hive это mpp dbms, так как там уже даже mvcc прикрутили

      3) к тому что это все сделано поверх shuffle примитивов

      5) резон один — набрать побольше клиентов на саппорт. и давайте не будем про уровень людей, для этого достаточно открыть репозитории и посмотреть на срач в нем в spark, если вы про «профессоров», то хочу вас расстроить, количество грантов на разработку выданных при развитии хадупа тоже не один десяток ушло в универы и написаны/защищены достаточно работ по оптимизациям.

      в общем не вижу смысла продолжать спор, так как мы сейчас в разной плоскости: я — «как сейчас сделано и работает», вы — «как МОЖНО сделать или в будущем возможно сделают». само плохое когда люди начинают вместо реальной ситуации и положении дел в продукте рассказывают об их идеальном видении как это можно сделать.
      • 0
        Ну спора тут мало — HIVE именно сверху шафлов, и сделать на них нормальный dbms не получится.
        И UC Berkley — это серьезные ребята, я им верю
        • +2
          если вы так уверенны про шафлы, то посмотрите проект Apache Tez, там точно такой же pipeline задач можно сделать, распределенные hive tmp таблицы сугубо в памяти.
          он за последнее время ооочень сильно вырос, пускай и тянет в основном только хортон его.
          а уже если действительно хочется драйва, то Apache Flink, там и грамотный pipeline и parallel обработка

          Tez и Flink позволяют строить пайплайны намного более удобные для ETL (много ETL'щиков спасибо скажут за такой функционал)
          загрузили из source1, transform1, transform2, output (out1, out2, out3)

          в спарке после второй трансформации нужно делать материализацию результата, чтобы потом в разные out скормить, если у вас большие объемы, то это гарантированно упадет на диск. у flink в разы более грамотная работа с памятью и тд.

          В итоге когда дело касается веры, то я немцам в таких вещах больше верю ;)

          spark — сделали прототип, шумиха пошла, докидываем на лету на map-reduce дополнительные модули (хотя к архитектуре вопросов более чем у меня), чисто американский подход

          flink — провели исследования, вопросы оптимизации и runtime подстройки задач на основе статистики, начали делать прототипы и продукт, в общем классический немецкий подход

          кто выживет покажет время
          • 0
            +1 сразу бы так написали :)
            Очень интересно, не знал что немцы чего-то делают вообще…

    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.