Конкурентность в асинхронном приложении на примере twisted

Теоретически, проблема конкурентного доступа не характерна для асинхронных приложений. В отличие от приложений с параллельной архитектурой, в которых в каждый момент времени может выполняться несколько задач претендующих на какой то общий ресурс — в асинхронном приложении в один момент времени выполняется только одна активность.

Но на практике все выглядит немного по иному:

Предстваим следующую ситуацияю. Наш сервис работает с удаленным сервисом (S), который обрабатывает запросы параллельно. У нас есть ресурс A, который зависит от данных на S. Запрос R1 обновляет A, для этого он запрашивает данные с S. В это время приходит R2, он модифицирует соответствующие данные на S. Запрос R2 пришел позже, но изза параллельной архитектуры S — обработался раньше (см. Рис.1).

image
Рис.1. Порядок выполнения запросов.

Теперь R1 получает данные уже с учетом модификаций R2, в то время как ресурс A еще не знает, что это результат операций R2. Вот вам и race condition.

Пример:

У Коли есть жена и счет в банке. Коля с женой решили, что у них в семье равноправие и общий бюджет, а потому у каждого из них есть по карте, привязанной к этому счету. Коля работает по фрилансу и приходит время, когда ему должны заплатить за последний проект.
Жена решает порадовать Колю, и подарить ему на ДР новый iPad2. Встав с утра по раньше, она находит на счету N$! Радостная она заказывает iPad2 мужу и новые часики себе. Когда Коля проверяет счет в банке, там «по прежнему» ноль. Он не догадывается проверить последние операции и долго долго ругается с заказчиком.

Коля — запрос 1, его жена — запрос 2, счет в банке — ресурс на удаленном сервисе, банковские карты — прокси для удаленного ресурса.

На месте Коли, жены и счета в банке могут быть: финансовый сервис, АСУТП и многое другое. Суть конкретно моей проблемы разгласить не могу, так как NDA.

Как бороться.

Бороться будем с применением twisted. Хотя общая концепция верна для любого асинхронного фреймворка.

Очевидно, что решением является блокировка ресурса A. Теоретически — для блокировки подойдут те же конструкции, что и для параллельной архитектуры: semaphor, mutex, conditional variable. Вопрос в реализации. Итак, рассмотрим mutex (почему mutex? потому что conditional variable тривиальна, а для semaphor мне пока не удалось найти применения).

Как сделать mutex?

Вариант нерабочий. В параллельной архитектуре — поток может просто спать, и нам ничего за это не будет. В асинхронной архитектуре — если мы просто уснем (например time.sleep(100) ) — то все встанет колом, и мы никогда ничего не дождемся, так как нам необходимо чтобы eventloop переключился на обработку запроса, заблокировавшего ресурс, а пока мы спим — этого не произойдет.

Вариант неправильный. Можно реализовать через reactor.callLater(1, self.some_method, *args), где self.some_method и есть наш метод, который ожидает окончания блокировки.

Недостатки следующие:
  • Довольно страшный код.
  • Ждем мы целую секунду, а за нее может многое случиться, например некий запрос Rn снова заблокирует A.


Вариант. И, наконец, верный вариант. У нас асинхронное приложение, построенное на Deferred'ах. Для того, чтобы что то в нем произошло — нужно чтобы сработал Deferred. Вывод — блокировку необходимо делать на Deferred'ах.

class Mutex(object):
  
    def __init__(self):
        self.locked = False
        self.waiters = list()

    def acquire(self):
        d = Deferred()
        if self.locked:
            self.waiters.append(d)
        else:
            self.locked = True
            d.callback(True)
        return d

    def release(self):
        self.locked = False
        if self.waiters:
            self.locked = True
            d = self.waiters.pop()
            d.callback(True)

Конструкция проста: если мы хотим получить доступ к ресурсу — мы получаем Deferred. Работать с ресурсом мы начинаем только после того, как Deferred срабатывает. Сам класс отслеживает отработку Deferred'ов, и, как только один из них отрабатывает, он достает из очереди следующий и запускает его.

Приведенный пример реализации не полон, реализацию мьютекса в twisted (DeferredLock) можно посмотреть здесь: http://twistedmatrix.com/trac/browser/tags/releases/twisted-11.0.0/twisted/internet/defer.py. Там же есть и DeferredSemaphore.

Пример использования.

Использовать можно по разному: можно сделать так, что объект сам следит за доступом к нему:

