Pull to refresh
1079.72
OTUS
Цифровые навыки от ведущих экспертов

Функциональный поиск событий с помощью cats-effect

Reading time13 min
Views1.5K
Original author: Jonas Chapuis

Достижение максимальной выразительности и абстракции домена при сохранении точности протокола актора с помощью библиотеки endless4s Scala.

Код, описывающий бизнес-логику, несомненно, является самым ценным активом в системе программного обеспечения. Также называемый кодом домена среди специалистов по доменно-ориентированному проектированию, он отражает опыт и ценностное предложение и постепенно аккумулирует в себя все богатство знаний. Хотя для зрелости таких углубленных моделей требуется время, тенденции и технологии программного обеспечения быстро меняются и даже подвергаются лицензионным изменениям, как это недавно произошло с Akka. Вместе с тем, по мере устаревания языка, методов и фреймворков, ценность программного обеспечения растет по мере расширения клиентской базы. Я слышал, что программистом на языке COBOL еще можно заработать на жизнь!

Алгебры

Есть язык, который выдержал испытание временем: это язык математики, чья универсальная абстрактная символьная нотация не стареет на протяжении веков. В идеале, я бы сказал, что наши программы должны стремиться к тому же!

FP (функциональная программирование) и такие техники, как tagless-final (неразмеченный конечный [подход]), стремятся к этому идеалу. Определение логики с помощью так называемых алгебр и DSL пытается сделать выражение домена как можно более компактным и отделить его от инфраструктуры. Разделение задач облегчает анализ системы и ее развитие. Это также позволяет обеспечить точное тестовое покрытие, а при эксплуатации сложной SaaS-платформы защита конфиденциальных областей очень важна.

Акторы

Создается впечатление, что бэкенд-системы делятся на несколько категорий: традиционные CRUD-системы, работающие за балансировщиком нагрузки, делегирующие состояние и параллелизм базе данных; потоковые сервисы, получившие популярность благодаря событийно-ориентированным архитектурам и брокерам, таким как Kafka, и еще есть акторы. Эта несколько странная и недооцененная модель вычислений способствовала успеху WhatsApp и уже в 90-х годах достигла уровня высокой доступности девяти девяток в телефонных коммутаторах. Акторы предлагают естественную семантику для распределенных систем из миллионов маленьких машин состояний, которые органично подключаются к памяти и удаляются из нее в соответствии с потребностями. Это также идеальная модель для надежной реализации активных процессов в масштабе (связанных с планированием, например, включением машины в определенное время).

Лучшее из двух возможных вариантов

В Scala Akka, без сомнения, является ведущим акторным фреймворком. Однако его API не предназначен для интеграции с системами монадических эффектов напрямую: его можно использовать из OO-кода Java (и даже для .NET, благодаря сообществу). В результате, на первый взгляд, кажется, что вы выбираете cats-effect или ZIO и отказываетесь от стека Lightbend, или придерживаетесь Akka и Futures.

В этой статье мы постараемся продемонстрировать, что если вас привлекают и алгебраическое мышление, и потенциал использования модели актора для описания стейтфул и активных сущностей, то вам не придется делать выбор: оказывается, они отлично работают вместе! Кроме того, вы получаете разумную степень защиты от будущих изменений базового фреймворка актора, что позволяет полностью поменять реализацию, если возникнет такая необходимость.

Функциональное описание сущностей источников события позволяет оградить код домена от фреймворка актора, что позволяет полностью поменятьреализацию, если возникнет такая необходимость

Читатель состояния, автор (писатель) событий

Шардированный актор называется сущностью — это удобно сопоставить один к одному с доменно-ориентированной концепцией, где он представляет собой объект, определяемый своей идентичностью. Распределенный актор-сущность получает команду, после чего он обычно проводит какую-то процедуру валидации, возможно, сохраняя некоторые события, и формулирует ответ. Состояние сущности меняется в соответствии с сохраняемыми событиями.

Комбинация монад очень хорошо подходит для этого паттерна: читатель и писатель. Как это также предлагалось другими, мы можем использовать ее для описания цепочки вычислений с возможностью записи событий, чтения состояния сущности и, наконец, возврата ответа:

