Pull to refresh

MassTransit. Сервисная шина для обмена сообщениями на основе сервера очередей RabbitMQ (MSMQ) для .Net

Reading time 8 min
Views 45K
MassTransit
Впервые я услышал о библиотеке MassTransit (MT) около года назад от экс-коллеги, зашедшего в наш офис для обмена опытом. Компания, в которую он устроился, использовала MT для уменьшения связности между модулями разрабатываемого ими сервиса и, поскольку высокая связность начала превращаться в проблему и для нас, чужой опыт оказался нам очень кстати. Помимо уменьшения связности путем перехода на событийную модель взаимодействия между модулями, MT пригодился нам и для распределения ресурсоемких задач между несколькими процессами.



Что такое MassTransit.


MassTransit — это реализация хорошо известного паттерна DataBus. Основная задача этого паттерна — организовать взаимодействие нескольких объектов, не подозревающих о существовании друг друга, через обмен сообщениями между ними. Библиотека была написана Dru Sellers и Chris Patterson как бесплатный аналог проекта NServiceBus, способный использовать в качестве транспорта сервера сообщений RabbitMQ или MSMQ на выбор. В своем проекте мы предпочли использовать RabbitMQ, поэтому здесь будет описан опыт работы и подводные камни, поджидающие при конфигурации шины на этом сервере очередей. Несмотря на то, что MassTransit является слоем абстракции над протоколом AMQP и разработчики старались скрыть детали реализации так, что знания об устройстве сервера очередей и протокола AMQP для использования библиотеки практически не требуются, для понимания статьи и успешного обхождения граблей при конфигурации шины общее представление об устройстве сервера RabbitMQ желательно иметь. Это плохая новость, но есть и хорошая — знаний необходим самый минимум, вполне будет достаточно прочесть первые четыре урока отсюда. Уроки небольшие и понятные, изучение основ работы с RabbitMQ не займет много времени, но способно принести много пользы. К слову, на хабре даже была переведена первая пара уроков. Урок один и урок два.

К делу.


Перейдем от теории к практике и попытаемся с помощью библиотеки MassTransit выполнить задачу, похожую по своему функционалу на первый пример из туториала к RabbitMQ. Мы напишем простое консольное приложение, в котором будут взаимодействовать два объекта Publisher и Subscriber. Publisher, при нажатии на любую клавишу, будет посылать в шину сообщение “KeyWasPressed” и код нажатой клавиши. Subscriber будет захватывать это сообщение из шины и выводить его на экран.

Для начала нам придется
1) Установить Erlang
2) Установить RabbitMQ
3) Установить MassTransit.RabbitMQ в тестовое приложение, выполнив команду PM> Install-Package MassTransit.RabbitMQ.

Перейдем к непосредственно к коду. Сообщения, отправляемые в шину, это обычные DTO объекты. В первую очередь нам потребуется создать класс самого сообщения, пересылаемого от издателя к подписчику. Назовем его KeyWasPressed.
public class KeyWasPressed
{
   //в сообщении будем передавать нажатую пользователем клавишу
   public ConsoleKey PressedKey { get; set; }
}


Теперь перейдем к написанию простеньких издателя (publisher) и подписчика (subscriber). Ключевым элементом библиотеки является ServiceBus. ServiceBus в MassTransit — это среда для обмена сообщениями, в которой за транспорт сообщений отвечает сервер очередей RabbitMQ (или MSMQ). Наши подписчик и издатель будут представлять собой объекты этого самого типа — ServiceBus.

Подписчик.

 IServiceBus subscriber = ServiceBusFactory.New(sbc =>
{
     //указываем что в качестве транспорта мы будем использовать rabbitMq
     sbc.UseRabbitMq();
     //указываем очередь из которой мы будем получать сообщения на которые мы подписались
     sbc.ReceiveFrom("rabbitmq://localhost/subscriber");
      //подписываемся на сообщение KeyWasPressed. При поступлении
     //соответствующего сообщения выводим его на экран
     sbc.Subscribe(subs => subs.Handler<KeyWasPressed>(msg =>
     Console.WriteLine("{0}{1}{2}{3}",Environment.NewLine,"Key  '", msg.PressedKey, "' was pressed")
     ));
});


Издатель.

IServiceBus publisher = ServiceBusFactory.New(sbc =>
{
    //указываем, что в качестве транспорта мы будем использовать rabbitMq
     sbc.UseRabbitMq();
     //указываем очередь, из которой мы будем получать сообщения             
     sbc.ReceiveFrom("rabbitmq://localhost/publisher");
});


