Pull to refresh

Push + ActiveMQ — ZendFramework =… или история одного драйвового проекта

Reading time 11 min
Views 5.8K

Одним прекрасным утром к нам в офис забежал молодой парень, с амбициозной идеей и “средствами для реализации” в кармане. “Заходишь на сайт, а там — телевизор. К нему можно подключиться через свою web-камеру. Одновременно может вещать только один человек, остальные — ждут своей очереди (но можно посмотреть скриншоты с их вебкамер). Задача каждого — удержаться в эфире, как можно дольше. Если выступающий нравится публике — все жмут “Cool!”, если подкачал — “Go away!”. И человек заменяется на следующего в очереди. Ну и можно в чат писать”.

Хорошая идея — драйвовый проект. Рисуем прототип, решаем реализовать обновление чата, списка пользователей, рейтинга и т.д. с помощью push-технологии. Это когда после загрузки страницы соединение между клиентом и сервером не закрывается, а продолжает использоваться для отправки сервером каких-либо событий на клиента.

Осторожно! Эта шняга может убить ваш сервер! Кстати, если вы вдруг решите написать высконагруженный скандинавский аукцион – истина и веселые картинки где-то рядом, под катом.


Пара слов про PUSH



Использование технологии PUSH оправдано там, где нужно очень быстро обновлять множество клиентов при определенных изменениях на стороне сервера. В общем случае общение клиента и сервера выглядит так


Очевидно, что на каждого клиента нам нужен один постоянно работающий серверный push-процесс, который называется Comet (Комета). Это накладно с точки зрения использования памяти сервера, но альтернатива этому: постоянные запросы к серверу — на порядок хуже, медленнее и тяжелее. Правда есть еще один хороший вариант — работать напрямую с сокетами, но это отдельная тема.

Процесс работы

Когда клиент совершает какое-то действие (например, пишет в чат или голосует), он отправляет на сервер ajax-запрос с соответствующей командой. Результат выполнения этой команды нужно разослать всем подключенным клиентам. Для этого, скрипт, занимающийся обработкой ajax-запроса, должен оповестить все Кометы, которые уже передадут команду на обновление своим клиентам, а те в свою очередь — аккуратно отрисуют изменение.

Не стоит говорить, что одновременно может поступать довольно много запросов на обновление. Для этого должна быть организована очередь таких событий, из которой Кометы смогут брать события и рассылать их дальше.

Примерно так выглядит происходящее, когда мы чего-то написали в чат.


Ключевых моментов в этой схеме два:
  1. Реализация кометы (хороший обзор предоставил пользователь balepc)
  2. Реализация очереди. Давайте остановимся на них более подробно.


Реализация очереди



Для нашего проекта было необходимо, чтобы:
  • Кометы мгновенно «просыпались» при поступлении новых событий в очередь, а не опрашивали ее постоянно. Это позволяет значительно снизить нагрузку на сервер, а так же реализовать мгновенную реакцию системы на действия пользователя.
  • Если Комета по какой-то причине умирает (например, пользователь обновил страницу), пользователю все равно должны дойти все события, которые произошли между моментом отмирания Кометы и моментом восстановления соединения. Паузы в отправке сообщений так же возникают, если Push-канал работает через технологии “long polling” (характерно, например, для браузера Opera). В этом случае push-соединение создается заново каждый раз, когда клиент получает сообщение от сервера.
  • Когда у нас на сайте сидит 5 человек, то голос каждого из них должен оказывать большее влияние, чем когда у нас на сайте сидит 100 человек.
  • Пользователи, отключившие камеры, должны уходить из блока скриншотов.
    В этом случае нам нужно достоверно знать, сколько и каких пользователей сейчас on-line. Дело в том, что после закрытия пользователем окна браузера на сервере не происходит никаких событий, поэтому отследить этот момент невозможно. И мы пришли к выводу о необходимости того, чтобы комета просыпалась и отчитывалась с определенной периодичностью (скажем, раз в пять секунд) о том, что она жива.
  • Каждое посылаемое в очередь событие доходило до всех пользователей, независимо от качества их канала.


