Pull to refresh

Symfony + RabbitMQ Быстрый старт для молодых

Reading time 9 min
Views 58K
Всем доброго времени суток, друзья.

Сегодня захотелось поговорить о том, как можно работать с RabbitMQ в Symfony и совсем чуть-чуть о некоторых подводных комнях. В конце я напишу парочку интересных моментов о кролике (рус. перевод «rabbit») для тех, кто совсем в танке.

Я не буду рассказывать про сам RabbitMQ, поэтому если вы пока и этого не знаете, почитайте следующие переводы:

Статья 1
Статья 2
Статья 3
Статья 4
Статья 5

Не бойтесь примеров на перле или пайтоне — это не страшно, все достаточно понятно из исходного кода.

+ Все достаточно подробно описано, когда я читал это в свое время, достаточно было интерпретировать код мысленно, чтобы понять как что и зачем.

Если вы уже знаете, что такое консумер и почему в нем нужно делать $em->clear() + gc_collect_cycles, а после закрывать соединение с базой данных, то, скорее всего, вы ничего нового для себя не узнаете. Статья скорее для тех, кто не хочет разбираться с AMQP протоколом, но которым нужно прямо сейчас применять очереди и выбор почему-то бездумно пал на RabbitMQ, а не тот же легковесный beanstalkd.

Если же у вас микросервисная архитектура и вы ждете, что я расскажу вам как сварить коммуникацию между компонентами через AMQP, как красиво делать RPC, то я сам чего-то подобного очень давно жду на Хабре…

Перед нами задача: отправлять сообщения на Email в очереди, используя RabbitMQ, а так же обеспечить отказоустойчивость: если почтовый сервер ответил таймаутом или что-то ещё сломалось — нужно попробовать выполнить задачу через 30 секунд ещё раз.

Итак, устанавливаем наш бандл.

Я слишком ленив, чтобы описывать вам, как нужно копировать composer require команду и строку в AppKernel.

Я очень надеюсь, что вы сами это сделали и готовы приступать к конфигурированию нашего бандла.

Если нет, то вот вам полный гайд для самых маленьких:
Установка RabbitMQ:

echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management

Теперь вы можете открыть ваш localhost:15672 под учеткой: guest guest и увидеть много прикольных вещей, в которых скоро вы будете разбираться и чувствовать себя мужиком.

Теперь устанавливаем сам бандл:

composer require php-amqplib/rabbitmq-bundle

И регистрируем его в нашем приложении:

// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
    );
}

Вот и всё.

Конфигурация бандла для нас:

old_sound_rabbit_mq:
    connections:
        default:
            host:     'localhost'
            port:     5672
            user:     'guest'
            password: 'guest'
            vhost:    '/'
            lazy:     false
            connection_timeout: 3
            read_write_timeout: 3
            keepalive: false
            heartbeat: 0
            use_socket: true
    producers:
        send_email:
            connection:       default
            exchange_options: { name: 'notification.v1.send_email', type: direct }

    consumers:
        send_email:
            connection:       default
            exchange_options: { name: 'notification.v1.send_email', type: direct }
            queue_options:    { name: 'notification.v1.send_email' }
            callback:         app.consumer.mail_sender

Здесь огромное внимание следует обратить на producers и consumers. Если очень коротко и просто: producer — это то, что отправляет сообщения через RabbitMQ в consumer, а consumer в свою очередь — та вещь, которая получает и обрабатывает эти сообщения. Здесь же exchange_options — опции для обменника (вы же прочитали статьи про rabbitmq, которые были в начале статьи?), queue_options — опции для очереди (аналогично). Так же стоит обратить внимание на callback в consumer — здесь указывается ID сервиса, который расширяет ConsumerInterface (execute метод с аргументом сообщения).

Т.к. пока что у вас его нету, при запуске приложения или компиляции контейнера мы получим какое-то DI исключение, что сервис не найден, но мы его запрашиваем. Поэтому давайте создавать наш сервис:

#app/config/services.yml
services:
    app.consumer.mail_sender:
        class: AppBundle\Consumer\MailSenderConsumer 

И сам класс:

namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Class NotificationConsumer
 */
class MailSenderConsumer implements ConsumerInterface
{
    /**
     * @var AMQPMessage $msg
     * @return void
     */
    public function execute(AMQPMessage $msg)
    {
        echo 'Ну тут типа сообщение пытаюсь отправить: '.$msg->getBody().PHP_EOL;
        echo 'Отправлено успешно!...';
    }
}

Ну вы же не обиделись, что я не включил в статью как работать со SwiftMailer? :) Нам важно, чтобы сюда асинхронно доставлялась строка через очередь сообщений, то, как мы будем обрабатывать эту строку — дело наше. Почта — всего лишь пример кейса.

