Пользователь
0,0
рейтинг
8 июня 2013 в 19:01

Разработка → Потоки, блокировки и условные переменные в C++11 [Часть 1] tutorial

В первой части этой статьи основное внимание будет уделено потокам и блокировкам в С++11, условные переменные во всей своей красе будут подробно рассмотрены во второй части

Потоки


В C++11, работа с потокам осуществляется по средствам класса std::thread (доступного из заголовочного файла <thread>), который может работать с регулярными функциями, лямбдами и функторами. Кроме того, он позволяет вам передавать любое число параметров в функцию потока.
#include <thread>
 
void threadFunction()
{
     // do smth
}
 
int main()
{
     std::thread thr(threadFunction);
     thr.join();
     return 0;
}

В этом примере, thr — это объект, представляющий поток, в котором будет выполняться функция threadFunction(). Вызов join блокирует вызывающий поток (в нашем случае — поток main) до тех пор, пока thr (а точнее threadFunction()) не выполнит свою работу. Если функция потока возвращает значение — оно будет проигнорировано. Однако принять функция может любое количество параметров.
void threadFunction(int i, double d, const std::string &s)
{
     std::cout << i << ", " << d << ", " << s << std::endl;
}
 
int main()
{
     std::thread thr(threadFunction, 1, 2.34, "example");
     thr.join();
     return 0;
}

Несмотря на то, что передавать можно любое число параметров, все они были переданы по значению Если в функцию необходимо передать параметры по ссылке, они должны быть обернуты в std::ref или std::cref, как в примере:
void threadFunction(int &a)
{
     a++;
}
 
int main()
{
     int a = 1;
     std::thread thr(threadFunction, std::ref(a));
     thr.join();
     std::cout << a << std::endl; 
     return 0;
}

Программа напечатает в консоль 2. Если не использовать std::ref, то результатом работы программы будет 1.

Помимо метода join, следует рассмотреть еще один, похожий метод — detach.
detach позволяет отсоединить поток от объекта, иными словами, сделать его фоновым. К отсоединенным потокам больше нельзя применять join.
int main()
{
     std::thread thr(threadFunction);
     thr.detach();
     return 0;
}

Также следует отметить, что если функция потока кидает исключение, то оно не будет поймано try-catch блоком. Т.е. следующий код не будет работать (точнее работать то будет, но не так как было задумано: без перехвата исключений):
try
{
     std::thread thr1(threadFunction);
     std::thread thr2(threadFunction);
 
     thr1.join();
     thr2.join();
}

catch (const std::exception &ex)
{
     std::cout << ex.what() << std::endl;
}

Для передачи исключений между потоками, необходимо ловить их в функции потока и хранить их где-то, чтобы, в дальнейшем, получить к ним доступ.
std::mutex                       g_mutex;
std::vector<std::exception_ptr>  g_exceptions;

void throw_function()
{
     throw std::exception("something wrong happened");
}

void threadFunction()
{
     try
     {
          throw_function();
     }
     catch (...)
     {
          std::lock_guard<std::mutex> lock(g_mutex);
          g_exceptions.push_back(std::current_exception());
     }
}

int main()
{
     g_exceptions.clear();
     std::thread thr(threadFunction);
     thr.join();

     for(auto &e: g_exceptions)
     {
          try 
          {
               if(e != nullptr)
                    std::rethrow_exception(e);
          }
          catch (const std::exception &e)
          {
               std::cout << e.what() << std::endl;
          }
     }
     return 0;
}

Прежде, чем двигаться дальше, хочу отметить некоторые полезные функции, предоставляемые <thread>, в пространстве имен std::this_thread:
  • get_id: возвращает id текущего потока
  • yield: говорит планировщику выполнять другие потоки, может использоваться при активном ожидании
  • sleep_for: блокирует выполнение текущего потока в течение установленного периода
  • sleep_until: блокирует выполнение текущего потока, пока не будет достигнут указанный момент времени

Блокировки


