Пользователь
0,0
рейтинг
28 октября 2014 в 14:15

Разработка → Как просто написать распределенный веб-сервис на Python + AMQP из песочницы

Привет, Хабр. Я уже довольно давно пишу на Python. Недавно пришлось разбираться с RabbitMQ. Мне понравилось. Потому что он без всяких проблем (понятно, что с некоторыми тонкостями) собирается в кластер. Тут я подумал: а неплохо бы его использовать в качестве очереди сообщений в кусочке API проекта, над которым я работаю. Сам API написан на tornado, основная мысль была в исключении блокирующего кода из API. Все синхронные операции выполнялись в пуле тредов.

Первое, что я решил, это сделать отдельный процесс(ы) «worker», который бы брал на себя всю синхронную работу. Задумал, чтобы «worker» был максимально прост, и делал задачи из очереди одну за другой. Скажем, выбрал из базы что-нибудь, ответил, взял на себя следующую задачу и так далее. Самих «worker»ов можно запустить много и тогда AMQP выступает уже в роли некоего подобия IPC.

Спустя некоторое время из этого вырос модуль, который берет на себя всю рутину связанную с AMQP и передачей сообщений туда и назад, а также сжимает их gzipом, если данных слишком много. Так родился crew. Собственно, используя его, мы с вами напишем простой API, который будет состоять из сервера на tornado и простых и незамысловатых «worker» процессов. Забегая вперед скажу, что весь код доступен на github, а то, о чем я буду рассказывать дальше, собрано в папке example.

Подготовка


Итак, давайте разберемся по порядку. Первое, что нам нужно будет сделать — это установить RabbitMQ. Как это делать я описывать не буду. Скажу лишь то, что на той-же убунте он ставится и работает из коробки. У меня на маке единственное, что пришлось сделать, это поставить LaunchRocket, который собрал все сервисы, что были установлены через homebrew и вывел в GUI:

LaunchRocket

Дальше создадим наш проект virtualenv и установим сам модуль через pip:

mkdir -p api
cd api
virtualenv env
source env/bin/activate
pip install crew tornado


В зависимостях модуля умышленно не указан tornado, так как на хосте с workerом его может и не быть. А на веб-части обычно создают requirements.txt, где указаны все остальные зависимости.

Код я буду писать частями, чтобы не нарушать порядок повествования. То, что у нас получится в итоге, можно посмотреть тут.

Пишем код


Сам tornado сервер состоит из двух частей. В первой части мы определяем обработчики запросов handlers, а во второй запускается event-loop. Давайте напишем сервер и создадим наш первый метод api.

Файл master.py:
# encoding: utf-8

import tornado.ioloop
import tornado.gen
import tornado.web
import tornado.options


class MainHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        # Вызываем задачу test c приоритетом 100
        resp = yield self.application.crew.call('test', priority=100)
        self.write("{0}: {1}".format(type(resp).__name__, str(resp)))


application = tornado.web.Application(
    [
        ('/', MainHandler),
    ],
    autoreload=True,
    debug=True,
)


if __name__ == "__main__":
    tornado.options.parse_command_line()
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()


Благодаря coroutine в торнадо, код выглядит просто. Можно написать тоже самое без coroutine.

Файл master.py:
class MainHandler(tornado.web.RequestHandler):
    def get(self):
        # Вызываем задачу test c приоритетом 100
        self.application.crew.call('test', priority=100, callback=self._on_response)

    def _on_response(resp, headers):
        self.write("{0}: {1}".format(type(resp).__name__, str(resp)))


Наш сервер готов. Но если мы его запустим, и сходим на /, то не дождемся ответа, его некому обрабатывать.

Теперь напишем простой worker:

Файл worker.py:
# encoding: utf-8

from crew.worker import run, context, Task

@Task('test')
def long_task(req):
    context.settings.counter += 1
    return 'Wake up Neo.\n'

run(
    counter=0,      # This is a part of this worker context
)


Итак, как видно в коде, есть простая функция, обернутая декоратором Task(«test»), где test — это уникальный идентификатор задачи. В вашем worker не может быть двух задач с одинаковыми идентификаторами. Конечно, правильно было бы назвать задачу «crew.example.test» (так обычно и называю в продакшн среде), но для нашего примера достаточно просто «test».

