Примеры использования asyncio: HTTPServer?!

    Не так давно зарелизилась новая версия Python 3.4 в changelog которой вошло много «вкусностей». Одна из таких — модуль asyncio, содержащий инфраструктуру пригодную для написания асинхронных сетевых приложений. Благодаря концепции сопрограмм (coroutines), код асинхронного приложения прост для понимания и поддержки.

    В статье на примере простого TCP (Echo) сервера я постараюсь показать с чем едят asyncio, и рискну устранить «фатальный недостаток» этого модуля, а именно отсутствие реализации асинхронного HTTP сервера.

    INTRO


    Прямой конкурент и «брат» — это фреймворк tornado, который хорошо зарекомендовал себя и пользуется заслуженной популярностью. Однако на мой взгляд, asyncore выглядит проще, более логичен и продуман. Впрочем это не удивительно, ведь мы имеем дело со стандартной библиотекой языка.

    Вы скажете, что на Python можно было и раньше писать асинхронные сервисы и будете правы. Но для этого требовались сторонние библиотеки и/или использования callback стиля программирования. Концепция coroutine доведенная в этой версии Python практически до совершенства позволяет писать линейный асинхронный код использую лишь возможности стандартных библиотек языка.

    Сразу хочу оговориться, что все это я писал под Linux, однако все используемые компоненты кроссплатформенные и под Windows тоже должно заработать. Но версия Python 3.4 обязательна.

    EchoServer


    Пример Echo сервера есть в стандартной документации, но относится это к low-level API «Transports and protocols». Для «повседневного» использования рекомендуется high-level API «Streams». Пример кода TCP сервера в нем отсутствует, однако изучив пример из low-level API и посмотрев исходники того и другого модуля, написать простой TCP сервер не составляет труда.

    import asyncio
    import logging
    import concurrent.futures
    
    @asyncio.coroutine
    def handle_connection(reader, writer):
        peername = writer.get_extra_info('peername')
        logging.info('Accepted connection from {}'.format(peername))
        while True:
            try:
                data = yield from asyncio.wait_for(reader.readline(), timeout=10.0)
                if data: 
                    writer.write(data)
                else:
                    logging.info('Connection from {} closed by peer'.format(peername))
                    break
            except concurrent.futures.TimeoutError:
                logging.info('Connection from {} closed by timeout'.format(peername))
                break
        writer.close()
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        logging.basicConfig(level=logging.INFO)
        server_gen = asyncio.start_server(handle_connection, port=2007)
        server = loop.run_until_complete(server_gen)
        logging.info('Listening established on {0}'.format(server.sockets[0].getsockname()))
        try:
            loop.run_forever()
        except KeyboardInterrupt:
            pass # Press Ctrl+C to stop
        finally:
            server.close()
            loop.close()
    

    Все достаточно очевидно, но есть пара нюансов на которые стоит обратить внимание.

        server_gen = asyncio.start_server(handle_connection, port=2007)
        server = loop.run_until_complete(server_gen)
    

    В первой строке создается не сам сервер, а генератор, который при первом обращении к нему и недр asyncio создает и инициализирует по заданным параметрам TCP сервер. Вторая строка и есть пример такого обращения.

            try:
                data = yield from asyncio.wait_for(reader.readline(), timeout=10.0)
                if data: 
                    writer.write(data)
                else:
                    logging.info('Connection from {} closed by peer'.format(peername))
                    break
            except concurrent.futures.TimeoutError:
                logging.info('Connection from {} closed by timeout'.format(peername))
                break
    

    Функция-coroutine reader.readline() производит асинхронное чтение данных из входного потока. Но ожидание данных для чтения не ограниченно по времени, если нужно его прекратить по таймауту необходимо обернуть вызов функции-coroutine в asyncio.wait_for(). В этом случае по истечению заданного в секундах интервала времени будет поднято исключение concurrent.futures.TimeoutError, которое можно обработать необходимым образом.
    Проверка что reader.readline() возвращает не пустое значение в данном примере обязательна. Иначе после разрыва соединения клиентом (connection reset by peer), попытки чтения и возврат пустого значения будут продолжаться до бесконечности.

    А как же ООП?

    С ООП тоже все хорошо. Достаточно обернуть методы использующие вызовы функций-coroutine в декоратор @asyncio.coroutine. Какие функции запускаются как coroutine в API явно указывается. Ниже пример реализующий класс EchoServer.

    import asyncio
    import logging
    import concurrent.futures
    
    class EchoServer(object):
        """Echo server class"""
        
        def __init__(self, host, port, loop=None):
            self._loop = loop or asyncio.get_event_loop() 
            self._server = asyncio.start_server(self.handle_connection, host=host, port=port)
        
        def start(self, and_loop=True):
            self._server = self._loop.run_until_complete(self._server)
            logging.info('Listening established on {0}'.format(self._server.sockets[0].getsockname()))
            if and_loop:
                self._loop.run_forever()
        
        def stop(self, and_loop=True):
            self._server.close()
            if and_loop:
                self._loop.close()
        
        @asyncio.coroutine    
        def handle_connection(self, reader, writer):
            peername = writer.get_extra_info('peername')
            logging.info('Accepted connection from {}'.format(peername))
            while not reader.at_eof():
                try:
                    data = yield from asyncio.wait_for(reader.readline(), timeout=10.0)
                    writer.write(data)
                except concurrent.futures.TimeoutError:
                    break
            writer.close()
    
    if __name__ == '__main__':
        logging.basicConfig(level=logging.DEBUG)
        server = EchoServer('127.0.0.1', 2007)
        try:
            server.start()
        except KeyboardInterrupt:
            pass # Press Ctrl+C to stop
        finally:
            server.stop()
    


    Как видно и в первом и во втором случае, код линейный и вполне читаемый. А во втором случае к тому же код оформлен в самодостаточный класс.

    HTTP Server


    Разобравшись со всем этим невольно возникает желание сделать что-то более существенное. Модуль asyncio предоставляет нам и такую возможность. В нем в отличии например от tornado не реализован HTTP сервер. Как говориться грех не попробовать исправить это упущение :)

    Писать целиком с нуля HTTP сервер со всеми его классами типа HTTPRequest и т. п. — не спортивно, учитывая что есть масса готовых фреймворков работающих поверх протокола WSGI. Те кто в курсе справедливо заметят, что WSGI синхронный протокол. Это верно, но считать данные для environ и тело запроса можно асинхронно. Выдача результата в WSGI рекомендована в виде генератора, и это хорошо вписывается в концепцию coroutines используемую в asyncio.

    Одним из фреймворков, который все делает правильно с отдачей контента является bottle. Так он например выдает содержимое файла не целиком, а порциями через генератор. Поэтому я выбрал его для тестирование разрабатываемого WSGI сервера и остался доволен результатом. Например демо приложение вполне оказалось способно отдавать большой файл на несколько клиентских подключений одновременно.

    Полностью посмотреть что получилось можно у меня на github. Ни тестов, ни документации там пока нет, зато есть демо приложение использующее bottle фреймворк. Оно выдает список файлов в определенном каталоге и отдает выбранный в асинхронном режиме в независимости от размера. Так что если накидать в этот каталог фильмов, можно организовать небольшой видеохостинг :)

    Хотелось бы сказать отдельное спасибо команде разработчиков CherryPy, в их код я часто поглядывал и кое-что взял целиком, что бы не придумывать «своих велосипедов».

    Посмотреть пример приложения
    import bottle
    import os.path
    from os import listdir
    from bottle import route, template, static_file
    
    root = os.path.abspath(os.path.dirname(__file__)) 
    
    
    @route('/')
    def index():
        tmpl = """<!DOCTYPE html>
    <html>
    <head><title>Bottle of Aqua</title></head>
    </body>
    <h3>List of files:</h3>
    <ul>
      % for item in files:
        <li><a href="/files/{{item}}">{{item}}</a></li>
      % end
    </ul>
    </body>
    </html>
    """
        files = [file_name for file_name in listdir(os.path.join(root, 'files'))
                            if os.path.isfile(os.path.join(root, 'files', file_name))]
        return template(tmpl, files=files)
    
    
    @route('/files/<filename>')
    def server_static(filename):
        return static_file(filename, root=os.path.join(root,'files'))
    
    
    class AquaServer(bottle.ServerAdapter):
        """Bottle server adapter"""
        def run(self, handler):
            import asyncio
            import logging
            from aqua.wsgiserver import WSGIServer
            
            logging.basicConfig(level=logging.ERROR)
            loop = asyncio.get_event_loop()
            server = WSGIServer(handler, loop=loop)
            server.bind(self.host, self.port)
    
            try:
                loop.run_forever()
            except KeyboardInterrupt:
                pass # Press Ctrl+C to stop
            finally:
                server.unbindAll()
                loop.close()
    
    if __name__ == '__main__':
        bottle.run(server=AquaServer, port=5000)
    


    При написании кода WSGI сервера, я не заметил каких-то нюансов, которые можно бы было отнести на счет модуля asyncio. Единственный момент, это особенность браузеров (например хрома), сбрасывать запрос если он видит что начинает получать большой файл. Очевидно это сделано с целью переключения на более оптимизированный способ загрузки больших файлов, ибо следом запрос повторяется и файл начинает приниматься штатно. Но первый сброшенный запрос вызывает исключение ConnectionResetError, если отдача файла по нему уже началась с помощь вызова функции StreamWriter.write(). Этот случай надо обрабатывать и закрывать соединение с помощью StreamWriter.close().

    Производительность


    Для сравнительного теста я выбрал утилиту siege. В качестве подопытных выступили «наш пациент» (он же aqua :) в связке с bottle, достаточно популярный Waitress WSGI сервер тоже в связке с bottle и конечно же Tornado. В качестве приложения был минимально возможный helloword. Тесты проводил со следующими параметрами: 100 и 1000 одновременных подключений; длительность теста 10 секунд для 13 байт и килобайт; длительность теста 60 секунд для 13 мегабайт; три варианта размера отдаваемых данных соответственно 13 байт, 13 килобайт и 13 мегабайт. Ниже результат:
    100
    concurent users
    13 b (10 sec)
    13 Kb (10 sec)
    13 Mb (60 sec)
    Avail.
    Trans/sec
    Avail.
    Trans/sec
    Avail.
    Trans/sec
    aqua+bottle
    100,0%
    835,24
    100,0%
    804,49
    99,9%
    26,28
    waitress+bootle
    100,0%
    707,24
    100,0%
    642,03
    100,0%
    8,67
    tornado
    100,0%
    2282,45
    100,0%
    2071,27
    100,0%
    15,78

    1000
    concurent users
    13 b (10 sec)
    13 Kb (10 sec)
    13 Mb (60 sec)
    Avail.
    Trans/sec
    Avail.
    Trans/sec
    Avail.
    Trans/sec
    aqua+bottle
    99,9%
    800,41
    99,9%
    777,15
    60,2%
    26,24
    waitress+bootle
    94,9%
    689,23
    99,9%
    621,03
    37,5%
    8,89
    tornado
    100,0%
    1239,88
    100,0%
    978,73
    55,7%
    14,51

    Ну что сказать? Tornado конечно рулит, но «наш пациент» кажется вырывается вперед на больших файлах и улучшил относительные показатели на большем числе соединений. К тому же он уверено обошел waitress (с его четырьмя дочерними процессами по числу ядер), который не на плохом счету среди разработчиков. Не скажу что моё тестирование адекватно на 100%, но как оценочное наверно сгодиться.

    Updated: Обратил внимание на странные цифры для 13 мегабайт тела ответа. И действительно за 10 секунд тест там толком наверно и не успел начаться :) Исправил на цифры которые получил при продолжительности теста в 60 сек.

    Пример запуска утилиты siege и полные результаты для последней колонки второй таблицы
    $ siege -c 1000 -b -t 60S http://127.0.0.1:5000/
    ** SIEGE 2.70
    ** Preparing 1000 concurrent users for battle.
    Transactions:               1570 hits
    Availability:              60.18 %
    Elapsed time:              59.84 secs
    Data transferred:       20410.00 MB
    Response time:              5.56 secs
    Transaction rate:          26.24 trans/sec
    Throughput:           341.08 MB/sec
    Concurrency:              145.80
    Successful transactions:        1570
    Failed transactions:            1039
    Longest transaction:           20.44
    Shortest transaction:           0.00
    
    
    $ siege -c 1000 -b -t 60S http://127.0.0.1:5001/
    ** SIEGE 2.70
    ** Preparing 1000 concurrent users for battle.
    The server is now under siege...
    Lifting the server siege...      done.
    Transactions:                526 hits
    Availability:              37.49 %
    Elapsed time:              59.20 secs
    Data transferred:        6838.00 MB
    Response time:             16.05 secs
    Transaction rate:           8.89 trans/sec
    Throughput:           115.51 MB/sec
    Concurrency:              142.58
    Successful transactions:         526
    Failed transactions:             877
    Longest transaction:           42.43
    Shortest transaction:           0.00
    
    $ siege -c 1000 -b -t 60S http://127.0.0.1:5002/
    ** SIEGE 2.70
    ** Preparing 1000 concurrent users for battle.
    The server is now under siege...
    Lifting the server siege...      done.
    Transactions:                857 hits
    Availability:              55.65 %
    Elapsed time:              59.07 secs
    Data transferred:       11141.00 MB
    Response time:             20.14 secs
    Transaction rate:          14.51 trans/sec
    Throughput:           188.61 MB/sec
    Concurrency:              292.16
    Successful transactions:         857
    Failed transactions:             683
    Longest transaction:           51.19
    Shortest transaction:           3.26
    



    OUTRO


    Асинхронный вебсервер с использованием asyncio имеет право на жизнь. Возможно говорить об использовании таких серверов в серьезных проектах пока рано, но после тестирования, обкатки и с появлением асинхронных драйверов asyncio к базам данных и key-value хранилищам — это вполне может быть возможно.
    • +29
    • 29,5k
    • 9
    Поделиться публикацией
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама
    Комментарии 9
    • +2
      Добавлю, что хотя Tornado внутри использует лапшу из callback'ов, некоторая поддержка сопрограмм (coroutine) там тоже есть: www.tornadoweb.org/en/stable/gen.html.
      • +1
        Поддержка сопрограмм (coroutine) в Tornado достаточно серьёзная. В двух небольших сервисах с успехом их использовал. Другое дело что конструкции с ними там более громоздкие получаются, чем в asyncio. Но это связано с тем, что до Python 3.3 нельзя было вернуть из генератора результат, как из обычной функции.
      • 0
        Было бы также интересно посмотреть бенчмарки в сравнении с gevent.
        • 0
          Для того чтобы бенчмарки были корректными нужно брать tornado+bottle. Потому что WSGI контекст для bottle будет потяжелее не-WSGI контекста tornado.
          • 0
            Торнадо+ботле будет наверняка меньше выдаст, чем даже waitres+ботле. WSGI у торнадо сделан просто для галочки. «Пациент» же изначально был задуман как WSGI сервер с поддержкой асинхронности.

            Тот же Waitress в этом тесте запускался с дефолтными настройками, то есть — он запустил четыре дочерних процесса, по числу ядер на компе на котором тестировалось. Когда как «пациент» и торнадо были запущены в одном экземпляре процесса. То есть данные waitess можно смело поделить на четыре.

            Если вы сейчас посмотрите мой гитхаб, то там добавилось новое демо. На флак приложении сделан чат, с почти мгновенной доставкой сообщений, использующий методику (патерн) long polling. И всё только Flask приложение запущенное под сервером aqua. Немного пришлось модифицировать Flask, но совсем чуть-чуть).
            • 0
              На github.com/Alesh/aqua смотреть не на что, извините.

              Возвращаясь к теме. Написал статью «почему нет http в asyncio»: asvetlov.blogspot.com/2014/04/wsgi.html
              • 0
                dev ветку не пробовали открыть? До мастер ветки пока я не готов выложить)

                github.com/Alesh/aqua/tree/dev/demo/chat
                • 0
                  Блин, это залет. Все интересное в ветке dev; а я не дал на неё ссылку в тексте)

                  github.com/Alesh/aqua/tree/dev
                  • 0
                    Андрей, в блог у вас что-то запись не кладется. Так и не понял отправил ли хабр мое послание вам. Свяжитесь со мной, HTTP сервер на asyncio можно сделать. Давайте обсудим это, потому что вижу вы тоже в теме.

              Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.