Пользователь
0,0
рейтинг
25 июля 2013 в 21:56

Разработка → Простая и масштабируемая подписка на события с WebSockets, STOMP, SockJS и Spring Framework 4.0 перевод

Первый промежуточный релиз Spring Framework 4.0 M1 предоставил поддержку SockJS на стороне сервера — лучшая и наиболее полная альтернативная реализация WebSocket. Вам потребуется этот резервный вариант в браузерах, не поддерживающих WebSocket и в ситуациях когда прокси препятствует их использованию. Проще говоря, SockJS позволяет строить WebSocket-приложения уже сегодня, которые, ко всему прочему, умеют прозрачно переходить на резервные возможности.

Но даже с резервными вариантами проблемы остаются. Сокет — довольно низкоуровневая абстракция и подавляющее большинство веб-приложений сегодня не адаптированы для сокетов. Вот почему протокол WebSocket определяет механизм под-протоколов, который, по существу, позволяет (и поощряет) использование протоколов более высокого уровня над WebSocket, аналогично тому как мы используем HTTP поверх TCP.

Второй промежуточный релиз Spring Framework 4.0 M2 позволяет использовать высокоуровневые протоколы обмена сообщениями поверх WebSocket. Для демонстрации этого, мы разберем пример приложения.

Приложение «Stock Portfolio»


Это тестовое приложение, доступное на Github, позволяет пользователям загружать портфель позиций, покупать и продавать акции; рассылает котировки цен и отображает обновления позиций. Это достаточно простое приложение. Тем не менее, оно покрывает ряд общих задач, которые могут возникнуть в браузерных приложениях для обмена сообщениями.


Как именно мы реализуем приложение, подобное этому? В HTTP и REST мы привыкли полагаться на URL-адреса и методы HTTP, выражающие что нужно сделать. Но здесь только сокет и куча сообщений. Как сообщить кому предназначено сообщение и что оно значит?


Браузер и сервер должны договориться об общем формате сообщений, прежде чем эта семантика сможет быть выражена. Существует несколько протоколов, которые могут помочь. Мы выбрали STOMP в этом релизе, благодаря своей простоте и широкой поддержке.

Simple/Streaming Text-Oriented Messaging Protocol (STOMP)


STOMP — протокол обмена сообщениями, созданный предельно простым. Основан на фреймах по образцу HTTP. Фрейм состоит из команды, необязательных заголовков и необязательного тела.

Например, приложение Stock Portfolio может рассылать котировки, клиент пошлет фрейм SUBSCRIBE, где заголовок «destination» показывает на что конкретно он подписывается:
SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*

Как только котировки акций становятся доступными, сервер отправляет фрейм MESSAGE с соответствующим «destination» и идентификатором подписки, а также заголовок «content-type» и тело:
MESSAGE
subscription:sub-1
message-id:wm2si1tj-4
content-type: application/json
destination:/topic/stocks.PRICE.STOCK.NASDAQ.EMC
 
