0,0
рейтинг
6 ноября 2013 в 20:45

Разработка → RabbitMQ tutorial 4 — Роутинг tutorial

Продолжаю серию перевода уроков с официального сайта. Примеры будут на php, но их можно реализовать на большинстве популярных ЯП.
В предыдущей статье мы разработали систему логирования. Нам удалось отправлять сообщения нескольким получателям. В этой статье модернизируем нашу программу — будем отправлять получателю только часть сообщений. Например, мы сможем сохранять на диске только сообщения с критическими ошибками (экономия места на диске), а в консоли будем отображать все сообщения.

Bindings


В предыдущей статье мы создавали связи(bindings). Напомним код:
$channel->queue_bind($queue_name, 'logs');

Binding — это связь между точкой доступа и очередью. Это можно интерпретировать как: очередь хочет получить сообщения из точки доступа.
Binding может принимать параметр routing_key. Чтобы не путаться с параметром $channel::basic_publish (тоже содержит параметр routing_key), назовем его binding_key. Рассмотрим создание связи с ключом binding_key:

$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);


Значение этого ключа зависит от типа точки доступа. Точка доступа с типом fanout просто проигнорирует его.

Точка доступа Direct


Наша система логирования в предыдущей статье отправляла всем подписчиками все сообщения. Мы хотим расширить нашу систему, чтобы фильтровать сообщения по степени важности. Для примера мы сделаем так, чтобы скрипт, записывающий логи на диск не тратил своё место на собщения с типом warning или info.
Ранее мы использовали точку доступа с типом fanout, которая не дает нам полной гибкости — она подходит только для простой трансляции.
Вместо это мы будем использовать тип direct. Его алгоритм очень прост — сообщения идут в ту очередь, binding_key которой совпадает с routing key сообщения.
Рассмотрим схему на картинке:

На схеме изображена точка доступа X и две связанные с ней очереди. Первая очередь связана с binding key = orange, а вторая очередь имеет две связи. Одна с ключом binding key = black, а вторая с ключом — green.
Сообщения с routing key = orange будут направляться в очередь Q1, а сообщения с ключом black или green направятся в очередь Q2. Все остальные сообщения будут удалены.

Составные связи (Multiple bindings)



Вполне допустимо связывать несколько очередей с одинаковым ключом binding key. В этом примере мы связываем точку доступа X и очередь Q1 с тем же ключом black, что и у очереди Q2. В этом примере direct ведет себя также как и fanout: отсылает сообщения во все связанные очереди. Сообщения с ключом black попадет в обе очереди Q1 и Q2.

Отправка логов


Построим алгоритм отправки сообщений. Вместо fanout для точки доступа будем использовать тип direct. Routing key будет совпадать с названием типа лога. Допустим что скрипт отправки лога будет знать тип лога.
Для начала создадим точку доступа:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);

Теперь отправим сообщение:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);

Лог будет иметь 3 типа: 'info', 'warning', 'error'.

Подписка


Отправка сообщений будет таким же как и в примере предыдущей статьи, с одним условием — нужно создать для каждого типа лога свою связь binding.
foreach($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}


Итого получаем



Код скрипта продюсера emit_log_direct.php:
<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = $argv[1];
if(empty($severity)) $severity = "info";

$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo " [x] Sent ",$severity,':',$data," \n";

$channel->close();
$connection->close();

?>


Код скрипта подписчика receive_logs_direct.php:
<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if(empty($severities )) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>


Если вы хотите сохранить в файл только логи с типом error и warning, наберите в консоли:

$ php receive_logs_direct.php warning error > logs_from_rabbit.log

Если вы хотите отобразить все логи на экране, наберите в консоли

$ php receive_logs_direct.php info warning error
 [*] Waiting for logs. To exit press CTRL+C



Или чтобы вытащить только логи error:
$ php emit_log_direct.php error "Run. Run. Or it will explode." 
 [x] Sent 'error':'Run. Run. Or it will explode.'

(исходники (emit_log_direct.php source) и (receive_logs_direct.php source))

В следующей статье будет рассмотрена прослушка сообщений соответствующих какому-либо шаблону
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Спецпроект

Самое читаемое Разработка

