.NET

индекс
121,07

Асинхронное программирование — цепочки вызовов

Когда в коде фигурирует пара вызовов BeginXxx()/EndXxx(), это приемлимо. Но что если алгоритм требует несколько таких вызовов подряд, то количество методов (или анонимных делегатов) преумножится и код станет менее читабельным. К счастью, эта проблема решена как в F# так и в C#.


Задача


Итак, представьте что вы хотите в полностью асинхронном режиме скачать веб-страницу и сохранить ее у себя на жестком диске. Для этого нужно

  • Начать скачивать страничку сайта
  • Когда она скачается, начать запись файла на диск
  • Когда запись завершилась, закрыть файловый поток и уведомить пользователя

Простое решение


Наивное решение задачи выглядит примерно вот так:

static void Main(string[] args)
{
  Program p = new Program();
  // начинаем загрузку
  p.DownloadPage("http://habrahabr.ru");
  // ждем 10сек.
  p.waitHandle.WaitOne(10000);
}

// пришлось вывесить несколько переменных
private WebRequest wr;
private FileStream fs;
private AutoResetEvent waitHandle = new AutoResetEvent(false);

// тут мы начинаем скачивать страницу
public void DownloadPage(string url)
{
  wr = WebRequest.Create(url);
  wr.BeginGetResponse(AfterGotResponse, null);
}

// тут мы получаем текст со страницы
private void AfterGotResponse(IAsyncResult ar)
{
  var resp = wr.EndGetResponse(ar);
  var stream = resp.GetResponseStream();
  var reader = new StreamReader(stream);
  string html = reader.ReadToEnd();
  // последний параметр true позволяет писать файлы асинхронно
  fs = new FileStream(@"c:\temp\file.htm", FileMode.CreateNew,
                      FileAccess.Write, FileShare.None, 1024, true);
  var bytes = Encoding.UTF8.GetBytes(html);
  // начинаем запись файла
  fs.BeginWrite(bytes, 0, bytes.Length, AfterDoneWriting, null);
}

// когда файл записан, устанавливаем wait handle
private void AfterDoneWriting(IAsyncResult ar)
{
  fs.EndWrite(ar);
  fs.Flush();
  fs.Close();
  waitHandle.Set();
}

Это решение еще цветочки. Представьте, например, что вам дополнительно нужно из файла асинхронно скачать и сохранить все картинки, и только когда они все сохранены открыть папку в которую они были записаны. Используя парадигму выше, это настоящий кошмар.

Решение через анонимные делегаты


Первое, что можно сделать – это сгруппировать кусочки функционала в анонимные делегаты[1]. Тогда получится примерно следующее:

private AutoResetEvent waitHandle = new AutoResetEvent(false);
public void DownloadPage(string url)
{
  var wr = WebRequest.Create(url);
  wr.BeginGetResponse(ar =>
  {
    var resp = wr.EndGetResponse(ar);
    var stream = resp.GetResponseStream();
    var reader = new StreamReader(stream);
    string html = reader.ReadToEnd();
    var fs = new FileStream(@"c:\temp\file.htm", FileMode.CreateNew,
                        FileAccess.Write, FileShare.None, 1024, true);
    var bytes = Encoding.UTF8.GetBytes(html);
    fs.BeginWrite(bytes, 0, bytes.Length, ar1 =>
      {
        fs.EndWrite(ar1);
        fs.Flush();
        fs.Close();
        waitHandle.Set();
      }, null);
  }, null);
}

Это решение выглядит хорошо, но если цепочка вызовов действительно большая, то код не будет читабельным, и им будет сложно управлять. Это особенно касается ситуаций когда, например, выполнение цепочки нужно приостановить и отменить.

Решение с использованием asynchronous workflows


Workflow – это конструкт F#. Идея примерно такая – вы определяете некий блок, в котором некоторые операторы (такие как let, например) переопределены. Asynchronous workflow – это такой workflow, внутри которого переопределены операторы (let!, do! и другие) так, что эти операторы позволяют «дождаться» завершения операции. То есть, когда мы пишем

async {
  ⋮
  let! x = Y()
  ⋮
}

это значит что мы делаем вызов BeginYyy() для Y, а когда результат доступен, записываем результат в x.

Тем самым, аналог скачивания и записи файла на F# будет выглядеть примерно вот так:

// вот как надо строить пару начало/конец для асинхронных вызовов
// этот метод был по непонятным причинам убран из последних сборок F#
type WebRequest with
  member x.GetResponseAsync() =
    Async.BuildPrimitive(x.BeginGetResponse, x.EndGetResponse)
