Pull to refresh
287.32
TINKOFF
IT’s Tinkoff — просто о сложном

Проблема сохранения контекста при асинхронном программировании в scala

Reading time 7 min
Views 6.6K
В какой-то момент в проекте встает вопрос отслеживания хода выполнения операции, получения или складирование какой-то информации о ней. Для этого как нельзя лучше служит контекст операции, например, контекст клиентской сессии. Если вам интересно, как это можно сделать относительно безболезненно, прошу под кат.

В мире java, зачастую (но не всегда), каждая операция выполняется в своём потоке. И тут всё получается довольно просто, можно воспользоваться ThreadLocal объектом и получать его в любом момент выполнения операции:

class Context {
  public static final ThreadLocal<Context> global = new ThreadLocal<Context>;
}
 
//где-то в месте вызова операции
Context context = new Context(...);
Context.global.set(context);
try {
  someService.someMethod();
} finally {
  Context.global.set(null);
}

В scala же, зачастую, всё не так просто, и по ходу операции поток может смениться неоднократно, например в очень асинхронных приложениях. И способ с ThreadLocal уже не подходит (как и в случае с переключением потоков в java, конечно же).

Первое, что может прийти в голову, это передавать контекст через имплиситный аргумент функции.

def foo(bar: Bar)(implicit context: Context)

Но это будет захламлять протокол сервисов. Поломав немного голову, пришла довольно простая идея: привязать контекст к объектам сервиса, и распространять его по внутренним сервисам по мере вызова функций.

Допустим, наш контекст выглядит вот так:

//data - склад для всякой информации касательно операции
class Context(val operationId: String, val data: TrieMap[String, String] = TrieMap.empty)

Создадим трейты, которыми будем помечать контекстно зависимые объекты:

trait ContextualObject {
  protected def context: Option[Context]
}
 
//объект, способный менять свой контекст
trait ChangeableContextualObject[T <: ContextualObject] extends ContextualObject {
  def withContext(ctx: Option[Context]): T
}
 
//объект с пустым контекстом
trait EmptyContext {
  _: ContextualObject =>
 
  override protected val context: Option[Context] = None
}

Теперь объявим наши сервисы и реализации:

//Говорим, что наш сервис может изменять контекст
trait ServiceA extends ChangeableContextualObject[ServiceA] {
  def someSimpleOperation: Int
 
  def someLongOperation(implicit executionContext: ExecutionContext): Future[Int]
}
 
trait ServiceAImpl extends ServiceA {
 
  override def someSimpleOperation: Int = 1
 
  override def someLongOperation(implicit executionContext: ExecutionContext): Future[Int] = {
    Future(someSimpleOperation)
      .map { res =>
        //запишем какие-нибудь данные в контекст выполнения, если он присутствует
        context.foreach(_.data.put("ServiceA.step1", res.toString))
        res * Random.nextInt(10)
      }
      .map { res =>
        context.foreach(_.data.put("ServiceA.step2", res.toString))
        res - Random.nextInt(5)
      }
      .andThen {
        case Success(res) => context.foreach(_.data.put("ServiceA.step3", res.toString))
      }
  }
 
  //создаём сервис с нужным нам контекстом
  override def withContext(ctx: Option[Context]): ServiceA = new ServiceAImpl {
    ctx.foreach(_.data.put("ServiceA.withContext", "true"))
    override protected def context: Option[Context] = ctx
  }
}
 
object ServiceAImpl {
  def apply(): ServiceAImpl = new ServiceAImpl with EmptyContext
}

И второй сервис, который будет использовать первый:

trait ServiceB extends ChangeableContextualObject[ServiceB] {
  def someOperationWithoutServiceA: Int
 
  def someOperationWithServiceA(implicit executionContext: ExecutionContext): Future[Boolean]
}
 
/**
  * При просмотре предыдущего и текущего сервиса мог возникнуть вопрос: 
  * почему это не класс и почему сервис А указан как абстрактный метод?
  * частично ответом является примешивание EmptyContext при создании сервиса,
  * но основная причина заключена в функции withContext.
  * Также, как бонус, в этом случае можно использовать cake pattern при создании объекта
  */
