Пользователь
–0,1
рейтинг
14 ноября 2014 в 14:23

Разработка → Python реализация парадигмы event-driven с помощью сопрограмм

Статья про то, как с помощью расширенных генераторов Python сделать собственную реализацию сопрограмм, переключающихся по получению событий. Простота кода получившегося модуля вас приятно удивит и прояснит новые и мало используемые возможности языка, которые можно получить, используя такие генераторы. Статья поможет разобраться и с тем, как это устроено в серьезных реализациях: asyncio, tornado, etc.

Теоретические моменты и disclaimer


Понятие сопрограмма имеет очень широкое толкование, поэтому следует определиться, какими характеристиками они будут обладать в нашей реализации:
  • Выполняются совместно в одном потоке;
  • Выполнение может прерываться для ожидания определенного события;
  • Выполнение может возобновиться после получения ожидаемого события;
  • Может вернуть результат по завершению.

Как следствие получаем: событийно-ориентированное программирование без функций обратного вызова и кооперативную многозадачность. Эффект от использования такой парадигмы программирования будет существенным только для задач, реагирующих на неравномерно поступающие события. В первую очередь это задачи обработки I/O: сетевые сервера, пользовательские интерфейсы, и т. п. Другой возможный вариант применения — это задачи расчета состояния персонажей в игровом мире. Но категорически не подойдет для задач, которые производят долгие расчеты.
Следует четко понимать, что пока выполняющаяся сопрограмма не прервалась на ожидание события, все остальные находятся в состоянии останова, даже если ожидаемое ими событие уже произошло.

Основа всего


В Python хорошей основой для всего этого являются генераторы, если их правильно приготовить в прямом и переносном смысле. Точнее расширенные генераторы, API которых окончательно сформировался в версии Python 3.3. В предыдущих версиях не было реализовано возвращение значения (результата) по завершению работы генератора и не было удобного механизма вызова одного генератора из другого. Тем не менее, реализации сопрограмм были и раньше, но из-за ограничений обычных генераторов они были не так «красивы» как то, что получится у нас. Очень хорошая статья на эту тему «A Curious Course on Coroutines and Concurrency» единственный её недостаток, так это то, что нет обновленной версии. Такой где реализация coroutine в python использует последние новшества в языке, в частности в API Enhanced Python Generators. Ниже рассмотрены возможности расширенных генераторов, которые нам понадобятся.
Передача сообщений в сопрограмму у нас будет построена на возможности задать генератору его состояние. Скопируйте код ниже в окно запущенного интерпретатора Python версии 3.3 и выше.
def gen_factory():
    state = None
    while True:
        print("state:", state)
        state = yield state

gen = gen_factory()

Генератор создан, его надо запустить.
>>> next(gen)
state: None

Получено исходное состояние. Изменим состояние:
>>> gen.send("OK")
state: OK
'OK'

Видим что состояние изменилось и возвращено в результате. Следующие вызовы send будут возвращать уже передаваемое ими состояние.

Зачем нам все это?


Представьте задачу: передавать привет Петрову раз в две секунды, Иванову раз в три секунды, а всему миру раз в пять секунд. В виде Python кода можно представить как-то так:
def hello(name, timeout):
    while True:
        sleep(timeout)
        print("Привет, {}!".format(name))

hello("Петров", 2.0)
hello("Иванов", 3.0)
hello("Мир", 5.0)

Смотрится хорошо, но приветы будет получать только Петров. Однако! Небольшая модификация не влияющая на ясность кода, а даже наоборот — уточняющая нашу мысль, и это уже может заработать как положено.
@coroutine
def hello(name, timeout):
    while True:
        yield from sleep(timeout)
        print("Привет, {}!".format(name))

hello("Петров", 2.0)
hello("Иванов", 3.0)
hello("Мир", 5.0)
run()

Код получился в стиле pythonic way — наглядно иллюстрирует задачу, линейный без калбэков, без лишних наворотов с объектами, любые комментарии в нем излишни. Осталось только реализовать декоратор coroutine, свою версию функции sleep и функцию run. В реализации, конечно, без наворотов не обойдется. Но это тоже pythonic way, прятать за фасадом библиотечных модулей всю магию.

Самое интересное


