Pull to refresh

WebSocket Akka HTTP на практике

Reading time6 min
Views20K
Довольно продолжительное время существовала только одна достойная реализация работы с HTTP поверх Akka — spray. К этой библиотеке пару умельцев написали расширения для WebSocket,
которое было вполне понятно в использовании и проблем не возникало. Но годы шли и spray, в том или ином виде, перекочевал в Akka HTTP с реализованной поддержкой WebSocket из коробки.
Для работы с WebSocket ребята из Akka предлагают нам использовать Akka Stream, тем самым упрощая нам жизнь с потоковыми данными и, одновременно, усложняя ее. Akka Stream не так прост в понимании. Далее я попытаюсь показать базовые практические примеры использования.

Коротко об Akka Stream


Это своеобразный pipeline обработки данных, каждая итерация которого что-либо делает с данными, попадающими в него. Flow делится на 3 составляющие: Source, GraphStage, Sink.
Лучше всего это показано на диаграмме из документации
image

Для реализации WebSocket нам потребуется реализовывать GraphStagе. Source нам предоставляет akka, это как раз и есть наш клиент с летящими от него сообщениями. А Sink — сама отправка наших сообщений клиенту.

Actor style


Пожалуй один из самых неэффективных способов обработки, но самый простой для понимания.
Идея его заключается в том, чтобы все входящие сообщения попадали в актор, и у него был ActorRef, который отправлял данные непосредственно клиенту.

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Terminated}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._

import scala.io.StdIn

object Boot extends App {
  implicit val system = ActorSystem("example")
  implicit val materializer = ActorMaterializer()

  def flow: Flow[Message, Message, Any] = {
    val client = system.actorOf(Props(classOf[ClientConnectionActor]))
    val in = Sink.actorRef(client, 'sinkclose)
    val out = Source.actorRef(8, OverflowStrategy.fail).mapMaterializedValue { a ⇒
      client ! ('income → a)
      a
    }
    Flow.fromSinkAndSource(in, out)
  }

  val route = path("ws")(handleWebSocketMessages(flow))
  val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

  println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
  StdIn.readLine()

  import system.dispatcher
  bindingFuture
    .flatMap(_.unbind())
    .onComplete(_ ⇒ system.terminate())
}

class ClientConnectionActor extends Actor {
  var connection: Option[ActorRef] = None

  val receive: Receive = {
    case ('income, a: ActorRef) ⇒ connection = Some(a); context.watch(a)
    case Terminated(a) if connection.contains(a) ⇒ connection = None; context.stop(self)
    case 'sinkclose ⇒ context.stop(self)

    case TextMessage.Strict(t) ⇒ connection.foreach(_ ! TextMessage.Strict(s"echo $t"))
    case _ ⇒ // ingone
  }

  override def postStop(): Unit = connection.foreach(context.stop)
}

На каждое подключение клиента мы создаем актор ClientConnectionActor. А также Source, который будет представлять из себя еще один актор, направляющий полученные сообщения во flow. После его создания через метод mapMaterializedValue мы получим на него ссылку. Кроме этого мы создаем Sink, который все сообщения будет отправлять в ClientConnectionActor.

Таким образом ClientConnectionActor будет получать все сообщения из сокета. Отправлять мы их будем через прилетевший ему ActorRef, который будет доставлять их клиенту.

Минусы: необходимо следить за побочными акторами; быть аккуратным с OverflowStrategy; для обработки всех сообщений у нас всего один актор, он, соотвественно, однопоточный, из-за чего могут последовать проблемы с производительностью.

Производный вариант с использованием ActorPublisher и ActorSubscriber мы рассматривать не будем, так как, судя по официальной документации, он в состоянии deprecated.

Flow style


Идея данного подхода заключается в полном ипользовании Akka Stream для достижения целей. Общий вид его сводится к построению pipeline обработки входящих сообщений клиента.

Скелет
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._

import scala.io.StdIn

object Boot extends App {
  implicit val system = ActorSystem("example")
  implicit val materializer = ActorMaterializer()

  def flow: Flow[Message, Message, Any] = {
    Flow[Message].collect {
      case TextMessage.Strict(t) ⇒ t
    }.map { text ⇒
      TextMessage.Strict(s"echo: $text")
    }
  }

  val route = path("ws")(handleWebSocketMessages(flow))
  val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

  println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
  StdIn.readLine()

  import system.dispatcher
  bindingFuture
    .flatMap(_.unbind())
    .onComplete(_ ⇒ system.terminate())
}

В данном случае мы обрабатываем только текстовые сообщения и изменяем их. Дальше TextMessage отправляется клиенту.


Теперь немного усложним скелет и добавим парсинг и сериализацию JSON.

Классы для сериализации
trait WsIncome
trait WsOutgoing
@JsonCodec case class Say(name: String) extends WsIncome with WsOutgoing

implicit val WsIncomeDecoder: Decoder[WsIncome] = Decoder[Say].map[WsIncome](identity)
implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = {
  case s: Say ⇒ s.asJson
}


Модифицируем flow

Flow[Message]
 .collect {
   case tm: TextMessage ⇒ tm.textStream
 }
 .mapAsync(CORE_COUNT * 2 - 1)(in ⇒ in.runFold("")(_ + _).flatMap(in ⇒ Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry))))
 .collect {
   case Say(name) ⇒ Say(s"hello: $name")
 }
 .mapAsync(CORE_COUNT * 2 - 1)(out ⇒ Future(TextMessage(out.asJson.noSpaces)))

Сперва мы отсекаем все бинарные сообщения, далее парсим входящий поток в JSON, обрабатываем его и сериализуем в текст для отправки клиенту.

Усложним конструкцию, добавив контекст для каждого клиента. В этом нам поможет statefulMapConcat.

ClientContext
class ClientContext {
  @volatile var userName: Option[String] = None
}
object ClientContext {
  def unapply(arg: ClientContext): Option[String] = arg.userName
}

@JsonCodec case class SetName(name: String) extends WsIncome
@JsonCodec case class Say(text: String) extends WsIncome with WsOutgoing

implicit val WsIncomeDecoder: Decoder[WsIncome] =
  Decoder[Say].map[WsIncome](identity)
    .or(Decoder[SetName].map[WsIncome](identity))


def flow: Flow[Message, Message, Any] = {
  Flow[Message]
    .collect {
      case tm: TextMessage ⇒ tm.textStream
    }
    .mapAsync(CORE_COUNT * 2 - 1)(in ⇒ in.runFold("")(_ + _).flatMap(in ⇒ Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry))))
    .statefulMapConcat(() ⇒ {
      val context = new ClientContext
      m ⇒ (context → m) :: Nil
    })
    .mapConcat {
      case (c: ClientContext, SetName(name)) ⇒
        c.userName = Some(name)
        Nil
      case a ⇒ a :: Nil
    }
    .collect {
      case (ClientContext(userName), Say(text)) ⇒ Say(s"$userName: $text")
      case (_, Say(text)) ⇒ Say(s"unknown: $text")
    }
    .mapAsync(CORE_COUNT * 2 - 1)(out ⇒ Future(TextMessage(out.asJson.noSpaces)))
}

