Pull to refresh

MapReduce в Qt Concurrent

Reading time 4 min
Views 13K

MapReduce в Qt
На картинке изображен MapReduce в том виде, в каком он реализован в Qt:


QFuture<T> QtConcurrent::mappedReduced(const Sequence &sequence,
    MapFunction mapFunction, ReduceFunction reduceFunction /*...*/)
T QtConcurrent::blockingMappedReduced(const Sequence &sequence,
    MapFunction mapFunction, ReduceFunction reduceFunction /*...*/)

Столкнулся с тем, что коллеги на работе не знают про MapReduce в Qt Concurrent. Как говорил Гёте: "Чего мы не понимаем, тем не владеем". Под катом будет немножко про Map, про Reduce, про Fork–join model и пример решения простой задачки при помощи MapReduce.


Задачка


Задачка была взята с просторов интернета как есть:
Написать консольную программу, которая выполняет поиск максимального элемента в массиве с 1000000000 элементов.


MapReduce состоит из функций высшего порядка map и reduce. Функция высшего порядка- это функция которая в качестве аргументов принимает другие функции.


Map


Map применяет функцию к каждому элементу списка, возвращая список результатов. В C++ это можно описать через std::transform:


std::list<int> list{
    1, 2, 3, 4, 5, 6
};

std::list<int> newList(list.size(), 0);
std::transform(list.begin(), list.end(),newList.begin(),
               [](int v){
    return v*2;
});

for(auto i: newList){
    std::cout<<i<<" ";
}

Reduce(accumulate)


Википедия дает определение: функция высшего порядка, которая производит преобразование структуры данных к единственному атомарному значению при помощи заданной функции. Если по простому, то reduce аккумулирует множество элементов(список, вектор и т.д.).


На C++ это можно описать через std::for_each и функциональный объект


struct Max{
    Max():value(std::numeric_limits<int>::min()){
    }

  void operator()(int val){
    value = std::max(value, val);
  }

  int value;
};

struct Sum{
    Sum(): value(0){
    }

    void operator()(int val){
      value += val;
    }

    int value;
};
//...
std::list<int> list{
    1, 2, 3, 4, 5, 6
};

const auto max = std::for_each(list.begin(), list.end(), Max());
const auto sum = std::for_each(list.begin(), list.end(), Sum());

std::cout<<"Max:"<<max.value<<std::endl;
std::cout<<"Sum:"<<sum.value<<std::endl;

Fork–join model


Как решить задачку через MapReduce может быть непонятно. Тут следует посмотреть, а может есть какая-нибудь теория? Существует модель параллельных вычислений fork-join. В основе её:


  • разделение большой задачи на подзадачи поменьше и запуск их параллельно(fork);
  • объединение решений подзадач в итоговый результат(join).

fork-join
Картинка демонстрирующая модель(взята из wikipedia). Чем-то похоже на изображение в самом начале. MapReduce в Qt это реализация fork-join model.


Для такой задачи стандартное решением является взять вектор, поделить его на несколько непересекающихся отрезков, найти локальный максимум в отрезках, и в конце объединить результат. На std::thread это будет как-то так:


using DataSet = std::vector<int>;
const size_t DATASET_SIZE = 1000000000;

struct Task
{
    size_t first;
    size_t last;
    DataSet& data;

    int localMaximum;
};

using Tasks = std::vector<Task>;

void max(Task& task)
{
    int localMax = task.data[task.first];
    for(size_t item = task.first; item < task.last; ++item)
    {
        localMax = std::max(localMax, task.data[item]);
    }

    task.localMaximum = localMax;
}

DataSet data(DATASET_SIZE);
//...

const auto threadCount = std::thread::hardware_concurrency();
const auto taskSize = data.size()/threadCount;

Tasks tasks;

size_t first = 0;
size_t last  = taskSize;

// Разбиваем задачу на подзадачи
for(size_t i = 0; i < threadCount; ++i)
{
    tasks.push_back(Task{first, last, data, 0});
    first+=taskSize;
    last = std::min(last+taskSize, data.size());
}

// Запускаем подзадачи
std::vector<std::thread> threads;
for(auto& task: tasks)
{
    threads.push_back(std::thread(max, std::ref(task)));
}

// Выполняем объединение
for(auto& thread: threads)
{
    thread.join();
}
int Max = tasks[0].localMaximum;
for(const auto& task: tasks)
{
    Max = std::max(Max, task.localMaximum);
}

Разбив задачу на подзадачи, можно компактно записать через QtConcurrent::blockingMappedReduced


using DataSet = std::vector<int>;
const size_t DATASET_SIZE = 1000000000;

struct Task
{
    size_t first;
    size_t last;
    DataSet& data;
};

int mapMax(const Task& task)
{
    int localMax = task.data[task.first];
    for(size_t item = task.first; item < task.last; ++item)
    {
        localMax = std::max(localMax, task.data[item]);
    }

    return localMax;
}

void reduceMax(int& a, const int& b)
{
    a = std::max(a, b);
}

using Tasks = std::vector<Task>;

//...
const auto threadCount = std::thread::hardware_concurrency();
const auto taskSize = data.size()/threadCount;

Tasks tasks;

size_t first = 0;
size_t last  = taskSize;

for(size_t i = 0; i < threadCount; ++i)
{
    tasks.push_back(Task{first, last, data, 0});
    first+=taskSize;
    last = std::min(last+taskSize, data.size());
}

int Max = QtConcurrent::blockingMappedReduced(tasks, mapMax, reduceMax);

На что тут следует обратить внимание:


  • структура Task не имеет поля localMaximum, максимум возвращается из фунции mapMax;
  • на сигнатуру функции reduceMax, возврат значения осуществляется через первый аргумент.

Полный код примера
#include <QtCore/QtDebug>
#include <QtCore/QElapsedTimer>
#include <QtCore/QCoreApplication>

#include <QtConcurrent/QtConcurrent>

#include <cstdlib>
#include <thread>
#include <vector>
#include <algorithm>

using DataSet = std::vector<int>;
const size_t DATASET_SIZE = 1000000000;

struct Task
{
    size_t first;
    size_t last;
    DataSet& data;
};

int mapMax(const Task& task)
{
    int localMax = task.data[task.first];
    for(size_t item = task.first; item < task.last; ++item)
    {
        localMax = std::max(localMax, task.data[item]);
    }

    return localMax;
}

void reduceMax(int& a, const int& b)
{
    a = std::max(a, b);
}

using Tasks = std::vector<Task>;

int main(int argc, char *argv[])
{
    std::srand(unsigned(std::time(0)));
    QCoreApplication a(argc, argv);

    DataSet data(DATASET_SIZE);
    for(size_t i = 0; i < data.size(); ++i)
    {
        data[i] = std::rand();
    }

    QElapsedTimer timer;
    timer.start();

    const auto threadCount = std::thread::hardware_concurrency();
    const auto taskSize = data.size()/threadCount;

    Tasks tasks;

    size_t first = 0;
    size_t last  = taskSize;

    for(size_t i = 0; i < threadCount; ++i)
    {
        tasks.push_back(Task{first, last, data});
        first+=taskSize;
        last = std::min(last+taskSize, data.size());
    }

    timer.start();

    const auto Max = QtConcurrent::blockingMappedReduced(tasks, mapMax, reduceMax);

    qDebug() << "Maximum" << Max << "time" <<timer.elapsed() << "milliseconds";
    return 0;
}
Tags:
Hubs:
If this publication inspired you and you want to support the author, do not hesitate to click on the button
+22
Comments 9
Comments Comments 9

Articles