trait ServiceBImpl extends ServiceB {
  self =>
  protected def serviceA: ServiceA
 
  override def someOperationWithoutServiceA: Int = 1
 
  override def someOperationWithServiceA(implicit executionContext: ExecutionContext): Future[Boolean] = {
    serviceA.someLongOperation.map {
      case res if res % 2 == 0 =>
        context.foreach(_.data.put("ServiceB.res", "even"))
        true
 
      case res =>
        context.foreach(_.data.put("ServiceB.res", "odd"))
        false
    }
  }
 
  override def withContext(ctx: Option[Context]): ServiceB = new ServiceBImpl {
    ctx.foreach(_.data.put("ServiceB.withContext", "true"))
    override protected val context: Option[Context] = ctx
    // собственно, тот факт, что мы объявили сервис А как функцию
    // позволяет нам переопределить ее как lazy val,
    // и этот сервис будем инициализирован с новым контекстом, только если это будет нужно.
    // Это я и назвал распространением контекста
    override protected lazy val serviceA: ServiceA = self.serviceA.withContext(ctx)
  }
}
 
object ServiceBImpl {
  // Есть небольшой недостаток - нужно либо называть аргументы именами отличными от тех,
  // что используются в классе, либо помещать их в отдельную переменную внутри функции.
  // Но есть еще вариант объявлять так:
  // class Builder(val serviceA: ServiceA) extends ServiceBImpl with EmptyContext
  // И в месте вызова:
  // new ServiceBImpl.Builder(serviceA)
  // Имя, возможно, не самое удачное, но идея должна быть понятна.
  def apply(a: ServiceA): ServiceBImpl = new ServiceBImpl with EmptyContext {
    //  а в этом месте его можно объявить как val
    override protected val serviceA: ServiceA = a
  }
}

В итоге, в месте вызова мы получим следующий код:

val context = new Context("opId")
val serviceBWithContext = serviceB.withContext(Some(context))
serviceBWithContext.someOperationWithoutServiceA
context.data.get("ServiceB.withContext") // Some("true")
context.data.get("ServiceA.withContext") // None
 
serviceBWithContext.someOperationWithServiceA.andThen {
  case _ => 
    context.data.get("ServiceA.withContext") // Some("true")
    context.data.get("ServiceA.step1") // Some("1")
}

Всё довольно просто — таким образом, по ходу операции будет один и тот же контекст. Но нужно для этого всего найти какое-то реальное применение. Например, мы в ходе операции записывали важную информацию, и теперь хотим эту информацию залогировать. Самым простым вариантом было создавать логгер для каждого контекста, и при записи в лог к сообщению приписывать эту информацию в нём. Но появляется проблема логирования, которое происходит вне вашего кода (например, в сторонней библиотеке).

Для того, чтобы контекст можно было использовать вне нашего кода, сделаем ThreadLocal с нашим контекстом:

object Context {
  val global: ThreadLocal[Option[Context]] = ThreadLocal.withInitial[Option[Context]](() => None)
 
  //Запустить операцию в контексте
  def runWith[T](context: Context)(operation: => T): T = {
    runWith(Some(context))(operation)
  }
 
  //Запустить операцию в контексте
  def runWith[T](context: Option[Context])(operation: => T): T = {
    val old = global.get()
    global.set(context)
    // после завершения вернем старое значение на всякий случай
    try operation finally global.set(old) 
  }
}

Например, если вы используете библиотеку logback-classic для логирования, то вы можете написать свой Layout для логирования этих параметров.

Возможная реализация
class OperationContextLayout extends LayoutBase[ILoggingEvent] {
  private val separator: String = System.getProperty("line.separator")
 
  override def doLayout(event: ILoggingEvent): String = {
    val sb = new StringBuilder(256)
    sb.append(event.getFormattedMessage)
      .append(separator)
 
    appendContextParams(sb)
    appendStack(event, sb)
    sb.toString()
  }
 