Наглядная иллюстрация монадической цепочки, включающей поиск событий с помощью читателя-писателя: F[ _ ] — это тип оболочки с эффектом более высокого рода, A обозначает тип команды, write (запись) позволяет производить события, read (чтение) приводит к промежуточному свертыванию событий по начальному состоянию, а B представляет ответ команды. Типичная цепочка, естественно, будет включать некоторые этапы вычислений, например, валидацию.

Если это кажется абстрактным, не волнуйтесь: на примере ниже вы увидите, что за этими понятиями скрываются более простые конструкции.

Пример: система бронирования поездок

Предположим, что мы создаем распределенную систему для бронирования поездок. Вот как мы репрезентуем бронирование:

final case class LatLon(lat: Double, lon: Double)
final case class Booking(
  time: Instant,
  passengerCount: Int,
  origin: LatLon,
  destination: LatLon
)

Упрощенная модель домена для бронирования пассажирских перевозок

Представим также, что бронирование имеет связанный с ним маршрут транспортного средства. Такое взаимодействие можно представить с помощью следующей алгебры без тегов:

trait BookingAlg[F[_]] {
  def place(booking: Booking): F[AlreadyExists.type \/ Unit]
  def getBooking: F[Unknown.type \/ Booking]
  
  def setRoute(steps: List[LatLon]): F[Unknown.type \/ Unit]
  def getRoute: F[Unknown.type \/ List[LatLon]]
}
trait BookingRepositoryAlg[F[_]] {
  def bookingFor(bookingID: BookingID): BookingAlg[F]
}
// note: type \/[A, B] = Either[A, B]

Упрощенная алгебра для бронирования транспорта: place создает сущность, getBooking возвращает ее определение, если сущность существует, а setRoute и getRoute позволяют указать географические данные. С этими определениями бронирование поездки в системе пишется как repository.bookingFor(bookingID).place(booking), а уведомление о том, что маршрут составлен, принимает вид repository.bookingFor(bookingID).setRoute(steps).

Этот трейт репозитория представляет кластер сущностей и может быть включен в другие выражения, описывающие общий процесс бронирования, вычисления маршрута и т.д. Важным моментом здесь является то, что на этом уровне код не опирается на знания об акторах, командах и шардинге. Напротив, абстрагирование от этих понятий позволяет использовать более простые выражения и облегчает тестирование.

Примечание: благодаря абстракции BookingRepositoryAlg трейту BookingAlg не нужно включать ID в качестве параметра во все методы.

Реализация

Все это хорошо и замечательно, но как описать поведение и где события? Именно здесь на помощь приходит читатель-писатель.

Как и с Ask и Tell в cats MTL, возможности читателя-писателя раскрываются через класс типа Entity: trait Entity[F[_], S, E] extends StateReader[F, S] with EventWriter[F, E] with Monad[F]:

  • StateReader позволяет выполнять read: F[S] (и другие вспомогательные методы, см. ниже).

  • EventWriter предоставляет возможность сохранять события: write(events: E*): F[Unit].

Но, достаточно сложного жаргона:

