MapReduce для начинающих на Erlang'e

    Я продолжаю свое погружение в Эрланг. Уже есть хитрый план переписать один из наших сервисов для мониторинга на Эрланге. Мы тут осваиваем облака Windows Azure и Amazon EC2 в качестве платформы для некоторых продуктов и внутренних задач типа QA, поэтому возможность использовать много ядер и машин без переписывания кода выглядить перспективно.

    Итак, для начала простой, но реальный пример — есть проект ~2000 файлов. Надо составить список используемых переменных окружения. То есть найти вхождения строк «getenv(...)» и «GetVariable(...)» (это наш wrapper) и выдрать из них параметр.

    Задача незамысловатая и давно решается программой на C++, которая даже обход каталогов не делает, а просто вызывает юниксовый «find», генерирующий список файлов по маске, и затем по списку лопатит файлы. На 2000 файлах работает пару секунд в один поток.

    Теперь Эрланг. Тут хочется замутить что-нибудь более кучерявое, чем последовательный обход файлов. MapReduce как раз в тему — можно составить список файлов, затем анализ каждого файла делать параллельно (Map), аккумулируя найденных имена переменных, и в конце обработать все полученные входждение (Reduce), в нашем случае просто подсчитать количество вхождения каждой переменной.

    Фактически мой код повторяет пример из "Programming Erlang" и использует модуль phofs (parallel higher-order functions) из этой же книги.

        -module(find_variables).
        -export([main/0, find_variables_in_file/2, process_found_variables/3]).
    
        -define(PATH, "/Projects/interesting_project").
        -define(MASK, "\\..*(cpp|c)").
    
        main() ->
            io:format("Creating list of files...~n", []),
            % Стандартная функция обхода файловой системы. Последний параметр -
            % функтор, накапливающий имена в списке.
            Files = filelib:fold_files(?PATH, ?MASK, true, 
                                       fun(N, A) -> [N | A] end, []),
            io:format("Found ~b file(s)~n", [length(Files)]),
            F1 = fun find_variables_in_file/2,   % Map
            F2 = fun process_found_variables/3,  % Reduce
            % Вызываем MapReduce через функцию benchmark, считающую время
            % выполнения.
            benchmark(fun() -> 
                L = phofs:mapreduce(F1, F2, [], Files),
                io:format("Found ~b variable(s)~n", [length(L)])
            end, "MapReduce").
    
        benchmark(Worker, Title) ->
            {T, _} = timer:tc(fun() -> Worker() end),
            io:format("~s: ~f sec(s)~n", [Title, T/1000000]).
    
        -define(REGEXP, "(getenv|GetVariable)\s*\\(\s*\"([^\"]+)\"\s*\\)").
    
        % Map. Анализ одного файла.
        find_variables_in_file(Pid, FileName) ->
            case file:open(FileName, [read]) of 
                {ok, File} ->
                    % Заранее компилируем регулярное выражение.
                    {_, RE} = re:compile(?REGEXP),
                    % Данный обратный вызов пошлет родительскому контролирующему
                    % потому сообщение с именем найденной переменной.
                    CallBack = fun(Var) -> Pid ! {Var, 1} end,
                    find_variable_in_file(File, RE, CallBack),
                    file:close(File);
                {error, Reason} ->
                    io:format("Unable to process '~s', ~p~n", [FileName, Reason]),
                    exit(1)
            end.
    
        % Reduce. Анализ данных. Данная функция вызывается контролирующим 
        % процессом MapReduce для каждого найденного ключа вместе со списком
        % значений, ассоциированных с ним. В нашем случае это будут пары 
        % {VarName, 1}. Мы просто подсчитаем для каждого VarName количество 
        % пришедших пар, то есть количество найденных вхождений этой переменной.
        % Это и есть наш незамысловатый анализ.
    
        process_found_variables(Key, Vals, A) ->
            [{Key, length(Vals)} | A].
    
        % Построчный обход файла.
        find_variable_in_file(File, RE, CallBack) ->
            case io:get_line(File, "") of
               eof -> void;
               Line -> 
                 scan_line_in_file(Line, RE, CallBack),
                 find_variable_in_file(File, RE, CallBack)
            end.
    
        % Поиск строки в строке по регулярному выражению (скомпилированному ранее),
        % и в случае нахождение вызов CallBack с передачей ему имени найденной
        % переменной.
        scan_line_in_file(Line, RE, CallBack) ->
            case re:run(Line, RE) of
                {match, Captured} -> 
                    [_, _, {NameP, NameL}] = Captured,
                    Name = string:substr(Line, NameP + 1, NameL),
                    CallBack(Name);
                nomatch -> void
            end.
    

    Для сборки программы нужен модуль phofs. Он является универсальным, независимым от конкретных функций Map и Reduce.

    И Makefile на всякий случай:

        target = find_variables
    
        all:
            erlc $(target).erl
            erlc phofs.erl
            erl -noshell -s $(target) main -s init stop
    
        clean:
            -rm *.beam *.dump
    


    Пузомерка. Как я уже сказал, программа на C++ вместе со временем вызова «find» на моей машине работает 1-2 секунды. Версия на Erlang'e работает ~20 секунд. Плохо? Смотря как посмотреть. Если анализ каждого файла будет более длительным (то есть программа будет основное время тратить на анализ файла, а не обход каталогов), то тут уже не совсем очевидно, какое из решений будет более практично при увеличении числа файлов и сложности анализа.

    Я новичок в Эрланге, поэтому будут признателен за критику кода.

    Посты по теме:
    Метки:
    Поделиться публикацией
    Реклама помогает поддерживать и развивать наши сервисы

    Подробнее
    Реклама
    Комментарии 14
    • +4
      Как я понимаю, вы проводите 2000 раз следующий цикл: «чтение файла — обработка данных». Ваш код очень неоптимален, есть несколько путей его улучшения.

      — Возможно, будет быстрее сначала считать все файлы в память, а затем обработать данные. Либо считывать их по n штук, то есть, блоками.
      — Имеет смысл вынести компиляцию регулярного выражения за пределы функции find_variables_in_file, поскольку каждый раз результаты компиляции, вроде, одинаковые.
      — Вообще заменить регулярное выражение простым поиском вхождений getenv, GetVariable. Это нетрудно и должно сэкономить кучу процессорного времени.
      — Не посылать никуда сообщение с именем переменной, а накапливать их внутри переменной, которую передать в find_variables_in_file.
      • +1
        Факт, компиляцию регулярного выражения можно было вынести еще выше.
        • +1
          Закомментировал регулярные выражения, и оно ускорилось в два раза.
          • +1
            Операция считывания файлов — медленна. Вполне логично делать чтение и обработку одновременной, чтобы обработка потом не занимала дополнительного времени, а проходила в те моменты, когда ожидаем прочтения следующих данных.
            Читать файлы целиком смысла нет вообще, вдруг это несколько гигабайт. Нужно умело пользоваться потоками.
            Читать файлы последовательно имеет смысл, учитывая свойства современных систем поддерживать файлы на винчестере одним куском. Читая параллельно даже два файла, мы бестолково мучаем винчестер.
            Вообще заменить re на штатную функцию языка, которой можно гибко управлять, например, сравнение по шаблону.

            Только вчера перевёл статью с аналогичным примером и обьяснением, что то, что вы тут делаете бестолково.
            • +1
              Простите, часть замечаний была автору.
            • +4
              Писать на эрланге можно и нужно, но стоит ли пихать его в каждую задачу? «Если анализ каждого файла будет более длительным...». А будет ли он более длительным? Появится ли таких файлов больше 2000? не упрется ли параллельная обработка в disk io? Такие утилиты как find оттачивались годами и понятны каждому админу, стоит ли плодить сущности?
              • +1
                Да, пример во многом искусственный. Но для демонстрации mapreduce весьма удобный.
                • +1
                  Отнюдь нет. Смысл mapreduce — в распределении вычислений между разными машинами. Пытаясь распараллелить процесс на одной вы наталкиваетесь сразу на несколько грабель — попытка чтения с последовательного устройства данных вперемешку; загрузка файлов одновременно в память.
                  Если бы файлы лежали на трёх-четырёх машинах, это было бы ещё более-менее логично.
                • +2
                  Вы очень, очень, очень сильно ошибаетесь насчёт find-а. Когда файлов становится много, нет не так, когда их становится МНОГО, find работать перестает. Это большой миф насчёт его отлаженности годами.

                  Годами софт отлаживали на винчестерах по 2-3 гигабайта размером. А сейчас уже 2-3 терабайта и софт типа find-а укладывает насмерть жесткий диск вместе с системой.

                  Да, эта софтина тоже работать не будет, потому что накапливает список файлов в памяти.
                  • +3
                    Я именно про это и говорю, нужна эволюция. Как только появляются реальные проблемы (типа find тупит), файлы становятся огромными, или требуется блэкджэк — именно тогда стоит задуматься об кастомном решении.
                    • +2
                      а как нужно делать правильно?
                      • +1
                        Оптимальный вариант не ползать по файловой системе в поисках файлов и информации о них, а хранить эти данные в БД.

                        Либо использовать хорошее знание того как разложены файлы, что бы делать какое-то разумное количество обращений к ФС.
                  • +3
                    glob вам в помощь
                    • +1
                      Только вчера же об этом писали:
                      habrahabr.ru/blogs/crazydev/133697/

                      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.