Pull to refresh

SignalsyMQ — простая и быстрая очередь сообщений на PHP+Redis (и немножко Zend Framework)

Reading time10 min
Views8.1K

Приветствую читателей. Сейчас я веду разработку собственного фреймворка, основанного, в противовес главенствующей сейчас модели MVC, на базе сигнальной архитектуры (signal/slot). Пока он проходит боевую обкатку в нашем стартапе, и за это время я понял, что просто библиотеки мало — реальные задачи и виденье будущего показывает, что необходим разный функционал, но объединенный одной темой — обработка и доставка информации множеству клиентов в реальном времени (да, в чем-то схоже с Comet-ом, он там тоже есть). Поэтому решено было попробовать реализовать основной компонент — очередь сообщений, которая бы стала основной для следующих проектов, быстрой, гибкой и масштабируемой.

Что получилось? Альфа-версия SignalsyMQ — очередь сообщений на базе PHP/Redis/Zend Framework.

Об очередях сообщений мы уже писали (первая статья обзора, вторая), но еще раз критически пересмотрев все системы и даже попробовав некоторые из них, я понял, что ниша небольших, гибких и конфигурируемых очередей, максимально приближенных к платформе основного приложения (в данном случае, РНР), остается открытой и свободной. Недостает простого сервера, который бы оперировал стандартными сообщениями (в формате JSON, внутри приложения это просто ассоциативный массив), обеспечивал бы гибкое конфигурирование доставки сообщений, постоянное хранение очередей, и очень быструю с ними работу. При этом хочется простого протокола и вообще максимальной простоты.

Ближе всего к этому стоят или продукты на базе Ruby — Starling MQ и MemcacheQ. Кстати, обе системы работают по memcache-протоколу, что положительно сказывается на возможности встраивания в разнородную среду. Но memcacheq смутил своей политикой развития, вернее — отсутствием таковой, ограничениями на максимальную длину сообщения и т.п., а Starling хоть и имеет за собой опыт применения в Twitter-е, но также достаточно специфический продукт, сыроват, к тому же требующий совсем другой платформы. Поэтому решено было, в лучших традициях, написать собственную реализацию очереди сообщений.

В качестве отправной точки был взят проект Redis — очень-очень быстрая и гибкая NoSQL система с развитыми возможностями хранения и обработки структурированных данных (о ней уже писали на Хабре, не успел). Вторым «китом» стала Rediska — РНР библиотека для работы с Redis-ом с удобным синтаксисом и поддержкой многих интересных функций (например, работы с несколькими серверами, встроенная сериализация, распределение ключей по серверах и т.п., однако библиотека в активной разработке, поэтому функции добавляются постоянно). Третьим «китом» стал Zend Framework, который я теперь использую в большинстве проектов, хотя здесь он играет всего лишь вспомогательную роль (используются несколько служебных классов).

Сразу замечу, что в ZF уже есть собственная реализация очередей сообщений — Zend_Queue, поддерживающая различные бекенды для хранения сообщений, начиная от обычного РНР-массива, заканчивая MemcacheQ, базами данных и Zend Platform. Однако на практике попробовать самый многообещающий интерфейс с MemcacheQ мне пока не удалось, с базой же работа в случае плотного потока сообщений просто невозможна — очень сильно замедляется работа вплоть до вылета с ошибкой всего скрипта. Ну и сам интерфейс Zend_Queue сильно уж абстрактный, в моем случае хотелось некоторые более высокоуровневые функции, поэтому пришлось бы значительно расширять существующий код. К слову, в библиотеке Rediska есть специальный адаптер для того, чтобы работать с очередями поверх Redis, но промучившись неделю и так и не заставив работать даже тестовый пример, я окончательно решил — все, пишу свою систему!

