Pull to refresh

Асинхронные задачи в С++11

Reading time5 min
Views34K
Доброго времени суток, хотел бы поделиться с сообществом своей небольшой библиотектой.
Я программирую на С/C++, и, к сожалению, в рабочих проектах не могу использовать стандарт C++11. Но вот пришли майские праздники, появилось свободное время и я решил поэкспериментировать и по-изучать этот запретный плод. Самое лучшее для изучения чего либо — это практика. Чтение статей о языке программирования научит максимум лучше читать, поэтому я решил написать маленькую библиотеку для асинхронного выполнения функций.
Сразу оговорюсь, что я знаю, что существует std::future, std::async и тп. Мне было интересно реализовать самому нечто подобное и окунуться в мир лямбда-функций, потоков и мьютексов с головой. Праздники — отличное время для велопрогулок.


Итак начнем

Я решил что моя библиотека будет функционировать следующим образом.
Существует некоторый пул, с фиксированным количеством потоков.
В него добавляются задачи, используя синтаксис лямбда функций.
Из самой задачи можно извлечь результат ее выполнения, или просто дождаться окончания ее работы.
Забегая вперед, выглядит это примерно так:

...
act::control control(N_THREADS);
auto some_task = act::make_task([](std::vector<double>::const_iterator begin, std::vector<double>::const_iterator end)
{
  double sum = 0;
  for (auto i = begin; i != end; ++i)
  {
    sum+=(*i);
  }
  return sum;
}
, data.begin(), data.end());
control << some_task;  
cout << some_task->get() << endl;
...


Класс задачи

Для начала необходимо создать класс, описывающий задачу:

template <typename T>
  class task
      : public task<decltype(&T::operator())>
  {
  };

  template <typename ClassType, typename ReturnType, typename ... Args>
  class task<ReturnType(ClassType::*)(Args...) const>
  {
  protected:
    const ClassType &m_func;
    std::tuple<Args...> m_vars;
    ReturnType m_return;
  public:
    task(const ClassType &v, Args... args): m_func(v), m_vars(args ...) {}
    virtual ~task() {}   
  private:   
  };


Как известно, лямба функция раскрывается в класс-функтор с оператором operator().
Наш класс задачи шаблонный, его тип извлекается из типа оператора функтора &T::operator().
Класс хранит в себе указатель на функтор, аргументы функции в виде std::tuple и возвращаемое значение.

Итак теперь мы можем хранить в объекте лямбда-функцию с параметрами, теперь надо научиться ее вызывать.
Для этого необходимо вызвать opertator() у m_func с параметрами, хранящимися в m_vars.
С начала я не знал как это сделать, но усиленное использования гугла и переход по второй ссылке принесло результат:

  template<int ...>
  struct seq { };

  template<int N, int ...S>
  struct gens : gens<N-1, N-1, S...> { };

  template<int ...S>
  struct gens<0, S...> {
    typedef seq<S...> type;
  };


С помощью этой конструкции можно добавить в класс следующие функции:

  ...
  public:
  void invoke()
  {
    ReturnType r = caller(typename gens<sizeof...(Args)>::type());     
  }    
  private:
  template<int ...S>
  ReturnType caller(seq<S...>) const
  {
    return m_func(std::get<S>(m_vars) ...);
  }    
  ...


Базовый класс задачи

Теперь реализуем базовый класс задачи::

  class abstract_task
  {
  protected:
    mutable std::mutex m_mutex;
    mutable std::condition_variable m_cond_var;
    mutable bool m_complete;
  public:
    abstract_task(): m_complete(false) {}
    virtual ~abstract_task() {}
    virtual void invoke() = 0;
    virtual void wait() const
    {
      std::unique_lock<std::mutex> lock(m_mutex);
      while (!m_complete)
      {
        m_cond_var.wait(lock);
      }
    }
  };

Класс содержит в себе мьютекс и переменную состояния, сигнализирующую о завершении задачи. Соотвественно наш класс задачи притерпит некоторые изменения, которые я опущую, так как исходный код доступен на гитхабе.

Создание задач

Сделаем функцию-обертку для создания задач:

  template <typename T, typename ... Args>
  std::shared_ptr<task<decltype(&T::operator())>> make_task(T func, Args ... args )
  {
    return std::shared_ptr<task<decltype(&T::operator())>>(new task<decltype(&T::operator())>(func, args ...));
  }