Как же нам передать строку в наш консьюмер? Для этого давайте создадим тестовую команду:

namespace AppBundle\Command;

use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class TestConsumerCommand extends ContainerAwareCommand
{
    /**
     * {@inheritdoc}
     */
    protected function configure()
    {
        $this
            ->setName('app:test-consumer')
            ->setDescription('Hello PhpStorm');
    }

    /**
     * {@inheritdoc}
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('Сообщенька для отправки на мыло...');
    }

Снова извиняюсь, что не сделал для вас контроллер с красивой формочкой — я слишком ленив для этого. Да и слишком уж излишне. А я экономный лентяй и люблю рисовать, мечтаю немного в сторону теорий и архитектуры приложений. Отвлеклись.

Теперь запускаем наш consumer и приказываем ему ждать сообщения из RabbitMQ:

bin/console rabbitmq:consumer send_email -vvv

И отправим ему сообщение из нашей тестовой команды:

bin/console app:test-consumer

И вот сейчас, в процессе rabbitmq:consumer, мы можем увидеть наше сообщение! И что псевдо отправка завершилась успехом.

А теперь давайте посмотрим, как можно реализовать отложенную обработку сообщений в случае ошибок. Я не буду использовать плагин RabbitMQ для отложенных сообщений. Мы будем достигать этого путем создания новой очереди, в которой укажем время жизни сообщений 30сек и установим настройку: после смерти — перекладываться в основную очередь.

Достаточно лишь добавить новый producer:

producers:
        send_email:
            connection:       default
            exchange_options: { name: 'notification.v1.send_email', type: direct }
            
        delayed_send_email:
            connection: default
            exchange_options:
                name: 'notification.v1.send_email_delayed_30000'
                type: direct
            queue_options:
                name: 'notification.v1.send_email_delayed_30000'
                arguments:
                    x-message-ttl: ['I', 30000]
                    x-dead-letter-exchange: ['S', 'notification.v1.send_email']

Теперь давайте изменим логику консумера:

namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Class NotificationConsumer
 */
class MailSenderConsumer implements ConsumerInterface
{
    private $delayedProducer;

    /**
     * MailSenderConsumer constructor.
     * @param ProducerInterface $delayedProducer
     */
    public function __construct(ProducerInterface $delayedProducer)
    {
        $this->delayedProducer = $delayedProducer;
    }

    /**
     * @var AMQPMessage $msg
     * @return void
     */
    public function execute(AMQPMessage $msg)
    {
        $body = $msg->getBody();

        echo 'Ну тут типа сообщение отправляю '.$body.' ...'.PHP_EOL;

        try {
            if ($body == 'bad') {
                throw new \Exception();
            }

            echo 'Успешно отправлено...'.PHP_EOL;
        } catch (\Exception $exception) {
            echo 'ERROR'.PHP_EOL;
            $this->delayedProducer->publish($body);
        }
    }
}

А вообще для вывода полезно использовать LoggerInterface — и красиво и масштабируется.
Но нам же лень и мы не хотим создавать дополнительные «думки», верно? Просто знайте.

Теперь мы должны прокинуть producer для отложенной очереди:

#app/config/services.yml
services:
    app.consumer.mail_sender:
        class: AppBundle\Consumer\MailSenderConsumer
        arguments: ['@old_sound_rabbit_mq.delayed_send_email_producer']

И изменим команду:

namespace AppBundle\Command;

use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class TestConsumerCommand extends ContainerAwareCommand
{
    /**
     * {@inheritdoc}
     */
    protected function configure()
    {
        $this
            ->setName('app:test-consumer')
            ->setDescription('Hello PhpStorm');
    }

    /**
     * {@inheritdoc}
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('Ура, сообщенька...');
        $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('bad');
    }
}

Теперь вместе с нормальным сообщением она будет отправлять и плохое сообщение.

Если мы запустим, то увидим следующий вывод:

Ну тут типа сообщение отправляю Ура, сообщенька...
Успешно отправлено...

Ну тут типа сообщение отправляю bad...
ERROR

Спустя 30 секунд еще раз появится сообщение об обработке:

Ну тут типа сообщение отправляю bad...
ERROR

И так бесконечно. Логику максимальных попыток и т.п. продумывайте сами. Далее я дам пару советов для вашего прода и некоторых фич.

Теперь советы для вашего прода:

1) Не отходя от темы с максимальными попытками обработки: знайте на все 102% все возможные исключения контекста с которым вы работаете! Умейте представлять, когда повторная обработка требуется, а когда нет, иначе — привет мусорке из логов и отсутствия понимания что происходит. В случае, если битая задача будет крутится в RabbitMQ, с реальными данными, нормальными задачами, вы вряд ли сможете выкинуть сломанные задачи без костылей, не обновляя код консьюмера и не перезапуская его. Поэтому продумывайте это сразу. В данном случае правильным было бы ловить только лишь SMTPTimeOutException какой-нибудь.

Так же с такой моделью важно понимать, что: на 1 очередь — одна «глобальная ответственность смены состояния чего либо». Не стоит давать слишком много рискованных задач своему воркеру. Если рассмотреть вариант с 1С, то проблема может быть в следующем: допустим при успешном или неуспешном изменении\добавлении товара в 1С мы записываем в базу данных что-нибудь, например, дату последней удачной синхронизации или неудачной. Т.е. тут обновляются сразу 2 базы данных: бд 1С и бд вашего приложения. Допустим в 1С все успешно создалось, далее идет обновление в базе данных поля «дата последней удачной синхронизации» — хоп, вылезла ошибочка, опять же, сервер бд не отвечает — задача откладывается на «потом» и повторяется, пока база данных не начнет отвечать. И при этом каждый раз «подзадача» связанная с созданием сущности в 1С будет успешно выполняться, каждый раз при неудачной попытке записи в базу данных сайта, что неправильно.

2) Прочитайте про durable, раз уж мы с вами используем RabbitMQ. P.S: это заводится как true\false флаг «durable» в конфиге бандла, конкретно — в exchange_options и queue_options

3) Всю свою жизнь закрывайте соединение к базе данных после выполнения работы программы. А так же запускайте очистку EM и после сборщик мусора для чистки ссылок. Т.е. в конце концов наш консьюмер должен выглядеть как-то так:

class MailSenderConsumer implements ConsumerInterface
{
    private $delayedProducer;

    private $entityManager;

    /**
     * MailSenderConsumer constructor.
     * @param ProducerInterface      $delayedProducer
     * @param EntityManagerInterface $entityManager
     */
    public function __construct(ProducerInterface $delayedProducer, EntityManagerInterface $entityManager)
    {
        $this->delayedProducer = $delayedProducer;
        $this->entityManager = $entityManager;
        
        gc_enable();
    }

    /**
     * @var AMQPMessage $msg
     * @return void
     */
    public function execute(AMQPMessage $msg)
    {
        $body = $msg->getBody();

        echo 'Ну тут типа сообщение отправляю '.$body.' ...'.PHP_EOL;

        try {
            if ($body == 'bad') {
                throw new \Exception();
            }

            echo 'Успешно отправлено...'.PHP_EOL;
        } catch (\Exception $exception) {
            echo 'ERROR'.PHP_EOL;
            $this->delayedProducer->publish($body);
        }
        
        $this->entityManager->clear();
        $this->entityManager->getConnection()->close();

        gc_collect_cycles();
    }
}

Консьюмер работает как демон, поэтому постоянно копить в нем ссылки и держать соединение с бд — это плохо. В случае с MySQL вы получите MySQL server has gone away.

4) Много думайте, почему ваша модель отложенных сообщений может неожиданно убить ваш бизнес. Например у нас есть механизм, который при изменении товара в админке заливает эти изменения через очередь в 1С. Теперь представим ситуацию: администратор меняет товар -> создается задача #1 на попытку изменить те же данные в базе 1С. Сервер 1С не отвечает, поэтому задачка просто перекладывается постоянно, пока все не заработает. За это время администратор решил еще кое-что подправить в том же товаре, что он и делает. Регистрируется задача #2.

А теперь представьте ситуацию, когда поочередно выполняются и откладываются задачи #1 и #2.

Что если 1С заработает к моменту выполнения задачи #2? Задача выполнится и зальёт последние изменения. Далее пойдет в ход задача #1 и затрет собой стабильные изменения :)

Выход: отправляем timestamp в качестве version, и, если задача «из прошлого» — выкидываем её.

5) Идешь в асинхронность — прочитай про многие архитектурные проблемы, а также race condition, несогласованность консумеров на разных машинах и прочее.

6) Пишите версии вашим очередям… Ух как помогает на реальном проде. В принципе мы так и сделали в этом примере.

7) Возможно тебе не нужен RabbitMQ и целый AMQP протокол. Посмотри в сторону beanstalkd.

8) Запускайте консумеры и всякое прочее демоническое на php через supervisor и подключите полное логирование падения процессов в нём. У него так же есть web интерфейс для управления всем этим делом, что так же очень удобно. Проблемы будут всегда.
Tags:
Hubs:
If this publication inspired you and you want to support the author, do not hesitate to click on the button
+8
Comments 13
Comments Comments 13

Articles