Сразу бросается в глаза context.settings.counter. Это некий контекст, который инициализируется в worker процессе при вызове функции run. Также в контексте уже есть context.headers — это заголовки ответа для отделения метаданных от ответа. В примере с callback-функцией именно этот словарь передается в _on_response.

Заголовки сбрасываются после каждого ответа, а вот context.settings — нет. Я использую context.settings для передачи в функции worker(ы) соединения с базой данных и вообще любого другого объекта.

Также worker обрабатывает ключи запуска, их не много:

$ python worker.py --help
Usage: worker.py [options]

Options:
  -h, --help            show this help message and exit
  -v, --verbose         make lots of noise
  --logging=LOGGING     Logging level
  -H HOST, --host=HOST  RabbitMQ host
  -P PORT, --port=PORT  RabbitMQ port


URL подключения к базе и прочие переменные можно брать из переменный окружения. Поэтому worker в параметрах ждет только как ему соединиться c AMQP (хост и порт) и уровень логирования.

Итак, запускаем все и проверяем:

$ python master.py & python worker.py


image

Работает, но что случилось за ширмой?


При запуске tornado-сервера tornado подключился к RabbitMQ, создал Exchange DLX и начал слушать очередь DLX. Это Dead-Letter-Exchange — специальная очередь, в которую попадают задачи, которые не взял ни один worker за определенный timeout. Также он создал очередь с уникальным идентификатором, куда будут поступать ответы от workerов.

После запуска worker создал по очереди на каждую обернутую декоратором Task очередь и подписался на них. При поступлении задачи воркер main-loop создает один поток, контролируя в главном потоке время исполнения задачи и выполняет обернутую функцию. После return из обернутой функции сериализует его и ставит в очередь ответов сервера.

После поступления запроса tornado-сервер cтавит задачу в соответствующую очередь, указывая при этом идентификатор своей уникальной очереди, в которую должен поступить ответ. Если ни один воркер не взял задачу, тогда RabbitMQ перенаправляет задачу в exchange DLX и tornado-сервер получает сообщение о том, что истек таймаут пребывания очереди, генерируя исключение.

Зависшая задача


Чтобы продемонстрировать, как работает механизм завершения задач, которые повисли в процессе выполнения, напишем еще один веб-метод и задачу в worker.

В файл master.py добавим:

class FastHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        try:
            resp = yield self.application.crew.call(
                'dead', persistent=False, priority=255, expiration=3,
            )
            self.write("{0}: {1}".format(type(resp).__name__, str(resp)))
        except TimeoutError:
            self.write('Timeout')
        except ExpirationError:
            self.write('All workers are gone')


И добавим его в список хендлеров:

application = tornado.web.Application(
    [
        (r"/", MainHandler),
        (r"/stat", StatHandler),
    ],
    autoreload=True,
    debug=True,
)


А в worker.py:
@Task('dead')
def infinite_loop_task(req):
    while True:
        pass


Как видно из приведенного выше примера, задача уйдет в бесконечный цикл. Однако, если задача не выполнится за 3 секунды (считая время получения из очереди), main-loop в воркере пошлет потоку исключение SystemExit. И да, вам придется обработать его.

Контекст


Как уже упоминалось выше, контекст — это такой специальный объект, который импортируется и имеет несколько встроенных переменных.

Давайте сделаем простую статистику по ответам нашего worker.

В файл master.py добавим следующий handler:

class StatHandler(tornado.web.RequestHandler):

    @tornado.gen.coroutine
    def get(self):
        resp = yield self.application.crew.call('stat', persistent=False, priority=0)
        self.write("{0}: {1}".format(type(resp).__name__, str(resp)))


Также зарегистрируем в списке обработчиков запросов:

application = tornado.web.Application(
    [
        (r"/", MainHandler),
        (r"/fast", FastHandler),
        (r"/stat", StatHandler),
    ],
    autoreload=True,
    debug=True,
)


Этот handler не очень отличается от предыдущих, просто возвращает значение, которое ему передал worker.

Теперь сама задача.

В файл worker.py добавим:

@Task('stat')
def get_counter(req):
    context.settings.counter += 1
    return 'I\'m worker "%s". And I serve %s tasks' % (context.settings.uuid, context.settings.counter)


Функция возвращает строку, с информацией о количестве задач, обработанных workerом.