let private DownloadPage(url:string) =
  async {
    try
      let r = WebRequest.Create(url)
      let! resp = r.GetResponseAsync() // let! позволяет дождаться результата
      use stream = resp.GetResponseStream()
      use reader = new StreamReader(stream)
      let html = reader.ReadToEnd()
      use fs = new FileStream(@"c:\temp\file.htm", FileMode.Create,
                              FileAccess.Write, FileShare.None, 1024, true);
      let bytes = Encoding.UTF8.GetBytes(html);
      do! fs.AsyncWrite(bytes, 0, bytes.Length) // ждем пока все запишется
    with
      | :? WebException -> ()
  }
// вызов ниже делает синхронный вызов метода, но поведение внутри метода - асинхронное
Async.RunSynchronously(DownloadPage("http://habrahabr.ru"))

Используя хитрые синтактические конструкции, F# позволяет нам с помощью специально созданных «примитивов» (таких как GetResponseAsync() и AsyncWrite()) производить вызовы с Begin/End семантикой, но без разделения их на отдельные методы или делегаты. Как ни странно, примерно то же самое можно делать и в C#.

Решение с использованием asynchronous enumerator


Джефри Рихтер, всем известный автор книги CLR via C#, является также автором библиотеки PowerThreading. Эта библиотека[2] предоставляет ряд интересных фич, одна из которых – реализация аналога asynchronous workflow на C#.

Делается это очень просто – у нас появляется некий «менеджер токенов» под названием AsyncEnumerator. Этот класс фактически позволяет прерывать исполнение метода и продолжать его снова. Как можно прервать исполнение метода? Это делается с помощью нехитрого выражения yield return.

Использовать AsyncEnumerator просто. Берем и добавляем его как параметр в наш метод, а также меняем возвращаемое значение на IEnumerator<int>:

public IEnumerator<int> DownloadPage(string url, AsyncEnumerator ae)
{
  ⋮
}

Далее, пишем код с использованием BeginXxx()/EndXxx(), используя три простых правила:

  • Каждый BeginXxx() в качестве callback-параметра получает ae.End()
  • Каждый EndXxx() в качестве токена IAsyncResult получает ae.DequeueAsyncResult()
  • Каждый раз когда нужно чего-то ждать, мы делаем yield return X, где X – количество начатых операций

Вот как выглядит наш метод скачивания при использовании AsyncEnumerator:

public IEnumerator<int> DownloadPage(string url, AsyncEnumerator ae)
{
  var wr = WebRequest.Create(url);
  wr.BeginGetResponse(ae.End(), null);
  yield return 1;
  var resp = wr.EndGetResponse(ae.DequeueAsyncResult());
  var stream = resp.GetResponseStream();
  var reader = new StreamReader(stream);
  string html = reader.ReadToEnd();
  using (var fs = new FileStream(@"c:\temp\file.htm", FileMode.Create,
                      FileAccess.Write, FileShare.None, 1024, true))
  {
    var bytes = Encoding.UTF8.GetBytes(html);
    fs.BeginWrite(bytes, 0, bytes.Length, ae.End(), null);
    yield return 1;
    fs.EndWrite(ae.DequeueAsyncResult());
  }
}

Как видите, асинхронный метод теперь записан в синхронном виде – мы даже умудрились использовать using для файлового потока. Код стал более читабелен, если конечно не считать дополнительные вызовы yield return.

Теперь осталось только вызвать этот метод:

static void Main()
{
  Program p = new Program();
  var ae = new AsyncEnumerator();
  ae.Execute(p.DownloadPage("http://habrahabr.ru", ae));
}   

Заключение


Проблема цепочек асинхронных вызовов оказалась не такой страшной. Для простых ситуаций подойдут анонимные делегаты; для сложных есть асинхронные воркфлоу и AsyncEnumerator, в зависимости от того, какой язык вам ближе.

Цепочки – это просто, а что делать с целыми графами зависимостей? Об этом – в следующем посте. ■

Заметки


  1. ↑ Примечательно то, что можно изначально сделать отдельные методы, а потом «заинлайнить» их с помощью ReSharper’а.
  2. ↑ Библиотека действительно интересная – советую открывать ее в Reflector’е, там много вкусного. Также, заметьте что лицензия библиотеки позволяет использовать ее только на Windows, что наверняка разозлит фанатов Mono
+32
6 октября 2009, 11:47
36

комментарии (15)

