Pull to refresh

Организация рабочих потоков: синхронизационный канал

Reading time7 min
Views9.1K
Представьте себе архитектуру типичного приложения:

Есть рабочий поток движка, выполняющий какую-то функциональность, допустим копирование файлов (архивирование, поиск простых чисел). В общем что-то длительное.
Данный поток должен периодически сообщать информацию о текущем копируемом файле, а также уметь обрабатывать ошибки, допустим ошибка нехватки места на диске.

Графический интерфейс такого приложения должен позволять запускать процесс копирования файлов, уметь приостановить копирование, а также, в случае ошибки, отобразить соответствующий диалог с вопросом к пользователю.

Казалось бы, как можно допустить ошибку в такой простой ситуации?

Проблемы многопоточности


Когда в программе появляется дополнительный поток — сразу же возникает проблема взаимодействия между потоками. Даже если поток ничего не делает и ни с кем не общается, всегда есть проблема правильной остановки потока.

Даже при работе с высокоуровневыми классами-обертками над потоками, легко сделать что-то не так, если до конца не понимать правильность работы с потоками. По этому в данной статье будет идти речь о работе с потоками на уровне WinAPI.

И так, вернемся к нашему примеру.

Рабочий поток движка должен каким-то образом сообщать потоку GUI о своем состоянии (текущий копируемый файл), уведомлять о постановке на паузу, а так-же инициировать сообщение об ошибке.

Два основных способа уведомлений — асинхронный и синхронный


Асинхронный способ — рабочий поток уведомляет о своем состоянии асинхронными сообщениями (PostMessage).
После посылки такого сообщения, поток, как правило, не дожидается ответа и продолжает свою работу.
А в случае невозможности продолжать, поток ожидает вызова управляющей команды от GUI.

Синхронный способ — рабочий поток уведомляет о своем состоянии синхронными вызовами (SendMessage), с ожиданием завершения обработки таких вызовов.
Такой способ удобен тем, что рабочий поток, в момент обработки сообщений, находится в заранее известном состоянии. Нет необходимости в излишней синхронизации.

Также не мало важен третий вариант — опрос состояния движка по таймеру. Таким образом лучше всего реализовывать часто меняющиеся состояния, например прогресс выполнения. Но этот способ не относится к теме данной статьи.

В асинхронном способе есть и свои преимущества, но речь пойдет о синхронных сообщениях, основная выгода которых — простота.

Подводные камни: SendMessage + остановка потока


Когда я вижу рабочий поток, то сразу задаюсь вопросом как он взаимодействует с GUI и как его при этом останавливают.

Будьте внимательны, если рабочий поток прямым или косвенным образом вызывает блокирующую функцию SendMessage для GUI потока. На примере WinAPI, это может быть что-нибудь совсем безобидное, например какой нибудь вызов SetText, который внутри вызывает SendMessage WM_SETTEXT. В этом случае нужно быть особо внимательным при попытке остановки потока в обработчиках нажатия на кнопки и при закрытии приложения (в случае если GUI поток является основным потоком приложения). Это не совсем очевидно, дальше я попытаюсь объяснить.

Правильный способ завершить поток — это дождаться завершения, с использованием одной из функций WaitFor, передав параметром HANDLE потока. Притом дожидаться полной остановки потока обязательно — никаких таймаутов с последующим вызовом TerminateThread. Например:

// INFINITE означает, что - функция не вернет управление до тех пор, пока поток не завершится
WaitForSingleObject(hThread, INFINITE);

Если этого не сделать, возможны непредсказуемые последствия (зависания и падения программы).
Особенно, если поток находится внутри динамически подключаемой библиотеки, которая должна тут-же выгрузиться.

И так, еще раз о проблеме SendMessage: если мы будем в обработчике оконных сообщений ждать завершения потока, то мы таким образом заблокируем эту самую обработку оконных сообщений. А рабочий поток, в свою очередь, пошлет сообщение и будет ждать пока его обработают. Таким образом мы гарантированно получим взаимную блокировку потоков (deadlock).
Один из вариантов решения в случае синхронных сообщений — не просто ждать завершение потока, а прокручивать оконные сообщения, пока поток не завершиться (костыль конечно, но тоже имеет право на существование)