Для начала определимся с общей архитектурой и особенностями. Основные понятия это — сообщение, очередь (канал), сервер и хранилище.
  • Сообщение — некоторая структура данных, в нашем случае — ассоциативный массив, содержащая информацию, которую мы сохраняем. Сообщение состоит из нескольких служебных полей и тела. В теле сообщения хранятся сами данные, которые и передаются получателю, и, что главное, в неизменном виде. Служебные поля могут изменяться в процессе обработки. Пока обязательными полями у меня являются — тип сообщения (произвольная строка), источник сообщения (также строка), канал, куда необходимо доставить сообщение и метка времени создания сообщения. Тело сообщения также одно из полей, но оно вполне может быть и пустым — оно имеем смысл только для конкретного получателя, обязанность всей системы — просто сохранить и доставить.
  • Очередь (канал, channel) — это некоторый уникальный адрес, имя очереди, куда доставляются сообщения. Я принял самую простую и понятную форму адресации — нечто подобное URL-адресам. Например, /users/aleks — это очередь сообщений для пользователя aleks. Внутри происходит некоторая дополнительная обработка, но это никак не влияет на то, как с очередью работают другие скрипты. Также допускается рассылка сообщений сразу в несколько очередей — для этого используются мета-символы '*' (любое количество символов), '?' (произвольный один символ) и '[a..z]' — произвольный набор символов, который должен быть в имени, чтобы сообщением было доставлено. Этим моя разработка выгодно отличается от того же Zend_Queue — можно легко разослать сообщение, например, всем пользователям, которые имеют имя aleks (при условии, что мы очереди называем по имени пользователей). Очереди также характеризуются длиной — максимальным количеством сообщений, которое может там хранится. Учитывая, что сообщения хранятся в памяти (да еще и в сериализированном виде), следует учесть расход памяти, поэтому выбирайте длину очереди исходя из конкретики проекта. В случае переполнения очереди, самые старые сообщения будут удалены.
  • Сервер очередей — собственно, сам скрипт библиотеки. Текущая реализация напрямую работает с Redis серверами, следующим шагом будет создание выделенного сервера. То есть, клиентское приложение будет соединяться по сокету с моим сервером, посылать сообщения с мета-информацией (как минимум — указание канала, куда послать сообщение), остальное будет дело сервера (иили скорее, демона). Для расширения возможностей, планируется поддерживать как различные форматы данных (сериализации), так и несколько протоколов работы — через сокеты, XML/JSON-RPC, HTTP-REST, SOAP и любые другие.
  • Хранилище — в текущем варианте это один или несколько запущенных Redis-cерверов, в будущем, возможно, реализую по аналогии с Zend_Queue, несколько вариантов хранения сообщений, хотя маловероятно.
Еще одним замечанием об архитектуре и отличиях от Rediska_Queue будет то, что насколько я понял из исходников, в угоду быстродействию, они положили ценное свойство распределённости. Например, команда getQueues, которая возвращает массив всех хранимых очередей (не сообщений, а только имена очередей), работает с локальной копией списка очередей. Поэтому, если другой клиент создаст новую очередь во время работы первого, он об этом не узнает. В случае веб-приложения, когда запрос обрабатывается, пока идет формирование страницы, это может и не важно, мне же надо именно сервер сообщений, который может работать как демон и обрабатывать множество очередей и команды от многих клиентов. Хотя это требует дополнительных ресурсов, хотя благодаря Redis-у все операции имеют очень хорошие показатели сложности.