Назовем модуль с реализацией незатейливо — concurrency, со смыслом и отражает тот факт, что это фактически, будет реализация кооперативной многозадачности. Понятно, что декоратор должен будет сделать из обычной функции генератор и запустить его (сделать первый вызов next). Конструкция языка yield from пробрасывает вызов в следующий генератор. То есть функция sleep должна создать генератор, в котором можно спрятать всю магию. В генератор, ее вызвавший, вернется только код полученного события. Здесь возвращаемый результат не обрабатывается, код тут может получить по сути только один результат, означающий что тайм-аут истек. Ожидание же ввода-вывода может возвращать разные виды событий, например (чтение/запись/тайм аут). Более того, генераторы порождаемые функциями типа sleep могут вернуть по yield from любой тип данных и соответственно их функционал может быть не ограничен ожиданием событий. Функция run запустит диспетчер событий, его задача — получить событие извне и/или сгенерировать внутри, определить его получателя и собственно отправить.
Начнем с декоратора:
class coroutine(object):
    """Делает из функции сопрограмму на базе расширенного генератора."""
    _current = None

    def __init__(self, callable):
        self._callable = callable

    def __call__(self, *args, **kwargs):
        corogen = self._callable(*args, **kwargs)
        cls = self.__class__
        if cls._current is None:
            try:
                cls._current = corogen
                next(corogen)
            finally:
                cls._current = None
        return corogen

Он выполнен в виде класса, типичный прием, как и обещал, он создает и запускает генератор. Конструкция с _current добавлена для того, чтобы избежать запуска генератора, если декорированная функция, его создающая вызывается внутри тела другого генератора. В этом случае первый вызов будет и так сделан. Так же это поможет разобраться, в какой генератор должно быть передано событие, чтобы оно попало по цепочке в генератор, созданный функцией sleep.
def sleep(timeout):
    """Приостанавливает выполнение до получения события "таймаут истек"."""
    corogen = coroutine._current
    dispatcher.setup_timeout(corogen, timeout)
    revent = yield
    return revent

Здесь видим вызов dispatcher.setup_sleep, это сообщает диспетчеру событий, что генератор такой-то ожидает событие «тайм-аут» по истечению заданного параметром timeout количества секунд.
from collections import deque
from time import time, sleep as sys_sleep


class Dispatcher(object):
    """Объект реализующий диспечер событий."""
    def __init__(self):
        self._pending = deque()
        self._deadline = time() + 3600.0

    def setup_timeout(self, corogen, timeout):
        deadline = time() + timeout
        self._deadline = min([self._deadline, deadline])
        self._pending.append([corogen, deadline])
        self._pending = deque(sorted(self._pending, key=lambda a: a[1]))

    def run(self):
        """Запускает цикл обработки событий."""
        while len(self._pending) > 0:
            timeout = self._deadline - time()
            self._deadline = time() + 3600.0
            if timeout > 0:
                sys_sleep(timeout)
            while len(self._pending) > 0:
                if self._pending[0][1] <= time():
                    corogen, _ = self._pending.popleft()
                    try:
                        coroutine._current = corogen
                        corogen.send("timeout")
                    except StopIteration:
                        pass
                    finally:
                        coroutine._current = None
                else:
                    break

dispatcher = Dispatcher()
run = lambda: dispatcher.run()

В коде диспетчера событий тоже нет ничего необычного. Куда передавать события определяется с помощью переменной класса coroutine._current. При загрузке модуля создается экземпляр класса, в рабочей реализации это конечно же должен быть синглетон. Класс collections.deque задействован вместо списка, так как побыстрее и полезен своим методом popleft. Ну вот собственно и все, и нет какой-то особой магии. Вся она на поверку спрятана еще глубже, в реализации расширенных генераторов Python. Их остается только правильно приготовить.

Файл: concurrency.py
# concurrency.py
from collections import deque
from time import time, sleep as sys_sleep


class coroutine(object):
    """Делает из функции сопрограмму на базе расширенного генератора."""
    _current = None

    def __init__(self, callable):
        self._callable = callable

    def __call__(self, *args, **kwargs):
        corogen = self._callable(*args, **kwargs)
        cls = self.__class__
        if cls._current is None:
            try:
                cls._current = corogen
                next(corogen)
            finally:
                cls._current = None
        return corogen


def sleep(timeout):
    """Приостанавливает выполнение до получения события "таймаут истек"."""
    corogen = coroutine._current
    dispatcher.setup_timeout(corogen, timeout)
    revent = yield
    return revent


class Dispatcher(object):
    """Объект реализующий диспечер событий."""
    def __init__(self):
        self._pending = deque()
        self._deadline = time() + 3600.0

    def setup_timeout(self, corogen, timeout):
        deadline = time() + timeout
        self._deadline = min([self._deadline, deadline])
        self._pending.append([corogen, deadline])
        self._pending = deque(sorted(self._pending, key=lambda a: a[1]))

    def run(self):
        """Запускает цикл обработки событий."""
        while len(self._pending) > 0:
            timeout = self._deadline - time()
            self._deadline = time() + 3600.0
            if timeout > 0:
                sys_sleep(timeout)
            while len(self._pending) > 0:
                if self._pending[0][1] <= time():
                    corogen, _ = self._pending.popleft()
                    try:
                        coroutine._current = corogen
                        corogen.send("timeout")
                    except StopIteration:
                        pass
                    finally:
                        coroutine._current = None
                else:
                    break

dispatcher = Dispatcher()
run = lambda: dispatcher.run()