В простых случаях взаимодействия между двумя процессами можно воспользоваться функциями работы с очередями, встроенными в PHP, либо обертку Zend_Queue. Однако они не обеспечивают рассылку события сразу всем слушающим процессам (кометам).

Экспериментируем с очередями


Один из возможных вариантов решения — менеджер очередей.
Способ реализации:
  • Каждая Комета поднимает свою очередь.
  • Идентификатор новой очереди заносится в общую для всех процессов память (shared memory).
  • Для предотвращения одновременной работы нескольких процессов с очередями используется семафор.
  • При отправке сообщения в очередь событие рассылается в очередь каждого процесса.


Примерно так:


// когда мы нарисовали эту картинку на клочке бумажки — нам стало уже страшно, но отважный велосипедостроитель может попытаться довести реализацию до конца.

Даже если мы сделаем такой менеджер, то кометы будут просыпаться только при поступлении новых событий, а нам этого явно недостаточно (помните про необходимость знать, кто сейчас живой?).

К сожалению, стандартными функциями php сделать очередь с таймаутами невозможно. В голову пришла мысль реализовать какой-то сторонний серверный процесс, который бы периодически «пробуждал» кометы, подбрасывая события в очередь. Кстати, этот же процесс может так же удалять умершие идентификаторы очередей из shared memory. Вот что получаем в итоге:


При взгляде на эту картинку невольно вспоминается высказывание Луговского
«Человеческая фантазия безгранична на идиотские и нелепые архитектуры...»


Начинаем анализировать существующие решения для организации очередей с точки зрения пригодности использования в нашем приложении.


Наш выбор пал на Apache ActiveMQ (реально выбирали между ним и RabbitMQ)
Плюсы:
  • Очень быстрый
  • Очень стабильный
  • Поддержка нескольких протоколов, в том числе — stomp
  • Есть реализация взаимодействия в ZendFramework
  • Есть способ рассылки сообщений всем слушателям (так называемые Topics)
  • Умеет пробуждать ожидающий процесс по таймауту
  • Хорошо масштабируется


Минус: Не умеет возвращать количество сообщений в очереди

Более подробный обзор менеджеров очередей от хабраюзера aleks_raiden можно найти здесь
http://habrahabr.ru/blogs/hi/44907/
http://habrahabr.ru/blogs/hi/45891/

Единственное, что нельзя было реализовать напрямую, используя механизм Topics — это восстановление событий, которые могли произойти при обрыве соединения кометы, до того, как она поднялась (это можно было сделать, отказавшись от механизма Topics, и используя индивидуальные очереди на каждый процесс, но...)

Мы решили складывать свежие N штук событий в shared memory, чтобы комета смогла мгновенно восстановить свой контекст. В итоге архитектура получилась следующая:


Бренчмарки



Для реализации приложения мы использовали ZendFramework. Ajax-запросы поступали в соответствующие контроллеры, которые обрабатывали их, и складывали сообщения в очередь, откуда кометы забирали сообщения и рассылали их на клиентов. Сами кометы так же реализованы как Action внутри контроллера ZendFramework.

Запустив Apache Branchmark, мы с ужасом обнаружили, что система активно кушает процессор уже при 25 одновременных обращениях.

300 запросов на 25 одновременных потоков

Как выяснилось, 99% времени занимала инициализация ZendFramework и соединение с базой данных. О-опс. Ну, это старая новость, и лекарство от этого известно — нужно исключить подключение классов ZendFramework, кроме реально необходимых. А все необходимые классы объединить в один файл.