В последнем примере, я должен был синхронизировать доступ к вектору g_exceptions, чтобы быть уверенным, что только один поток одновременно может вставить новый элемент. Для этого я использовал мьютекс и блокировку на мьютекс. Мьютекс — базовый элемент синхронизации и в С++11 представлен в 4 формах в заголовочном файле <mutex>:

Приведу пример использования std::mutex с упомянутыми ранее функциями-помощниками get_id() и sleep_for():
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
 
std::mutex g_lock;
 
void threadFunction()
{
     g_lock.lock();
 
     std::cout << "entered thread " << std::this_thread::get_id() << std::endl;
     std::this_thread::sleep_for(std::chrono::seconds(rand()%10));
     std::cout << "leaving thread " << std::this_thread::get_id() << std::endl;
 
     g_lock.unlock();
}
 
int main()
{
     srand((unsigned int)time(0));
     std::thread t1(threadFunction);
     std::thread t2(threadFunction);
     std::thread t3(threadFunction);
     t1.join();
     t2.join();
     t3.join();
     return 0;
}

Программа должна выдавать примерно следующее:
entered thread 10144
leaving thread 10144
entered thread 4188
leaving thread 4188
entered thread 3424
leaving thread 3424

Перед обращением к общим данным, мьютекс должен быть заблокирован методом lock, а после окончания работы с общими данными — разблокирован методом unlock.

Следующий пример показывает простой потокобезопасный контейнер (реализованный на базе std::vector), имеющий методы add() для добавления одного элемента и addrange() для добавления нескольких элементов.
Примечание: и всё же этот контейнер не является полностью потокобезопасным по нескольким причинам, включая использование va_args. Также, метод dump() не должен принадлежать контейнеру, а должен быть автономной функцией. Цель этого примера в том, что показать основные концепции использования мьютексов, а не не сделать полноценный, безошибочный, потокобезопасный контейнер.
template <typename T>
class container 
{
     std::mutex _lock;
     std::vector<T> _elements;
public:
     void add(T element) 
     {
          _lock.lock();
          _elements.push_back(element);
          _lock.unlock();
     } 
     void addrange(int num, ...)
     {
          va_list arguments;
          va_start(arguments, num);
          for (int i = 0; i < num; i++)
          {
               _lock.lock();
               add(va_arg(arguments, T));
               _lock.unlock();
          }
          va_end(arguments); 
     }
     void dump()
     {
          _lock.lock();
          for(auto e: _elements)
          std::cout << e << std::endl;
          _lock.unlock();
     }
};
 
void threadFunction(container<int> &c)
{
     c.addrange(3, rand(), rand(), rand());
}
 
int main()
{
     srand((unsigned int)time(0));
     container<int> cntr;
     std::thread t1(threadFunction, std::ref(cntr));
     std::thread t2(threadFunction, std::ref(cntr));
     std::thread t3(threadFunction, std::ref(cntr));
     t1.join();
     t2.join();
     t3.join();
     cntr.dump();
     return 0;
}

При выполнении этой программы произойдет deadlock (взаимоблокировка, т.е. заблокированный поток так и останется ждать). Причиной является то, что контейнер пытается получить мьютекс несколько раз до его освобождения (вызова unlock), что невозможно. Здесь и выходит на сцену std::recursive_mutex, который позволяет получать тот же мьютекс несколько раз. Максимальное количество получения мьютекса не определено, но если это количество будет достигно, то lock бросит исключение std::system_error. Поэтому, решение проблемы в коде выше (кроме изменения реализации addrange(), чтобы не вызывались lock и unlock), заключается в замене мьютекса на std::recursive_mutex.
template <typename T>
class container 
{
     std::recursive_mutex _lock;
     // ...
};

Теперь, результат работы программы будет следующего вида:
6334
18467
41
6334
18467
41
6334
18467
41

