Многопоточный QuickSort на С++ 2011

    Лично я, при всей моей вере в C++, считаю, что даже в редакции 2011, этот язык крайне недружелюбен в плане многозадачности и многопоточности. В качестве очередной попытки переубедить себя в этом я попробовал сделать многопоточный QuickSort.

    В этом алгоритме получается после фазы разбиения запустить сортировки подчастей параллельно.

    Вот мой наивный велосипед:

    int naive_quick_sort(std::vector<Type>::iterator begin, std::vector<Type>::iterator end) {
      auto const sz = end - begin;
      if (sz <= 1) return 0;
    
      auto pivot = begin + sz/2;
      auto const pivot_v = *pivot;
    
      std::swap(*pivot, *(end - 1));
      auto p = std::partition(begin, end, [&](const Type& a) { return a < pivot_v; } );
      std::swap(*p, *(end - 1));
    
      if (sz > 4096) {
        auto left = std::async(std::launch::async, [&]() {
          return naive_quick_sort(begin, p);
        });
        naive_quick_sort(p + 1, end);
      } else {
        naive_quick_sort(begin, p);
        naive_quick_sort(p + 1, end);
      }
      return 0;
    }
    
    void quick_sort(std::vector<Type>& arr) {
      naive_quick_sort(arr.begin(), arr.end());
    }
    

    Реализация крайне простая, но стоит отметить несколько моментов. Есть некая константа 4096, которая определяет порог, когда отключается параллельное выполнение. Почему именно такое значение? Не знаю. Взято из воздуха с минимальным чувством здравого смысла. Когда же параллельность активна, то сортировка левого массива запускается через async в другом потоке, а правый сортируется как и раньше в текущем потоке. При выходе из контекста функции гарантируется, что задача, запущенная через async, будет завершена (ее завершения будут ждать).

    Традиционно, пузомерка сферических коней в вакууме. Три кандидата:
    • приведенная выше реализация (через async)
    • она же, но в один поток (if (sz > 4096) заменить на if (false))
    • std::sort() (naive_quick_sort(arr.begin(), arr.end()) заменить на std::sort(arr.begin(), arr.end()))

    Сортируется массив из 50000000 элементов типа int64 (со знаком). Делается 10 экспериментов, и считается среднее. Значения генерируются случайно:

    std::tr1::uniform_int<Type> uniform(
      std::numeric_limits<Type>::min(),
      std::numeric_limits<Type>::max());
    std::mt19937_64 engine;
    
    void generate(std::vector<Type>& v) {
      std::for_each(v.begin(), v.end(), [](Type& i) {
        i = uniform(engine); 
      });
    }
    

    Не спрашивайте, почему тут делается перегон из big endian туда и обратно. Это было сделано для сравнения с другой программой, на Java. При замерах времени учитывается только «чистое время».

    Компилятор VS 2011, 64-bit. Процессор Intel Core i5 2.53GHz, 4 ядра.

    Итерация   Через async()   Один поток   std::sort()
    --------- --------------- ------------ ------------
     1         2512            6555         7309
     2         2337            6320         6977
     3         2450            6516         7180
     4         2372            6388         6933
     5         2387            7074         7189
     6         2339            7399         7040
     7         2434            6875         7040
     8         2562            7060         7187
     9         2470            7050         7145
    10         2422            6846         6898
    --------- --------------- ------------ ------------    
    Среднее    2428.5          6808.3       7089.8
    

    Время указано в миллисекундах.

    Получается где-то в три раза быстрее. Странное небольшое отставание std::sort() скорее всего связано c тем, что данные «хорошие», и на них моей простецкой реализации просто везет. Видно, что у времени std::sort() девиация гораздо меньше. Все-таки stl::sort() стабилен по времени вне зависимости от данных.

    Есть ли в этой параллельности практическая польза? Думаю нет. Очень сложно оценить стабильность алгоритма на разных данных. Например, совершенно не ясно, как выбрать порог отключения многозадачности? Стоит ли использовать пул потоков?

    Если кому интересно, внизу полный текст этого велосипеда, включая генератор данных.

    Сборка и генерация данных:

    call "%VS110COMNTOOLS%..\..\VC\vcvarsall.bat" amd64 && ^
    cl /Ox /DWIN32 sort_async.cpp && ^
    sort_async generate
    

    Осторожно! Генератор создаст данных на 8 гигов.

    Сборка и эксперимент:

    call "%VS110COMNTOOLS%..\..\VC\vcvarsall.bat" amd64 && ^
    cl /Ox /DWIN32 sort_async.cpp && ^
    sort_async
    

    Файл sort_async.cpp:
    #include <vector>                      
    #include <iostream>
    #include <fstream>
    #include <sstream>
    #include <algorithm>
    #include <iomanip>
    #include <future>
    #include <random>
    #include <chrono>
    #include <cstdlib>
    
    const int ITERATIONS_NUM = 10;
    const int DATA_SIZE = 50000000;
    
    typedef __int64 Type;
    
    inline void endian_swap(Type& x) {
      x =
        (0x00000000000000FF & (x >> 56))
      | (0x000000000000FF00 & (x >> 40))
      | (0x0000000000FF0000 & (x >> 24))
      | (0x00000000FF000000 & (x >>  8))
      | (0x000000FF00000000 & (x <<  8))
      | (0x0000FF0000000000 & (x << 24))
      | (0x00FF000000000000 & (x << 40))
      | (0xFF00000000000000 & (x << 56));
    }
    
    std::tr1::uniform_int<Type> uniform(
      std::numeric_limits<Type>::min(),
      std::numeric_limits<Type>::max());
    std::mt19937_64 engine;
    
    void generate(std::vector<Type>& v) {
      std::for_each(v.begin(), v.end(), [](Type& i) { i = uniform(engine); });
    }
    
    void check_sorted(const std::vector<Type>& v, const std::string& msg) {
      for (auto i = 0; i < v.size() - 1; ++i) {
        if (v[i] > v[i + 1]) {
          std::cout << "\nUnsorted: " << msg << "\n";
          std::cout << "\n" << i << "\n";
          std::cout << v[i] << " " << v[i + 1] << "\n";
          std::exit(1);
        }
      }
    }
    
    std::string data_file_name(const int i, const std::string& suffix) {
      std::ostringstream fmt;
      fmt << "trash_for_sort_" << i << suffix << ".bin";
      return fmt.str();
    }
    
    void save_file(std::vector<Type> array, const std::string& name) {
      std::for_each(array.begin(), array.end(), [](Type& i) { endian_swap(i); });
      std::ofstream os(name.c_str(), std::ios::binary|std::ios::out);
      auto const bytes_to_write = array.size() * sizeof(array[0]);
      std::cout << "Saving " << array.size() << " bytes to " << name << "\n";
      os.write((char *)&array[0], bytes_to_write);
    }
    
    int main_generate(int argc, char* argv[]) {
      std::cout << "Generation\n";
      for (auto i = 0; i < ITERATIONS_NUM; ++i) {
        std::vector<Type> unsorted(DATA_SIZE);
        generate(unsorted);
        save_file(unsorted, data_file_name(i, ""));
        std::cout << "Sorting...\n";
        std::sort(unsorted.begin(), unsorted.end());
        check_sorted(unsorted, "check sorted array");
        save_file(unsorted, data_file_name(i, "_sorted"));
      }
      return 0;
    }
    
    void load_file(std::vector<Type>& array, const std::string& name) {
      std::cout << "Loading " << name;
      array.resize(DATA_SIZE, 0);
    
      std::ifstream is(name.c_str(), std::ios::binary|std::ios::in);
      auto const to_load = array.size() * sizeof(array[0]);
      is.read((char *)&array[0], to_load);
      if (is.gcount() != to_load) {
        std::cerr << ", Bad file " << name
          << ", loaded " << is.gcount() << " words but should be " << to_load << "\n";
        std::exit(1);
      }
      std::for_each(array.begin(), array.end(), [](Type& v){ endian_swap(v); });
    }
    
    int naive_quick_sort(std::vector<Type>::iterator begin, std::vector<Type>::iterator end) {
      auto const sz = end - begin;
      if (sz <= 1) return 0;
    
      auto pivot = begin + sz/2;
      auto const pivot_v = *pivot;
    
      std::swap(*pivot, *(end - 1));
      auto p = std::partition(begin, end, [&](const Type& a) { return a < pivot_v; } );
      std::swap(*p, *(end - 1));
    
      if (sz > 4096) {
        auto left = std::async(std::launch::async, [&]() {
          return naive_quick_sort(begin, p);
        });
        naive_quick_sort(p + 1, end);
      } else {
        naive_quick_sort(begin, p);
        naive_quick_sort(p + 1, end);
      }
      return 0;
    }
    
    void quick_sort(std::vector<Type>& arr) {
      naive_quick_sort(arr.begin(), arr.end());
    }
    
    int main(int argc, char* argv[]) {
      if (argc == 2 && !std::strcmp(argv[1], "generate"))
        return main_generate(argc, argv);
    
      std::vector<double> times;
      auto times_sum = 0.0;
      for (auto i = 0; i < ITERATIONS_NUM; ++i) {
        std::vector<Type> unsorted;
        load_file(unsorted, data_file_name(i, ""));
    
        std::vector<Type> verify;
        std::cout << ", ";
        load_file(verify, data_file_name(i, "_sorted"));
        check_sorted(verify, "verify array");
    
        std::cout << ", Started";
        auto start = std::chrono::high_resolution_clock::now();
    
        quick_sort(unsorted);
    
        auto stop = std::chrono::high_resolution_clock::now();
        std::cout << ", Stopped, ";
        auto duration = std::chrono::duration<double>(stop - start).count();
        std::cout << duration;
    
        check_sorted(unsorted, "sorted array");
    
        const auto match = unsorted == verify;
        std::cout << (match ? ", OK" : ", DON'T MATCH");
    
        times.push_back(duration);
        times_sum += duration;
    
        std::cout << "\n";
      }
    
      auto const average = times_sum / ITERATIONS_NUM;
      auto const max_element = *std::max_element(times.begin(), times.end());
      auto const min_element = *std::min_element(times.begin(), times.end());
      auto const average_fixed = (times_sum - max_element - min_element) /
                                 (ITERATIONS_NUM - 2);
    
      std::cout << "Average: " << average << "s, " 
                << "Average without max/min: "
                << average_fixed << "s." << std::endl;
    }
    


    Под занавес, картинка загрузки процессоров. Явно видны всплески на каждой итерации, когда система используется подзавязку.



    Добавка

    Интересная статья от Microsoft, Dynamic Task Parallelism, где также показывается вариант многопоточного Quicksort'а.
    Метки:
    Поделиться публикацией
    Похожие публикации
    Реклама помогает поддерживать и развивать наши сервисы

    Подробнее
    Реклама
    Комментарии 15
    • +2
      В этом алгоритме получается после фазы разбиения запустить сортировки подчастей параллельно.

      Не очень удачное решение с точки зрения параллельности. Мне кажется, более thread-friendly было бы разбить массив на N одинаковых частей и запустить для каждой части quicksort в своем потоке.
      • 0
        Но ведь потом придется сливать куски в один, а это уже только в один поток.
        • +1
          Ну а у вас в самом-самом начале std::partition весь массив жуёт тоже в один поток.
          • 0
            Не так уж это и долго, особенно если составить хороший алгоритм для слияния.
            • 0
              Научите сливать K кусков суммарного размера N быстрее чем за O(NK)?
              • 0
                И, конечно, без использования разного рода логарифмических структур данных, типа сетов, мапов и прочего. При маленьких K они сильно затормозят финальную стадию процесса.
                • +2
                  Почему без использования? При маленьких K выгоднее запустить QS в один поток.
                • +1
                  А разве нельзя рекурсивно посливать по два в сумме за O(N log(K))?
                  • 0
                    Для слияния O(N log K) наилучшая асимптотика. Правда там константа очень маленькая, и можно реализовать не-рекурсивную версию, будет ещё быстрее.

                    Но можно подойти с другой стороны, и перед сортировкой обработать массив неким подобием quicksort, который не сортирует, а просто разбивает массив на K частей (при этом K — степень двойки) таким образом, что каждый элемент 1-ой части меньше любого элемента 2-ой части и т.д. Т.е. после сортировки в K потоков мы получаем уже окончательный массив.
            • +1
              Это уже не совсем quicksort будет.
            • 0
              Интересно, а если предположить что процессоров бесконечное количество, хотя бы сравнимо с log(N)… реально ли будет добиться O(N log K)? или даже O(N)?

              Много ядер уже сейчас можно найти в GPU (там только куча ограничений для эффективного доступа к оперативке), в будущем, я думаю, ситуация с многопроцессорными сопроцессорами будет только улучшаться.
              • 0
                Там ограничение в межпроцессной коммуникации и передаче данных через PCI-E шину…

                А у CPU — обмен данными через L3 кеш весьма медленный (и это если нам еще повезло, и не нужно в процессор на «соседнем» сокете идти).
              • –3
                А брать-то надо не среднее время выполнения, а наименьшее. Результаты выше минимального просто больше «пострадали» от отсутствия данных в кеше CPU, переключения процессов и др.
                • +2
                  Стоп-стоп-стоп, отсутствие данных в кеше — это как раз ожидаемый фактор замедления работы алгоритма )
                • 0
                  Не совсем понял как работает такая многопоточность. Task Manager показывает сначала 8 потоков, а потом на 3-ем или 4ом проходе количество потоков на приложении увеличивается до 13ти.

                  По идее, если ограничить число потоков по количеству ядер, то будет меньше переключений, лучше доступ к памяти и т.п. Интересно было бы проверить, будет ли быстрее.

                  ЗЫ в инклюде присутствует future и chrono, это буст приходит в студию, или это уже входит стандарт c++11?

                  Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.