MassTransit не делит подключенные к серверу сообщений экземпляры ServiceBus на издателей и подписчиков, через каждый подключенный экземпляр можно как публиковать так и обрабатывать сообщения. Поэтому нам всегда необходимо указывать очередь для получения сообщений, хотя иногда, как и в случае с объектом publisher, мы не собираемся ничего получать.
Теперь напишем бесконечный цикл, в котором каждая нажатая клавиша будет отправляться в шину.
while (true)
{
   publisher.Publish(new KeyWasPressed() { PressedKey = Console.ReadKey().Key });
}

Все готово, запускаем наше приложение и жмем на клавиши.


Что происходит на сервере очередей.


Давайте посмотрим — что происходит на сервере очередей при запуске нашего приложения. По умолчанию установщик RabbitMQ регистрирует RabbitMQ как службу Windows, так что мы всегда можем смотреть что присходит в данный момент времени на сервере очередей через утилиты командной строки. Но удобнее пользоваться веб плагином, также входящим в стандартную поставку дистрибутива.

Для его установки нам придется выполнить следующие несколько шагов
1) В командной строке перейдем в папку sbin из каталога установки сервера (например, %PROGRAMFILES%\RabbitMQ Server\rabbitmq_server_2.7.1\sbin\)

2) Далее выполним следующую команду.
rabbitmq-plugins.bat enable rabbitmq_management

3) Наконец, чтобы включить плагин управления мы должны переустановить службу RabbitMQ. Выполним следующую последовательность команд для установки службы:
rabbitmq-service.bat stop
rabbitmq-service.bat install
rabbitmq-service.bat start
Чтобы убедиться, что плагин управления сервером RabbitMQ установлен и запущен, запустим браузер и перейдем на следующую страницу (для версии 3.0 порт по умолчанию 55672). Если все прошло нормально, появится экран, аналогичный следующему:

Логин/пароль по умолчанию guest/guest. Заходим внутрь и нажимаем на список точек обмена.

Для каждого сообщения, на которое существует хотя бы один подписчик, MT создает точку обмена (exchange) с именем по шаблону Namespace:ClassName и привязывает к ней очереди подписчиков. В нашем приложении точка обмена только одна, с названием MT:KeyWasPressed и эта точка обмена привязана к одной очереди — subscriber.

В новых версиях MT очереди и точки обмена по умолчанию стали постоянными (durable), я так и не понял баг это или фича, но раньше при создании каждой подписки требовалось явно вызывать метод Permanent(), чтобы закрепить соответствующую ей точку обмена на сервере очередей. Устойчивые точки обмена и очереди удобны тем, что, если в момент публикации сообщения издателем подписчик отключен от сервера очередей, опубликованное сообщение все равно не пройдёт мимо подписчика, а встанет в очередь и тихонько дождется его (подписчика) подключения.

Добавление новых подписчиков.


Давайте изменим наше приложение добавив в него еще одного подписчика на сообщение KeyWasPressed. В отличии от первого он будет подписан на очередь под названием anothersubscriber и будет выводить на экран числовое представление каждой нажимаемой пользователем клавиши.

IServiceBus anotherSubscriber = ServiceBusFactory.New(sbc =>
{
   sbc.UseRabbitMq();
   sbc.ReceiveFrom("rabbitmq://localhost/anothersubscriber");
   sbc.Subscribe(subs => subs.Handler<KeyWasPressed>(msg => 
      Console.WriteLine("{0}{1}{2}{3}", Environment.NewLine, "Key with code  ", (int) msg.PressedKey, " was pressed")
   ));
});

Запустим приложение и понажимаем на клавиши.

Теперь при нажатии на каждую клавишу на экране появляется две строки — с цифровым и символьным кодом каждой нажатой клавиши. Если зайти в панель управления сервером очередей, можно увидеть, что теперь к точке обмена MT:KeyWasPressed привязано уже две очереди subscriber и anothersubscriber. И каждое полученное сообщение типа MT.KeyWasPressed сервер очередей RabbitMQ отправляет в обе очереди.

Распределение ресурсоемких задач.


Теперь давайте посмотрим, как с помощью связки MassTransit + RabbitMQ можно распределять ресурсоемкие задач между несколькими процессами.
Представим, что перед нами стоит задача создать сервис для конвертации видео файлов. Под эту задачу у нас есть два сервера. Опытным путем мы установили, что оптимальная нагрузка для сервера под номером один — это три паралельно конвертируемых видеофайла, для сервера номер два — количество одновременно конвертируемых видеофайлов не должно превышать пяти. Процесс конвертации мы, само собой, будем эмулировать. Представим, что у нас есть очередь под названием filesToConvert, в которую поступают файлы для конвертации. Каждый файл будет представлять у нас объект типа VideoFile.

public class VideoFile
{
  public int Num { get; set; }
  //Время, требующееся для конвертации файла в мс
  public int TimeToConvert { get; set; }
}

