Pull to refresh
0

Использование сервера очередей beanstalkd для распределения нашей почтовой рассылки

Reading time 4 min
Views 19K
В нашей системе (сервис покупок за рубежом Shopozz.RU и бесплатный mail forwarding в США Shopozz.COM), как и во многих других, присутствует множество почтовых рассылок разного содержания. Безусловно, такие электронные письма должны доходить до конечного пользователя крайне оперативно, тем паче, что контент большинства из них подразумевает оперативную доставку по умолчанию. Когда же суммарное время отправления очередной рассылки перевалило за 24 часа, стало понятно, что нужно как-то оптимизировать этот процесс.

image

Итак, задача: есть over 200 000 пользователей, для которых необходимо сгенерировать и доставить сообщение за максимально короткий срок. Решение задачи — под катом.

Первая же мысль, которая приходит на ум – правильно, увеличить число каналов. Однако, в дело вступает старая добрая синхронизация – на генерацию и отправку одного сообщения уходит приблизительно 0.5 – 1 секунда, а если за дело будут браться сразу несколько отправителей, нужно не дать им взяться за одного и того же пользователя дважды.

Дабы не нагружать лишний раз ни базу, ни сервер, было принято решение в роли распределителя ресурсов использовать сервер очереди beanstalkd.

image

Выбор пал именно на этот сервер во многом из-за простоты внедрения. Вeanstalkd прост и без излишеств. Подробнее про очереди сообщений уже написано тут и тут.

Общая схема отправки сообщений выглядит следующим образом:

image

Список получателей образуется обычным запросом к базе и преобразовывается в набор job’ов — сообщений для beanstalkd.

Следует учесть, что в beanstalkd’е ограничен максимальный размер job’а и по дефолту составляет 256 байт текста (максимальный размер можно указать другой). Кроме того, не следует выделять на каждого пользователя отдельный job, потому что получение и парсинг данных из beanstalkd'а также занимают ресурсы сервера.

Методом подбора мы пришли к цифре 100 пользователей на job.

Worker в данной схеме – это php-скрипт, который постоянно прослушивает канал сервера на наличие новых job’ов, получает из них список пользователей и отправляет сообщения.

Алгоритм работы worker’а:

while(1)
{
	$ci = get_instance();
	$ci->load->model('shared/queue_model');
	$ci->load->model('shared/sendmail_model');
	if ($ci->queue_model->watch_jobs())
	{
		// Действия, выполняемые по завершению job'а
		print 'Memory usage = ' . intval(memory_get_peak_usage() / 1024) . ' KB' . PHP_EOL;
	}			
	unset($ci);
}

Код модели queue_model:

class Queue_model extends CI_Model
{
	private $_pheanstalk;
	
	public function __construct()
	{
       		 $this->_pheanstalk = new Pheanstalk_Pheanstalk('XX.XX.XX.XX:XX');
       		 $this->tube = (ENVIRONMENT === 'production') ? 'www.shopozz.ru' : 'default';
	}

	public function watch_jobs()
	{
		@$job = $this->_pheanstalk->watch($this->tube)->reserve();
		if ( ! isset($job) OR ! $job)
		{
			return FALSE;
		}
		$job_data = json_decode($job->getData());		
		if ( ! isset($job_data->type))
		{
			$this->_pheanstalk->release($job);
			return FALSE;
		}

		if ($this->email_messages_model->can_send($job_data->mail))
		{
			print date('H:i:s ') . 'START JOB!' . PHP_EOL;

			$data = $job->getData();
			$this->_pheanstalk->delete($job);
			$this->_send_email($data);
			$time = round(microtime(true) - $start, 2);

			print date('H:i:s') . 'JOB DONE! Time=' . $time . PHP_EOL;

			return TRUE;
		}
		$this->_pheanstalk->release($job);
		return FALSE;
	}


Здесь важно помнить, что метод
$this->_pheanstalk->watch($this->tube)->reserve();
помечает текущий активный элемент в очереди, как зарезервированный, чтобы другие worker’ы не могли получить к нему доступ. После этого, если задача выполнена, ее необходимо удалить, а если это не представляется возможным – обязательно вернуть в предыдущий статус через
_pheanstalk->release($job);


Метод
$this->email_messages_model->can_send
проверяет не остановлена ли рассылка. Благодаря этой проверке можно остановить отправку сообщений в любой момент (например, если в тексте рассылки найдена ошибка). В этом случае каждый job будет считываться, но не будет выполнен, и, как следствие, не будет удален, а просто вернется в состояние активного ожидания.

Остался последний штрих – поскольку worker’ы находятся в состоянии постоянного зацикленного запуска, они не смогут корректно получать изменения в коде. Для этого добавим в цикл проверку на время последнего изменения в файле:

class Restarter
{
	/**
	 * Время изменения файла воркера
	 * @var integer
	 */
	private $_changed_time;
	
	/**
	 * Путь до файла со скриптом воркера
	 * @var type 
	 */
	private $_self_file;

	public function __construct() 
	{
		$this->_self_file = __FILE__;
		
		$this->_changed_time = $this->_get_changed_time();		
	}

	/**
	 * Получает время последнего изменения файла скрипта воркера
	 * @return type
	 */
	private function _get_changed_time()
	{
		$fp = fopen($this->_self_file, "r");
		$fstat = fstat($fp);
		$res = $fstat['mtime'];
		return $res;
	}
	
	/**
	 * Завершает работу воркера, если его код был изменен
	 */
	public function check_changed_time()
	{
		$time = $this->_get_changed_time();

		if ($time !== $this->_changed_time)
		{
			echo "File has been changed!";
			exit(1);
		}		
	}
}


Теперь, если в файле произошли изменения, скрипт вызовет завершение со статусом «1», после чего будет автоматически перезапущен.

В итоге, поставленная наверху поста задача решена: теперь наши пользователи получают рассылку максимально быстро. С увеличением количества пользователей, что естественно, сервис не испытывает каких-либо проблем с конечным временем доставки электронного письма до получателя — все решается банальным увеличением каналов.

Напоминаем, что вы читаете блог компании Shopozz.com – абсолютно бесплатного сервиса mail forwarding'а в США с гарантированными сроками доставки. Подписывайтесь на наш блог, регистрируйтесь на нашем сервисе и экономьте еще больше на покупках за рубежом.

image
Tags:
Hubs:
+4
Comments 14
Comments Comments 14

Articles

Information

Website
shopozz.com
Registered
Founded
Employees
51–100 employees
Location
США