Пользователь
0,0
рейтинг
27 сентября 2014 в 13:33

Разработка → UDP и C# async/await tutorial

C#*, .NET*
Недавно возникла необходимость решить следующую несложную задачку: есть несколько десятков устройств (учебных комплексов), у которых нужно регулярно запрашивать их текущее состояние. Комплексы общаются по протоколу UDP, и хотелось сделать так, чтобы не задумываться о цикле опроса и определении, от какого же устройства пришел ответ, а просто посылать запрос — и когда пришел результат — записывать его. Задачу эту я решал и раньше, но захотелось посмотреть, насколько концепция async/await упростит и сократит код. Оказалось, что финальный результат занимает меньше странички.



Вся логика опроса состоит всего лишь из двух методов — цикла чтения сокета UDP и метода посылки команды на устройство.

Когда посылаем команду, есть две вещи, которые надо принять во внимание — это 1) после посылки команды нам надо ждать ответа от устройства и 2) ответ может не прийти — тогда необходимо вернуть исключение, которое скажет нам о таймауте.

Асинхронный метод посылки команды выглядит следующим образом (*см. Update 1):

        public async Task<byte[]> SendReceiveAsync(byte[] msg, string ip, int port, int timeOut)
        {
            var endPoint = new IPEndPoint(IPAddress.Parse(ip), port);
            var tcs = new TaskCompletionSource<byte[]>();

            try
            {
                var tokenSource = new CancellationTokenSource(timeOut);
                var token = tokenSource.Token;
                if (!_tcsDictionary.ContainsKey(endPoint)) _tcsDictionary.TryAdd(endPoint, tcs);
                _client.Send(msg, msg.Length, ip, port);

                var result = await tcs.Task.WithCancellation(token);
                return result;
            }

            finally
            {
                _tcsDictionary.TryRemove(endPoint, out tcs);
            }
        }

Здесь _client — это стандартный UdpClient.
Мы посылаем команду и по await ждем результата, который нам должен вернуть Task, сохраненный в словарике с ключом нашего соединения (именно от него мы и ждем ответ). Когда чтение начинается — мы заносим TaskCompletionSource в словарик, когда мы получаем ответ и соединение больше не нужно, либо при выброшенном исключении — удаляем из словарика.

Сам словарик (ConcurrentDictionary используем вместо Dictionary для того, чтобы избежать проблем с кросспоточными вызовами):

private ConcurrentDictionary<Tuple<string,int>, TaskCompletionSource<byte[]>> _tcsDictionary;


Тут есть момент, который заслуживает внимания — это метод-расширение WithCancellation(token). Он нужен для того, чтобы поддержать отмену операции при помощи CancellationToken, и отменяет задачу, возвращая исключение при превышении заданного таймаута.


    static class TaskExtension
    {
        public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
        {
            var tcs = new TaskCompletionSource<bool>();

            using (cancellationToken.Register(
                        s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs))
                if (task != await Task.WhenAny(task, tcs.Task))
                    throw new OperationCanceledException(cancellationToken);
            return await task;
        }
    }


А вот и сам цикл чтения: читаем, пока хватит сил, и если пришедшая датаграмма имеет адресом соединение, ключ с параметрами которого мы уже занесли в словарик, то результат помещается в TaskCompletionSource по этому ключу, и мы переходим обратно в метод посылки сообщения на await tcs.Task, только уже имея на руках нужный результат от устройства, этот результат и вернем в место вызова.

            Task.Run(() =>
            {
                IPEndPoint ipEndPoint = null;

                while (true)
                {
                    try
                    {
                        var receivedBytes = _client.Receive(ref ipEndPoint);
                        TaskCompletionSource<byte[]> tcs;
                        if (_tcsDictionary.TryGetValue(ipEndPoint, out tcs)) tcs.SetResult(receivedBytes);
                    }
                    catch (SocketException)
                    {
                        ;//при невозможности соединения продолжаем работать
                    }

                }
            });

Итог радует. Вот так async-await упростил задачу опроса множества устройств по протоколу UDP.

Update 1
Как было справедливо отмечено в комментариях, метод SendReceiveUdpAsync необходимо обернуть в try{} finally{}, чтобы в случае отмены задачи и выброса исключения удалялось значение из словарика.

Update 2
Использование Reactive Extensions для той же задачи
habrahabr.ru/post/238445
Константин @NeoNN
карма
76,0
рейтинг 0,0
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Спецпроект

Самое читаемое Разработка

