Pull to refresh
0
ХостТрекер
Сервис мониторинга доступности сайтов

Message Passing в F#. Применение MailboxProcessor

Reading time 8 min
Views 4.3K
Данная статься продолжает серию публикаций о технологиях, которые мы используем для разработки сервиса проверки доступности веб сайтов HostTracker.
Сегодня речь пойдет о…

MailboxProcessor


image



MailboxProcessor – класс, который позволяет создать агент на языке F#, который состоит из двух частей – очереди сообщений и функции, которая обрабатывает сообщения из очереди. Он предоставляет следующий интерфейс взаимодействия:

Post – отправить сообщение 'msg в очередь процессора асинхронно, не ожидая его обработки;

PostAndReply: (msgBuilder: AsyncReplyChannel<'reply> -> 'msg) -> 'reply – отправить сообщение с асинхронным каналом ожидания результата AsyncReplyChannel. Поток, который вызвал данный метод, ожидает вызов AsyncReplyChannel.Reply: 'reply -> unit с обработчика сообщений из очереди для получения резульатата. msgBuilder – лямбда-функция, которая компонирует сообщение из канала;

PostAndAsyncReply: (msgBuilder: AsyncReplyChannel<'reply> -> 'msg) -> Async<'reply> – аналогичный к PostAndReply, но ожидание результата не происходит. Возвращается асинхронное вычисление, результатом которого является переданное в AsyncReplyChannel.Reply значение. Клиент сам решает каким образом ожидать результат — отдав текущий поток пулу или блокируя его;

TryPostAndReply: (msgBuilder: AsyncReplyChannel<'reply> -> 'msg)*?timeout:int -> 'reply option – аналогичный PostAndReply, но ожидание происходит в течении timeout миллисекунд. Если результат за это время получен, то метод возвращает Some(result). В другом случае возвращает None;

PostAndTryAsyncReply: (msgBuilder: AsyncReplyChannel<'reply> -> 'msg)*?timeout:int -> Async<'reply option> – аналогичный TryPostAndReply, но метод возвращает асинхронные вычисление;

Receive: unit -> Async<'msg>; TryReceive: timeout:int -> Async<'msg option> – асинхронные вычисления, которые возвращают сообщение в очередь в порядке FIFO (TryReceive – асинхронные вычисления, которые ожидают в течении timeout миллисекунд на сообщение и возвращает Some(msg), если оно пришло в данный период времени, и None, если нет). Используется, в основном, на стороне обработчика сообщений и очень редко на стороне клиента;

Scan: (scanner: 'msg -> Async<'T> option) -> Async<'T>; TryScan: (scanner: 'msg -> Async<'T> option)*?timeout:int -> Async<'T option> – возвращают асинхронные вычисления, которые будут проверять очередь сообщений на наличие такого сообщения, для которого scanner сможет построить асинхронные вычисления. Для Scan вычисления, созданные с помощью scanner, будут исполнены после его построения, и результат станет результатом асинхронного вычисления, которое возвратит Scan. Для TryScan задается время сканирования очереди. Если в течении данного времени scanner построит вычисления, то они выполнятся и их результат станет результатом асинхронного вычисления, созданного TryScan. В противном случае асинхронное вычисление возвратит None.

В формировании MailboxProcessor-а принимает участие лямбда-функция, которая генерирует асинхронные вычисления, которые будет выполнять агент. Данная функция принимает как параметр сам процессор, что позволяет в созданном вычислении использовать вышеописанный интерфейс взаимодействия с очередью (очень редко применяются методы PostAndReply, часто Post, Receive, Scan). Таким образом, происходит обработка очереди асинхронными вычислениями. Клиент, построенный процессором, использует тот же интерфейс для взаимодействия с очередью сообщений, но обычно (в большинстве сценариев) выполнение методов Receive, Scan на его стороне не происходит.

Использование MailboxProcessor:

• Кеши (caches) данных при работе с их хранилищами (SQL базы данных, ...). Независимо от типа хранилища, в приложении часто возникает задача доступа к данным по определенным критериям (как правило, по идентификатору записи). Это может привести к заметному понижению быстродействия системы. Решение на базе MaiboxProcessor приведено в следующем коде:

// Сообщения кеша:
//        GetFromCache: id - идентификатор ресурса - generic
//                               AsyncReplyChannel<Choice<'res, exn>> - канал ответа для передачи ресурса    
//        ClearCache - чистка кеша
type cacheCommands<'id,'res> = 
    | GetFromCache of 'id * AsyncReplyChannel<Choice<'res, exn>>
    | ClearCache

