Pull to refresh

Многопоточный Observer на С++ (практика)

Reading time12 min
Views8.2K
Есть много вариаций на тему данного паттерна, но большинство примеров не подходит для многопоточных приложений.
В этой статье я хочу поделится опытом применения паттерна в многопоточных приложениях и опишу основные проблемы, с которыми мне приходилось сталкиваться.
Цель данной стати — обратить внимание разработчиков на проблемы, с которыми можно столкнуться при создании многопоточных приложений. Выявить подводные камни в реализации коммуникации между компонентами в многопоточном приложении.
Если Вам необходимо готовое решение, обратите внимание на библиотеку Signals2, котрая включена в boost с мая 2009-го года.
Я не пытаюсь предоставить решение, которое можно было бы использовать в готовом виде. Но тем не менее, ознакомившись с материалом, можно обойтись без использования сторонних библиотек, в тех проектах, в которых они по каким-либо причинам не доступны или нежелательны (драйвера, низкоуровневые приложения и т.п.).

Предметная область


Действующие лица

NotificationSender — объект, рассылающий сообщения.
Как правило это рабочий поток, извещающий об изменении своего состояния, которое необходимо отобразить на пользовательском интерфейсе.
NotificationListener — объект, реализующий обработку уведомлений.
Как правило это объект, который управляет отображением части пользовательского интерфейса связанного с фоновой задачей.
Таких объектов может быть множество, при этом они могут подключаться/отключаться динамически (к примеру открытие далогового окна, где показываются детали выполнения задачи)
NotificationDispatcher — объект, управляющий подписчиками и рассылкой сообщений.

Взаимодействие между объектами

Рассылка сообщений всем подписчикам.
Процесс подписки/прекшащения подписки.
Время жизни объектов.
В данной статье описан метод синхронной рассылки сообщений. Это означает, что вызов функции SendMessage будет синхронным, и поток, вызывающий этот метод будет ожидать завершения обработки сообщений всеми подписчиками. В ряде случаев такой подход удобней ассинхронной рассылки, но при этом в нем есть трудности с прекращением подписки.

Простейшая реализация для однопоточной среды


typedef unsigned __int64    SubscriberId;
class CSubscriber
{
public:
    virtual ~CSubscriber(){}
    virtual void    MessageHandler(void* pContext) = 0;
    SubscriberId    GetSubscriberId()   {return (SubscriberId)this;}
};

class CDispatcher
{
private:
    typedef std::vector<CSubscriber*>     CSubscriberList;
public:
    SubscriberId Subscribe(CSubscriber* pNewSubscriber)
    {
        for(size_t i = 0; i < m_SubscriberList.size(); ++i)
        {
            if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId())
            {
                return 0;
            }
        }
        m_SubscriberList.push_back(pNewSubscriber);
        return pNewSubscriber->GetSubscriberId();
    }
    bool Unsubscribe(SubscriberId id)
    {
        for(size_t i = 0; i < m_SubscriberList.size(); ++i)
        {
            if(m_SubscriberList[i]->GetSubscriberId() == id)
            {
                m_SubscriberList.erase(m_SubscriberList.begin() + i);
                return true;
            }
        }
        return false;
    }
    void SendMessage(void* pContext)
    {
        for(size_t i = 0; i < m_SubscriberList.size(); ++i)
        {
            m_SubscriberList[i]->MessageHandler(pContext);
        }
    }
private:
    CSubscriberList     m_SubscriberList;
};

Здесь уникальный идентификатор подписчика — адресс объекта подписчика, функция GetSubscriberId возвращает всегда одинаковое значение для одного объекта подписчика в не зависимости от преобразования типов.

Пример использования

class CListener:
    public CSubscriber
{
    virtual void MessageHandler(void* pContext)
    {
        wprintf(L"%d\n", *((int*)pContext));
    }
};
int _tmain(int argc, _TCHAR* argv[])
{
    CDispatcher Dispatcher;
    CListener Listener1;
    CListener Listener2;
    Dispatcher.Subscribe(&Listener1);
    Dispatcher.Subscribe(&Listener2);
    for(int i = 0; i < 5; ++i)
    {
        Dispatcher.SendMessage(&i);
    }
    Dispatcher.Unsubscribe(Listener2.GetSubscriberId());
    Dispatcher.Unsubscribe(Listener1.GetSubscriberId());
    return 0;
}

Отключение подписчика внутри обработчика сообщений