Мы пришли к выводу, что придется отказаться от использования каркаса MVC при обработке ajax-запросов на голосование и чат, а так же в реализации кометы (поскольку скорость ее старта очень критична для браузеров, поддерживающих только механизм Long poll). Так же мы избавились от каких-либо запросов к базе данных в процессах обработки ajax-запросов. Для чего мы вынесли все необходимые данные в shared memory. При необходимости же синхронизации shared memory и базы данных — выполняли синхронизацию сторонним процессом.

Итог



300 запросов на 100 одновременных потоков

Реакция на события от клиента стала практически мгновенной. Ура, победа! Легковесные кометы используют память и процессор очень экономно. Узкое горлышко приложения — раздача видео-потока (но если вы будете делать скандинавский аукцион, где эффективно применять схожую архитектуру очередей — это не ваша проблема ;-), возможно напишем про это в следующих статьях, если выдержим хабраэффект.

Ниже опубликован наш менеджер очередей.

<?php

require_once 'Zend/Queue.php';
require_once 'Sibirix/Queue/Smemory/Package.php';
/**
 * Менеджер очередей
 *
 */
class Sibirix_Queue_Smemory_Manager {

    const ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE = "BROKEN_OPTIONS_FOR_INIT";
    const ERROR_UNKNOWN_IDENTIFIER_STORAGE_MESSAGE = "UNKNOWN_IDENTIFIER_STORAGE";
    const ERROR_CAN_NOT_ACTIVATE_THE_SEMAPHORE_MESSAGE = "CAN_NOT_ACTIVATE_THE_SEMAPHORE";

    const SHM_VAR_LAST_CHAT_POSTS = 1;
    const SHM_VAR_LAST_SYSTEM_COMMAND = 2;
    const SHM_VAR_QUEUE_LAST_ID = 3;

    private $_count_chat_package_storage;
    private $_count_system_package_storage;

    private $_activeMQoptions;
    private $_queue;

    /**
     *  semaphore key
     */
    private $_sem_key;

    /**
     *  Shared memory APP_KEY = one symbol
     */
    private $_app_key;
    private $_shared_memory_id = false;
    private $_shared_memory_handler = false;
    private $_shared_memory_size;

    private $_sem_id = false;
    private $_last_queue_message_id = false;

    public function __construct($options = array()) {

        if (!is_array($options)) {
            throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
        }

        if (!isset($options['sem_key'])) {
            throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
        }

        if (!isset($options['app_key'])) {
            throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
        } else {

            $options['app_key'] = (string)$options['app_key'];
            if (strlen($options['app_key']) != 1) {
                throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
            }
        }

        if (!isset($options['activeMQ'])) {
            throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
        }

        if (!isset($options['shared_memory_size'])) {
            throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
        }

        if (!isset($options['count_chat_package_storage'])) {
            throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
        }

        if (!isset($options['count_system_package_storage'])) {
            throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
        }

        $this->_sem_key = $options['sem_key'];
        $this->_app_key = $options['app_key'];

        $this->_activeMQoptions = $options['activeMQ'];
        $this->_shared_memory_size = $options['shared_memory_size'];

        $this->_count_chat_package_storage = $options['count_chat_package_storage'];
        $this->_count_system_package_storage = $options['count_system_package_storage'];

        $this->_sem_id = sem_get($this->_sem_key, 1);

        $this->_shared_memory_id = ftok(".", $this->_app_key);
        $this->_shared_memory_handler = shm_attach($this->_shared_memory_id, $this->_shared_memory_size);
    }

    public function connectTopic() {
        if (!$this->_queue) {
            $this->_queue = new Zend_Queue('Activemq', $this->_activeMQoptions);
        }
        return $this->_queue;
    }

    public function receive() {

        $this->connectTopic();

        $message = false;
        $msg_receive = $this->_queue->receive(1);
        foreach ( $msg_receive as $msg ) {
            $pkg = new Sibirix_Queue_Smemory_Package($msg->body);
            $this->_last_queue_message_id = $pkg->id;
            $message = $pkg;
        }

        return $message;
    }