И так, общий API состоит из ряда методов, максимально приближенных к интерфейсу Zend_Queue, однако с некоторыми расширениями, которые я опишу ниже.
  • getInstance ($config) — Используется один экземпляр библиотеки и адаптера для доступа к Redis, поэтому вся работа идет через статичный метод, которые сам принимает решение о создании или использовании уже инициализированного экземпляра. В конфигурации задаются параметры серверов (адреса, порты), а также функции-сериализаторы и механизм распределения ключей в случае нескольких серверов.
    private $_options = Array(
      'maxQueues' => 100000, //максимальное количество очередей
      'maxMessagesPerQueues' => 1000, //максимальное количество сообщений в одной очереди
      'expireQueues' => 15552000, //максимальное время жизни очереди - 180 дней
      'expireMessages' => 604800, //время жизни максимальное одного сообщения
      'maxMessageLength' => 32768, // максимальный размер в байтах сообщения (после сериализации)
      'log' => false, //если указан, то объект Zend_Log куда логгировать сообщения,
      'redis' => Array(
    	'namespace' => 'smq_', //неймспейс для ключей, для сокращения памяти выбран небольшим
    	'servers' => array(
     	  array('host'     => 'localhost', // Хост
    	          'port'     => 6379, // порт
    	          'weight'   => 1, // Weight of server for key distribution
    	          'password' => '' // пароль
    	 )
      ),
     'keyDistributor' => 'crc32',  //тип распределения ключей
     'serializer' => array('Signalsy_MQ', '_serialize'),
     'unserializer' => array('Signalsy_MQ', '_unserialize'))
    );

    Также есть возможность задать логгер (используем Zend_Log) для записи всех значимых событий, ошибок и даже самих сообщений (скорее для отладки, в продакшине достаточно настроить Redis на сохранение + встроенный в него механизм appendonly.log-а для предотвращения потери данных, или же репликация на запасной сервер).
  • _serialize/_unserialize — встроенные функции сериализации, которые применяются при добавлении в хранилище и извлечении из него сохраненных сообщений. Redis может хранить в качестве значений только числа и строки, поэтому мы должны сериализовать данные перед сохранением. Это делается прозрачно самим сервером, необходимо лишь указать методы для этих действий, иначе используется по умолчанию сериализация РНР. Я выбрал более удобный и «родной» для AJAX-приложений JSON, так как в будущем хочется минимизировать количество прослоек между клиентом и хранилищем. В следующей версии будет использоваться новый компонент Zend_Serializer из инкубатора Zend-а, что позволит использовать и другие алгоритмы. Кроме этого, данные, сохраненные в PHP, сможет прочитать и любая другая программа на любом языке, где есть JSON-парсер, что очень важно для распределенных систем.
  • createQueue — основной метод, который создает очередь, имя которой ему передается. Изначально проверяется корректность имени (удаляются различные спец-символы и т.п.), а также проверяется существование такой очереди в системе. После создания в очередь помещается первое служебное сообщение, в котором просто указана метка времени. Это сделано так потому, что Redis автоматически создает пустой список (структура данных List), когда мы добавляем первый элемент, поэтому чтобы не указывать что-то служебное, типа пустой строки, решено было инициализировать очередь специальным служебным типом сообщений.
  • isExists — проверяет существование указанной очереди на сервере. В отличие от реализации адаптера Rediska для Zend_Queue, мы производим более серьезные проверки, включая проверку существование такого объекта в памяти базы, также совпадение типов данных.
  • count — позволяет получить количество сообщений в очереди.
  • deleteQueue — удаление очереди из хранилища сервера, соответственно, все сообщения в ней будут также удалены. Возможно, следует инициализировать принудительную запись состояния базы при таких изменениях, чтобы в случае краха данные были актуальны.
  • send — это основной метод, который рассылает сообщения в указанные каналы. Кроме обязательного параметра — сообщения, можно указать дополнительные проверки, которые гарантируют корректность работы системы очередей, но могут существенно замедлить работу. skipCheckQueueLength отвечает за проверку длины очереди и ее усечения при превышении установленного в конфигурации лимита. При усечении самые старые сообщения будут удалены. Если этот параметр равен целому числу, то оно задает вероятность того, что такая проверка будет произведена в этом запросе.Это некий компромисс между быстродействием и надежностью, так как требует один или два запроса к серверу. Параметр skipCheckQueueExists позволяет отключить проверку на существование очереди. Как уже говорилось выше, сообщение это стандартного вида массив, который после сериализации сохраняется в очереди. В обязательном поле channel мы указываем очередь (или несколько, используя маску), куда следует поместить сообщение. В случае, если задана маска, сначала мы извлекаем по этой маске все имена очередей, удовлетворяющие заданным параметрам, а потом в цикле добавляем сообщение в каждую из них, предварительно скорректировав поле channel в сообщении. Больше никакие служебные заголовки на данный момент не изменятся и не анализируются, так же как и тело самого сообщения, которое может быть как строкой, так и произвольным типом данных в РНР, который допускает корректную сериализацию. Еще раз — основным отличием от других реализаций является возможность рассылать одним вызовом сообщения в несколько (или даже все) очереди. В случае, если установлена опция логгированя, сообщение будет записано и в лог, используя Zend_Log
  • receive — вторая по важности операция — получение из очереди одного или нескольких сообщений. Здесь уже не поддерживаются маски, сообщения можно получить только из конкретной заданной очереди. Также обязательно проверяется существование очереди на сервере (возможно, для максимальной производительности, особенно, если с Redis-сервером никто больше не работает, все эти проверки можно проводить только раз, при инициализации работы). Для получения мы указываем максимальное количество сообщений, сколько мы можем получить, однако реально может быть получено меньше, например, если их столько нет в очереди. Еще одна интересная опция (не скрою, пока функционал не совершенен в плане оптимизации) — получения только сообщений, отправленных после указанной даты. Например, мы хотим получить не более ста сообщений, отправленных за последний час. Сначала будет произведена выборка указанного числа сообщений, потом проверка и отбрасывание тех, которые не удовлетворяют критерию времени создания. Необходимо отметить, что сообщения из очереди только читаются, не удаляются сразу после чтения — для этого необходимо использовать специальную команду. Очередь сама по себе является персистентной, ее состояние изменяется только вручную клиентом (или в результате проверки на длину очереди). В будущем будет реализован механизм устаревания (expires) — он изначально поддерживается в Redis-е, но в клиентской части (для отдельных элементов списков) еще не реализовано.
  • deleteMessage — удаление сообщения из очереди.
  • getQueues — позволяет просмотреть список всех доступных очередей на сервере.
  • getQueuesByPattern — выводит список очередей, удовлетворяющих указанному паттерну (используя метасимволы '*', '?' и группы символов [a..z]

Честно скажу, что исходный код проекта, хоть и доступен, но никак не готов к реальной работе — скорее просто прототип и первая альфа-версия. Там много еще не оптимальных мест, разная документация, в том числе куски из Zend_Queue, на интерфейсе которого я основывался, не до конца продуманный алгоритм работы и структуры данных (например, deleteMessage требует два параметра, имя очереди и сообщение, хотя в служебном поле channel сообщения уже есть имя его очереди и т.п.). Тем не менее, текущий код уже показывает результаты, он рабочий и пригоден к экспериментам.

Говорить о производительности сейчас также реально нельзя, однако я проводил несколько тестов, используя Amazon EC2 small instance (32 bit, Debian, PHP 5.2.11, Redis 1.1.9) — однопоточный сервер в режиме выборочной записи большого количества сообщений в рандомные очереди (1000 сообщений и пачками по 100) показывает быстродействие в пределах 1500 — 3000 сообщений в секунду (на запись, так как на сервере еще другие системы крутятся, то колебания очень зависит от загрузки, а также при таком режиме очень чувствительно как выставлен интервал сохранения в самом Redis).

В случае, когда каждый клиент работает с небольшим количеством сообщений и одной очередью, но параллельных клиентов множество, достигается наибольшее быстродействие, в принципе, вплотную приближаясь к максимум, что по тестам на этой конфигурации можно вытянуть (5 — 7 тыс. сообщений в секунду, в случае когда тест и сервер на одной машине). Большего результата можно достичь, применяя многопоточность, вынеся Redis на отдельный сервер, а также максимально разгрузив сервер от других задач, кроме того, быстродействие упирается в параметры CPU сервера — уже инстанс Large (2 ядра и 7 Гб RAM) показывает в 5 — 6 раз большее быстродействие (конечно, это лишь усредненный показатель, далекий от реального).

Вот такие эксперименты. Тема очередей сообщений интересная, надо бы написать еще пару статей, например, почти никто не знает про компоненты Zend_Queue и их применение, даже из разработчиков, которые активно используют ZF в работе. Ну и впереди много доработок и совершенствование сервера, превращение его в сетевого демона и сборка на этой базе специального сервера, работающего с различными протоколами (возможно и стандартными для MQ — Stomp, AMQP), настройка репликации и учет загруженности при фоновом сохранении, работа с несколькими параллельными серверами и т.п. Вам интересно об этом почитать?
Tags:
Hubs:
+18
Comments51

Articles

Change theme settings