  private def appendContextParams(sb: StringBuilder): Unit = {
    Context.global.get().foreach { ctx =>
      sb.append("operationId=")
        .append(ctx.operationId)
 
      ctx.data.readOnlySnapshot().foreach {
        case (key, value) =>
          sb.append(" ").append(key).append("=").append(value)
      }
 
      sb.append(separator)
    }
  }
 
  private def appendStack(event: ILoggingEvent, sb: StringBuilder): Unit = {
    if (event.getThrowableProxy != null) {
      val converter = new ThrowableProxyConverter
      converter.setOptionList(List("full").asJava)
      converter.start()
 
      sb.append()
    }
  }
}


Возможный конфиг
<configuration>
 
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="operation.context.logging.OperationContextLayout" />
        </encoder>
    </appender>
 
    <root level="debug">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>


И попробуем что-нибудь залогировать:

  def runWithoutA(): Unit = {
    val context = Some(createContext())
    val res = serviceB.withContext(context).someOperationWithoutServiceA
    Context.runWith(context) {
      // Result of someOperationWithoutServiceA: '1'
      // operationId=GPapC6JKmY ServiceB.withContext=true
      logger.info(s"Result of someOperationWithoutServiceA: '$res'")
    }
  }

  def runWithA(): Future[_] = {
    val context = Some(createContext())
    serviceB.withContext(context).someOperationWithServiceA.andThen {
      case _ =>
        Context.runWith(context) {
          // someOperationWithServiceA completed
          // operationId=XU1SGXPq1N ServiceB.res=even ServiceA.withContext=true ServiceB.withContext=true ServiceA.step1=1 ServiceA.step2=7 ServiceA.step3=4
          logger.info("someOperationWithServiceA completed")
        }
    }
  }

И остался вопрос: как же быть с внешним кодом, который запускается в ExecutionContext? Но нам же никто не мешает написать враппер для него:

Возможная реализация враппера
class ContextualExecutionContext(context: Option[Context], executor: ExecutionContext) extends ExecutionContext {
 
  override def execute(runnable: Runnable): Unit = executor.execute(() => {
    Context.runWith(context)(runnable.run())
  })
 
  override def reportFailure(cause: Throwable): Unit = {
    Context.runWith(context)(executor.reportFailure(cause))
  }
 
}
 
object ContextualExecutionContext {
  implicit class ContextualExecutionContextOps(val executor: ExecutionContext) extends AnyVal {
    def withContext(context: Option[Context]): ContextualExecutionContext = new ContextualExecutionContext(context, executor)
  }
}


Возможная реализация внешней системы
class SomeExternalObject {
  val logger: Logger = LoggerFactory.getLogger(classOf[SomeExternalObject])
 
  def externalCall(implicit executionContext: ExecutionContext): Future[Int] = {
    Future(1).andThen {
      case Success(res) => logger.debug(s"external res $res")
    }
  }
}

Попробуем сделать вызов в нашем ExecutionContext:


  def runExternal(): Future[_] = {
    val context = Some(createContext())
    implicit val executor = global.withContext(context)
    // external res 1
    // operationId=8Hf277SV7B
    someExternalObject.externalCall
  }

Вот и вся идея. На самом деле, использование контекста не ограничивается только логированием. Можно хранить в этом контексте всё, что угодно. Например, слепок каких-то состояний, если нужно, чтобы все сервисы во время операции работали с одинаковыми данными. И так далее, и так далее.

Если есть необходимость в реализации слежения за контекстом при общении акторов, пишите в комментариях, дополню статью. Если есть идеи по поводу другой реализации, также пишете в комментариях, будет интересно почитать.

P.S. Исходный код проекта, используемый в статье github.com/eld0727/scala-operation-context.
P.P.S. Я уверен, что данный подход может быть применен и к других языкам, позволяющим создавать анонимные классы, и это всего лишь возможная реализация на scala.
Tags:
Hubs:
+17
Comments 30
Comments Comments 30

Articles

Information

Website
www.tinkoff.ru
Registered
Founded
Employees
over 10,000 employees
Location
Россия