В примере есть проблема, не связанная с многопоточностью. Эта проблема проявляется, когда мы пытаемся отписаться внутри обработчика MessageHandler. Данная проблемма будет решена копированием списка подписчиков перед вызовом MessageHandler.

Переходим к многопоточной среде


С одним потоком такой код будет работать довольно стабильно.
Давайте посмотрим что будет при работе нескольких потоков.
CDispatcher g_Dispatcher;
DWORD WINAPI WorkingThread(PVOID pParam)
{
    for(int i = 0;;++i)
    {
        g_Dispatcher.SendMessage(&i);
    }
};
int _tmain(int argc, _TCHAR* argv[])
{
    ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL);
    CListener Listener1;
    CListener Listener2;
    for(;;)
    {
        g_Dispatcher.Subscribe(&Listener1);
        g_Dispatcher.Subscribe(&Listener2);

        g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId());
        g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId());
    }
    return 0;
}

Рано или позно произойдет креш.
Проблема заключается в добавлении/удалении подписчиков и одновременной рассылке уведомлений (многопоточный доступ к CDispatcher::m_SubscriberList в нашем примере).
Здесь необходима синхронизация доступа к списку подписчиков.

Синхронизация доступа к списку подписчиков


class CDispatcher
{
private:
    typedef std::vector<CSubscriber*>     CSubscriberList;
public:
    SubscriberId Subscribe(CSubscriber* pNewSubscriber)
    {
        CScopeLocker ScopeLocker(m_Lock);
        for(size_t i = 0; i < m_SubscriberList.size(); ++i)
        {
            if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId())
            {
                return 0;
            }
        }
        m_SubscriberList.push_back(pNewSubscriber);
        return pNewSubscriber->GetSubscriberId();
    }
    bool Unsubscribe(SubscriberId id)
    {
        CScopeLocker ScopeLocker(m_Lock);
        for(size_t i = 0; i < m_SubscriberList.size(); ++i)
        {
            if(m_SubscriberList[i]->GetSubscriberId() == id)
            {
                m_SubscriberList.erase(m_SubscriberList.begin() + i);
                return true;
            }
        }
        return false;
    }
    void SendMessage(void* pContext)
    {
        CScopeLocker ScopeLocker(m_Lock);
        for(size_t i = 0; i < m_SubscriberList.size(); ++i)
        {
            m_SubscriberList[i]->MessageHandler(pContext);
        }
    }
private:
    CSubscriberList     m_SubscriberList;
    CLock               m_Lock;
};

Синхронизация доступа была реализована при помощи объектов синхронизации (Critical section или Mutex).
Для большей переносимости и для того, чтобы не отвлекаться от сути происходящего, абстрагируемся от прямых вызовов платформенно-зависимых функций, типа EnterCriticalSection. Для этого служит класс CLock.
Для устойчивости к с++ исключениям удобно использовать технологию RAII, а именно класс CScopeLocker, который в конструкторе захватывает объект синхронизации, а в деструкторе освобождает его.
При такой реализации программа не будет падать, но нас поджидает еще одна неприятная ситуация.

Борьба с взаимной блокировкой потоков (deadlock)


Допустим у нас есть некий поток, выполняющий какую-то фоновую задачу и есть окно, где отображается ход выполнения этой задачи.
Как правило, поток посылает уведомление классу окна, который в свою очередь вызывает системную функцию SendMessage, которая инициирует какие-то действия в контексте оконной процедуры.
Системная функция SendMessage является блокирующей, она отсылает сообщение потоку окна и ждет пока тот его обработает.
Если подключение/отключение объекта-слушателя будет происходить также в контексте оконной процедуры (в потоке окна) возможна взаимная блокировка потоков, так называемый deadlock.
Такой deadlock может воспроизоводится крайне редко (в момент вызова Subscribe/Unsubscribe и одновременном вызове MessageHandler в отдельном потоке)
Следующий код эмулирует ситуацию с блокирующим вызовом системной ф-ции SendMessage.