    public function receiveAll() {

        $this->connectTopic();

        $last_queue_message_id_mem = $this->_sem_getLastQueueMessageId();

        $messages = array();
        while(true) {
            $msg_receive = $this->_queue->receive(1);
            if (!$msg_receive->count()) {break;}

            foreach ( $msg_receive as $message ) {
                $pkg = new Sibirix_Queue_Smemory_Package($message->body);
                $this->_last_queue_message_id = $pkg->id;
                $messages[] = $pkg;
            }

            if ($this->_last_queue_message_id >= $last_queue_message_id_mem ) {
                $last_queue_message_id_mem = $this->_sem_getLastQueueMessageId();
                if ($this->_last_queue_message_id >= $last_queue_message_id_mem ) {
                    break;
                }
            }
        }

        return $messages;
    }

    public function send($action, $data, $recipient = false) {

        $this->connectTopic();

        $package = new Sibirix_Queue_Smemory_Package($action, $data, $recipient);
        
        $this->_lock();

        $last_queue_message_id_mem = $this->_getLastQueueMessageId();

        $last_queue_message_id_mem++;
        $this->_setLastQueueMessageId($last_queue_message_id_mem);

        $this->_queue->send( $package->normalize( $last_queue_message_id_mem ) );

        if ( $package->isPost() ) {
            $this->_savePackageToChatStorage( $package );
        } else {
            $this->_savePackageToSystemStorage( $package );
        }
        $this->_unlock();
        return $last_queue_message_id_mem;
    }

    private function _getLastQueueMessageId() {
        $last_queue_message_id_mem = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID);
        if (!$last_queue_message_id_mem) {
            shm_put_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID, 0);
            $last_queue_message_id_mem = 0;
        }

        return $last_queue_message_id_mem;
    }

    private function _setLastQueueMessageId($id) {
        shm_put_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID, $id);
        return true;
    }

    private function _sem_getLastQueueMessageId() {
        $this->_lock();
        $last_queue_message_id_mem = $this->_getLastQueueMessageId();
        $this->_unlock();

        return $last_queue_message_id_mem;
    }
    
    private function _savePackageToChatStorage( $package ) {
        $last_chat_posts = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS);
        if (!$last_chat_posts) {$last_chat_posts=array();}
        if (count($last_chat_posts) >= $this->_count_chat_package_storage) {
            while(count($last_chat_posts) >= $this->_count_chat_package_storage) {
                array_shift($last_chat_posts);
            }
        }
        array_push($last_chat_posts, $package->normalize());
        shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS, $last_chat_posts);
    }

    private function _savePackageToSystemStorage($package) {
        $last_sys_cmd = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND);
        if (!$last_sys_cmd) {$last_sys_cmd=array();}
        if (count($last_sys_cmd) >= $this->_count_system_package_storage) {
            while(count($last_sys_cmd) >= $this->_count_system_package_storage) {
                array_shift($last_sys_cmd);
            }
        }
        array_push($last_sys_cmd, $package->normalize());
        shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND, $last_sys_cmd);
    }
    
    public function getLastPackage($storage) {
        if ( $storage != self::SHM_VAR_LAST_SYSTEM_COMMAND && $storage != self::SHM_VAR_LAST_CHAT_POSTS ) {
            throw new Exception(self::ERROR_UNKNOWN_IDENTIFIER_STORAGE_MESSAGE);
        }
        $this->_lock();
        $last_packages = shm_get_var($this->_shared_memory_handler, $storage);
        if (!$last_packages) {$last_packages=array();}
        $this->_unlock();

        $return_last_packages = array();
        foreach($last_packages as $normalize_package) {
            $package = new Sibirix_Queue_Smemory_Package($normalize_package);
            $return_last_packages[] = $package;
        }
        return $return_last_packages;
    }

    private function _lock() {

        if (!sem_acquire($this->_sem_id)) {

            throw new Exception(self::ERROR_CAN_NOT_ACTIVATE_THE_SEMAPHORE_MESSAGE);
        }
    }

    private function _unlock() {

        sem_release($this->_sem_id);
    }

    public function resetMemory() {
        $this->_lock();
        shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS, array());
        shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND, array());
        $this->_setLastQueueMessageId(0);
        $this->_lock();
    }

    public function getLastQueueMessageId() {
        $this->_lock();

        $last_queue_message_id_mem = $this->_getLastQueueMessageId();

        $this->_unlock();
        return $last_queue_message_id_mem;
    }
}