Вторая архитектурная проблема — если рабочий поток вызывает напрямую код GUI, то необходимо позаботиться о синхронизации. Синхронизация потоков получается размазанной по всей программе.

Вариант решения перечисленных проблем


Рабочий поток должен быть изолирован внутри интерфейса движка.

Все уведомления от движка должны приходить синхронно и в контексте клиентского потока, по принципу COM Single-Threaded Apartments.

Вызовы должны происходить синхронно и прозрачно: рабочий поток вызывает некую функцию, которая не возвращает управление до тех пор, пока поток GUI не обработает этот вызов.
Но при этом рабочий поток должен иметь возможность завершиться, даже в момент вызова такой функции.

В итоге интерфейс движка для GUI будет однопоточным, что существенно упростит работу с таким движком.

Вариант реализации и пример на C++


Для реализации этого поведения можно создать повторно используемый объект, который будет обеспечивать переключение контекста потоков при вызове кода GUI.

Я назвал такой объект — синхронизационный канал.

И так, делаем некий синхронизационный канал, при помощи которого рабочий поток движка будет вызывать функции обратного вызова, реализуемые GUI.

Канал будет иметь функцию Execute, с параметром boost::function, куда можно передать функтор, созданный boost::bind. Таким образом, с использованием данного канала, можно будет вызвать функцию обратного вызова с любой сигнатурой, например:
class IEngineEvents
{
public:
    virtual void OnProgress(int progress) = 0;
    ...
};
//где-то в движке...
IEngineEvents* listener; //указатель на объект, реализуемый GUI
syncChannel.Execute(boost::bind(&IEngineEvents::OnProgress, listener, 30));

Функция Execute, как говорилось раньше, синхронная — она не завершается до тех пор, пока функция обратного вызова не будет завершена. Кроме исключения, описанного ниже.

Канал также должен иметь функцию Close, действие которой следующее: все вызовы функции Execute завершаются, новые вызовы функции Execute не проходят. Рабочий поток освобождается и, таким образом, решается проблема остановки рабочего потока — можно использовать функцию WaitFor без необходимости прокрутки оконных сообщений.

Для переключения контекста потоков в примере используется стандартная очередь сообщений Win32 потока и функция PostThreadMessage.

Принцип работы следующий: рабочий поток посылает сообщение при помощи PostThreadMessage, и далее ждет, пока это сообщение не будет обработано, для этого в примере используется отдельный объект событие.
В это время GUI поток должен обрабатывать свои оконные сообщения, одним из которых должно быть сообщение от рабочего потока, которое он обязан обработать и известить рабочий поток о завершении обработки, используя объект событие.

Данная реализация предполагает функцию ProcessMessage, которую необходимо вызывать из цикла обработки оконных сообщений или оконной процедуры. Возможна реализации и без такой функции, например канал может создавать себе невидимое окно, и обрабатывать все сообщения внутри. Кроме того, возможны реализации без использования оконных сообщений в принципе.

Хотелось бы еще сказать, что пример несет лишь ознакомительный характер, и не является готовым решением.
// SyncChannel.h
class CSyncChannel
{
public:
    typedef boost::function<void()> CCallback;

public:
    CSyncChannel(void);
    ~CSyncChannel(void);

public:
    bool Create(DWORD clientThreadId);
    void Close();
    bool Execute(CCallback callback);
    bool ProcessMessage(MSG msg);

private:
    DWORD                           m_clientThreadId;
    CCallback                       m_callback;
    HANDLE                          m_deliveredEvent;
    volatile bool                   m_closeFlag;
};