Комментарии (16)

  • +9
    Замечания.
    1. Достаточно было написать в основном методе
    using (cancellationToken.Register(() => tcs.TrySetCancelled()))
        await tcs.Task;
    

    Это куда проще, чем получившийся метод-расширение WithCancellation

    2. Метод-расширение WithCancellation тоже пишется проще.
    public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken) {
        await task.ContinueWith(_ => {}, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
        return await task;
    }
    


    3. У вас в случае отмены задачи остается мусор — а все потому что _tcsDictionary.Remove(key) надо делать в блоке finally!

    4. По поводу самого цикла чтения — имейте в виду, этот цикл навсегда забирает один из потоков пула. Если таких циклов во всей программе случайно около 20 и более — начнутся загадочные проблемы. Вам следовало бы либо явно создать отдельный поток — либо использовать конструкцию await и тут тоже.

    5. У вас словарь _tcsDictionary никак не защищен. Следует либо использовать специальный однопоточный планировщик задач (и добавить всюду проверки, что метод именно в этом потоке и выполняется) — либо заменить Dictionary на ConcurrentDictionary
    • 0
      Спасибо, ценные комментарии. По поводу 1, 2, и особенно 3 пункта — полностью согласен. Метод SendReceiveUdpAsync должен быть обернут в try{} finally{}, где удаляется значение словарика. По пункту 4 — цикл чтения всего один, а вот насчет 5го пункта я не совсем понимаю, какие проблемы могут возникнуть и в каких ситуациях, если будет использован простой Dictionary, а не Concurrent?
      • 0
        а вот насчет 5го пункта я не совсем понимаю, какие проблемы могут возникнуть и в каких ситуациях, если будет использован простой Dictionary, а не Concurrent?
        А попробуйте — и увидите сами. Создайте 1000 потоков, которые будут сначала добавлять что-нибудь в словарь, а потом удалять обратно — и так много раз.

        Надо просто запомнить — нельзя обращаться к одному и тому же объекту из разных потоков одновременно — если в документации явно не разрешено обратное.
        • +1
          А почему нельзя доступ к ресурсу обернуть в Monitor.Enter или Lock(object)?
          • +5
            Можно. Но ConcurrentDictionary работает быстрее, чем обычный Dictionary, обернутый в lock.
            • 0
              Cпсибо
        • 0
          Так ведь метод вызывается из одного потока, а не из нескольких. Нужно ли здесь обеспечение кросспоточной безопасности, если оно все равно не понадобится (YAGNI)?
          • +1
            Я вижу как минимум два потока — в одном вызывается метод SendReceiveUdpAsync, а в другом крутится цикл чтения.
            • 0
              А и правда что. Даже несмотря на то, что в цикле чтения значения из словарика только считывается — все равно надо обеспечить безопасность.
    • 0
      2. Метод WithCancellation написан Стивеном Таубом. Вы уверены, что знаете лучше него, как правильно? :)
      • 0
        Да, уверен. Если бы я писал комментарий к той статье, на которую вы дали ссылку — я бы написал то же самое :)
        • 0
          Да, вы правы. Пролистав комментарии до конца, нашел предложение, практически идентичное по реализации:

          public static Task<T> WithCancellation<T>(this Task<T> task, CancellationToken token)
          {
              return task.ContinueWith(t => t.GetAwaiter().GetResult(), token);
          }
          


          Ответ автора:
          That's reasonable. If you're planning to use this for real, you'd probably want a few tweaks:
          1. First check task.IsCompleted, and if it's true, just return 'task' rather than doing a continuation.
          2. Pass TaskScheduler.Default as the scheduler and TaskContinuationOptions.ExecuteSynchronously as the continuation options to the call to ContinueWith.

          Итого в сухом остатке только проверка.
  • +4
    Еще два замечания.
    6. Перед вызовом _tcs.Dictionary.Remove(key) нет никакого смысла проверять наличие этого ключа там. Во-первых, этот ключ там по логике программы всегда есть, а во-вторых Remove сама умеет делать эту проверку.

    7. В цикле чтения вместо SetResult надо использовать TrySetResult, особенно если вы последовали моему совету номер 1. В противном случае, если успеть отменить чтение, когда ключ уже получен из словаря, но SetResult еще не вызван — весь цикл чтения упадет с ошибкой.

    8. Если в цикле чтения возникнет ошибка — она будет проглочена, и никто о ней не узнает. Просто программа внезапно перестанет получать ответы от серверов, и все… Лучше бы эту ошибку как-нибудь логировать, а еще лучше — пытаться пересоздать сокет.
  • +4
    9. Вместо использования Tuple<string, int> в качестве ключа в словаре можно использовать непосредственно класс IPEndPoint. Это ничуть не усложнит код SendReceiveUdpAsync — но значительно упростит код цикла чтения.

    10. В редких случаях вызов _client.Send(msg, msg.Length, ip, port) может заблокировать поток на некоторое время. Внутри асинхронных функций лучше бы использовать метод SendAsync.
  • 0
    Сделал вариант попроще на RX — habrahabr.ru/post/238445/
    • 0
      Здорово, уже прочитал! Стоит изучить Rx в свободное время.

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.