CDispatcher g_Dispatcher;
CLock       g_Lock;
class CListener:
    public CSubscriber
{
    virtual void MessageHandler(void* pContext)
    {
        //Эмулируем блокирующий вызов SendMessage
        g_Lock.Lock();
        wprintf(L"%d\n", *((int*)pContext));
        g_Lock.Unlock();
    }
};
DWORD WINAPI WorkingThread(PVOID pParam)
{
    for(int i = 0;;++i)
    {
        g_Dispatcher.SendMessage(&i);
    }
};
int _tmain(int argc, _TCHAR* argv[])
{
    ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL);
    CListener Listener1;
    CListener Listener2;
    for(;;)
    {
        //Эмулируем контекст оконной процедуры (обработчик оконного сообщения)
        g_Lock.Lock();
        g_Dispatcher.Subscribe(&Listener1);
        g_Dispatcher.Subscribe(&Listener2);
        g_Lock.Unlock();
        Sleep(0);
        g_Lock.Lock();
        g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId());
        g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId());
        g_Lock.Unlock();
    }
    return 0;
}

Проблема заключается в том, что главный поток захватывает глобальный объект синхронизации g_Lock (при аналогии с оконной процедурой — выполняется в контексте оконного потока), и затем вызывает метод Subscribe/Unsubscribe, который внутри пытается захватить второй объект синхронизации CDispatcher::m_Lock.
В этот момент рабочий поток посылает уведомление, захватив при этом CDispatcher::m_Lock в функции CDispatcher::SendMessage, и затем пытается захватить глобальный объект синхронизации g_Lock (при аналогии с оконом — вызывает системную функцию SendMessage).

Поток окна    A -> B
Рабочий поток B -> A

Это можно назвать класическим deadlock-ом.
Проблема скрывается в функции CDispatcher::SendMessage().
Здесь должно соблюдаться правило — нельзя вызывать callback-функцию захватив при этом какой-либо объект синхронизации.
Итак, убираем блокировку при рассылке уведомлений.

void SendMessage(void* pContext)
{
    CSubscriberList SubscriberList;
    {
        CScopeLocker ScopeLocker(m_Lock);
        SubscriberList = m_SubscriberList;
    }
    for(size_t i = 0; i < SubscriberList.size(); ++i)
    {
        SubscriberList[i]->MessageHandler(pContext);
    }
}

Контроль времени жизни подписчиков


После того, как мы убрали deadlock, появилась другая проблема — время жизни объектов-подписчиков.
У нас больше нет гарантии, что метод MessageHandler не будет вызван после вызова Unsubscribe, и по этому мы не можем удалять объект-подписчик непосредственно после вызова Unsubscribe.
В данной ситуации проще всего контролировать время жизни объектов-подписчиков с использованием счетчика ссылок.
Для этого можно исползовать технологию COM — унаследовать интерфейс CSubscriber от IUnknown и использовать ATL CComPtr для списка объектов-подписчиков, тоесть заменить std::vector<CSubscriber*> на std::vector<CComPtr>.
Но такая реализация чревата дополнительными расходами на реализацию классов-подписчиков, так как в каждом из них должны быть реализованы методы AddRef/Release и ненужный QueryInterface, хотя если в проекте активно используется COM, то такой подход может иметь приемущество.
Для контроля времени жизни объектов-подписчиков с исползованием счетчика ссылок хорошо подойдут умные указатели.

Простая реализация для многопоточной среды


typedef unsigned __int64    SubscriberId;
class CSubscriber
{
public:
    virtual ~CSubscriber(){}
    virtual void    MessageHandler(void* pContext) = 0;
    SubscriberId    GetSubscriberId()   {return (SubscriberId)this;}
};
typedef boost::shared_ptr<CSubscriber> CSubscriberPtr;

class CDispatcher
{
private:
    typedef std::vector<CSubscriberPtr> CSubscriberList;
public:
    SubscriberId Subscribe(CSubscriberPtr pNewSubscriber)
    {
        CScopeLocker ScopeLocker(m_Lock);
        for(size_t i = 0; i < m_SubscriberList.size(); ++i)
        {
            if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId())
            {
                return 0;
            }
        }
        m_SubscriberList.push_back(pNewSubscriber);
        return pNewSubscriber->GetSubscriberId();
    }
    bool Unsubscribe(SubscriberId id)
    {
        CSubscriberPtr toRelease;
        CScopeLocker ScopeLocker(m_Lock);
        for(size_t i = 0; i < m_SubscriberList.size(); ++i)
        {
            if(m_SubscriberList[i]->GetSubscriberId() == id)
            {
                toRelease = m_SubscriberList[i];
                m_SubscriberList.erase(m_SubscriberList.begin() + i);
                return true;
            }
        }
        return false;
    }
    void SendMessage(void* pContext)
    {
        CSubscriberList SubscriberList;
        {
            CScopeLocker ScopeLocker(m_Lock);
            SubscriberList = m_SubscriberList;
        }
        for(size_t i = 0; i < SubscriberList.size(); ++i)
        {
            SubscriberList[i]->MessageHandler(pContext);
        }
    }