Есть и другой способ: можно реализовать свой filter/map унаследовав GraphStage[FlowShape[A, A]].

Пример (не адаптировано под предыдущий код)
class AuthFilter(auth: ws.AuthMessage ⇒ Future[Option[UserProfile]])(implicit ec: ExecutionContext) extends GraphStage[FlowShape[ws.WsIncomeMessage, ws.WsContextIncomeMessage]] {

  val in = Inlet[ws.WsIncomeMessage]("AuthFilter.in")
  val out = Outlet[ws.WsContextIncomeMessage]("AuthFilter.out")

  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      @volatile var profile: Option[UserProfile] = None
      setHandler(in, new InHandler {
        override def onPush(): Unit = profile match {
          case Some(p) ⇒ push(out, ws.WsContextIncomeMessage(p, grab(in)))
          case _ ⇒ grab(in) match {
            case a: ws.AuthMessage ⇒ auth(a) onComplete {
              case Success(p) ⇒
                profile = p
                pull(in)
              case Failure(e) ⇒ fail(out, e)
            }
            case _ ⇒ pull(in)
          }
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = pull(in)
      })
    }
  }
}

В данном варианте фильтруется все сообщения до тех пор, пока не получено сообщение на авторизацию. Если авторизация пройдет успешно, сообщения проходят дальше совместно с профилем пользователя.

И напоследок сделаем так, чтобы всем подключенным пользователям каждую секунду отправлялось текущее время:

case object Tick extends WsOutgoing
implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = {
   case s: Say ⇒ s.asJson
   case Tick ⇒ Json.obj("time" → DateTime.now.toIsoDateTimeString().asJson)
}

...

val broadcast = Source.tick[WsOutgoing](1.second, 1.second, Tick)

...

.collect {
  case (ClientContext(userName), Say(text)) ⇒ Say(s"$userName: $text")
  case (_, Say(text)) ⇒ Say(s"unknown: $text")
}
.merge(broadcast)
.mapAsync(CORE_COUNT * 2 - 1)(out ⇒ Future(TextMessage(out.asJson.noSpaces)))

Это базовые примеры того, как можно реализовать поддержку WebSocket в своем проекте. Пакет Akka Stream большой и разннобразный, он поможет решить довольно большой пласт задач, не переживая за масштабирование и параллелизацию.

PS: Используя новую для вас технологию в более-менее нагруженном проекте, не забывайте проводить нагрузочное тестирование, следить за памятью и горячими участками кода (в этом вам может помочь gatling). Всем добра.
Tags:
Hubs:
+16
Comments7

Articles

Change theme settings