class ImportantObject(object):

    def __init__(self):
         self.lock = defer.DeferredLock()

    def get_lock(self):
         return self.lock.acquire()

    def release_lock(self):
         return self.lock.release()


Или, если объекты выбираются из БД и храняться в сессиях (то есть для каждого запроса — объект ImportantObject с id 1 будет разным), можно сделать пул блокировок:

class Pool(object):
    __metaclass__ = Singleton

    def __init__(self, objects_list):
        self.__objects = dict()
        for o in objects_list:
            self.__objects[o.id] = defer.DeferredLock()

    def acquire(self, o):
        if o.id not in self.__objects:
            self.__objects[o.id] = defer.DeferredLock()
        return self.__objects[o.id].acquire()

    def release(self, o):
        self.__objects[o.id].release()


Pool здесь слегка упрощен, мы «забыли» методы для обновления списка отслеживаемых объектов, чтобы не загромождать статью. Так же мы упустили реализацию Singleton, но найти /сделать ее — не составит труда даже начинающему питонисту.

Ну и наконец:

def multiplex(self, a):

    def get_value_from_remote_service(skipped, a):
        d = some_service.do_long_boring_call(a)
        return d

    def multiply(result, a):
        return result*a

    d = Pool().acquire(a)
    d.addCallback(get_value_from_remote_service, a)
    d.addCallback(power,a)
    return d


П.С.: К сожалению, документация twisted далеко не исчерпывающая. Более того, покрывает она хорошо если процентов 30 этого фреймворка. Поэтому, когда мной решалась проблема конкурентности — я в течении 3-х дней изобретал различные велосипеды. Пока не догадался посмотреть исходники twisted. Так что общий совет — работая с twisted — больше читайте исходники, там спрятаны велосипеды на все случаи жизни.

Источники:
Официальная документация twisted.
Исходники twisted
Метки:
Поделиться публикацией
Похожие публикации
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Реклама
Комментарии 18
  • +4
    DNA который мешает разгласить суть проблемы — это сильно ;)
    • 0
      Активирован ген молчаливости и скрытности, бывает.
    • 0
      Это DNA, не то которое «consists of two long polymers of simple units called nucleotides», а корпоративное. :)
      • +2
        Это которое обычно называют Non Disclosure Agreement?
        • 0
          Точно. Оно самое!
          • +3
            Так оно NDA тогда… Или я кэп?
            • –1
              :-D У нас в разговоре его упоминают в основном как DNA. Косяк. Сейчас поправлю.
    • 0
      А по сути статьи, не понял в чём «научная новизна»? Автор открыл для себя DeferredLock?
      • +2
        Автор открыл для себя то, что в асинхронном приложении тоже бывают проблемы с конкурентным доступом, и хотел этим поделиться. Ну и DeferredLock он тоже для себя открыл, написав вначале штуки 2 вариаций «на тему».
        • –1
          А почему «тоже»? Откуда в синхронном приложении проблемы с конкурентным доступом?
      • 0
        Насколько я понимаю, всё описанное не работает, если запускать несколько копий приложения, а так нередко делают, чтобы нагрузить равномерно многоядерную систему. Как быть в таком случае?
        • +1
          ну твистед позволит наиболее полно загрузить одно ядро, так что если нету реально необходимости параллелить (например когда большую часть времени мы ждем ответы), то лучше не параллелить.
          ну а если надо — делается аналогичная реализация на базе хотя бы memcached или flock. даже сам класс придется не сильно много переписывать
          • 0
            лучше сразу zookeeper, тогда действительно надежно получится…
        • 0
          d.addCallback(power,a) читать как d.addCallback(multiply, a)?
          ну и еще бы хорошо добавить d.addCallbacks(release_pool, release_pool), а то релиз походу не вызывается во втором примере
          • 0
            ну и d = some_service.do_long_boring_call(a) — этот вызов же скорее всего должен вернуть Deferred, иначе смысл использовать лок? в таком случае в multiply попадет не результат, а всего лишь Deferred.

            п.с. ничего личного, но с твистедом и так местами непонятно, что произошло, так что примеры без адекватной обработки ошибок и содержащие ошибки могут еще больше запутать понимание
          • 0
            Да, кстати. Deferred в Twisted — это реализация паттерна «Futures and Promises».
            • 0
              > Так что общий совет — работая с twisted — больше читайте исходники, там спрятаны велосипеды на все случаи жизни.

              Это уж точно.
              Один из основных минусов твистеда — крайне скудная документация. Без регулярного копания в кишках использовать его в полную силу несколько затруднительно.

              Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.