Pull to refresh

Простой сервер задач с очередью в MySQL (без проблем с блокировками)

Reading time 2 min
Views 11K
Почти в каждом более менее динамическом проекте бывает возникает необходимость выполнять очереди задач в фоне (отправка email, обновления кеша, реиндексация поиска и т.д.). Job сервера (Gearman и т.п.) хороши, но для большинства простых задач они избыточны. Классическая реализация очередей в MySQL (при помощи SELECT … LOCK FOR UPDATE) при росте нагрузки со временем начинает приводить к проблемам с блокировкой. Потому, как это обычно бывает, пришлось написать свой «велосипед» для работы с фоновыми задачами, который бы «точно работал» и был предельно прост.

Основа: Cron, PHP 5.3 (mysqli), MySQL > 5.1 — легко «влепить» почти на любой хостинг.
Операция получения (захвата) задачи — атомарна (один UPDATE запрос). Никаких проблем с блокировкой и RC.
Возможность распределения воркерам задач по группам и приоритетам, передача массива данных в исполняемый метод (функцию).
Три режима обработки завершенных задач: переместить запись в отдельную таблицу, удалить запись, оставить запись и отметить как успешно обработанная.
Обработка незавершенных задач или задач, обработанных с ошибкой — на совести разработчика.
На всё про всё 400 строк кода (с полными PHPDOC).
Ограничения: текущая реализация не подходит для persistent соединений, но если кому-то потребуется, несложно допилить. Даже при желании переписать на другой язык :)

Возможность неблокирующей работы с очередью реализована через использование пользовательских переменных в UPDATE запросе с их последующей выборкой. Посвящать этому приему целую статью — глупо. Гораздо приятнее конечная реализация, которую можно применить в дело (Мы же с вами практики, не так ли?). Во всём остальном исключительно классическая очередь с группами и приоритетами.

Пример использования (клиент):
$task_server = \DBTaskServer::create('localhost', 'root', '', 'testDB', 'jobs_queue');
$task_server->addTask('mywork', $data);

mywork — функция, которая должна быть доступна воркеру. В нее будет передан массив $data. Также возможно указывать вызов статических методов класса.
$task_server->addTask('MyWork::doWork', $data);


Пример воркера:
\DBTaskServer::create('localhost', 'root', '', 'testDB', 'jobs_queue') // Создаем сервер.
		->setByCLIAgruments($argv) // Устанавливаем параметры вызова из консоли.
		->setMode(\DBTaskServer::MODE_MARK_AS_COMPLETED) // Выбираем режим обработки.
		->run(); // Запускам воркера.


Запуск воркера из консоли с параметрами:
/path/to/script/worker.php [max_tasks_per_lifecycle] [comma_separated_group_ids]

Как понятно из названия, первая опция говорит о том сколько максимум задач может выполнить воркер прежде чем завершит работу (если конечно таковые для него будут доступны), вторая опция — это значения group_id заданий, которые данный воркер должен обрабатывать. Если группы не указаны, то воркер обрабатывает любые группы.

Например:
/path/to/script/worker.php 100 3,5,6

Выполнить 100 заданий из групп 3, 5 и 6.
Если заданий не будет найдено, то воркер сразу завершит свою работу.

Добавляем воркера в крон:
0-59/5 * * * * /path/to/script/worker.php 5 3 >/dev/null 2>&1

Каждые 5 минут обрабатывать по 5 заданий с group_id=3.

В архиве примеры клиента, воркера, сам класс сервера (задокументирован), sql файл с таблицей задач.
Качать тут (аж целых 5kB).

Приятного вам кода.
Tags:
Hubs:
+17
Comments 35
Comments Comments 35

Articles