PubSub и Long polling


Теперь реализуем пару обработчиков. Один при запросе будет просто висеть и ждать, а второй будет принимать POST данные. После передачи последних первый будет их отдавать.

master.py:

class LongPoolingHandler(tornado.web.RequestHandler):
    LISTENERS = []

    @tornado.web.asynchronous
    def get(self):
        self.LISTENERS.append(self.response)

    def response(self, data):
        self.finish(str(data))

    @classmethod
    def responder(cls, data):
        for cb in cls.LISTENERS:
            cb(data)

        cls.LISTENERS = []

class PublishHandler(tornado.web.RequestHandler):

    @tornado.gen.coroutine
    def post(self, *args, **kwargs):
        resp = yield self.application.crew.call('publish', self.request.body)
        self.finish(str(resp))

...

application = tornado.web.Application(
    [
        (r"/", MainHandler),
        (r"/stat", StatHandler),
        (r"/fast", FastHandler),
        (r'/subscribe', LongPoolingHandler),
        (r'/publish', PublishHandler),
    ],
    autoreload=True,
    debug=True,
)

application.crew = Client()
application.crew.subscribe('test', LongPoolingHandler.responder)

if __name__ == "__main__":
    application.crew.connect()
    tornado.options.parse_command_line()
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()


Напишем задачу publish.

worker.py:

@Task('publish')
def publish(req):
    context.pubsub.publish('test', req)


Если же вам не нужно передавать управление в worker, можно просто публиковать прямо из tornado-сервера

class PublishHandler2(tornado.web.RequestHandler):

    def post(self, *args, **kwargs):
        self.application.crew.publish('test', self.request.body)


Параллельное выполнение заданий


Часто бывает ситуация, когда мы можем выполнить несколько заданий параллельно. В crew есть для этого небольшой синтаксический сахар:

class Multitaskhandler(tornado.web.RequestHandler):

    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        with self.application.crew.parallel() as mc:
            # mc - multiple calls
            mc.call('test')
            mc.call('stat')
            test_result, stat_result = yield mc.result()
            self.set_header('Content-Type', 'text/plain')
            self.write("Test result: {0}\nStat result: {1}".format(test_result, stat_result))


В этом случае, задаче будут поставлены две задачи параллельно и выход из with будет произведен по окончании последней.

Но нужно быть осторожным, так как какая-то задача может вызвать исключение. Оно будет приравнено непосредственно переменной. Таким образом, вам нужно проверить, не является ли test_result и stat_result экземплярами класса Exception.

Планы на будущее


Когда eigrad предложил написать прослойку, которой можно запустить любое wsgi приложение с помощью crew, мне эта идея сразу понравилась. Только представьте, запросы хлынут не на ваше wsgi приложение, а будут равномерно поступать через очередь на wsgi-worker.

Я никогда не писал wsgi сервер и даже не знаю, с чего начать. Но вы можете мне помочь, pull-requestы я принимаю.

Также думаю дописать client для еще одного популярного асинхронного фреймворка, для twisted. Но пока разбираюсь с ним, да и свободного времени не хватает.

Благодарности


Спасибо разработчикам RabbitMQ и AMQP. Замечательные идеи.

Также спасибо вам, читатели. Надеюсь, что вы не зря потратили время.
Дмитрий @orlovdl
карма
6,0
рейтинг 0,0
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Реклама

Самое читаемое Разработка