Так как у нас класс виртуальный, логично использовать указатель и мы это сделаем, и не просто указатель, а умный указатель.

Класс управления

Теперь реализуем сущность для исполнения задач в фоновых потоках.
Приведу лишь часть кода:

...
  class control
  {
    std::deque<std::shared_ptr<abstract_task>> m_tasks;
    std::vector<std::thread> m_pool;
    std::mutex m_mutex;
    std::condition_variable m_cond_var;
    std::condition_variable m_empty_cond;
    std::atomic<bool> m_run;
    std::vector<bool> m_active;
  public:
    control(std::size_t pool_size = 2)
    {
      m_run.store(true, std::memory_order_relaxed);
      auto func = [this](int n)
      {
        while (m_run.load(std::memory_order_relaxed))
        {
          std::unique_lock<std::mutex> lock(m_mutex);
          m_active[n] = true;
          if (m_tasks.empty())
          {
            m_empty_cond.notify_all();
            m_active[n] = false;
            m_cond_var.wait(lock);
          }
          else
          {
            std::shared_ptr<abstract_task> t = m_tasks.front();
            m_tasks.pop_front();
            lock.unlock();
            t->invoke();
            lock.lock();
            m_active[n] = false;
          }
        }        
      };
      pool_size = pool_size > 0 ? pool_size : 1;
      m_active.resize(pool_size, false);
      for(std::size_t i = 0; i < pool_size; ++i)
      {
        m_pool.emplace_back(func, i);
      }
    }
    ...


Для интереса, я использовал все фичерсы нового стандарта, применение которых я хоть как то мог обосновать.
Данный класс создает массив потоков и массив переменных состояния активности для мониторинга выполнения заданий дочерними потоками.
Главный цикл дочернего потока контролируется атомарной переменной (по идее достаточно было объявить ее volatile, так как тут нет состояния гонки, главный поток в нее только пишет, а дочерние только читают)

Производительность

Я бы не стал писать эту статью скорее всего, если бы не проведенный мной тест производительности данного решения по сравнению с std::async.
Конфигурация:
Intel Core(TM) i7-2600 CPU @ 3.40GHz
$gcc --version
gcc (Debian 4.8.2-21) 4.8.2


Тест заключается в параллельном сложении массивов, а затем асинхронном сложении результатов всех сложений. Результатом операции будет:
res = sum(array)*N_P


Числа указаны в миллисекундах.

Тест 1

Оптимизация выключена, количество элементов в массиве 100000000, количество порождаемых задач 73, Количество потоков в пуле 6
Результаты:
test_act 16775 OK
test_async 16028 OK

Производительность сравнима.

Тест 2

Оптимизация включена, количество элементов в массиве 100000000, количество порождаемых задач 73, Количество потоков в пуле 6
Результаты:
test_act 1597.6 OK
test_async 2530.5 OK

Моя реализация быстрее в полтора раза.

Тест 3

Оптимизация включена, количество элементов в массиве 100000000, количество порождаемых задач 73, Количество потоков в пуле 7
Результаты:
test_act 1313.1 OK
test_async 2503.7 OK


Тест 4

Оптимизация включена, количество элементов в массиве 100000000, количество порождаемых задач 73, Количество потоков в пуле 8
Результаты:
test_act 1402 OK
test_async 2492.2 OK


Тест 5

Оптимизация включена, количество элементов в массиве 100000000, количество порождаемых задач 173, Количество потоков в пуле 8
Результаты:
test_act 4435.7 OK
test_async 5789.4 OK


Выводы и баги

Данные результаты скорее всего связаны с тем, что async порождает для каждой задачи свой поток, в моей же реализации количество потоков фиксировано и накладные расходы на их создание отсутствуют.
Баг — захват переменных области видимости (через []) в лямбда функции вызывает SIGSEGV. Хотя передача их же через параметры работает прекрасно.

Не знаю насколько ли полезна данная статья и сама библиотека, но, по крайней мере, я применил некоторые возможности нового стандарта на своей практике.
Исходный код
Only registered users can participate in poll. Log in, please.
Используете ли вы C++11 в своих рабочих проектах?
48.95% Да256
29.45% Нет, но хотел бы154
4.21% Нет, C++11 не нужен22
17.4% Использую другой язык программирования91
523 users voted. 95 users abstained.
Tags:
Hubs:
+32
Comments21

Articles

Change theme settings