Pull to refresh

RabbitMQ tutorial 3 — Публикация/Подписка

Reading time 5 min
Views 112K
Хочу продолжить серию перевода уроков с официального сайта. Примеры будут на php, но их можно реализовать на большинстве популярных ЯП.

Публикация/Подписка


В предыдущей статье было рассмотрено создание рабочей очереди сообщений. Было сделано допущение, что каждое сообщение будет направлено одному обработчику(worker). В этой статье усложним задачу – отправим сообщение нескольким подписчикам. Этот паттерн известен как "publish/subscribe" (публикация/подписка).
Чтобы понять этот шаблон, создадим простую систему логирования. Она будет состоять из двух программ – первая будет создавать логи, вторая считывать и печатать их.

В нашей систему логирования каждая программа подписчик будет получать каждое сообщение. Благодаря этому, мы сможем запустить одного подписчика на сохранение логов на диск, а потом в любое время сможем создать другого подписчика для отображения логов на экран.

По существу, каждое сообщение будет транслироваться каждому подписчику.

Точки обмена(exchanges)


В предыдущих статьях для отправки и принятия сообщений мы работали с очередью. Теперь рассмотрим расширенную модель отправки сообщений Rabbit.

Напомним термины предыдущей статьи:
  • Producer (поставщик) ‒ программа, отправляющая сообщения
  • Queue (очередь) – буффер, хранящий сообщение
  • Consumer (подписчик) ‒ программа, принимающая сообщения.


Основная идея в модели отправки сообщений Rabbit – Поставщик(producer) никогда не отправляет сообщения напрямую в очередь. Фактически, довольно часто поставщик не знает, дошло ли его сообщение до конкретной очереди.
Вместо этого поставщик отправляет сообщение в точку доступа. В точке доступа нет ничего сложного. Точка доступа выполняет две функции:

— получает сообщения от поставщика;
— отправляет эти сообщения в очередь.

Точка доступа точно знает, что делать с поступившими сообщениями. Отправить сообщение в конкретную очередь, либо в несколько очередей, либо не отправлять никому и удалить его. Эти правила описываются в типе точки доступа (exchange type).

image

Существуют несколько типов: direct, topic, headers и fanout. Мы остановимся на последнем типе fanout. Создадим точку с доступа с этим типом и назовем её – logs:

$channel->exchange_declare('logs', 'fanout', false, false, false);

Тип fanout – очень прост. Он копирует все сообщения которые поступают к нему во все очереди, которые ему доступны. Это то что нам нужно для нашей системы логирования.

Просмотр списка точек доступа:


Чтобы посмотреть все точки доступа на сервере, необходимо выполнить команду rabbitmqctl:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

Мы видим список точек доступа с наименованием amq.* и точку доступа без имени, которая используется по умолчанию (она не подходит для выполнения нашей задачи).

Наименование точек доступа.


В предыдущих статьях мы ничего не знали о точках доступа, но всё-таки могли отправлять письма в очередь. Это было возможно, потому что использовали точку доступа по умолчанию, которая идентифицируется пустой строкой “”.
Вспомним как раньше мы отправляли письма:

$channel->basic_publish($msg, '', 'hello');

Здесь используется точка доступа по умолчанию или безымянная точка доступа: сообщение направляется в очередь, идентифицированную через ключ “routing_key”. Ключ “routing_key” передается через третий параметр функции basic_publish.

Теперь мы можем отправить сообщение в нашу именованную точку доступа:

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');


Временные очереди:


Всё это время мы использовали наименование очередей (“hello“ или “task_queue”). Возможность давать наименования помогает указать обработчикам (workers) определенную очередь, а также делить очередь между продюсерами и подписчиками.

Но наша система логирования требует, чтобы в очередь поступали все сообщения, а не только часть. Также мы хотим, чтобы сообщения были актуальными, а не старыми. Для этого нам понадобиться 2 вещи:

— Каждый раз когда мы соединяемся с Rabbit, мы создаем новую очередь, или даем создать серверу случайное наименование;
— Каждый раз когда подписчик отключается от Rabbit, мы удаляем очередь.

В php-amqplib клиенте, когда мы обращаемся к очереди без наименовании, мы создаем временную очередь и автоматически сгенерированным наименованием:

list($queue_name, ,) = $channel->queue_declare("");

Метод вернет автоматически сгенерированное имя очереди. Она может быть такой – ‘amq.gen-JzTY20BRgKO-HjmUJj0wLg.’.
Когда заявленное соединение оборвется, очередь автоматически удалиться.

Переплеты(Bindings)


image

Итак, у нас есть точка доступа с типом fanout и очередь. Сейчас нам нужно сказать точке доступа, чтобы она отправила сообщение в очередь. Отношение между точкой доступа и очередью называется bindings.

$channel->queue_bind($queue_name, 'logs');

С этого момента, сообщения для нашей очереди проходят через точку доступа.
Посмотреть список binding-ов можно используя команду rabbitmqctl list_bindings.

Отправка во все очереди:


image

Программа продюсер, которая создает сообщения, не изменилась с предыдущей статьи. Единственное важное отличие – теперь мы направляем сообщения в нашу именованную точку доступа ‘logs’, вместо точки доступа по умолчанию. Нам нужно было указать имя очереди при отправки сообщения. Но для точки доступа с типом fanout в этом нет необходимости.

Рассмотрим код скрипта emit_log.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

?>


(emit_log.php source)

Как вы видите, после установки соединения мы создаем точку доступа. Этот шаг необходим, так как использование несуществующей точки доступа – запрещено.

Сообщение в точке доступа будут потеряны, так как ни одна очередь не связана с точкой доступа. Но это хорошо для нас: пока нет ни одного подписчика нашей точки доступа, все сообщения могут безопасно удалятся.

Код подписчика receive_logs.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, 'logs');

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>


(receive_logs.php source)

Если вы хотите сохранить логи в файл, вам потребуется открыть консоль и набрать:

$ php receive_logs.php > logs_from_rabbit.log
Если вы хотите отобразить логи на экран, откройте еще одно окно и наберите:

$ php receive_logs.php
Ну и, конечно, запуск продюсера сообщений:

$ php emit_log.php

С помощью команды rabbitmqctl list_bindings мы можем удостовериться, что код правильно создал очередь и связал её с точкой доступа. С двумя открытыми программами receive_logs.php у вас должно получиться следующее:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.

Здесь отображено, что данные точки доступа logs отправляются в две очереди, имена которых созданы автоматически. Это именно то, чего мы добивались.

В следующей статье будет описано, как прослушать только часть сообщений.
Tags:
Hubs:
+25
Comments 6
Comments Comments 6

Articles