Вы, наверное, заметили, что при вызове threadFunction(), генерируются одни и те же числа. Это происходит потому, что функция void srand (unsigned int seed); инициализирует seed только для потока main. В других потоках, генератор псевдо-случайных чисел не инициализируется и получаются каждый раз одни и те же числа.
Явная блокировка и разблокировка могут привести к ошибкам, например, если вы забудете разблокировать поток или, наоборот, будет неправильный порядок блокировок — все это вызовет deadlock. Std предоставляет несколько классов и функций для решения этой проблемы.
Классы «обертки» позволяют непротиворечиво использовать мьютекс в RAII-стиле с автоматической блокировкой и разблокировкой в рамках одного блока. Эти классы:
  • lock_guard: когда объект создан, он пытается получить мьютекс (вызывая lock()), а когда объект уничтожен, он автоматически освобождает мьютекс (вызывая unlock())
  • unique_lock: в отличие от lock_guard, также поддерживает отложенную блокировку, временную блокировку, рекурсивную блокировку и использование условных переменных

С учетом этого, мы можем переписать класс контейнер следующим образом:
template <typename T>
class container 
{
     std::recursive_mutex _lock;
     std::vector<T> _elements;
public:
     void add(T element) 
     {
          std::lock_guard<std::recursive_mutex> locker(_lock);
          _elements.push_back(element);
     }
     void addrange(int num, ...)
     {
          va_list arguments;
          va_start(arguments, num);
          for (int i = 0; i < num; i++)
          {
               std::lock_guard<std::recursive_mutex> locker(_lock);
               add(va_arg(arguments, T));
          }
          va_end(arguments); 
     }
     void dump()
     {
          std::lock_guard<std::recursive_mutex> locker(_lock);
          for(auto e: _elements)
               std::cout << e << std::endl;
     }
};

Можно поспорить насчет того, что метод dump() должен быть константным, ибо не изменяет состояние контейнера. Попробуйте сделать его таковым и получите ошибку при компиляции:
‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1 from ‘const std::recursive_mutex' 
                                                  to ‘std::recursive_mutex &'

Мьютекс (не зависимо от формы реализации), должен быть получен и освобожден, а это подразумевает использование не константных методов lock() и unlock(). Таким образом, аргумент lock_guard не может быть константой. Решение этой проблемы заключается в том, чтобы сделать мьютекс mutable, тогда спецификатор const будет игнорироваться и это позволит изменять состояние из константных функций.
template <typename T>
class container 
{
     mutable std::recursive_mutex _lock;
     std::vector<T> _elements;
public:
     void dump() const
     {
          std::lock_guard<std::recursive_mutex> locker(_lock);
          for(auto e: _elements)
               std::cout << e << std::endl;
     }
};

Конструкторы классов «оберток» могут принимать параметр, определяющий политику блокировки:
  • defer_lock типа defer_lock_t: не получать мьютекс
  • try_to_lock типа try_to_lock_t: попытаться получить мьютекс без блокировки
  • adopt_lock типа adopt_lock_t: предполагается, что у вызывающего потока уже есть мьютекс

Объявлены они следующим образом:
struct defer_lock_t { };
struct try_to_lock_t { };
struct adopt_lock_t { };
 
constexpr std::defer_lock_t defer_lock = std::defer_lock_t();
constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t();
constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();

Помимо «оберток» для мьютексов, std также предоставляет несколько методов для блокировки одного или нескольких мьютексов:
  • lock: блокирует мьютекс, используя алгоритм избегания deadlock'ов (используя lock(), try_lock() и unlock())
  • try_lock: пытается блокировать мьютексы в порядке, в котором они были указаны

Вот типичный пример возникновения взаимоблокировки (deadlock): у нас есть некий контейнер с элементами и функция exchange(), которая меняет местами два элемента разных контейнеров. Для потокобезопасности, функция синхронизирует доступ к этим контейнерам, получая мьютекс, связанный с каждым контейнером.
template <typename T>
class container 
{
public:
     std::mutex _lock;
     std::set<T> _elements;
     void add(T element) 
     {
          _elements.insert(element);
     }
     void remove(T element) 
     {
          _elements.erase(element);
     }
};
 
void exchange(container<int> &c1, container<int> &c2, int value)
{
     c1._lock.lock();
     std::this_thread::sleep_for(std::chrono::seconds(1)); // симулируем deadlock
     c2._lock.lock();    
     c1.remove(value);
     c2.add(value);
     c1._lock.unlock();
     c2._lock.unlock();
}