Файл: sample.py
# sample.py
from concurency import coroutine, sleep, run

@coroutine
def hello(name, timeout):
    while True:
        yield from sleep(timeout)
        print("Привет, {}!".format(name))

hello("Петров", 2.0)
hello("Иванов", 3.0)
hello("Мир", 5.0)
run()



Outro


Если тема интересная, можно продолжить в сторону реализации ожидания событий ввода/вывода с асинхронным TCP Echo сервером в качестве примера. С реальным диспетчером событий, реализованным в виде динамической библиотеки написанной на другом, более быстром, чем Python языке.
@Alesh
карма
21,0
рейтинг –0,1
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Реклама

Самое читаемое Разработка

Комментарии (17)

  • +2
    О многом что говорится в статье достаточно ясно рассказано в главе 6 книги «Python. Подробный справочник». 4 изд. Дэвид Бизли, ISBN 978-5-93286-157-8. Рекомендую.
    • 0
      Позволю с вами не согласиться, издание от 2010 года, тогда расширенные генераторы Python были разве что в головах тех, кто их потом начал вводить в язык начиная с года 2012. А без наличия этих самых расширенных генераторов, то о чем я писал не имеет смысла вообще. Если же говорить о реализации coroutine, то я упоминал что тема не нова. Интерес и реализации в Python есть уже достаточно давно. Но! Последние изменения в API расширенных генераторов, позволяют сделать реализацию coroutin очень простой и прозрачной.

      Попробуйте например без усложнения кода sapmle.py заставить это заработать на Python 2.6, я уже не говорю о том насколько усложниться модуль concurrency. И я в своем примере еще не задействовал возможность возвращения результата из функции sleep, а она есть. На обычных генераторах, без хаков, это в принципе не возможно сделать.

      Хотя согласен с тем что книга очень полезная, хотя и малопопулярная почему-то.
      • +2
        Наверное, ни одна статья о генераторах/короутинах в python не должна обходиться без ссылок на Бизли) В частности все, что описано в этой статье, более подробно рассмотрено здесь http://www.dabeaz.com/finalgenerator/
        • 0
          Ниже я исправил это упущение :), но там coroutine реализованы без использования возможностей последних новшеств API Enhanced Python Generators, таких как например yield from и возврат значения по return из генератора через свойство value экземпляра класса StopIteration.
      • +1
        Вы не заметили моих слов «О многом». Вернее не придали значения этим двум токенам. Если человек почитает Д.Бизли и вернется к Вашей статье, то уверяю материал ляжет в голову значительно легче. IMHO
        • 0
          Согласен, отсутствие ссылки на Бизли, было упущение при написании статьи. Наверно даже стоит поправить саму статью, а не ограничится комментарием.
  • 0
    Пожалуй наверно стоит добавить, что я не в коей мере не претендую на «открытие coroutine» в Python. статья о том, что последние новшества введенные в язык начиная с версии Python 3.3 делают реализацию сопрограм очень простым и понятным делом. Сама же идея сопрограмм в Pythone основанных на генераторах кажется впервые наиболее полно освещена в этой статье. И я рекомендую её для прочтения, тем кто заинтересовался этой темой, даже не смотря на то что информация там существенно устарела.
  • +1
    Небезынтересно.
    • 0
      Спасибо, старался.
  • 0
    Посмотрите стандартный модуль sched, там сама очередь сделана на heapq без пересортировок.
    Да и вообще, если честно, данный пример повторяет функциональность sched сложным способом, не давая никаких преимуществ.
    • 0
      Причем тут sched? Он помогает создавать сопрограммы и реализует парадигму event-driven? Вроде нет.
      • –1
        sched тут при двух вещах — он 1) показывает какую структуру данных лучше использовать для очереди событий и 2) показывает что пока в данном примере корутины вам никакой элегантности не дали, все можно было сделать на питоне уровня 1.7 без потери читаемости. Вероятно, дальше мы увидим их мощь когда задача осложнится событиями кроме таймера.
        • 0
          Вероятно, дальше мы увидим их мощь когда задача осложнится событиями кроме таймера.

          Безусловно, например когда мы будем останавливать выполнение короутины для ожидания готовности сетевого соединения к чтению и/или записи. Подход будет использован точно такой же как и в примере отправляющем приветы. А насколько я помню sched — это только запуск калбэка по таймеру.

          Насчет очереди событий, да тут собственно и не очередь, а просто добавление в отсортированный список.
  • 0
    Видим что состояние изменилось и возвращено в результате. Следующие вызовы next будут возвращать уже его.


    Не будет. И не должен.
    • 0
      Да конечно, я опечатался next/send. Я потом еще и при вычитке немного изменил предложение уже видя next вместо send :)
      Спасибо сейчас поправлю.
  • 0
    Хочу отметить высочайший уровень статьи. Пожалуйста, хотя бы еще 2 в том же духе!

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.