?>



<?php
/**
 * Пакет в очереди
 *
 */ 
class Sibirix_Queue_Smemory_Package {

    const ERROR_NOT_SUPPORTED_TYPE_ACTION_MESSAGE = 'ERROR_NOT_SUPPORTED_TYPE_ACTION';
    const ERROR_FOR_NORMALIZE_NEED_ID_MESSAGE = 'FOR_NORMALIZE_NEED_ID';

    const CHAT_ACTION = 'pasteChatMessages';

    private $_supported_actions = array(
        'updateRating',
        'setChannel',
        'pasteChatMessages',
        'updateTimer',
        'connectToServer',
        'statistics'
    );

    private $_read_private_property = array(
        'id','action','time','data','recipient'
    );

    private $_action;
    private $_data;
    private $_recipient;
    private $_time;
    private $_id;

    public function __construct() {

        $fg_args = func_get_args();

        if (count($fg_args) == 1) {
            list(
                $this->_id,
                $this->_action,
                $this->_time,
                $this->_data,
                $this->_recipient
            ) = unserialize($fg_args[0]);
        } else {

            list($action, $data, $recipient) = $fg_args;
            $this->_validation($action, $data, $recipient);

            $this->_action = $action;
            $this->_data = $data;
            $this->_time = $this->_getmicrotime();
            $this->_recipient = $recipient;
        }
    }

    public function normalize( $id = false ) {

        if ($id) {
            $this->_id = $id;
        } else {
            if ($this->_id === null) {
                throw new Exception(self::ERROR_FOR_NORMALIZE_NEED_ID_MESSAGE);
            }
        }

        return serialize(array(
            0 => $this->_id,
            1 => $this->_action,
            2 => $this->_time,
            3 => $this->_data,
            4 => $this->_recipient
        )); 
    }

    private function _validation($action, $data, $recipient) {
        if (!in_array($action, $this->_supported_actions)) {
            throw new Exception(self::ERROR_NOT_SUPPORTED_TYPE_ACTION_MESSAGE);
        }

        return true;
    }

    private function _getmicrotime() {
        list($usec, $sec) = explode(' ', microtime());
        return number_format(((float)$usec + (float)$sec), 2, '.', '');
    }

    public function __get($key) {

        if (in_array($key, $this->_read_private_property)) {

            return $this->{'_'.$key};
        }

        return null;
    }
    
    public function isPost() {

        if ( $this->_action == self::CHAT_ACTION ) {
            return true;
        }
        return false;
    }
}

?>

Выводы


  • Для очередей необходимо использовать сторонние менеджеры. В нашем случае хорошо себя проявил Active MQ.
  • Работа с кометами и отправляемыми на сервер запросами должна осуществляться легковесными скриптами, без полной инициализации фреймворка, коннектов к базе и других излишеств. Спасибо, кэп.

Сейчас, смотря на то какой был проделан путь, становится просто страшно. Надеюсь, эта статья поможет хабравчанам сделать правильные выводы и не повторить наших ошибок в будущем.

Спасибо за внимание, буду рад услышать вопросы и комментарии и с удовольствием на них отвечу.

UPD. По просьбам читателей — публикую ссылку на проект http://feelyoustar.com/
Tags:
Hubs:
+118
Comments 87
Comments Comments 87

Articles