Pull to refresh

Использование RabbitMQ вместе с MonsterMQ часть 5

Reading time4 min
Views1.3K
В предыдущей части мы улучшили нашу систему логгирования. Вместо использования обменника типа fanout мы использовали обменник типа direct, позволившему нам выборочно принимать сообщения. Не смотря на улучшения, наша система до сих пор имеет ограничения, например мы не можем принимать сообщения основываясь на нескольких критериях. Например в нашей системе мы могли бы хотеть перенаправлять сообщения основываясь не только на уровне строгости сообщения, но и на источнике сообщения. Например как в unix-инструменте syslog, который перенаправляет сообщения не только в зависимости от уровня строгости (info/warn/crit..), но и в зависимости от источника (auth/cron/kern...). Это может дать нам дополнительную гибкость, например мы сможем получать только критические сообщения от 'cron', но также все сообщения от 'kern'. Для реализации такой системы нам предстоит познакомиться с более сложным типом обменника — topic.

Оглавление всех частей:
Часть 1
Часть 2
Часть 3
Часть 4
Часть 5

Обменник типа topic


Сообщения отправляемые обменнику типа topic должны иметь ключ маршрутизации, представляющий из себя набор слов разделённых точками. Слова могут быть любыми, но обычно они являются связанными с какой-нибудь особенностью сообщения. Вот примеры некоторых допустимых ключей маршрутизации: «stock.usd.nyse», «nyse.vmw», «quick.orange.rabbit». Ключ ограничен размером в 255 байт.

Ключ связывания должен быть указан похожим образом. Маршрутизация сообщений в обменнике типа topic похожа на маршрутизацию обменника типа direct — сообщение направляется в очередь с ключом связывания совпадающим с ключом маршрутизации. Однако есть два отличия:

  1. *(звёздочка) в ключе связывания может быть заменена только одним словом
  2. #(решётка) в ключе связывания может быть заменена нулём или более слов

Это можно проиллюстрировать следующим изображением:

image
(изображение взято с официального сайта RabbitMQ)

В этом примере мы будем отсылать сообщения которые представляют животных. Ключи маршрутизации сообщений состоят из трёх слов(и двух точек). Первое слово представляет скорость, второе — цвет, третье — вид.

В нашем примере с обменником связаны две очереди: Q1 с ключом связывания "*.orange.*" и Q2 с двумя ключами связывания "*.*.rabbit" и «lazy.#».

Эти связи будут означать следующую маршрутизацию:

  • Q1 заинтересована во всех оранжевых (orange) животных
  • Q2 заинтересована во всех кроликах (rabbit) и ленивых (lazy) животных

Сообщение с ключом маршрутизации «quick.orange.rabbit» будет доставлено в обе очереди. Сообщение с ключом «lazy.orange.elephant» также будет доставлено в обе.

Однако сообщение с ключом «quick.orange.fox» попадёт только в первую очередь, а с ключом «lazy.brown.fox» только во вторую. «lazy.pink.rabbit» будет доставлено во вторую очередь один раз, не смотря на то, что ключ маршрутизации совпадает с обоими ключами связывания.

«quick.brown.fox» не попадёт ни в одну очередь, так как ключ маршрутизации не совпадает ни с одним из ключей связывания. Если попытаться отослать сообщения с количеством слов в ключе маршрутизации меньшим или большим (например «orange» или «quick.orange.male.rabbit») чем в ключе связывание сообщение будет отброшено.

Обменник типа topic может вести себе как обменник типа fanout если в качестве ключа связывания указать #. Или как direct если в ключе связывания не указывать ни * ни #, а указать просто какое-нибудь слово.

Собираем код вместе


Будем использовать обменник типа topic для нашей системы логгирования. Начнём с предположения о том что наши ключи маршрутизации сообщений будут иметь вид «источник.строгость». Код будет почти такой же как в предыдущей части, вот send.php:

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newTopicExchange('topic-logs');

   $routingKey = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';

   $message = implode(' ', array_slice($argv, 2));
   $message = empty($message) ? "Hello World!" : $message;

   $producer->publish($message, $routingKey, 'topic-logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Код worker-1.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();

   $consumer->newTopicExchange('topic-logs');

   $bindingKeys = array_slice($argv, 1);
   if (empty($bindingKeys)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
      exit(1);
   }

   foreach ($bindingKeys as $key) {
      $producer->queue('queue-1')->bind('topic-logs', $key);
   }

   $consumer->consume('queue-1');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Код worker-2.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();

   $consumer->newTopicExchange('topic-logs');

   $bindingKeys = array_slice($argv, 1);
   if (empty($bindingKeys)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
      exit(1);
   }

   foreach ($bindingKeys as $key) {
      $producer->queue('queue-2')->bind('topic-logs', $key);
   }

   $consumer->consume('queue-2');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Чтобы получать только critical сообщения первым воркером вызовите

php worker-1.php "*.critical"

Чтобы связать очередь, которую использует второй воркер, с обменником двумя ключами связывания вызовите:

php worker-2.php "kern.*" "*.critical"

Для отправки сообщения выполните что-нибудь вроде:

php send.php "kern.critical" "A critical kernel error"

Поэкспериментируйте с этими программами.
Tags:
Hubs:
Total votes 6: ↑6 and ↓0+6
Comments0

Articles