Комментарии (22)

  • 0
    Выглядит интересно. Спасибо.
  • 0
    RabbitMQ хорошо работает когда вам нужно выполнять таски на нескольких серверах. В пределах одного сервера Rabbit представляет из себя суровую пушку стреляющую по воробьям, для этого лучше подходит, например, 0MQ.

    На клиент сайде так же можно сделать обвязку поверх транспортного уровня, чтобы не писать напрямую в вебсокет (socket.io или sockjs).

    В идее с wsgi есть проблема, по крайней мере если вы рассматриваете торнадо как фронтенд и воркеры как бэкенд обработки. Воркеры работают с чистыми данными, им не нужен request и заголовки, сессии и прочее, с этим должен работать торнадо. Уровень wsgi здесь кажется лишним, так как сразу напрямую можно использовать объекты бизнес-логики без каких-либо дополнительных прослоек.
    • +1
      RabbitMQ хорошо работает когда вам нужно выполнять таски на нескольких серверах. В пределах одного сервера Rabbit представляет из себя суровую пушку стреляющую по воробьям, для этого лучше подходит, например, 0MQ.


      Конечно, речь идет о распределенной системе, которая работает на нескольких серверах. На одном сервере тоже можно, но для разработки ИМХО. А вот в боевых условиях, можно тиражировать воркеров как угодно, и единственное что им нужно это связь с RabbitMQ.

      На клиент сайде так же можно сделать обвязку поверх транспортного уровня, чтобы не писать напрямую в вебсокет (socket.io или sockjs).


      Немного не понял о чем вы.

      В идее с wsgi есть проблема, по крайней мере если вы рассматриваете торнадо как фронтенд и воркеры как бэкенд обработки. Воркеры работают с чистыми данными, им не нужен request и заголовки, сессии и прочее, с этим должен работать торнадо. Уровень wsgi здесь кажется лишним, так как сразу напрямую можно использовать объекты бизнес-логики без каких-либо дополнительных прослоек.


      Идея с wsgi в том, чтобы взять django/flask/etc. приложение и запустить его «воркером». Тогда на все ноды с приложением, будет равномерно распределена нагрузка + очереди.
      • 0
        Вы в воркере уже можете получить готовые параметры для работы, зачем поверх этого вы еще хотите накрутить дополнительный уровень wsgi? Если так, то nginx умеет делать load balancing и работает с бэкендом по wsgi, так же, судя по всему, умеет отправлять сообщения напрямую в rabbit.
      • 0
        Название статьи можно перефразировать как «Масштабируем Слоупочность lvl 100»

        1. AMQP толст — если размер Payload'a будет меньше размера Header'а — получаем Amplification в рамках кластера / ноды, сервис ложится при первом же скачке нагрузки. Согласен с prefer ZMQ менее избыточен.

        2. Масштабировать вертикально с помощью AMQP в рамках одной ноды — ересь! Сжечь, и не смотреть где прах закопают!
        Реализуйте очереди обработки задач по типу Disruptor'a в рамках приложения, или используйте готовые сурогатные модели актёров по типу Greenlet'ов. Пилите SOA.

        3. Хотите строить «распределённые» сервисы — проводите нагрузочное тестирование jMeter'ом и Яндекс.танком и профилируйте до потери пульса! Иначе получите бутылочные горлышки и Amplification'ы в самых не очевидных местах.
        • 0
          AMQP толст — если размер Payload'a будет меньше размера Header'а
          Любая распределенная система (я говорю не об одной ноде с RabbitMQ а о кластере хотя-бы из двух трех нод) это overhead на пересылку пакетов по сети сериализацию/десериализацию. Хотите «тонкий» протокол общения ок, используйте 0MQ (кладите все в «shared memory» наконец), хотите работать на 10 виртуалках в облаке, планируйте жизнь с неким overhead.

          Масштабировать вертикально с помощью AMQP в рамках одной ноды — ересь!
          Именно поэтому слово «распределенный» и является ключевым. В статье нет ни слова о вертикальном масштабировании, crew это именно про горизонтальное масштабирование.

          Хотите строить «распределённые» сервисы — проводите нагрузочное тестирование jMeter'ом и Яндекс.танком и профилируйте до потери пульса!
          Согласен, сурового highload добиться можно только так. Сейчас речь идет скорее о надежности (непроизвольный отвал нескольких нод и тому подобные жизненные реалии). Сейчас версия модуля 0.7.0, у меня он покрывает все кейсы, работает и все суровые баги я уже почистил, сейчас начинаю всестороннее тестирование. Если вам интересен проект, давайте проведем нагрузочные или интеграционные тесты, помогите проекту.

          Статья называется «Как просто написать распределенный веб-сервис на Python + AMQP» и тут нет и намека на суровый highload. Но у меня на практике в основном все упиралось не в amqp и не в данный модуль, а скорее в базу, которую ходят воркеры.

          В случае highload слово «просто» вообще отпадает. Кто нибудь видел «простой» highload кластер наяву?
          • 0
            У меня распределёнка и highload'ы начинаются с 10Гбит трафика, я просто сужу по своим потребностям и задачам которые приходится решать. Не могу сказать что разработку подобных решений можно назвать простой, но она и не особо отличается от разработки любых других RESTful SOA приложений, просто есть свои особенности в архитектуре, и в организации работы.

            Если у меня будет 10ток машин в виртуалке, то перво-наперво я сделаю так что бы они между собой не разделяли общих состояний, а только логировали запросы. В итоге это CQRS-ES с логированием текущих транзакций посредством Raft алгоритма.

            Нужно отличать вертикальное масштабирование от горизонтального.
            Вертикальное — эффективная утилизация мощности каждой конкретной машины с минимальными издержками на коммуникацию, тут работают модели актёров и разнообразные очереди с приоритетами (типа Disruptor), sharedmem тут не обязателен так как почти все сервисы выполняются в рамках одного процесса, ну в общем по аналогии с tasklet'ами линуксовского ядра.

            Горизонтальное — распределение нагрузки на несколько машин без существенного увеличения времени отклика, тут нужно понимать что всякие master-master репликации — непозволительная роскошь. У меня обычно всё сводится к Raft'у и нескольким ведомым машинам дублирующим текущие запросы посредством зеркалирования трафика. Шардинг и репликация в таких условиях решается уже на уровне приложения, и с БД совсем не нужно заморачиваться… но это тема отдельной статьи.

            Обычно когда люди вспоминают о распределёнке — ни первый, ни второй тип масштабирования не реализовывается должным образом. То отклик медленный, то избыточность не там где надо… то бутылочные горлышки без профилирования, в общем больше половины проектов выглядят довольно костыльно, по этому я так и отреагировал.
            • 0
              Понимаю вашу реакцию и спасибо за развернутый ответ. Статья предназначена скорее не вам, однако ваш опыт был бы очень полезен проекту в целом.

              У вас судя по всему есть свой проверенный временем подход. В моей реализации (на моем проекте где используется crew), в целом, все workers — stateless и роль «ведущих машинок» выполняет RabbitMQ и пяток серверов с tornado, по которым размазывает 2 haproxy. База MongoDB поэтому вопрос шардирования и репликаций решен та том уровне. Но никто не мешает написать логику шардирования в воркере и бегать по шарду из SQLBased СУБД.
            • 0
              Комменты можно перефразировать так: «в моей системе раздачи контента AMQP не нужен».

              Ну да. Если задача стоит в пересылке данных, то пересылать данные дополнительно через AMQP смысла нет. Но статья же не про это.

              Хотя оверхед на AMQP вообще-то меньше чем на HTTP.

              ps. 10гбит трафика — «распределенка и highload»?.. Ребята из Netflix на YaC'е рассказывали что у них по 40гбит с ноды отдается… :-)
              • 0
                Зачем CDN'ам CQRS-ES'ы и реактивность?
                Отдавать 40Гбит DWDM'у не проблема…

                Я не раздаю контент, в основном Restful API и различные конкурирующие транзакции с большой стоимостью отката и синхронизации.
  • 0
    Возможно, это глупый вопрос, но почему нельзя было взять Сelery?
    • 0
      В задачи celery не входит доставка результата в браузер, правда для этого можно использовать celery + pusher. Но если нужно без обращений на сторонние сервера, то ничего лучше не приходит, как использовать для этих целей tornado.
    • 0
      Отличный вопрос, у celery действительно похожая идеология, но немного не тот подход. А так да, почти celery, но заточена на amqp-фичи, типа DLX и прочего.
      • 0
        Возможно, об этом стоит расписать поподробее, в смысле, описать кейсы, где это может быть нужно.
        • 0
          На самом деле ответ кроется в названии. «распределенный веб-сервис» это именно тот самый кейс. Когда у вас части системы живут отдельно на разных машинках.
  • 0
    Я не хочу чтобы вы это воспринимали как еще один клон celery. Тут идея в асинхронном приеме заданий, и синхронных обработчиках.
  • 0
    Когда eigrad предложил написать прослойку, которой можно запустить любое wsgi приложение с помощью crew, мне эта идея сразу понравилась. Только представьте, запросы хлынут не на ваше wsgi приложение, а будут равномерно поступать через очередь на wsgi-worker.


    Реализовать именно прослойку будет скорее всего достаточно просто, т.к. WSGI достаточно простой протокол. Меня лично здесь стали бы волновать другие вещи: логирование (должно быть централизовано или централизуемо), graceful restart и версионность кода.
    • 0
      graceful restart


      Должен работать уже, т.к. ACK на задачу посылается после ее обработки. И если убить воркер до посыла ACK задачу RabbitMQ переназначит другому.

      версионность кода


      Этот вопрос я бы на вашем месте расписал подробнее. Что понимается под версионностью кода.

      логирование (должно быть централизовано или централизуемо)


      Опять же на совести разработчика. Worker это просто функция, лично у меня есть декоратор который шлет в sentry ошибки.
      • 0
        Ну, это в целом стандартные проблемы.

        Опять же на совести разработчика. Worker это просто функция, лично у меня есть декоратор который шлет в sentry ошибки.


        WSGI-приложение — это тоже в принципе просто функция. Если вы хотите делать возможность подключения в качестве обработчиков wsgi-приложения на Django или других фреймворках, то, наверное, имеет смысл делать это не ради спортивного интереса, а чтобы разработчики могли этим пользоваться, не задумываясь о множестве деталей, типа логирования. С распределением нагрузки по серверам не так уж и плохо справляется nginx, так что тут непонятно, зачем придумывать ещё одну прослойку. Но вот чего реально не хватает — это возможности «из коробки» собирать логи, а главное, отчётов об ошибках, централизованно, без поиска по логам инстансов и привлечения сторонних лог-сервисов и лог-серверов.

        Этот вопрос я бы на вашем месте расписал подробнее. Что понимается под версионностью кода.


        Я про версии кода приложения. Вот у вас обновляется код wsgi-приложения, например, разработчик фиксит баг какой-то или меняет бизнес-логику. Обновить везде код и разом перезапустить — плохой вариант. Перезапускать просто всё по-очереди — вариант лучше, но есть некоторый риск в том, что у нас будут одновременно работать две версии кода (что, при смене бизнес-логики иногда бывает очень нехорошо). Было бы неплохо, если бы такой сервер (или шина) позволяли делать рестарт так, чтобы направлять новые запросы на обновлённые обработчики, давая старым обработчикам доработать своё, обновиться и перезапуститься.
        • 0
          С распределением нагрузки по серверам не так уж и плохо справляется nginx
          Да только вот если к вам придет 10к запросов, nginx сразу их пошлет на backend. И тут уж вашему приложению придется самому справляться с нагрузкой. В данном же случае, у вас все ляжет в очередь, и workerы потихоничку и без напряга все отработают.

          Зачем же вообще wsgi это для того чтобы вы максимально не переписывали код, просто запустили уже работающее приложение.

          Я про версии кода приложения. Без проблем называете несовместимый по запросу/ответу таск «app.mysupertask.v1» и фронтенд с уже обновленным кодом все будет слать на новые воркеры. Даже если вы остановите все воркеры, у вас приходящие сообщения будут копиться в очереди (учитывая timeout который вы уже обработаете сами), а после запуска воркеров, все побежит.
          • 0
            В данном же случае, у вас все ляжет в очередь, и workerы потихоничку и без напряга все отработают.


            Может быть я это не заметил в статье, но у вас же есть таймауты для задач в очереди?

            Я про версии кода приложения. Без проблем называете несовместимый по запросу/ответу таск «app.mysupertask.v1» и фронтенд с уже обновленным кодом все будет слать на новые воркеры. Даже если вы остановите все воркеры, у вас приходящие сообщения будут копиться в очереди (учитывая timeout который вы уже обработаете сами), а после запуска воркеров, все побежит.


            А, вот и про таймаут. Не очень понятно, что означает «обрабатываете сами», правда.

            Кажется, я понял, в чём заключается предлагаемый подход. Согласен, он может применятся, хотя я предпочёл бы, чтобы фреймворк/сервер/шина умела решать это за меня.
            • 0
              В примере есть TimeoutError и ExpirationError которые выбрасываются в зависимости от того что с задачей. Первая в случае если таск был взят, но воркер не обработал его за ttl. Вторая если таск так никто и не взял. А обрабатывайте сами — это значит, что можно попробовать повторить или сказать клиенту 500 404 etc. Вобщем в зависимости от требований. А идея wsgi-worker и предпологает решение всего что касается wsgi средствами этого wsgi слоя.

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.