{\"ticker\":\"EMC\",\"price\":24.19}

Чтобы объединить это все в браузере мы используем stomp.js и клиент SockJS:
var socket = new SockJS('/spring-websocket-portfolio/portfolio');
var client = Stomp.over(socket);
 
var onConnect = function() {
  client.subscribe("/topic/price.stock.*", function(message) {
      // обработка квоты
  });
};
client.connect('guest', 'guest', onConnect);

Это серьезное продвижение! У нас есть формат стандартного сообщения и поддержка на стороне клиента.

Теперь мы можем двигаться далее — на сторону сервера.

Message-Broker


Message-broker — это типичное серверное решение, где сообщения пересылаются между традиционными брокерами, такими как RabbitMQ, ActiveMQ и т.п. Большинство, если не все, поддерживают STOMP over TCP, некоторые — WebSocket, но дальше всех продвинулся RabbitMQ, он, ко всему прочему, работает и с SockJS.

Наша архитектура будет выглядеть следующим образом:


Это надежное и масштабируемое решение, но возможно не самое оптимальное для рассматриваемой задачи. Message-broker обычно используется в рамках предприятия. Выставлять его напрямую в сеть не тянет на идеальное решение.

Если мы что-то усвоили из подхода REST, так это то, что не стоит раскрывать детали реализации нашей системы, устройство базы данных или модель предметной области.

Кроме того, вы как Java-разработчик, вероятно хотите настроить права доступа, валидации, добавить прикладную логику. В подходе с message-broker сервер приложений находится за брокером, что существенно отличается от того к чему привыкло большинство веб-разработчиков.

Вот почему популярны библиотеки вроде socket.io. Она проста и она нацелена на потребности веб-приложений. С другой стороны мы не должны игнорировать возможности message-broker в плане обработки сообщений, они в этом действительно хороши — непростая дилемма. Возьмем лучшее от обоих.

Приложение + Message-broker


Другой подход заключается в том, чтобы сделать приложение, обрабатывающее входящие сообщения и выступающее в качестве посредника между веб-клиентами и брокером. Сообщения от клиентов к брокеру можно отправлять через приложение, обратное сообщение также пройдет через приложение к клиенту. Это позволяет приложению определить тип сообщения и заголовок «destination», после чего решить обрабатывать самостоятельно или перенаправить брокеру.


Мы выбрали этот подход. Чтобы проиллюстрировать его лучше вот несколько сценариев.

Загрузка портфеля позиций
  • Клиент запрашивает портфель позиций
  • Приложение обрабатывает запрос путем загрузки и возврата данных для подписки
  • Message-broker не участвует в этом взаимодействии

Подписка на котировки акций
  • Клиент отправляет запрос на подписку
  • Приложение передает сообщение брокеру
  • Message-broker передает сообщение всем подписанным клиентам

Получение котировок акций
  • QuoteService посылает брокеру сообщение с котировками акций
  • Message-broker передает сообщение всем подписанным клиентам

Проведение сделки
  • Клиент отправляет торговый запрос
  • Приложение не обрабатывает его, все сделки проходят через TradeService
  • Message-broker не участвует в этом взаимодействии

Получение обновленных позиций
  • TradeService отправляет сообщение об обновлении позицию в очередь брокера
  • Message-broker отправляет обновление позиций клиенту
  • Отправка сообщений определенному пользователю рассматривается подробно ниже

Строго говоря, использование брокера сообщений не является обязательным. Мы предлагаем «простую» альтернативу, работающую из коробки. Однако использование брокера сообщений рекомендуется для обеспечения масштабируемости и при развертывании с несколькими серверами приложений.

Фрагменты кода


Давайте рассмотрим некоторые примеры серверного и клиентского кода.

Это запрос портфеля позиций из portfolio.js:
stompClient.subscribe("/app/positions", function(message) {
  self.portfolio().loadPositions(JSON.parse(message.body));
});

На стороне сервера PortfolioController обнаруживает запрос и возвращает портфель позиций, демонстрируя взаимодействие запрос-ответ, которое очень часто используется в веб-приложениях. Поскольку мы используем Spring Security для защиты HTTP-запросов, включая один начальный, относящийся к рукопожатию с WebSocket, как аргумент метода в примере ниже, передается principal пользователя Spring Security, полученный из HttpServletRequest:
@Controller
public class PortfolioController {
 
  // ...
 
  @SubscribeEvent("/app/positions")
  public List<PortfolioPosition> getPortfolios(Principal principal) {
    String user = principal.getName();
    Portfolio portfolio = this.portfolioService.findPortfolio(user);
    return portfolio.getPositions();
  }
}

Здесь portfolio.js отправляет торговый запрос:
stompClient.send("/app/trade", {}, JSON.stringify(trade));

На стороне сервера PortfolioController передает его на исполнение:
@Controller
public class PortfolioController {
 
  // ...
 
  @MessageMapping(value="/app/trade")
  public void executeTrade(Trade trade, Principal principal) {
    trade.setUsername(principal.getName());
    this.tradeService.executeTrade(trade);
  }
}

PortfolioController также может обрабатывать неожиданные исключительные ситуации, отсылая сообщение пользователю:
@Controller
public class PortfolioController {
 
  // ...
 
  @MessageExceptionHandler
  @ReplyToUser(value="/queue/errors")
  public String handleException(Throwable exception) {
    return exception.getMessage();
  }
}

Что насчет рассылки сообщений всем подписанным клиентам? Так QuoteService рассылает квоты:
@Service
public class QuoteService {
 
  private final MessageSendingOperations<String> messagingTemplate;
 
  @Scheduled(fixedDelay=1000)
  public void sendQuotes() {
    for (Quote quote : this.quoteGenerator.generateQuotes()) {
      String destination = "/topic/price.stock." + quote.getTicker();
      this.messagingTemplate.convertAndSend(destination, quote);
    }
  }
}

А так TradeService отправляет обновление позиций после того, как выполнил сделку:
@Service
public class TradeService {
 
  // ...
 
  @Scheduled(fixedDelay=1500)
  public void sendTradeNotifications() {
    for (TradeResult tr : this.tradeResults) {
      String queue = "/queue/position-updates";
      this.messagingTemplate.convertAndSendToUser(tr.user, queue, tr.position);
    }
  }
}

И на всякий случай если вам интересно… да, PortfolioController может содержать Spring MVC методы (например, @RequestMapping), как предложено в этом тикете разработчиком, который ранее строил онлайн-игры:
Цитата
Yes, having [message] mappings and spring mvc mappings consolidated would be nice. There is no reason why they can't be unified.

И так же как QuoteService и TradeService, методы Spring MVC контроллера тоже могут публиковать сообщения.

Поддержка обмена сообщениями в Spring-приложениях


Долгое время Spring Integration предоставляет первоклассные абстракции для известных Enterprise Integration паттернов, а также легковесный обмен сообщениями. Во время работы над этим релизом мы поняли, что последний был именно тем, на что нам необходимо было опираться.

В результате, и я рад это объявить, мы переместили набор Spring Integration типов в новый модуль Spring Framework, который предсказуемо называется spring-messaging. Помимо основных абстракции, таких как Message, MessageChannel, MessageHandler и др., модуль содержит все аннотации и классы для поддержки новых функций, описанных в этом посте.

Теперь, имея это в виду, мы можем взглянуть на диаграмму внутренней структуры приложения Stock Portfolio:


StompWebSocketHandler помещает входящие клиентские сообщения в канал «dispatch». У этого канала есть 3 подписчика. Первый отвечает за аннотированные методы, второй — пересылает сообщения STOMP-брокеру, в то время как третий обрабатывает сообщения отдельных пользователей путем преобразования адреса назначения в очередь уникальных имен, на которые клиент был подписан.

По-умолчанию приложение работает с «простым» брокером, предоставленным для знакомства. Как поясняется в README, вы можете выбрать между «простым» и полнофункциональный брокером путем активации и деактивации профилей.


Другое возможное изменение в конфигурации заключается в переходе от Executor к реализации MessageChannel, основанной на Reactor. Проект Reactor недавно выпустил первый промежуточный релиз и также используется для управления TCP-соединениями между приложением и Message-broker.

Здесь вы можете увидеть полную конфигурацию приложения, которая также включает новую конфигурацию Spring Security. Вам также может быть интересно узнать об улучшенной поддержке конфигов в STS.

Посылка сообщений отдельным пользователям


Легко понять, как можно разослать сообщения всем подписанным клиентам — просто послать сообщение в канал. Чуть сложнее дело обстоит с отсылкой сообщения конкретному клиенту. Например, вы поймали исключение и хотите отправить сообщение об ошибке. Или вы получили подтверждение о завершении сделки и хотите обрадовать пользователя.

В традиционных приложениях для обмена сообщениями эта задача обычно решается созданием временной очереди и задания заголовка «reply-to» во всех сообщениях, подразумевающих ответ. Это работает, но выглядит довольно громоздким. Клиент не должен забывать выставлять необходимые заголовки на всех применяемых сообщениях и серверному приложению может потребоваться отслеживать их. Иногда такая информация может быть недоступна (например, если используется HTTP POST вместо обмена сообщениями).

Для поддержки этого требования, мы отправляем уникальный суффикс очереди каждому подключенному клиенту. Суффикс затем может быть добавлен к идентификатору для создания уникального имени очереди:
client.connect('guest', 'guest', function(frame) {
 
  var suffix = frame.headers['queue-suffix'];
 
  client.subscribe("/queue/error" + suffix, function(msg) {
    // обработка ошибок
  });
 
  client.subscribe("/queue/position-updates" + suffix, function(msg) {
    // обработка обновления позиций
  });
 
});

Затем на стороне сервера к методу, помеченному @MessageExceptionHandler (или любому другой методу обработки сообщений) можно добавить аннотацию @ReplyToUser и отправить возвращаемое значение как сообщение:
@MessageExceptionHandler
@ReplyToUser(value="/queue/errors")
public String handleException(Throwable exception) {
  // ...
}

Все остальные классы, такие как TradeService, могут использовать messagingTemplate для достижения того же:
String user = "fabrice";
String queue = "/queue/position-updates";
this.messagingTemplate.convertAndSendToUser(user, queue, position);

В обоих случаях внутри мы располагаем суффиксом пользовательской очереди (через сконфигурированный UserQueueSuffixResolver) для того чтобы восстановить корректное имя очереди. На данный момент существует только одна реализация простого резолвера. Однако, довольно легко добавить реализацию, основанную на Redis, которая будет поддерживать функцию независимо от того, подключен ли пользователь или другой сервер приложений.

Выводы


Надеюсь, что это было полезным введением для новой функциональности. Вместо того, чтобы сделать пост еще длиннее, я советую изучить пример и попробуйте применить это в приложениях, которые вы пишете или намерены написать. Это идеальное время для обратной связи, т.к. как мы планируем релиз-кандидат в начале сентября.
Перевод: Rossen Stoyanchev
Александр @Lucyfer
карма
32,0
рейтинг 0,0
Пользователь
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Реклама

Самое читаемое Разработка

Комментарии (3)

  • 0
    Интересно, я до этого Atmosphere использовал. Во множестве подобных фреймворков почему-то отсутствуют нормальные средства отлавливания событий life-cycle для коннекшна. То есть такую простую вещь как обновляемый список онлайн пользователей приходится реализовывать через одно место. Дело усложняется, если необходимо добавить прозрачный реконнект.
    • 0
      В Spring это более-менее просто реализуется:
      Скрытый текст
      // допустим храним в памяти
      interface SomeInMemoryStorage {
          Collection<Principal> getActivePrincipals();
          void connectPrincipal(Principal principal);
          void disconnectPrincipal(Principal principal);
      }
      
      // какой-то storage
      @Service
      class SomeInMemoryStorageImpl implements SomeInMemoryStorage {
      
          @Override
          public void connectPrincipal(Principal principal) {
              // восстанавливаем предыдущее состояние
          }
      
          @Override
          public Collection<Principal> getActivePrincipals() {
              return new ArrayList<>(0);
          }
      
          @Override
          public void disconnectPrincipal(Principal principal) {
              // можем сохранить состояние
          }
      }
      
      public class LifecycleStompWebSocketHandler extends StompWebSocketHandler {
      
          private static final Log logger = LogFactory.getLog(LifecycleStompWebSocketHandler.class);
      
          private SomeInMemoryStorage someInMemoryStorage;
      
          public LifecycleStompWebSocketHandler(SomeInMemoryStorage storage, MessageChannel dispatchChannel) {
              super(dispatchChannel);
              this.someInMemoryStorage = storage;
          }
      
          // переопределяем пару методов
      
          @Override
          public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
              super.afterConnectionClosed(session, status);
      
              logger.debug("Principal disconnected: " + session.getPrincipal().getName());
              someInMemoryStorage.connectPrincipal(session.getPrincipal());
          }
      
          @Override
          public void afterConnectionEstablished(WebSocketSession session) throws Exception {
              super.afterConnectionEstablished(session);
      
              logger.debug("Principal connected: " + session.getPrincipal().getName());
              someInMemoryStorage.disconnectPrincipal(session.getPrincipal());
          }
      }
      
  • +1
    Спасибо, мне помогло:)

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.