Подписчик, получив такое сообщение, по правилам игры должен будет заснуть на количество миллисекунд, заданных в поле TimeToConvert пришедшего сообщения.
Код, по легенде выполняющийся на первом сервере.
int firstServerFilesCount = 0;
IServiceBus firstServer = ServiceBusFactory.New(sbc =>
{
     sbc.UseRabbitMq();
     //указываем количество паралельных потоков, получающих сообщения с сервера очередей
     sbc.SetConcurrentConsumerLimit(3);
     sbc.Subscribe(subs => subs.Handler<VideoFile>(msg =>
       {
       firstServerFilesCount++; 
       Thread.Sleep(msg.TimeToConvert);
       Console.WriteLine("Сервер 1. {0}Файл {1} обработан за {2} мс. Потоков: {3} из 3. ThreadId - {4}", Environment.NewLine, msg.Num, msg.TimeToConvert, firstServerFilesCount, Thread.CurrentThread.ManagedThreadId); 
        firstServerFilesCount--;
       }));
       //prefetch=3. Сообщаем серверу очередей, что мы готовы разбирать до трех сообщений одновременно 
       sbc.ReceiveFrom("rabbitmq://localhost/filesToConvert?prefetch=3");
 });

По легенде мы на первом сервере решили ограничить число одновременно разбираемых сообщений тремя. Поэтому, мы вызываем метод SetConcurrentConsumerLimit с аргументом 3. Это означает, что при подключении объекта firstServer к серверу сообщений, MassTransit будет держать наготове пул из трех потоков, предназначенных для обработки сообщений с сервера. Но надо помнить, что распределением сообщений занимается RabbitMQ, и он никак не может знать того факта, что объект firstServer готов разбирать до трех сообщений одновременно. Передать ему эту информацию мы можем указав параметр prefetch в Uri, по которому firstServer подключается к серверу сообщений.

Код, по легенде выполняющийся на втором сервере.
int secondServerFilesCount = 0;
IServiceBus secondServer = ServiceBusFactory.New(sbc =>
{
   sbc.UseRabbitMq();
   //указываем количество паралельных потоков, получающих сообщения с сервера очередей
   sbc.SetConcurrentConsumerLimit(5);
   sbc.Subscribe(subs => subs.Handler<VideoFile>(msg =>
   {
     secondServerFilesCount++;
     Thread.Sleep(msg.TimeToConvert);
     Console.WriteLine("Сервер 2. {0}Файл {1} обработан за {2} мс. Потоков: {3} из 5. ThreadId - {4}", Environment.NewLine, msg.Num, msg.TimeToConvert, secondServerFilesCount, Thread.CurrentThread.ManagedThreadId);
     secondServerFilesCount--;
    }));
    //prefetch=3. Сообщаем серверу очередей, что мы готовы разбирать до пяти сообщений одновременно 
    sbc.ReceiveFrom("rabbitmq://localhost/filesToConvert?prefetch=5");
});

Отличия, как можно было догадаться, есть только в количестве потоков в пуле, призванных разбирать сообщения, и значении prefetch в Uri. Куда важнее отметить тот факт, что мы подключили secondServer к той же очереди, куда был подключен firstServer, тем самым создавая конкуренцию между подписчиками за сообщения, появляющиеся в этой очереди. Если объекты firstServer и secondServer будут подключены к разным очередям, то мы столкнемся с тем, что каждый файл будет сконвертирован дважды, по разу на каждом сервере.

Теперь напишем код, наполняющим очередь filesToConvert сотней “видеофайлов”, с заданным рандомом временем конвертации.
Random rnd = new Random();
for (int i = 1; i <= 100; i++)
{
  publisher.Publish(new VideoFile() {Num = i, TimeToConvert = rnd.Next(100, 5000)});
}

Запускаем и убеждаемся, что наши сервера-подписчики работают паралельно, используя назначенное нами число потоков.


Какие еще возможности может предложить нам MassTransit.


В рамках одной статьи рассмотреть все возможности, предлагаемые библиотекой MassTransit, невозможно. Но можно перечислить (чем я и займусь).

  • Sagas. Механизм для координации распределенных процессов


  • Scheduling. Интеграция с библиотекой Quartz.net позволяет отправлять сообщения в очереди по расписанию






  • Encryption. Шифрование отправляемых сообщений. Для шифрования используется блочный шифр Rijndael


  • Unit Testability. Для целей тестирования, MassTransit может использовать встроенный транспорт (Loopback), не требующий внешних MQ серверов


Код примеров из статьи можно взять здесь.
Tags:
Hubs:
+22
Comments 9
Comments Comments 9

Articles