0
Meroving #
Хм, вроде только была на главной, а теперь нету. Видимо не всем интересно :)
Вообще очень неплохо, но особенно конечно доставляет удовлетворение своей очевидностью способ F#. Кстати сейчас готовлю один перевод про F#, там это упоминается как одно из достоинств. И, в принципе, резонно.
0
mezastel #
Пока еще на главной.
–1
IBB4 #
Повторюсь: Я, конечно, зануда но…
1. Нет такого понятия как анонимные делегаты, есть анонимные методы.
2. Для асинхронной работы помимо библиотеки Рихтера есть еще и TPL, точнее будет в .Net 4. (С Рихтером, кстати, забавно на эту тему весной пообщались)
3. Вообще же такие вещи, там где это поддерживается, делаются посредством continuation passing style (http://en.wikipedia.org/wiki/Continuation-passing_style)
И небезизвестный Эрик Мейер сейчас пишет довольно забавный фреймворк на шарпе для работы в таком стиле через LINQ — вот это действительно круто :)
Посмотреть об этом можно, например, здесь: channel9.msdn.com/shows/Going+Deep/Expert-to-Expert-Brian-Beckman-and-Erik-Meijer-Inside-the-NET-Reactive-Framework-Rx/
–1
mezastel #
Не пишите комментарии на зеркала блогов – никто не ответит :)

1. Семантика. Можно погуглить ‘anonymous delegate’ и найти достаточно упоминаний.
2. Никто не спорит что в task-level parallelizm в дотнете все в порядке. Но вот всякие Parallel.For оттуда же – не круто.
3. Continuation passing style работает в простых ситуациях, когда задачу можно разбить на несколько Func<U,V,...>. AsyncEnumerator и F# позволяют формировать цепочки так же, как если бы вы писали синхронный код. То есть там например могут переплетаться изменения состояний, scope’ы, и так далее.
Про Reactive Framework – тут уже на хабре было, в принципе bindable linq неплохо работает :)
0
IBB4 #
Тогда закройте комментарии в зеркале.

1. Например, погуглив по словам «папа римский педофил», тоже много чего найти можно, но это не значит, что так и есть на самом деле, иными словами это лишь доказывает, что ошиблись вы не один… Формально есть термин «анонимный метод», а термина «анонимный делегат» — нет. Хотя для блога простительно, как выразился один мой знакомый — это концерт в халате на лестничной клетке… )
2. Parallel.For — как раз очень круто. Но речь не о нем, а о том, что там тоже есть поддержка континуейшенов в достаточно приемлемом виде.
3. Continuation, как раз и были придуманы для того, чтобы писать асинхронный код, как синхронный, а AsyncEnumerator-ы уже попытка эмулировать это дело, но Reactive кажется удачнее.
Вообще CPS был придуман функциональщиками, где каждая функция независима и общего состояния нет (что здорово) и ими же показана успешность применения такого подхода, отсюда утверждение, что могут переплетаться изменение состояний, и т. д. — звучит как антиреклама… ))
И в принципе, я бы хотел посмотреть на задачу, которую невозможно разбить на несколько независимых функций с явной передачей состояния так, чтобы она не стала от этого понятнее и читаемее… :)
–1
nochnoy #
Спасибо. Узнал, что в дотнете живётся не так плохо как я думал )
+1
mezastel #
У нас в дотнете все ажурно. Точнее Azure’но. :)
0
fuCtor #
Интересно но не привычно (после С и ему подобных языков). Еще подробно расписано про данную возможность здесь:
msdn.microsoft.com/ru-ru/magazine/cc967279.aspx
0
mezastel #
Попробовал прочитать, но, увы… на русском языке читать нереально. English version
НЛО прилетело и опубликовало эту надпись здесь
+1
mezastel #
Я об этом хотел через пост написать. В следующем про обычный граф, в потом — про conditional graph, что у нас и имеется.

Задачка прикольная. И соглашусь — с такими вещами «голый» шарп справляется достаточно плохо.
0
iZENfire #
На Java, например, каждая часть задачи заводилась бы в собственный java.lang.Thread (обеспечение инкапсуляции данных и кода), а те, в свою очередь, «цеплялись» друг к другу через вызов join() и, таким образом, обеспечивали последовательное взаимоувязанное выполнение этапов задачи.
Из нитей, увязанных в граф зависимостей через join(), можно получать весьма нетривиальную функциональность, не теряя управляемости кодом.
0
mezastel #
А топологическая сортировка зависимостей сама собой произойдет?
0
iZENfire #
В данном случае, что я привёл, топологическая сортировка зависимостей в самом графе — join() ждёт смерти объекта-нити, окончания которой ожидается в текущей нити, прежде чем запустить текущую нить.
0
Smerig #
Ничего против Вас не имею, но не слишком ли много var?

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.