Предположим, что эта функция вызвана из двух разных потоков, из первого потока: элемент удаляется из 1 контейнера и добавляется во 2, из второго потока, наоборот, элемент удаляется из 2 контейнера и добавляется в 1. Это может вызвать deadlock (если контекст потока переключается от одного потока к другому, сразу после первой блокировки).
int main()
{
     srand((unsigned int)time(NULL));
     container<int> cntr1; 
     cntr1.add(1);
     cntr1.add(2);
     cntr1.add(3);
     container<int> cntr2; 
     cntr2.add(4);
     cntr2.add(5);
     cntr2.add(6);
     std::thread t1(exchange, std::ref(cntr1), std::ref(cntr2), 3);
     std::thread t2(exchange, std::ref(cntr2), std::ref(cntr1), 6);
     t1.join();
     t2.join();
     return 0;
}

Для решения этой проблемы можно использовать std::lock, который гарантирует блокировку безопасным (с точки зрения взаимоблокировки) способом:
void exchange(container<int> &c1, container<int> &c2, int value)
{
     std::lock(c1._lock, c2._lock); 
     c1.remove(value);
     c2.add(value);
     c1._lock.unlock();
     c2._lock.unlock();
}

Продолжение: условные переменные
Блохин Алексей @Renzo
карма
28,2
рейтинг 0,0
Пользователь
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Реклама

Самое читаемое Разработка