final case class BookingEntity[F[_]: Monad: Logger](
  entity: Entity[F, BookingState, BookingEvent]) extends BookingAlg[F] {
  
  def place(booking: Booking): F[AlreadyExists.type \/ Unit] = 
    entity.ifUnknownF(entity.write(BookingPlaced(booking)))(_ => AlreadyExists)
    
  def getBooking: F[Unknown.type \/ Booking] = 
    entity.ifKnown(_.definition)(Unknown)
  
  def setRoute(steps: List[LatLon]): F[Unknown.type \/ Unit] = 
    entity.ifKnownF(entity.write(RouteSet(steps))(Unknown)
  
  def getRoute: F[Unknown.type \/ List[LatLon]] = 
    entity.ifKnown(_.route)(Unknown)
}
// note: type \/[A, B] = Either[A, B]

Пример простой реализации поведения для объекта бронирования (booking entity). Entity instance инжектируется непосредственно в конструктор (мы рассмотрим это ниже). write (запись) позволяет сохранять события. методы-хендлеры ifKnown, ifUnknown, связанные с reader (читатель), облегчают описание дифференцированного поведения для этих случаев. Любая комбинация таких вызовов, включая записи, перемежающиеся с чтениями, может быть соединена в цепочку для более сложной логики: чтение всегда обеспечивает последовательное представление состояния, сворачивая события за кулисами.

Свертывание событий

Применение события к состоянию сущности (также известное как свертка событий по состоянию) является чистой функцией формы ((Option[S], E) => String \/ Option[S]). Проще говоря, это кортеж из возможно определенного состояния (до первого события оно еще пустое) и события, приводящего либо к новой версии состояния, либо к ошибке. Для нашего простого набора событий бронирования это:

class BookingEventApplier extends EventApplier[BookingState, BookingEvent] {
  def apply(state: Option[BookingState], event: BookingEvent): String \/ Option[BookingState] =
    (event match {
      case BookingPlaced(booking) =>
        state
          .toLeft(BookingState(definition = booking, route = Nil))
          .leftMap(_ => "Booking already exists")
      case RouteSet(steps: List[LatLon]) =>
        state
          .toRight("Attempt to set route on unknown booking")
          .map(_.copy(route = steps))
    }).map(Option(_))
}
// note: type \/[A, B] = Either[A, B]

Определив алгебру, поведение и аппликатор событий, мы получили все необходимые определения на уровне домена (обычно они находятся в модуле с ограниченным количеством зависимостей в кодовой базе, которая строго отделяет бизнес-логику от инфраструктуры).

Тестирование

Тестирование этого кода может быть выполнено синхронно с помощью cats.ID, или с помощью инструментария для тестирования IO, например, такого, который предоставляется для munit:

class BookingEntitySuite
    extends munit.CatsEffectSuite
    with munit.ScalaCheckEffectSuite
    with Generators {
  private val bookingAlg = BookingEntity(EntityT.instance[IO, BookingAlg, BookingEvent])
  private implicit val eventApplier: BookingEventApplier = new BookingEventApplier

  test("place booking") {
    forAllF { booking: Booking =>
      bookingAlg
        .place(booking)
        .run(None)
        .map {
          case Right((events, _)) =>
            assertEquals(
              events,
              Chain(BookingPlaced(booking))
            )
          case Left(error) => fail(error)
        }
    }
  }
}      

Пример сьюта munit, использующего scalacheck для поведения сущностей. Обратите внимание на использование монады-трансформера EntityT, предоставляемой endless, которая интерпретирует программы, использующие Entity, в целевую монаду (в данном случае IO). Кроме того, EntityT требует неявного определения функции сворачивания событий eventApplier (чтобы иметь возможность сворачивать состояние в случае чередующихся чтений и записей).

Реализация репозитория

Реализация BookingRepositoryAlg тривиальна:

final case class BookingRepository[F[_]: Monad](
    repository: Repository[F, BookingID, BookingAlg])
        extends BookingRepositoryAlg[F] {
  def bookingFor(bookingID: BookingID): BookingAlg[F] = repository.entityFor(bookingID)
}

Аналогично реализации BookingEntity, опирающейся на класс типа Entity, в конструкторе передается экземпляр Repository: именно этот экземпляр предоставляет возможность получить хэндл сущности для определенного ID, поэтому мы делегируем ему полномочия.

Проводной протокол 

Выше мы упоминали, как эти абстракции позволяют отделить логику домена от акторов и сообщений. На уровне инфраструктуры данные аспекты "проводного протокола" определяются с помощью двух специализированных реализаций трейта Bookings, одна для "клиентской" стороны (отправитель команды), другая для "серверной" (актор, получающий сообщение).

Изюминка заключается в использовании типа эффекта F для указания типов команды и ответа, а также кодирующих и декодирующих устройств.

Исходящая команда

Предоставляя реализации методов алгебры каждой сущности типа OutgoingCommand[*], мы можем определить двоичную кодировку для параметров, а также указать декодер, необходимый для парсинга ответа:

/** Represents an outgoing command. Embeds the binary payload and indicates 
  * the decoder to use for the reply of type `R`.
  * @tparam R
  *   reply
  */
trait OutgoingCommand[+R] {
  def payload: Array[Byte]
  def replyDecoder: Decoder[R]
}

Определение для типа эффекта, представляющего исходящую команду, используемое при реализации протокола "клиент"

Входная команда

На принимающей стороне нам необходимо декодировать команду, запустить ее логику и закодировать возвращаемое значение в ответ:

/** Represents an incoming entity command. Embeds the `Reply` type, 
  * the ability to run it on the entity algebra in `F` context 
  * and specifies the encoder to be used to encode the reply.
  * @tparam F
  *   context
  * @tparam Alg
  *   entity algebra
  */
trait IncomingCommand[F[_], Alg[_[_]]] {
  type Reply
  def runWith(alg: Alg[F]): F[Reply]
  def replyEncoder: Encoder[Reply]
}

Определение для типа эффекта, представляющего входную команду, используемое для реализации "серверного" протокола. runWith отвечает за декодирование сообщения актора и вызов соответствующего поведения.

Клиентские и серверные реализации снабжены одним классом, расширяющим trait CommandProtocol[Alg[_[_]]. В нашем примере мы используем вспомогательный подкласс ProtobufCommandProtocol, который предоставляет хелперов для протоколов protobuf (есть также встроенные хелперы для JSON). Получившийся код относительно плотный, но малосложный; он просто обеспечивает маппинг доменных типов в сериализованные представления сообщений и действует как коммутатор для входных команд:

class BookingCommandProtocol extends ProtobufCommandProtocol[BookingAlg] {
  override def client: BookingAlg[OutgoingCommand[*]] =
    new BookingAlg[OutgoingCommand[*]] {
      def place(booking: Booking): OutgoingCommand[AlreadyExists.type \/ Unit] =
        outgoingCommand[proto.BookingCommand, proto.PlaceCommandReply, AlreadyExists.type \/ Unit](
          command = proto.PlaceBooking(booking.transformInto[proto.Booking]),
          replyMapper = _.reply match {
            case proto.PlaceCommandReply.AlreadyExistsV1(_) => AlreadyExists.asLeft
            case proto.PlaceCommandReply.Unit(_) => ().asRight
          }
        )
      // ...
    }

  override def server[F[_]]: Decoder[IncomingCommand[F, BookingAlg]] =
    ProtobufDecoder[proto.BookingCommand].map(_.command match {
      case proto.PlaceBookingV1(booking) =>
        incomingCommand[F, proto.PlaceCommandReply, AlreadyExists.type \/ Unit](
          run = _.place(booking.transformInto[Booking]),
          replyContramapper = {
            case Left(AlreadyExists) => proto.PlaceCommandReply.Reply.AlreadyExistsV1()
            case Right(()) => proto.PlaceCommandReply.Reply.Unit()
          }  
        )
      // ..
    })
}
// note: type \/[A, B] = Either[A, B]

Определение командного протокола для сущности Booking: класс расширяет CommandProtocol, который определяет как "сервер", так и "клиент". На стороне клиента реализация предоставляет OutgoingCommand, которая преобразует параметры в сообщение protobuf и устанавливает маппер для декодирования ответа. На стороне сервера выполняется декодирование входящего сообщения protobuf и вызов соответствующего метода сущности, а также кодирование ответа (обратите внимание на вызовы transformInto, как правило, мы используем chimney для маппинга данных).

Существенным преимуществом разделения задач является то, что версионирование протокола не влияет на домен, защищая его от устаревших типов и маппинга бойлерплейта. В приведенном выше примере можно заметить, что в именах сообщений protobuf мы даже приняли соглашение о явном кодировании номера версии.

Команда «маршрутизация»

При естественной трансформации происходит доставка команды целевому объекту: OutgoingCommand[*] ~> F :

trait CommandRouter[F[_], ID] {
  def routerForID(id: ID): OutgoingCommand[*] ~> F
}

CommandRouter обеспечивает естественную трансформацию для идентификатора сущности, которая может осуществить маппинг алгебры сущностей, интерпретируемой CommandProtocol.client, в контекст OutgoingCommand[*] обратно в F. Это представляет собой "маршрутизацию" команды к целевому актору, получение ответа, декодирование и доставку ответа вызывающему актору.

Примечание 1: Чтобы обеспечить поддержку такой естественной трансформации, для алгебры сущностей должен быть предоставлен инстанс FunctorK без тегов cats (cats-tagless), но это можно автоматизировать с помощью макросов деривации cats-tagless: cats.tagless.derive.functorK[BookingAlg].

Примечание 2: команда router используется при интерпретации монадических значений, полученных в результате обращений к алгебре репозитория (которые внутренне интерпретируются трансформером монад RepositoryT).

Диаграмма последовательности, на которой показаны различные элементы в процессе получения экземпляра BookingAlg[F].
Диаграмма последовательности, на которой показаны различные элементы в процессе получения экземпляра BookingAlg[F].

Встроенный рантайм Akka

Endless предоставляет модуль endless-runtime-akka, который представляет собой PnP (plug & play) рантайм для Akka. Однако ничто не мешает вам разработать свой кастомный рантайм.

После определения необходимых реализаций трейтов деплой объекта с помощью рантайма Akka сводится к выполнению одного вызова deployEntity. Этот вызов собирает все вместе и обеспечивает cats-эффект Resource с инстансом хранилища, работающим через Akka Cluster Sharding:

deployEntity[F, BookingState, BookingEvent, BookingID, BookingAlg, BookingRepositoryAlg](
  BookingEntity(_),
  BookingRepository(_),
  (effector, _) => BookingEffector(effector)
): Resource[F, (BookingRepositoryAlg[F], ActorRef[ShardingEnvelope[Command]])]

Пример вызова deployEntity для нашей сущности Booking (с указанием возвращаемого типа). Вызов параметризован всеми связанными типами и принимает конструкторы для различных реализаций. Результирующий ресурс предоставляет кортеж с экземпляром хранилища, интерпретированным в целевую монаду, и ссылкой на актора региона, полученной в результате внутреннего вызова ClusterSharding.init, на случай, если потребуются дальнейшие взаимодействия.

Примечание 1: чтобы без побочных эффектов соединить неявную асинхронность Akka с контекстом F, используемым для алгебр, командный маршрутизатор и deployEntity запрашивают Async из F. Это позволяет использовать механизм Dispatcher для синхронного запуска монадической цепочки обработки команд из потока актора.

Примечание 2: Ask от Akka обеспечивает реализацию трейта командного маршрутизатора. Внутри endless рантайма Akka команды передаются с помощью "несущего" сообщения protobuf, содержащего идентификатор цели, полезную нагрузку в двоичном виде и сериализованную ссылку актора-отправителя для получения ответа.

Подробное описание взаимодействий на "клиентской" стороне вызова place(booking) с рантаймом Akka, которые заканчиваются передачей команды и получением ответа.
Подробное описание взаимодействий на "клиентской" стороне вызова place(booking) с рантаймом Akka, которые заканчиваются передачей команды и получением ответа.
Что происходит на "серверной" стороне, кульминацией чего является использование DSL эффекта Akka's Cluster Sharding.
Что происходит на "серверной" стороне, кульминацией чего является использование DSL эффекта Akka's Cluster Sharding.

Effector (Эффектор)

Мы еще не упомянули о том, как работать с побочными эффектами: для этого существует гибкий класс типа Effector: trait Effector[F[_], S] extends StateReader[F, S] with Passivator[F] with Self[F]. Давайте рассмотрим каждую из этих возможностей поочередно:

  • StateReader позволяет чтение обновленного состояния сущности после сохранения или восстановления событий.

  • Passivator позволяет осуществлять тонкий контроль над пассивацией. В некоторых областях сущности могут переходить в "спящие" формы (например, после события BookingCancelled), для которых выгодно инициировать пассивацию немедленно или с некоторой задержкой, что позволяет проактивно оптимизировать ресурсы кластера.

  • Self раскрывает алгебру сущности в контексте эффектора, что делает возможным определение асинхронных процессов, включающих взаимодействие с одной и той же сущностью, обычно для определения сущностей, действующих как менеджеры процессов (более подробно см. документацию).

Примечание: монадические значения, использующие Effector, интерпретируются с помощью трансформера монад EffectorT, который предоставляет инстансы cats-эффекта вплоть до Async, чтобы можно было использовать операторы, ориентированные на время, такие как sleep, timeoutTo.

End-less (бесконечный)

Мы решили наконец-то присвоить этой библиотеке тег endless, чтобы упомянуть о непрерывном потоке событий, которые отражают эволюцию состояния (намек на tag-less). По нашему опыту, поиск событий и акторы являются мощными инструментами для представления распределенных машин состояний в масштабе. Применение алгебраического стиля для описания сущностей позволяет легко интегрировать их с функциональной логикой домена, сохраняя при этом независимость от структуры акторов.


Приглашаем всех желающих на открытое занятие онлай-курса «Scala-разработчик», на котором познакомимся с парсер-комбинаторами на Scala и будем парсить описание REST API написанное с помощью markdown. Регистрация открыта по ссылке.

Tags:
Hubs:
Total votes 6: ↑5 and ↓1+4
Comments0

Articles

Information

Website
otus.ru
Registered
Founded
Employees
101–200 employees
Location
Россия
Representative
OTUS