Комментарии (14)

  • 0
    Автор, лучи добра тебе и пожелание попасть в рай без очереди :)
    А вот что нам понадобилось в реальной практике и было сходу непонятно как реализовать:
    — как сделать подписку по маске (каюсь, не сразу нашли звездочку и решётку);
    — как сделать, когда надо чтобы оба консумера получили мессадж, и когда надо чтобы строго один консумер из N возможных;
    — как сделать, чтобы консумер выполнял действие, только получив сообщения из двух и более очередей, делая ack только по получению всех и не потребляя процессорные ресурсы между этими сообщениями (multi-queue consumer)
    • 0
      Спасибо)
      1. Подписка по маске будет в следующей статье, как я и написал в конце этой статьи.
      2. Почему бы не сделать две очереди? Одну для всех и одну строго для одного?
      3. Не очень понял последнего пункта.
    • 0
      как сделать, когда надо чтобы оба консумера получили мессадж
      — Паб/саб реадихуется через fanout exchange

      как сделать, когда надо чтобы строго один консумер из N возможных
      — Для direct exchange это дефолтное поведение реализовано через round robin

      как сделать, чтобы консумер выполнял действие, только получив сообщения из двух и более очередей
      — Из коробки можно подписываться на сколько угодно очередей, выставив ручное подверждение и отправлять ack опираясь уже на вашу бизнес логику
      • 0
        Вопросы получения сообщений касаются исключительно логики queue, т.е.
        — хотим что бы был round robin — консьюмеры должны слушать одну очередь на всех
        — хотим что бы каждый получил сообщене — консьюмеры должны слушать разные очереди

        Вопросы роутинга находятся на стыке queue and exchange, при этом тип exchange — скорее способ задавать различные способы адресации.

        И последнее — из коробки можно подписать одну очередь несколькими биндингами, которые могут иметь разные типы exchange and routing keys, а можно — слушать несколько очередей (тут есть разница между «подписываться» и «слушать»), соответственно выбор правильной стратегии (много очередей мало биндингов или наоборот) зависит от задачи (ну или уже имеющегося хозяйства)
        • 0
          — хотим что бы каждый получил сообщене — консьюмеры должны слушать разные очереди

          Ну как же? fanout exchange как раз для ситуации 1 очередь получают все.
      • 0
        Из коробки можно подписываться на сколько угодно очередей
        Уважаемый limitium, у Вас очень поверхностный взгляд на проблему.

        Во-первых, если подписываться можно, то слушать одновременно более 1 очереди (по крайней мере из PHP) нельзя. Когда PHP-скрипт повисает на consume очереди, его выполнение приостанавливается. Если мы слушаем три очереди (A, B, C), повисли на consume первой из них (A), но из внешних источников быстрее поступили сообщения во вторую (B) и третью ©, то мы так и будем тупо висеть и ждать сообщения в первой.

        Это плохо, потому что:
        — если например комбинация сообщений B и C уже позволяет выполнить следующий шаг бизнес-логики, то ждать сообщения из A нам смысла нет. Нет никакого способа «из коробки» поймать такую ситуацию в коде PHP.
        — consume на одну из очередей означает, что процессорные ресурсы потребляются по сути на прогон пустого цикла ожидания. Нет никакого способа «из коробки» дёрнуть действие только по получению всех трёх сообщений, а пока они все не пришли — заниматься другими задачами.

        Во-вторых, ack сообщений не атомарен, как например в редисе (там можно завернуть несколько действий в атомарную транзакцию). Если ваш скрипт получил сообщения из трёх очередей, сделал первому ack, второму ack, а третьему не успел — умер (в ДЦ попал метеорит) — вы получили неконсистентность данных.

        В-третьих, если у вас несколько воркеров, то вам нужно сделать чтобы все три сообщения в очередях A, B и C, составляющие «смысловую тройку» по бизнес-логике, попали строго в один воркер. Никаких round-robin.

        Естественно, речь идёт о высоконагруженных проектах, сопоставимых с… скажем с Вконтакте или Amazon. В проектах меньшего масштаба на всё перечисленное можно забить болт :)

        Надеюсь, я достаточно подробно объяснил почему нормальный multi-consumer не так просто реализовать?
        • 0
          Радость от rabbitmq, как раз в том, что можно писать разные части на разных языках. Зачем вы пишете на PHP?
          Если у вас столь сложная бизнес логика, которая выходит за рамки rabbitmq, но вы очень любите PHP, велкам к zeromq. Будете разруливать любую ситуацию как вам угодно. В противном случае у вас будет огород из воркеров, часть из которых будут разруливать только потоки данных и будут узким местом.

          То что вы описали это абстрактная проблема, возможно ее можно решить изменив потоки данных, например сваливать все во временные очереди и ждать из них.

          Давайте конкретные примеры решать?
          • 0
            Я бы сказал, что это радость от правильно организованной распределённой архитектуры, а не только rabbit :) Я не агитирую конкретно за PHP, но это популярный язык, который понятен многим читателям.

            Конкретный пример: допустим, есть «облачный сервис» — таск-менеджер (или багтрекер). Пользователю по текущему тарифу разрешено создавать в нём не более 10 проектов, за каждый из которых он платит 100 рублей в месяц. От пользователя пришла входящая задача — создать новый проект. Исполняющий сервис ожидает сообщения одновременно из четырёх очередей:

            A) подтверждение, что пользователь валиден, авторизован, и вообще имеет право выполнять действие project.create;
            B) подтверждение, что у пользователя в данный момент не более 10 проектов;
            C) подтверждение, что у пользователя на балансе вообще есть сумма не менее 100 рублей (ну или подтверждение списания 100 рублей, если так понятнее);
            D) подтверждение от валидатора, что название проекта (поддомен) не содержит нецензурных или оскорбительных слов.

            Только по получению сообщений об успехе из всех четырёх очередей — исполнительный сервис может приступать к фактическому созданию проекта.

            Однако заметно, что достаточно получить как минимум два сообщения о НЕуспехе, например из «A» и «B», и результат из «C» и «D» нам становится неинтересен. Поскольку, напомню, процессорное время стоит денег — ожидание двух последних становится лишней работой, которая расходуется впустую — а значит впустую тратятся деньги. Уточню, что сообщения приходят в непредсказуемое время, и быстрее всех могут придти как A и B, так и например B и D, в зависимости от текущей нагрузки на тот или иной сегмент системы.

            Если же мы развернём эти проверки в последовательность действий (A-B-C-D), мы удлинним всю цепочку — и снова потратим процессорные ресурсы впустую в том случае, если например A, B, C будут успешны, а D — зафэйлится. Не говоря уже о том, что приложение может потерять в «отзывчивости», визуально станет медленнее работать. Можно было бы сказать «заранее думайте о порядке действий», но это неудобно и ухудшает архитектурную простоту системы.

            Вот конкретный пример :) Как будем решать?
            • 0
              А ты хитрец.
              Данные передются только средствами рэбита или можно хранить промежуточные значения в редисе например?
              Почему имеено 2 НЕуспеха? Вроде бы любой отказ должен фейлить всю операцию.
              • 0
                Именно 2 НЕуспеха здесь для иллюстрации ситуации, когда на одном неуспехе нельзя сделать окончательный вывод. Например, если у пользователя 99 рублей на счету, то можно его кредитовать на рубль (или уйти в минус на рубль), но всё равно предоставить услугу. Два фейла — это просто пример, но его тоже нужно учесть в проектировании.

                Если пойти на то, чтобы хранить состояние [между сообщениями], то вся задача теряет актуальность. Однако и здесь вопрос гораздо шире. Дело в том, что «серьёзные» распределённые системы вообще требуют иного подхода, нежели общепринятый подход к программированию как к последовательности команд в программе.

                В идеале, SOA-архитектура должна быть построена без хранимых состояний и разделяемой памяти. Почему этих штук следует избегать — вопрос слишком большой, чтобы описать его в рамках топика. Говоря кратко, надо строить систему из «тупых», легко заменяемых и дублируемых кирпичиков — конечных автоматов, а функционировать они должны по принципам архитектуры потока данных.

                Материалы для любопытных читателей:
                — общий обзор habrahabr.ru/post/122479/
                — для самостоятельного гугления: event-driven development, паттерн observer-notifier, SOA-архитектура, архитектура потока данных, основы философии erlang, конечные автоматы, принципы фон-неймана.

                Если мы разрешаем себе хранение состояний, тогда (казалось бы) вся задача теряет актуальность: храним промежуточные результаты в редисе и не паримся. Однако я могу совершенно точно сказать, что нельзя иметь хранимые состояния «на входе» в систему, где любой неавторизованный пользователь может нам этот редис засрать, навалив кучу (миллионы) входящих запросов. Это компромисс, на который надо идти очень осторожно.
                • 0
                  Значит есть
                  Клиент — который хочет результат сервиса
                  Валидаторы — независимые раз-|| шматки кода
                  Сервис — который делает, что-то после валидаторов

                  В рамках ребита я б сделал так:
                  Сервисов много они слушают все 1 очередь service_queue(раунд робин)
                  Валидоторов много они слушают каждый свой тип очереди validator_typeN_queue(раундробин)

                  Клиент пораждает 2 временных очереди replay_queue123 и validates_queue123.
                  Кажому валидатору в котором он заинтересован шлет validates_queue123 и запрос в соотвествующую validator_typeN_queue.
                  Сервису через service_queue отсылает список валидаторов, которые были запущены, и обе временных очереди replay_queue123 и validates_queue123.
                  Начинает ожидать результат в replay_queue123

                  Валидатор сделав свое дело отсылает результат во временную очередь validates_queue123.

                  Сервис получив список валидаторов и очередь validates_queue123 подписывается на нее и ждет резалты. В нем реализуется вся логика. Получили 2 фейла стриаем очередь validates_queue123 шлем в replay_queue123 провал.
                  • 0
                    Для большей гибкости и большей производительности можно из сервиса вычленить что-то типа summary_validator в котором останется логика по результатам валидаторов. И котороый при фейле будет слать киленту сразу феил в очередь replay_queue123, а если ок то — сервису, передавая очередь replay_queue123. Сервис, тогда будет всегда получать положительныей запросы и ничего ждать не будет.
          • 0
            а zeromq позволит реализовать логику в системе очередях?
            • 0
              В zeromq нету очередей. Есть queue device, но это только побаловаться. Там есть набор сокетов комибнируя их строишь себе любую топологию

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.