// SyncChannel.cpp
UINT WM_SYNC_CHANNEL_COMMAND = WM_APP + 500;
CSyncChannel::CSyncChannel(void) : m_closeFlag(true)
{}
CSyncChannel::~CSyncChannel(void)
{}
bool CSyncChannel::Create(DWORD clientThreadId)
{
    if (!m_closeFlag)
    {
        return false;
    }
    m_clientThreadId = clientThreadId;
    m_deliveredEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
    if (!m_deliveredEvent)
    {
        return false;
    }
    m_closeFlag = false;
    return true;
}
void CSyncChannel::Close()
{
    m_closeFlag = true;
    if (m_deliveredEvent)
    {
        CloseHandle(m_deliveredEvent);
        m_deliveredEvent = NULL;
    }
}
bool CSyncChannel::Execute(CCallback callback)
{
    // Эта функция может быть вызвана с любого потока.
    // Дело в том, что некоторые функции движка могут вызываться клиентским потоком.
    // Например функция Pause(), в которой может быть
    // тут-же вызвана функция обратного вызова,
    // изменяющая состояние движка на что нибудь вроде "pause pending"
    if (m_closeFlag)
    {
        return false;
    }
    if (GetCurrentThreadId() == m_clientThreadId)
    {
        // Если вызывающий поток - это клиентский поток,
        // то мы должны вызвать колбек напрямую, без переключения контекста потоков.
        // Иначе поток сам себе пошлет сообщение, и будет ждать пока он сам его обработает,
        // что привело бы к блокировке потоков - поток будет ждать сам себя.
        callback();
    }
    else
    {
        // Функция Execute была вызвана из рабочего потока,
        // по этому мы должны послать сообщение клиентскому потоку,
        // и вызвать функцию обратного вызова уже в нем.

        // Сохраняем функцию обратного вызова для того,
        // чтобы она была вызвана в клиентском потоке.
        // Данная реализация предполагает один рабочий поток, и один клиентский,
        // если рабочих потоков будет сразу несколько,
        // то здесь необходимо добавить синхронизацию.
        m_callback = callback;

        // Сбрасываем объект событие для того, чтобы клиентский поток
        // мог нам его установить после того, как он обработает вызов
        ResetEvent(m_deliveredEvent);

        // Уведомляем клиентский поток о том, что необходимо
        // вызвать функцию обратного вызова.
        // Для этого посылаем в клиентский поток сообщение, получив которое,
        // клиентский поток должен будет вызвать функцию CSyncChannel::ProcessMessage()
        if (!PostThreadMessage(m_clientThreadId, WM_SYNC_CHANNEL_COMMAND, NULL, NULL))
        {
            return false;
        }

        // Ждем, пока клиенсткий поток вызовет функцию CSyncChannel::ProcessMessage(),
        // в которой установится событие m_deliveredEvent,
        // либо пока не будет установлен флаг m_closeFlag
        // Можно заменить флаг m_closeFlag на объект событие
        // и использовать WaitForMultipleObjects, но канал будет закрываться не часто,
        // и моментальной реакции на это не требуется.
        DWORD waitResult = WAIT_TIMEOUT;
        while (waitResult == WAIT_TIMEOUT && !m_closeFlag)
        {
            waitResult = WaitForSingleObject(m_deliveredEvent, 100);
        }
        if (waitResult != WAIT_OBJECT_0)
        {
            // Мы не дождались сообщения о доставке, а значит мы дождались флага закрытия
            return false;
        }
    }
    // Функция обратного вызова была успешно вызвана в клиентском потоке
    return true;
}
bool CSyncChannel::ProcessMessage(MSG msg)
{
    // Эта функция вызывается только из клиентского потока
    if (msg.message != WM_SYNC_CHANNEL_COMMAND)
    {
        // Клиентский код вызывает эту функцию для всех сообщений потока,
        // фильтруем не наши сообщения
        return false;
    }
    if (!m_closeFlag)
    {
        // Мы переключились в контекст клиентского потока,
        // и теперь мы можем вызвать функцию обратного вызова
        m_callback();

        // После обработки вызова, отпускаем рабочий поток.
        // Для этого необходимо установить объект событие
        SetEvent(m_deliveredEvent);
    }
    return true;
}

Пример использования класса CSyncChannel смотрите в следующей статье — Организация рабочих потоков: управление состоянием движка.
Tags:
Hubs:
+4
Comments8

Articles

Change theme settings