Комментарии (14)

  • +7
    Статья в целом понравилась, но есть несколько моментов, на которых, на мой взгляд, стоит заострять внимание и не использовать их в примерах. Не хватает некоторой глубины объяснения, для чего нужны те или иные фичи.
    Например, не объяснено, для чего нужны scoped lock'и. Более того, большая часть примеров их не использует. А нужны они для написания exception safe кода. Если исключение будет кинуто после lock, но до unlock, то мьютекс останется захваченным и, через некоторое время программа зависьнет. Использование scoped lock'ов позволяет избежать таких ситуаций и всегда автоматически разблокировать мьютекс при выходе из критической секции.

    Про пример с перебрасыванием исключение в другом треде, хотелось бы узнать, кто и когда освобождает память, алоцированную под это исключение. Я могу ошибаться, но разве указатель останется валидным после того, как исключение будет обработано?

    Про recursive_mutex стоит сказать, что его надо расценивать как некий хак. Первым делом надо искать возможность обойтись обычным мьютексом.

    Про mutable стоит упомянуть смысл этого модификатора. Иногда метод ведет себя как константный, снаружи изменение объекта незаметно, семантика у метода не модифицирующая. Но по факту что-то меняется. Это может быть мьютекс, как в это примере, а может быть кеш. В таком случае можно метод сделать const, а изменяемые им переменные mutable. Если метод при этом стал не thread safe, то надо это написать в омментарии в заголовочном файле.
    • +1
      Спасибо за конструктивные дополнения!
      По поводу примера с перебрасыванием исключений: нет, указатель валидным не останется, поэтому в контейнер помещаются копии исключений (используя std::current_exception), а не указатели на них.
      • 0
        Действительно. Собственно такие детали очень важны для понимания материала.
        • 0
          Да, как-то статья не для новичков вышла, а для тех, кто переходит с С++ 2003 на С++11. Но тогда можно кое-что и повыкидывать из статьи. Я понимаю ваши сложности — вы сами с опытом и переходили с С++ 2003, работали с потоками 100500 лет, но попытались написать статью для новичков. Но мне в целом понравилось, видимо бэкграунд у нас схожий.
    • 0
      Поддерживаю, особенно о мелочах, которые следует уточнять. Например, в начале статьи не совсем понятно, в какой момент происходит запуск нового потока, но очень акцентируется внимание на join. В разделе о блокировках я бы, в первую очередь, расписал о проблемах с конкуренцией потоков в работе с данными и чем отличаются мутексы от простых переменных.

      Возможно, раз статья затрагивает с++, нужно упомянуть о других библиотеках, например старину pthread.
  • +6
    Вообще очень странное, мягко говоря, решение ставить блокировку внутри цикла for в функции addrange.
    Блокировка мьютексами — это достаточно дорогая операция и лучше бы ее вынести из цикла.
  • 0
    При выполнении этой программы произойдет deadlock (взаимоблокировка, т.е. заблокированный поток так и останется ждать). Причиной является то, что контейнер пытается получить мьютекс несколько раз до его освобождения

    По моему вы выбрали не самый лучший пример для deadlock. Это проблема проектирования классов, и есть весьма определенные приемы как не встать на эти грабли.
    Да и к тому же такую проблему с лета увидеть вообще сложно (собстно поэтому она и появляется), мне потребовалось пару минут что бы вчухать где же там deadlock.
    • 0
      Аналогично. Причем, не знаю как реализовано в c++11, но в бусте, например, для платформы Win в качестве обычного нерекурсивного mutex все равно используется критическая секция, которая сама по себе поддерживает рекурсию и, следовательно, никакого дедлока в данном примере и в помине не будет, что немного сбивает с толку.
  • +1
    А зачем вообще в container::addrange() блокировки?
    • +1
      Если по хорошему, то в addrange нужна общая блокировка над циклом for, но при этом не стоит вызывать внутри функцию add() содержащую блокировки. Лучше напрямую в addrange() использовать push_back(). Ну и еще совсем было бы хорошо, раз уж используется добавление в контейнер STL, добавлять диапазон не одиночными значениями, а используя итераторы — тогда в функцию addrange() достаточно было бы передать всего два параметра begin_iterator и end_iterator для добавляемого диапазона и вообще избавиться от этого внешнего цикла, но это уже не имеет к теме поста никакого отношения конечно.
  • 0
    Здесь и выходит на сцену std::recursive_mutex, который позволяет получать тот же мьютекс несколько раз. Максимальное количество получения мьютекса не определено, но если это количество будет достигно, то lock бросит исключение std::system_error.


    Не понял, как то, что не определено, может быть достигнуто?
    • 0
      Неправильно выразился: «Максимальное кол-во мьютексов заранее не известно»
  • +1
    Возможно, конечно, с опозданием, но все-таки лучше помечать статью как Перевод, если вы делаете перевод, или хотя бы сделать ссылку на оригинальную статью (http://www.codeproject.com/Articles/598695/Cplusplus11-threads-locks-and-condition-variables), иначе как-то некрасиво получается.
  • 0
    В C++17 будет shared_mutex. Это то же самое что rwlock. Его свойства вполне известны — он позволяет захватываться в режиме shared и exclusive. Если захвачен в режиме shared, то при попытке захвата в режиме exclusive будет блокировка потока, однако остальные захваты в режиме shared не приводят к блокировке. Если захвачен в режиме exclusive, то любые последующие попытки захвата в любых режимах приведут к блокировке.
    Существует множество алгоритмов (см. https://ru.wikipedia.org/wiki/%D0%97%D0%B0%D0%B4%D0%B0%D1%87%D0%B0_%D0%BE_%D1%87%D0%B8%D1%82%D0%B0%D1%82%D0%B5%D0%BB%D1%8F%D1%85-%D0%BF%D0%B8%D1%81%D0%B0%D1%82%D0%B5%D0%BB%D1%8F%D1%85)
    В Windows (копать InitializeSRWLock()) и Linux (см pthread_rwlock) поддерживается нативно ОСью

    Надеюсь кому нибудь будет полезно и познавательно.

    Однако меня интересует следующий объект синхронизации:
    — два метода lock( int index) и unlock( int index )
    — если сначала был вызван метод lock( 1 ), а затем lock( 2) то есть если методы lock() вызываются с разными аргументами, то блокировки не происходит. Если аргументы совпадают, то последний вызвавший поток блокируется.
    То есть блокировка происходит по индексу. Объект синхронизации хранит в себе индексы. Хранит в себе столько индексов, сколько потоков юзают этот Объект синхронизации.
    — метод unlock() освобождает индекс.
    — Произвольное количество потоков

    Интересует как такой объект синхронизации называется, какие у него есть известные реализации, ссылки, статьи и т.п.

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.