private:
    CSubscriberList     m_SubscriberList;
    CLock               m_Lock;
};

В данной реализации я заменил «голый» указатель CSubscriber* на «умный» указатель со счетчиком ссылок, такой оказался в библиотеке boost.
Также в функцию Unsubscribe я добавил переменную toRelease для того, чтобы вызвать деструктор объекта-подписчика уже после вызова Unlock (нельзя вызывать callback-функцию, включая деструктор объекта подписчика, захватив при этом какой-либо объект синхронизации).
Cтоит обратить внимание на то, что в функции SendMessage происходит копирование списка умных указателей (после копирования все указатели увеличивают свои счетчики ссылок, а при выходе из функции уменьшают, что и контролирует время жизни объектов-подписчиков)

Тестируем


CDispatcher g_Dispatcher;
CLock       g_Lock;
class CListener:
    public CSubscriber
{
    virtual void MessageHandler(void* pContext)
    {
        //Эмулируем блокирующий вызов SendMessage
        g_Lock.Lock();
        wprintf(L"%d\n", *((int*)pContext));
        g_Lock.Unlock();
    }
};
DWORD WINAPI WorkingThread(PVOID pParam)
{
    for(int i = 0;;++i)
    {
        g_Dispatcher.SendMessage(&i);
    }
};
int _tmain(int argc, _TCHAR* argv[])
{
    ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL);
    for(;;)
    {
        boost::shared_ptr<CListener> pListener1(new CListener);
        boost::shared_ptr<CListener> pListener2(new CListener);
        //Эмулируем контекст оконной процедуры (обработчик оконного сообщения)
        g_Lock.Lock();
        g_Dispatcher.Subscribe(pListener1);
        g_Dispatcher.Subscribe(pListener2);
        g_Lock.Unlock();
        Sleep(0);
        g_Lock.Lock();
        g_Dispatcher.Unsubscribe(pListener1->GetSubscriberId());
        g_Dispatcher.Unsubscribe(pListener2->GetSubscriberId());
        g_Lock.Unlock();
    }
    return 0;
}

Соптимизированная реализация для многопоточной среды


Как правило вызов функции SendMessage будет происходить намного чаще чем Subscribe/Unsubscribe. При большом количестве подписчиков узким местом может стать копирование списка подписчиков внутри SendMessage.
Копирование списка подписчиков можно перенести в функции Subscribe/Unsubscribe. Это будет похоже на методику из lock-free алгоритмов.
Объект CDispatcher будет хранить список подписчиков не на прямую, а при помощи умного указателя. Внутри функции SendMessage мы будем получать указатель на текущий список подписчиков и работать с ним. В функциях Subscribe/Unsubscribe мы будем каждый раз создавать новый список подписчиков и перенаправлять указатель внутри объекта CDispatcher на новый список подписчиков. Таким образом в то время, когда указатель на список подписчиков в объекте CDispatcher будет указывать уже на новый список подписчиков, ф-ция SendMessage по прежнему будет работать со старым списком. Так как старый список подписчиков никто не изменяет, то все будет работать стабильно в многопоточной среде.
В принципе, можно несколько модифицировать функции Subscribe/Unsubscribe и реализовать полностью lock-free алгоритм, но это уже другая тема.
Медот Unsubscribe является асинхронным и не гарантирует после своего завершения полное прекращение рассылки, половинное решение — подписчик получает уведомление о прекращении подписки при помощи ф-ции UnsubscribeHandler. Для реализации этого поведения добавлен промежуточный класс CSubscriberItem, который в своем деструкоторе вызывает ф-цию UnsubscribeHandler.
namespace Observer
{
    //////////////////////////
    // Subscriber
    //////////////////////////
    typedef unsigned __int64    SubscriberId;
    class CSubscriber
    {
    public:
        virtual ~CSubscriber(){}
        virtual void    MessageHandler(void* pContext) = 0;
        virtual void    UnsubscribeHandler() = 0;
        SubscriberId    GetSubscriberId()   {return (SubscriberId)this;}
    };
    typedef boost::shared_ptr<CSubscriber> CSubscriberPtr;