// класс Cache инкапсулирует MailboxProcessor
// getById - функция получения данных с источника в случае кеш промаха
// cleanupCacheInterval - интервал очистки кеша
type Cache<'itemId,'item when 'itemId : comparison > (getById, cleanupCacheInterval) =
    // MailboxProcessor работает на пуле потоков
    let inbox = MailboxProcessor.Start(fun inbox ->
        // цикл обработки сообщений в очереди
        // cache - состояние кеша - Map - отображение id на ресурс
        let rec loop cache = async {
            let! res = inbox.Receive() 
                  //берем новое сообщение
                  //поток MailboxProcessor вернется пулу до появления сообщения
            match res with
            | GetFromCache (itemId:'itemId, rc) ->
                let (item, cache')= 
                    match Map.tryFind itemId cache with 
                    | None -> 
                        // кеш промах
                        try
                        match getById itemId with
                        | None -> (Choice2Of2(KeyNotFoundException() :> exn), cache)
                        | Some (item:'item) -> 
                            (Choice1Of2(item), Map.add itemId item cache)
                        with exc ->
                            (Choice2Of2(exc), cache)
                    | Some item ->
                        // кеш попадание
                        (Choice1Of2(item), cache)
                rc.Reply item //возвращаем результат
                return! loop cache' //идем на новую итерацию цикла
            | ClearCache -> 
                return! loop Map.empty //полная очистка кеша
        }
        loop Map.empty)
    //цикл чистки кеша через фиксированые интервалы
    let rec runCleanupCicle () =
            async {
                do! Async.Sleep cleanupCacheInterval //поток вернется пулу
                inbox.Post ClearCache
                return! runCleanupCicle ()
            }
    do if cleanupCacheInterval > 0 then 
            runCleanupCicle () |> Async.Start
    with
        //интерфейс кеша
        member x.TryGet(itemId) =
            match inbox.PostAndReply( fun rc -> GetFromCache (itemId, rc) ) with
            | Choice1Of2 item -> Some item
            | _ -> None
        member x.Get(itemId) =
            match inbox.PostAndReply( fun rc -> GetFromCache (itemId, rc) ) with
            | Choice1Of2 item -> item
            | Choice2Of2 e -> raise e
        member x.Cleanup() =
            inbox.Post ClearCache


Другая проблема возникает при операция модификации данных в хранилище (insert, delete, update). Такие операции оптимально группировать (batch mode). Для этого можно реализовать кеш группировки данные с разных потоков:

// сообщения кеша:
//         Save: сохранить сущность в кеше
//         SaveToDB: сохранение данных в источнике
//         UpdateSaveState: операция в потоке кеша над полным его состоянием
type commandsSave<'itemId, 'item when 'itemId: comparison> = 
    | Save of 'itemId * 'item
    | SaveToDB
    | UpdateSaveState of (Map<'itemId, 'item> -> Map<'itemId, 'item>)

// saveToDB - непосредственное обращение к источнику с передачей Map<'itemId, 'item>
// savingInterval - интервал обращения к источнику
type SavingCache<'itemId, 'item when 'itemId : comparison>(saveToDB, savingInterval) = 
    let inbox = 
        MailboxProcessor.Start(
            fun inbox ->
                // цикл обработки сообщений
                let rec loop cache = 
                    async{
                        let! msg = inbox.Receive()                        
                        let cache' =
                            match msg with
                            | Save (key: 'itemId, value: 'item) ->
                                Map.add key value cache //добавление данных в кеш
                            | SaveToDB ->
                                saveToDB cache
                                Map.empty
                            | UpdateSaveState updater ->
                                updater cache //изменения состояния кеша через функцию updater (сообщение Save - частный случай) 
                        return! loop cache'
                    }
                loop Map.empty
        )
    // цикл сохранения данных
    let rec runCleanSaveCicle () = 
        async{
            do! Async.Sleep savingInterval
            inbox.Post(SaveToDB)
            return! runCleanSaveCicle()
        }
    do if savingInterval > 0 then runCleanSaveCicle() |> Async.Start
    with
        member x.Save(itemId, item) = inbox.Post( Save (itemId, item) )
        member x.UpdateBy f = inbox.Post <| UpdateSaveState f


• MailboxProcessor и есть машиной состояний, которая имеет хотя бы одно состояние. Для набора состояний, количество которых больше одного, для каждого из них определяется набор сообщений, которые могут быть обработаны. Во время обработки определенного сообщения возможен переход в другое состояние. Лямбда-функция, которая передается в конструктор MailboxProcessor определяет набор асинхронных рекурсивных вычислений, по одному на каждое состояние машины. В каждом таком вычислении происходит ожидание одного сообщения из всех возможных из очереди (Receive), или определенного подмножества сообщений (Scan). После ожидания происходит обработка, при которой возможен переход к вычислению для другого состояния машины, продолжение работы с текущим состоянием или завершения работы. Далее следует пример — прокси для работы с удаленным агентом. Определены состояния working, stoped, recovery. Все сообщения представлены в виде типа RemoteAgentProxyCommand:

// ProcessData - обработать сообщение
// Reconfigure  - 
//

type private RemoteAgentProxyCommand<'data, 'result> =
    | ProcessData of 'data * ('result option -> unit)
    | Reconfigure of string
    | Suspend | Resume
    | CleanupExpiredData

type RemoteAgentProxy<'data, 'result>(transport, ?cleanupInterval) =
    let cleanupInterval = defaultArg cleanupInterval 60000 
    let processor = MailboxProcessor.Start(fun inbox -> 
        let cleanup state now = …//очистка состояния
        let send state = async { … }  //отсылка данных по сети
        // рабочее состояние      
        let rec working (state: Map<_, _>) = async {            
            let! msgOpt = inbox.TryReceive(1000) //попытка получить сообщение за 1 секунду
            let now = DateTime.UtcNow
            match msgOpt with
            | None when state.Count = 0 -> //cообщение не получено но и состояние пустое
                return! working state //новая итерация в текущем состоянии
            | None ->  // сообщение не получено но состояние не пустое              
                let nextStateHandler = // следующее действие процесора
                    async {                    
                        try
                        let! state' = send state //пытаемся отослать состояние по сети
                        return! working state' //состояние отослано без проблем - продолжаем цикл обочего состояния
                        with e -> 
                            return! recovery(state, 10000) //произошел сетевой сбой - переходим в состояние восстановления
                    }
                return! nextStateHandler
            | Some CleanupExpiredData -> return! working (cleanup state now) //очистка кеша
            | Some (ProcessData (data, channelReply)) -> //добавление элемента данных на отсылку                    
                let expiration = DateTime.UtcNow.AddMilliseconds(float cleanupInterval)
                return! working (Map.add (Guid.NewGuid()) 
                            (expiration, data, channelReply) state)
            | Some Suspend -> //команда остановить работу
                return! stoped (working state)  //переход в состояние stoped 
            | Some _ -> //игнорируем другие сообщения
                return! working state
            }
        and stoped nextStateHandler = inbox.Scan(function | Resume -> Some(nextStateHandler) | _ -> None) //ждем сообщения Resume 
        and recovery(state, timeToRecieve) = async { //восстановление соединения
            let! nextTimeToRecieve =
                if timeToRecieve <= 100 then 
                    async {
                    try                    
                    let! state' = send state
                    return Choice1Of2 state'
                    with e -> return Choice2Of2 10000
                    }
                else async.Return <| Choice2Of2 timeToRecieve
            match nextTimeToRecieve with
            | Choice1Of2 state -> return! working state
            | Choice2Of2 time ->
                let nextDateTime = DateTime.UtcNow.AddMilliseconds(float time)
                let! msg = inbox.TryReceive(timeToRecieve)
                let now = DateTime.UtcNow
                let nextTime = 
                    int (nextDateTime - now).TotalMilliseconds
                match msg with
                | Some (ProcessData (data, channelReply)) -> 
                    channelReply None
                    return! recovery(state, nextTime)
                | Some CleanupExpiredData -> return! recovery (cleanup state now, nextTime)
                | Some Suspend -> return! stoped (recovery(state, nextTime))
                | None -> return! recovery(state, 0)
                | _ -> return! recovery(state, nextTime)
            }
        working Map.empty)


• MailboxProcessor можно использовать для организации асинхронного многоразового канала передачи данных. Его цель — передать данные из одной части приложения в другую (передача возможна между разными потоками), при этом жестко не связывая эти части. Канал – кортеж из двух функций: функции для отправки данных и функции для ожидания их получения без блокировки потока:

let CreateAsyncChannel<'a> () = //'a - тип данных для передачи через канал
        //MailboxProcessor синхронизирует доступ к данным
        let inbox = new MailboxProcessor<_>( fun inbox ->
            // состояние процессора при получении AsyncReplyChannel для ответа
            let rec waitResponse (repl:AsyncReplyChannel<'a*bool>) = 
                inbox.Scan <| function
                    | GetResult repl -> //если находим новый AsyncReplyChannel в очереди сообщений - используем его
                        Some <| waitResponse repl
                    | IntermediateResult res -> //промежуточный результат в очереди
                        repl.Reply (res, false)
                        Some <| waitRepl ()
                    | Result res -> //финальный результат в очереди
                        repl.Reply (res, true)
                        Some <| async.Return ()
            and waitRepl () = 
                //начальное состояние процессора - ожидаем в очереди сообщения GetResult  
                //при этом все другие сообщения остаются в очереди процессора
                //repl - канал для передачи ответа
                inbox.Scan <| function
                    | GetResult repl -> Some <| waitResponse repl
                    | _ -> None
            waitRepl ()
        )
        inbox.Start()
        // первая функция кортежа resultWaiter - ожидание данных 
        let resultWaiter timeout =             
            inbox.PostAndTryAsyncReply ((fun replChannel -> GetResult replChannel), timeout)
        // вторая функция кортежа postResult - многоразовая отсылка данных
        // данные собираются в очереди MailboxProcessor
        let postResult closeChannel = 
            if closeChannel then Result else IntermediateResult
            >> inbox.Post
        (resultWaiter, postResult)


Таким образом на основе MailboxProcessor можно построить систему из независимых компонентов, каждый из которых владеет своими ресурсами и предоставляет потокобезопасный интерфейс другим. В следующей статье будет рассмотрены особенности построения машины состояний на основе библиотеки TPL Dataflow.

Tags:
Hubs:
+4
Comments 0
Comments Leave a comment

Articles

Information

Website
www.host-tracker.com
Registered
Founded
Employees
2–10 employees
Location
Украина