Мириады запущенных задач на C#

    Недавно на ресурсе Medium были опубликованы две статьи от одного и того же автора, затрагивающие функциональность C# async/await.


    Основными выводами были:


    • рекурсивный вызов асинхронного метода в C# подвержен StackOverflowException
    • goroutine'ы лучше задач (тасков) в .NET в плане производительности

    Но главная проблема вышеприведенных публикаций — абсолютное непонимание модели кооперативной многозадачности в C# с вводом читателей в заблуждение. Сами же бенчмарки — бессмысленные, как мы увидим позже.


    Далее в статье я попытаюсь раскрыть суть проблемы более подробно с примерами решения.


    TL;DR

    После небольшой правки кода исходных примеров, реализация бенчмарка на .NET оказывается быстрее варианта Go. Попутно решаем проблему переполнения стека у рекурсивных асинхронных методов.


    NB: использоваться будут свежевыпущенный .NET Core 2.0 и Go 1.8.3.


    Stack overflow & async


    Перейдем сразу к рассмотрению примера #1:


    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace CSharpAsyncRecursion
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Counted to {0}.", CountRecursivelyAsync(10000).Result);
            }
    
            static async Task<int> CountRecursivelyAsync(int count)
            {
                if (count <= 0)
                    return count;
                return 1 + await CountRecursivelyAsync(count - 1);
            }
        }
    }

    Консоль упадет со StackOverflowException. Печаль!


    Вариант реализации tail-call оптимизации здесь не подходит, т.к. мы не собираемся править компилятор, переписывать байт-код и т.п.


    Поэтому решение должно подходить для максимально общего случая.


    Обычно рекурсивный алгоритм заменяют на итерационный. Но в данном случае нам это не подходит также.


    На помощь приходит механизм отложенного выполнения.
    Реализуем простой метод Defer:


    Task<T> Defer<T, TState>(Func<TState, Task<T>> task, TState state) => 
        Task.Factory.StartNew(async s => await task((TState)s), state).Unwrap();

    Для того, чтобы поставить задачу в очередь необходимо указание планировщика.
    Методы Task.Run и Task.Factory.StartNew позволяют его использовать (По-умолчанию — TaskScheduler.Default, который для данного примера и так подойдет), а последний позволяет передать объект-состояние в делегат.


    На даный момент Task.Factory.StartNew не подерживает обобщенные перегрузки и вряд ли будет. Если необходимо передать состояние, то либо Action<object>, либо Func<object, TResult>.

    Перепишем пример, используя новый метод Defer:


    static async Task Main(string[] args)
    {
        Task<T> Defer<T, TState>(Func<TState, Task<T>> task, TState state) => 
            Task.Factory.StartNew(async s => await task((TState)s), state).Unwrap();
    
        Task<int> CountRecursivelyAsync(int count)
        {
            if (count <= 0)
                return Task.FromResult(count);
    
            return Defer(seed => CountRecursivelyAsync(seed - 1).ContinueWith(rec => rec.Result + 1), count);
        }
    
        Console.WriteLine($"Counted to {await CountRecursivelyAsync(100000)}.");
    }

    Оно не то, чем кажется


    Для начала ознакомимся с кодом бенчмарков из этой статьи.


    Код на Go:
    package main
    
    import (
        "flag";
        "fmt";
        "time"
    )
    
    func measure(start time.Time, name string) {
        elapsed := time.Since(start)
        fmt.Printf("%s took %s", name, elapsed)
        fmt.Println()
    }
    
    var maxCount = flag.Int("n", 1000000, "how many")
    
    func f(output, input chan int) {
        output <- 1 + <-input
    }
    
    func test() {
        fmt.Printf("Started, sending %d messages.", *maxCount)
        fmt.Println()
        flag.Parse()
        defer measure(time.Now(), fmt.Sprintf("Sending %d messages", *maxCount))
        finalOutput := make(chan int)
        var left, right chan int = nil, finalOutput
        for i := 0; i < *maxCount; i++ {
            left, right = right, make(chan int)
            go f(left, right)
        }
        right <- 0
        x := <-finalOutput
        fmt.Println(x)
    }
    
    func main() {
        test()
        test()
    }

    C#-код:
    using System;
    using System.Diagnostics;
    using System.Linq;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Channels;
    
    namespace ChannelsTest
    {
        class Program
        {
            public static void Measure(string title, Action<int, bool> test, int count, int warmupCount = 1)
            {
                test(warmupCount, true); // Warmup
                var sw = new Stopwatch();
                GC.Collect();
                sw.Start();
                test(count, false);
                sw.Stop();
                Console.WriteLine($"{title}: {sw.Elapsed.TotalMilliseconds:0.000}ms");
            }
    
            static async void AddOne(WritableChannel<int> output, ReadableChannel<int> input)
            {
                await output.WriteAsync(1 + await input.ReadAsync());
            }
    
            static async Task<int> AddOne(Task<int> input)
            {
                var result = 1 + await input;
                await Task.Yield();
                return result;
            }
    
            static void Main(string[] args)
            {
                if (!int.TryParse(args.FirstOrDefault(), out var maxCount))
                    maxCount = 1000000;
                Measure($"Sending {maxCount} messages (channels)", (count, isWarmup) => {
                    var firstChannel = Channel.CreateUnbuffered<int>();
                    var output = firstChannel;
                    for (var i = 0; i < count; i++) {
                        var input = Channel.CreateUnbuffered<int>();
                        AddOne(output.Out, input.In);
                        output = input;
                    }
                    output.Out.WriteAsync(0);
                    if (!isWarmup)
                        Console.WriteLine(firstChannel.In.ReadAsync().Result);
                }, maxCount);
                Measure($"Sending {maxCount} messages (Task<int>)", (count, isWarmup) => {
                    var tcs = new TaskCompletionSource<int>();
                    var firstTask = AddOne(tcs.Task);
                    var output = firstTask;
                    for (var i = 0; i < count; i++) {
                        var input = AddOne(output);
                        output = input;
                    }
                    tcs.SetResult(-1);
                    if (!isWarmup)
                        Console.WriteLine(output.Result);
                }, maxCount);
            }
        }
    }

    Что брасается сразу в глаза:


    1. Сам пример (что для Go, что для C#) весьма странен. Все сводится к эмуляции цепочки действий и их лавинообразном 'спуске'. Более того в Go создается chan int на каждую итерацию из 1 млн. Это вообще best-practice??
    2. автор использует Task.Yield(), оправдывая это тем, что иначе пример упадет с StackOverflowException. С таким же успехом мог бы и Task.Delay задействовать. Зачем мелочиться-то?! Но, как увидели ранее, все проистекает из-за 'неудачного' опыта с рекурсивными вызовами асинхронных методов.
    3. Изначально в примерах также фигурирует бета-версия System.Threading.Tasks.Channels для сравнения с каналами в Go. Я решил оставить только пример с тасками, т.к. библиотека System.Threading.Tasks.Channels еще не выпущена официально.
    4. Вызов GC.Collect() после прогрева. Боюсь, я откажусь от такого сомнительного преимущества.

    Go использует понятие goroutine — легковесных потоков. Соответственно каждая горутина имеет свой стек. На данный момент размер стека равен 2KB. Поэтому при запуске бенчмарков будьте осторожны (более 4GB понадобиться)!

    С одной стороны, это может быть полезно CLR JIT'у, а с другой — Go переиспользует уже созданные горутины, что позволяет исключить замеры трат на выделение памяти системой.

    Результаты до оптимизации


    Среда тестирования:


    • Core i7 6700HQ (3.5 GHz)
    • 8 GB DDR4 (2133 MHz)
    • Win 10 x64 (Creators Update)

    Ну что ж, у меня получились следующие результаты:


    Warmup (s) Benchmark (s)
    Go 9.3531 1.0249
    C# - 1.3568

    NB: Т.к. пример реализует просто цепочку вызовов, то ни GOMAXPROCS, ни размер канала не влияют на результат (уже проверено опытным путем). В расчет берем наилучшее время. Флуктуации не совсем важны, т.к. разница большая.


    Да, действительно: Go опережает C# на ~30%.
    Challange accepted!

    Используй TaskScheduler, Luke!


    Если не использовать что-то наподобие Task.Yield, то снова будет StackOverflowException.


    На этот раз не будем использовать Defer!


    Мысль реализации проста: запускаем доп. поток, который слушает/обрабатывает задачи по очереди.


    По-моему, легче реализовать собственный планировщик, чем контекст синхронизации.
    Сам класс TaskScheduler выглядит так:


    // Represents an object that handles the low-level work of queuing tasks onto threads.
    public abstract class TaskScheduler
    {
        /* остальные методы */
    
        public virtual int MaximumConcurrencyLevel { get; }
    
        public static TaskScheduler FromCurrentSynchronizationContext();
    
        protected abstract IEnumerable<Task> GetScheduledTasks();
    
        protected bool TryExecuteTask(Task task);
    
        protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued);
    
        protected internal abstract void QueueTask(Task task);
    
        protected internal virtual bool TryDequeue(Task task);
    }

    Как мы видим, TaskScheduler уж реализует подобие очереди: QueueTask и TryDequeue.


    Дабы не изобретать велосипед, воспользуемся уже готовыми планировщиками от команды .NET.


    Внимание! Камера! Мотор!


    Перепишем это дело на C# 7, делая его максимально приближенным к Go:


    static async Task Main(string[] args)
    {
        void Measure(string title, Action<int, bool> test, int count, int warmupCount = 1
        {
            var sw = new Stopwatch();
            sw.Start();
            test(count, false);
            sw.Stop();
            Console.WriteLine($"{title}: {sw.Elapsed.TotalMilliseconds:0.000}ms");
        }
        async Task<int> f(Task<int> input)
        {
            return 1 + await input; // return output
        }
        await Task.Factory.StartNew(() =>
        {
            if (!int.TryParse(args.FirstOrDefault(), out var maxCount))
                maxCount = 1000000;
            Measure($"Sending {maxCount} messages (Task<int>)", (count, isWarmup) => {
                var tcs = new TaskCompletionSource<int>();
                (var left, var right) = ((Task<int>)null, f(tcs.Task));
                for (var i = 0; i < count; i++)
                {
                    left = f(right);
                    right = left;
                }
                tcs.SetResult(-1);
                if (!isWarmup)
                    Console.WriteLine(right.Result);
            }, maxCount);
        }, CancellationToken.None, TaskCreationOptions.None, new StaTaskScheduler(2));
    }

    Здесь необходимо сделать пару ремарок:


    • GC.Collect() убираем как и говорилось выше
    • Используем StaTaskScheduler с двумя вспомогательными потоками, чтобы избежать блокировки: один ждет результата из главной/последней задачи, а др. обрабатывает саму цепочку задач.

    Проблема рекурсивных вызовов исчезает автоматически. Поэтому смело убираем из метода f(input) вызов Task.Yield(). Если этого не сделать, то можно ожидать чуть более лучший результат по сравнению с исходным, т.к. дефолтный планировщик использует ThreadPool.


    Теперь публикуем релизную сборку:
    dotnet publish -c release -r win10-x64


    И запускаем...



    Внезапно получаем около 600 ms вместо прежних 1300 ms. Not bad!
    Go, напомню, отрабатывал на уровне 1000 ms. Но меня не покидает чувство неуместности использования каналов как средство кооперативной многозадачности в исходных примерах.


    p.s.
    Я не делал огромного количества прогонов тестов с полноценной статистикой распределения значений замеров специально.
    Цель статьи заключалась в освещении определенного use-case'a async/await и попутного развенчания мифа о невозможности рекурсивного вызова асинхронных методов в C#.


    p.p.s.
    Причиной изначального отставания C# было использование Task.Yield(). Постоянное переключение контекста — не есть гуд!

    Поделиться публикацией
    Похожие публикации
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама
    Комментарии 29
    • –17

      // Оффтоп
      Хватит, горшочек, не вари! Шарп, родненкий, ну хорошо же жили вместе уже почти два десятка лет, всё у нас получалось, и космические корабли бороздили просторы вселенной, и сервера с клиентами любили друг друга не смотря на разделяющие их моря и океаны. Но то, что щас происходит — за гранью моего восприятия. Я не знаю, сколько часов должно быть в сутках и в рабочей неделе, чтобы успевать вникать/изучать в новые конструкции и приёмы. Особенно учитывая, что среднестатистический программист разве что курьером не дорабатывает в фирме дабы экономить бюджет и всё совсем не загнулось. Или на западе реально в каждом стартапе/фирме получается выбить деньги на гору программистов, которые могут себе позволить быть настолько специализированными, что таки способны изучать и использовать все эти бесконечные навороты?

      • +10

        Если хочется продвигаться в карьере и расти в з/п — надо постоянно учиться.


        "Пока ты спишь, кто-то качается".

        • +2

          Вопрос исключительно в приоритетах. Ну и не надо работать курьером. Нужна нормальная работа с .NET — пишите, если уверены, что в текущей курьерской ситуации виновата не деструктивная лень.

          • +4
            Вам не надо знать детали всех «наворотов» — вам надо понимать концепции, а это в целом проще. Нюансы можно разобрать по мере необходимости.
            Ну и да, развиваться надо постоянно. И не только в опыте с C#/.NET.
            • +1
              Не изучайте.
            • +2
              StackOverflow можно достичь и вполне естественным способом, например, при написании сетевого пинг-понг приложения. C# по умолчанию предлагает вызвать продолжение метода немедленно, если задача уже выполнена. Обычно это полезно, т.к. сокращает накладные расходы, но может привести и к казусам.

              Да, эту проблему можно решить «костылём» в виде `Task.Yield`, а проблему с накладными расходами при переключении контекста — собственным планировщиком. Но просто обойтись без `Task.Yield`, к сожалению, не получится.
              • 0
                А что если попробовать Task заменить на ValueTask?
                • +1
                  Ничего не поменяется: ValueTask ничем не отличается от Task по семантике.
                  • +1

                    Боюсь, ValueTask здесь не поможет: мы создаем цепочку вызовов, а не просто возвращаем скалярное либо др. примитивное значение.

                  • 0
                    А что по затратам памяти в пике?
                    • +3

                      .NET потребляет примерно ~300 MB в пике.

                    • 0

                      Я запустил Ваш оптимизированный через StaTaskScheduler код и он выдал ~1300мс.
                      Я набросал то же самое на F# ~700мс.


                      Код
                      Картинка из консоли.
                      Первый запуск — релизная сборка Вашей версии, второй запуск — моей.
                      Ссылка на .Net dll + pdb


                      Возможно я что-то сделал не так если разница аж в два раза между языками одной и той же платформы.


                      P.S. я там в for ошибся, мой код на 1 итерацию больше делает :)

                      • 0

                        Ссылка на аплоад dll битая. Вот верная

                        • +2
                          у меня Ваш F# пример выдает ~850-900 ms:


                          Может быть, изначально использовался неоптимизированный C#-пример?

                          • +1

                            Подозреваю что есть ещё что-то, т.к. я точно использовал оптимизированный пример.
                            Учитывая такую непостоянность результатов, нельзя сказать что они у кого-то из нас достоверные.

                            • 0

                              можно поинтересоваться характеристиками Вашей тестовой среды?
                              хотелось бы увидеть в таком случае результаты Go-бенчмарка как базиса тоже.

                              • 0

                                Processor=Intel Core i5-3450 CPU 3.10GHz (Ivy Bridge), ProcessorCount=4
                                Есть предложение лучше — прогнать с BenchmarkDotNet:
                                F# — у меня выдаёт 56.29 ms (я снизил кол-во итераций в коде до 100к)
                                C# — у меня выдаёт 355.1 ms


                                Попробуйте запустить тот же код в релизе (я собирал всё под 4.6.1, т.к. ParallelExtensionsExtra под netcore нет)

                                • +1

                                  занятно! я попробую это дело прогнать у себя тоже.


                                  p.s.
                                  очевидно, что можно просто скопировать исходник StataskScheduler из ParallelExtensionsExtra прямо в проект :D

                                  • +3

                                    изначально я запускал пример на F# без Server GC ((
                                    вот что у меня получилось после правки:


                                     Method |   Count |     Mean |    Error |   StdDev |
                                    ------- |-------- |---------:|---------:|---------:|
                                         C# | 1000000 | 592.1 ms | 97.91 ms | 15.18 ms |
                                         F# | 1000000 | 243.0 ms | 91.02 ms | 15.04 ms |

                                    csharp, fsharp


                                    чуть добавил конфигурации для


                                    F#
                                    module Test
                                    
                                    open BenchmarkDotNet
                                    open BenchmarkDotNet.Running
                                    open BenchmarkDotNet.Attributes
                                    open BenchmarkDotNet.Configs
                                    open BenchmarkDotNet.Jobs
                                    open BenchmarkDotNet.Columns
                                    
                                    let f input = 
                                        async {
                                            let! await = input
                                            return 1 + await
                                        }
                                    
                                    type DefaultConfig() as self =
                                        inherit ManualConfig()
                                        do
                                            self.Add(Job.RyuJitX64.WithTargetCount(4).WithGcServer(true).WithGcForce(false))
                                            self.Add(BaselineScaledColumn.ScaledStdDev)
                                    
                                    [<Config(typedefof<DefaultConfig>)>]
                                    type Worker() = 
                                        [<Benchmark>]
                                        member public x.DoWork() =
                                            let src = async { return -1 }
                                            let first = f src
                                            let mutable output = first
                                    
                                            for i = 1 to x.Count do
                                                let input = f output
                                                output <- input
                                    
                                            output
                                            |> Async.Ignore
                                            |> Async.RunSynchronously
                                    
                                        [<Params(1000000)>]
                                        member val Count = 0 with get, set
                                    
                                    [<EntryPoint>]
                                    let main argv =
                                        let summary = BenchmarkRunner.Run<Worker>()
                                        printfn "%A" summary
                                        0
                                    и
                                    C#
                                    using BenchmarkDotNet.Attributes;
                                    using BenchmarkDotNet.Columns;
                                    using BenchmarkDotNet.Configs;
                                    using BenchmarkDotNet.Jobs;
                                    using BenchmarkDotNet.Order;
                                    using BenchmarkDotNet.Running;
                                    using System;
                                    using System.Threading;
                                    using System.Threading.Tasks;
                                    using System.Threading.Tasks.Schedulers;
                                    
                                    namespace Test
                                    {
                                        [OrderProvider(SummaryOrderPolicy.FastestToSlowest)]
                                        [Config(typeof(Config))]
                                        public class Worker
                                        {
                                            private class Config : ManualConfig
                                            {
                                                public Config()
                                                {
                                                    Add(Job.RyuJitX64.WithTargetCount(4).WithGcServer(true));
                                                    Add(BaselineScaledColumn.ScaledStdDev);
                                                }
                                            }
                                    
                                            async Task<int> f(Task<int> input)
                                            {
                                                return 1 + await input; // return output
                                            }
                                    
                                            [Benchmark]
                                            public void DoWork()
                                            {
                                                Task.Factory.StartNew(() =>
                                                {
                                                    var tcs = new TaskCompletionSource<int>();
                                                    (var left, var right) = ((Task<int>)null, f(tcs.Task));
                                                    for (var i = 0; i < Count; i++)
                                                    {
                                                        left = f(right);
                                                        right = left;
                                                    }
                                                    tcs.SetResult(-1);
                                                }, CancellationToken.None, TaskCreationOptions.None, new StaTaskScheduler(2)).Wait();
                                            }
                                    
                                            [Params(1000000)]
                                            public int Count { get; set; }
                                        }
                                    
                                        class Program
                                        {
                                            static void Main(string[] args)
                                            {
                                                var summary = BenchmarkRunner.Run<Worker>();
                                                Console.WriteLine(summary);
                                            }
                                        }
                                    }
                                    • +2

                                      Спасибо за результаты. Я правда не знаю о чём это говорит.
                                      Могу только предположить что asyncWorkflow в F# менее оверхедный чем Task в C#.

                          • +5
                            Используйте, пожалуйста, BenchmarkDotNet для бенчмарков.
                            • +6

                              мысль здравая. я и сам по возможности рекомендую библиотечку для бенчмарков.
                              При этом у меня не возникает чувство стыда из-за её неиспользования в данном случае.


                              вычислять персентили и т.п. по отношению к примеру на Go (выступающим базисом), где всего-то используется пакет time, выглядит, по-моему, неспортивно.
                              Особенно при выдимой невооруженным глазом разнице примерно в 2 раза в плане производительности.

                            • +1
                              Кстати посоветуйте что почитать/посмотреть чтобы хорошенько вкурить async/await с полного нуля для человека, который всю жизнь использовал для параллелизации только BackgroundWorker (ну и ещё пару раз немного Akka), а теперь хочет перейти на более современное и портабельное и понять как следует, чтобы перестать думать ивентами, начать думать тасками и научиться применять их правильно и по-полной. Заранее спасибо.
                              • +4

                                Думаю, рекомендовать книги Рихтера — вещь очевидная :)
                                Из блогов/публикаций настоятельно рекомендую PFX Team. Stephen Toub с командой весьма подробно разбирают подходы и принципы использования TPL и все, что с ним связано.
                                Серия статей по C# async от Stephen Cleary (автора библиотеки AsyncEx) также интересна.

                                • 0

                                  Не знаю, Ритхера все нахваливают, но по сути он очень странно и непонятно объсняет. Тот же Гольдштейн по кишкам мне зашел в разы лучше.

                                • 0
                                  Ещё, как вариант, Async in C# 5.0 (Alex Davies).
                                • 0

                                  В первом случае мне кажется что это переписывание в стиле "Сделайте фибоначчи не рекурсивным, используя 4 аргумента". На прологе баловались, знаем, плавали :)


                                  А вот второй вот не совсем понятен, зачем нам left и right если они на одно и то же всегда указывают?.. В чем смысл этой чехарды?

                                  • +1

                                    ну это можно переписать как next = f(next). по сути, я оставил этот странный кусок, чтобы максимально синтаксически приблизить к примеру на Go.

                                  • –3
                                    А у меня подобных проблем не возникнет, потому что в архитектуре моих проектов такого паноса никогда не будет. :)

                                    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.