    //////////////////////////////////////////////////////////////////////
    // Dispatcher
    ///////////////////////////////////
    class CDispatcher
    {
    private:
        class CSubscriberItem
        {
        public:
            CSubscriberItem(CSubscriberPtr pSubscriber)
                :m_pSubscriber(pSubscriber)
            {
            }
            ~CSubscriberItem()
            {
                m_pSubscriber->UnsubscribeHandler();
            };
            CSubscriberPtr Subscriber()const    {return m_pSubscriber;}
        private:
            CSubscriberPtr  m_pSubscriber;
        };
        typedef boost::shared_ptr<CSubscriberItem>  CSubscriberItemPtr;
        typedef std::vector<CSubscriberItemPtr>     CSubscriberList;
        typedef boost::shared_ptr<CSubscriberList>  CSubscriberListPtr;
    public:
        CDispatcher()
        {
        }
    private:
        CDispatcher(const CDispatcher&){}
        CDispatcher& operator=(const CDispatcher&){return *this;}
    public:
        SubscriberId Subscribe(CSubscriberPtr pNewSubscriber)
        {
            //Declaration of the next shared pointer before ScopeLocker
            //prevents release of subscribers from under lock
            CSubscriberListPtr pNewSubscriberList(new CSubscriberList());
            //Enter to locked section
            CScopeLocker ScopeLocker(m_Lock);
            if(m_pSubscriberList)
            {
                //Copy existing subscribers
                pNewSubscriberList->assign(m_pSubscriberList->begin(), m_pSubscriberList->end());
            }
            for(size_t i = 0; i < pNewSubscriberList->size(); ++i)
            {
                CSubscriberItemPtr pSubscriberItem = (*pNewSubscriberList)[i];
                if(pSubscriberItem->Subscriber()->GetSubscriberId() == pNewSubscriber->GetSubscriberId())
                {
                    return 0;
                }
            }
            //Add new subscriber to new subscriber list
            pNewSubscriberList->push_back(CSubscriberItemPtr(new CSubscriberItem(pNewSubscriber)));
            //Exchange subscriber lists
            m_pSubscriberList = pNewSubscriberList;
            return pNewSubscriber->GetSubscriberId();
        }
        bool Unsubscribe(SubscriberId id)
        {
            //Declaration of the next shared pointers before ScopeLocker
            //prevents release of subscribers from under lock
            CSubscriberItemPtr  pSubscriberItemToRelease;
            CSubscriberListPtr  pNewSubscriberList;
            //Enter to locked section
            CScopeLocker ScopeLocker(m_Lock);
            if(!m_pSubscriberList)
            {
                //No subscribers
                return false;
            }
            pNewSubscriberList = CSubscriberListPtr(new CSubscriberList());
            for(size_t i = 0; i < m_pSubscriberList->size(); ++i)
            {
                CSubscriberItemPtr pSubscriberItem = (*m_pSubscriberList)[i];
                if(pSubscriberItem->Subscriber()->GetSubscriberId() == id)
                {
                    pSubscriberItemToRelease = pSubscriberItem;
                }
                else
                {
                    pNewSubscriberList->push_back(pSubscriberItem);
                }
            }
            //Exchange subscriber lists
            m_pSubscriberList = pNewSubscriberList;
            if(!pSubscriberItemToRelease.get())
            {
                return false;
            }
            return true;
        }
        void SendMessage(void* pContext)
        {
            CSubscriberListPtr pSubscriberList;
            {
                CScopeLocker ScopeLocker(m_Lock);
                if(!m_pSubscriberList)
                {
                    //No subscribers
                    return;
                }
                //Get shared pointer to an existing list of subscribers
                pSubscriberList = m_pSubscriberList;
            }
            //pSubscriberList pointer to copy of subscribers' list
            for(size_t i = 0; i < pSubscriberList->size(); ++i)
            {
                (*pSubscriberList)[i]->Subscriber()->MessageHandler(pContext);
            }
        }
    private:
        CSubscriberListPtr  m_pSubscriberList;
        CLock               m_Lock;
    };

}; //namespace Observer

Ссылки


Библиотека boost::signals2 статья
Умные указатели Джефф Элджер
Resource Acquisition Is Initialization (RAII) википедия
Комментарии к первой версии этой статьи можно найти здесь
Tags:
Hubs:
Total votes 43: ↑43 and ↓0+43
Comments16

Articles