Celery — распределенная очередь заданий

    На этот раз мы решили рассказать о замечательном продукте, который мы используем в нашей работе. Речь пойдет о Celery — «distributed task queue». Это распределенная асинхронная очередь заданий, которая обладает широким функционалом. В нашем конструкторе сайтов нам часто приходиться запускать асинхронные с точки зрения ответа пользователю задачи. На хабре, к сожалению, не много информации по данному продукту, а он заслуживает отдельного упоминания, это мы и хотим исправить.

    Итак, что же умеет Celery:

    • Выполнять задания асинхронно или синхронно
    • Выполнять периодические задания(умная замена crond)
    • Выполнять отложенные задания
    • Распределенное выполнение (может быть запущен на N серверах)
    • В пределах одного worker'а возможно конкурентное выполнение нескольких задач(одновременно)
    • Выполнять задание повторно, если вылез exception
    • Ограничивать количество заданий в единицу времени (rate limit, для задания или глобально)
    • Routing заданий (какому worker'у что делать)
    • Несложно мониторить выполнение заданий
    • Выполнять подзадания
    • Присылать отчеты об exception'ах на email
    • Проверять выполнилось ли задание (удобно для построения Ajax приложений, где юзер ждет факта завершения)

    Заинтересовало? Просим под кат.

    Начнем c конфигурации worker'a. Это демон, который собственно получает задания из очереди и выполняет их. Рекомендуемая очередь — RabbitMQ, но мы пока ограничились ghettoq, через MongoDB. Также поддерживается Redis и РСУБД.

    celeryconfig.py:

    CARROT_BACKEND = "ghettoq.taproot.MongoDB"
    
    BROKER_HOST = "xxx"  
    BROKER_PORT = 27017         
    BROKER_VHOST = "celery"    
    
    CELERY_SEND_TASK_ERROR_EMAILS = True
    ADMINS = ( ('Admin', 'admin@localhost'), )
    
    CELERYD_MAX_TASKS_PER_CHILD = 5
    
    CELERY_IMPORTS = ("tasks", )
    CELERY_DISABLE_RATE_LIMITS = True
    
    CELERY_RESULT_BACKEND = "mongodb"
    CELERY_MONGODB_BACKEND_SETTINGS = {
        "host": "xxx",
        "port": 27017,
        "database": "celery",
        "taskmeta_collection": "my_taskmeta_collection",
    }
    


    Запуск демона: celeryd -l INFO -B
    Включаем логгирование в консоль и опция -B запуск демона периодических заданий. Последний можно запустить отдельно коммандой celerybeat

    Теперь создадим тестовое задание. В конфиге мы импортируем tasks, поэтому и файл заданий у нас tasks.py:

    from celery.decorators import task
    from celery.decorators import periodic_task
    from celery.task.schedules import crontab
    
    @periodic_task(run_every=timedelta(seconds=60))
    def mail_queue():
        
        print "Task is executed every minute"
        
    @periodic_task(run_every=crontab(hour=0, minute=10))
    def transactions():
        print "Task is executed every day on 0:10"
        
    
    @task
    def delayed_function(id):
        some_function()
        
    @task
    def delayed_heavy_function(id):
        some_heavy_function()
    


    Итак, у нас 4 задания в tasks. Первые два выполняются по расписанию, т.к. они отмечены декоратором @periodic_task. А вот два последних будут вызваны непосредственно из кода программы. Вот таким образом:

    from tasks import delayed_function, delayed_heavy_function
    
    delayed_function.apply_async(args=[id], countdown=300) # Будет запущена через 300 секунд
    
    r = delayed_heavy_function.delay(id) #Будет запущена сразу(как только появится возможность), в асинхронном режиме
    


    Теперь для того чтобы отследить результат и факт завершения последнего задания выполним:

    r.ready() # Вернет True если задание отработало
    r.result # Вернет значение выполненной функции или None если еще не выполнено(асинхронно)
    r.get() #Будет ждать выполнения задания и вернет ее результат(синхронно)

    Переменную r можно прогнать через cPickle, положить значение в кеш и аяксом опрашивать статус задания. Либо можно получить task id, и положить в кеш его. Кроме того, task id вы можете задавать самостоятельно, главное чтоб он был уникальным.

    После плотного использования celery мы обнаружили несколько ошибок, связанных с отложенным выполнением задач, с менеджером очередей ghettoq, но они все были поправлены автором в день создания issue на github, за что ему спасибо.

    Не так давно вышла версия 2.0, которая перестала быть django-зависимой, а интеграция с django теперь вынесена в отдельный подпроект celery-django.

    Из ограничений celery можно выделить два, точнее это просто особенности: на штатной FreeBSD worker'ы не будут работать, т.к. там нет питоновкого multiprocessing, хотя в сети есть рецепты по сборке ядра для celery; для перегрузки заданий необходимо рестартовать воркер, чтобы он загрузил новый python-код заданий и связанных функций. На linux работает замечательно.
    Метки:
    Бигго 13,12
    Компания
    Поделиться публикацией
    Реклама помогает поддерживать и развивать наши сервисы

    Подробнее
    Реклама
    Похожие публикации
    Комментарии 27
    • 0
      А тег mongodb, зачем?
      • +1
        Так бекенд — mongodb в примере.
        • 0
          Ага, как-то пропустил. Это несомненно круто.
          Спасибо.
      • 0
        А в чем заключается распределенность очереди заданий? Пока что я вижу централизованный учет и раздачу их.
        • 0
          Распределенность в плане выполнения заданий, то есть они делятся между запущенными воркерами, однако если вы хотите чтобы раздача заданий была отказоустойчивой, есть смысл использовать RabbitMQ
        • 0
          Не флейму для, а интереса ради, а gearman.org/ не рассматривали? Вроде как значительно более стабильный и проверенный вариант
          • 0
            Нет, не рассматривали. В будущем планируем перевести раздачу очередей на AMQP(RabbitMQ), а это тоже промышленное решение.
            • 0
              Instagram перешел с gearmand на celery.
              Мы, кстати, у себя гирман используем очень и очень много где и тоже подумываем переходить, привысоких нагрузках гирман начинает подтупливать, а именно выполняя одну очередь, забивает на другие полностью, и даже приоритеты задач не помогают.
          • 0
            оффтопик:
            интересно, сколько человек тянут такой проект?
            • 0
              это я про конструктор сайтов
              • 0
                4 пока
                • 0
                  Четыре программиста или админ, дизайнер, программист, менеджер?
                  • 0
                    всего 4, так что второе.
            • 0
              Есть ли математические применения этой штуки? Например, если у меня есть кластер и суперкомпьютер, могу я что-то для себя сложное на ней посчитать?
              • 0
                В принципе почему бы нет, вопрос в том что это даст в плане выигрыша в производительности. Надо смотреть особенности задачи.
              • +1
                > delayed_function.apply_async(args=[id], countdown=300) # Будет запущена через 300 секунд

                как-нибудь проверить можно, есть ли данное задание в очереди на выполнение? отменить его можно?
                • 0
                  Да, можно:
                  r = delayed_function.apply_async(args=[id], countdown=300)
                  r.status — Если PENDING, значит еще ждет

                  Отменить можно:
                  celery.task.control.revoke(task_id, destination=None, **kwargs)
              • 0
                Очередной RabbitMQ ради RabbitMQ.
                • 0
                  Могли поюзать tailable cursors в MongoDB для очередей =) Было прикольнее чем RabbitMQ.
                  • 0
                    Переменную r можно прогнать через cPickle, положить значение в кеш и аяксом опрашивать статус задания.

                    Ой-ой-ой-ой!.. не надо такие опасности советовать людям! Уточняйте четко, что значение cPickle надо хранить только на стороне сервера, а опрашивать аяксом только по какому-нибудь связному идентификатору, а то ведь люди будут передавать клиенту результат cPickle и по нему опрашивать. А там:

                    docs.python.org/library/pickle.html
                    • 0
                      Да, конечно лучше в кеш только id класть, а вобще по ситуации смотреть надо.
                    • НЛО прилетело и опубликовало эту надпись здесь
                      • НЛО прилетело и опубликовало эту надпись здесь
                        • НЛО прилетело и опубликовало эту надпись здесь
                        • 0
                          Почему же вы не вставили в пост этот прекрасный интернет-мем? :)

                          image
                          • 0
                            Спасибо, но уж поздновато статью менять)

                          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                          Самое читаемое