Pull to refresh

Взаимодействие PHP и Erlang посредством RabbitMQ

Reading time 6 min
Views 11K

Вступление


Чем больше программируешь на php, тем чаще попадаются задачи, для решения которых нужен демон на сервере. Да, конечно существует phpDaemon, cron или костыли, которые при каждом n-ом запуске скрипта вызывают какой-то определенный набор операций. Но когда мы говорим о проектах с нагрузкой больше, чем на обычном сайте, мы начинаем расстраиваться.

В одном из проектов для решения такой задачи мы решили использовать связку php+RabbitMQ+erlang. На php уже был написан необходимый функционал, нам надо было лишь разнести вызовы по времени и на разные машинки. Конкретно задача звучала так: написать парсер пользователей с внешнего хранилища данных и, самое главное, поддерживать актуальность данных, а в случае их изменения, посылать уведомления.

Исходные данные


— Rest-функция на php, добавляющая пользователя в избранное. Из этого списка в дальнейшем и будут формироваться пользователи, актуальность информации которых мы и будем поддерживать, а в случае изменения — посылать уведомления на клиента.
— Приложение на erlange, который должен координировать, каких пользователей мы сейчас обрабатывем и т.д. Обработка информации с внешнего источника осуществляется двумя путями — это парсинг html страниц или где это возможно — запросы api. Для обработки ответов используется rest-функция, написанная на php.
— Мы должны поддерживать возможность простого масштабирования на большое количество серверов. Список пользователей, которые надо парсить, находятся в очередях, формируемых RabbitMQ.

Задача номер 1 — настроить расширение php для работы с RabbitMQ


В первую очередь устанавливаем дополнительный программные пакеты:

apt-get install gcc
apt-get install php5-dev

Далее сама установка, найденная на просторах интернета:

#download and install the rabbitmq c amqp lib
wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.5.1/rabbitmq-c-0.5.1.tar.gz
tar -zxvf rabbitmq-c-0.5.1.tar.gz
cd rabbitmq-c-0.5.1/
./configure
make
sudo make install
cd ..
#download and compile the amqp
wget http://pecl.php.net/get/amqp-1.4.0.tgz
tar -zxvf amqp-1.4.0.tgz
cd amqp-1.4.0/
phpize && ./configure --with-amqp && make && sudo make install
#Add amqp extension to php mods-availabile directory
echo "extension=amqp.so" > /etc/php5/mods-available/amqp.ini
#Enabled it in cli
cd /etc/php5/cli/conf.d/
ln -s ../../mods-available/amqp.ini 20-amqp.ini
php -m | grep amqp
#Enabled it in cli
cd /etc/php5/apache2/conf.d/
ln -s ../../mods-available/amqp.ini 20-amqp.ini
#restart Apache and than check phpinfo on web
service apache2 restart

Если вам повезло, то все установилось верно.

Задача номер 2 — установить RabbitMQ и web-панель управления им


sudo apt-get install rabbitmq-server

rabbitmq-plugins enable rabbitmq_management

rabbitmqctl stop

rabbitmq-server -detached

Далее по данному адресу вы получаете доступ к управления очередями, по умолчанию логин(guest) и пароль(guest)
ip.addres:15672/

Задача номер 3 — через php влиять на очереди RabbitMQ


//создание точки обмена
$rabbit = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'));
$rabbit->connect();
$testChannel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($testChannel);
$exchange->setName('logoooooo');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

//создания очереди
$testChannel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($testChannel); 
$queue->setName("yyyyyyy2");
$queue->declare();


//привязываем очередь к точки обмена
$testChannel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($testChannel); 
$queue->setName("yyyyyyy2");
$queue->bind('logoooooo'); 
$queue->declare();

//отправить сообщение на точку
$testChannel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($testChannel);
$exchange->setName('logoooooo');
$exchange->publish('pooooooooooooooooooooooooooooooo');


Задача номер 4 — Создать приложение на erlange используя rebar


Устанавливаем rebar
apt-get install rebar
mkdir 1
rebar create template=simpleapp   srvid=my_server46
rebar create template=simplesrv   srvid=my_server46


Задача номер 5 — Запускаем приложение на erlange


Сначала устанавливаем CMake:

apt-get install make


В Makefile прописываем следующее:

all:
	rebar compile

run:
	ERL_LIBS=deps erl +K true -name myapp_app@127.0.0.1 -boot start_sasl -pa ebin -s myapp_app -sasl errlog_type error


Строчка -pa ebin -s myapp_app означает, что мы запускаем ebin/myapp_app.erl и в нем функцию myapp_app:start().
ERL_LIBS=deps означает, что мы подгружаем все библиотеки, которые расположены в папке deps.

Задача номер 6 — подключить необходимые библиотеки для связи RabbitMQ и Erlang


В rebar.config помещаем следующее:

{deps, [
	{rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq-3.0.2"}}}
]}.

{erl_opts, [
  debug_info, 
  compressed, 
  report, 
  warn_export_all, 
  warn_export_vars, 
  warn_shadow_vars, 
  warn_unused_function, 
  warn_deprecated_function, 
  warn_obsolete_guard, 
  warn_unused_import 
%  warnings_as_errors
]}.

Выполняем rebar get-deps, подтягивающий зависимости. Далее возникли сложности с оставшимися библиотеками, поэтому пришлось использовать то, что написано на официальном сайте RabbitMQ. Но перед эти доустанавливаем необходимые пакеты:

apt-get install xsltproc
apt-get install zip

После заходим в папочку deps, которую создал rebar и, используя git, все скачиваем, а после устанавливаем:

cd deps
git clone https://github.com/rabbitmq/rabbitmq-erlang-client.git
git clone https://github.com/rabbitmq/rabbitmq-server.git
git clone https://github.com/rabbitmq/rabbitmq-codegen.git
cd rabbitmq-erlang-client
make

Задача номер 7 — из Erlangа получать сообщения из очередей RabbitMQ


Файл myapp_app.erl оставляем чуть чуть редактируем, чтобы можно было запускать из makefile, что мы написали:

-module(myapp_app).

-behaviour(application).

-export([start/0,start/2, stop/1]).

start() ->
  myapp_sup:start_link().

start(_StartType, _StartArgs) ->
    myapp_sup:start_link().

stop(_State) ->
    ok.


Файл myapp_sup.erl, отвечающий за наблюдение процессами, дописываем вызов нашего модуля из init:

-module(myapp_sup).

-behaviour(supervisor).

-export([start_link/0]).

-export([init/1]).

-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
    {ok, { {one_for_one, 5, 10}, [{my_server46_0, {my_server46, start_link, []},permanent, brutal_kill, worker, [my_server46]}]} }.


Модуль, отвечающий за связь с RabbitMQ:

-module(my_server46).
-behaviour(gen_server).

-include("deps/rabbitmq-erlang-client/include/amqp_client.hrl").

-define(SERVER, ?MODULE).

-export([start_link/0,main/0,loop/1]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

init(Args) ->
  main(),
  {ok, Args}.

main() ->
  {ok, Connection} =
    amqp_connection:start(#amqp_params_network{host = "localhost"}),
  {ok, Channel} = amqp_connection:open_channel(Connection),

  io:format(" [*] Waiting for messages. To exit press CTRL+C~n"),

  amqp_channel:call(Channel, #'basic.qos'{prefetch_count = 1}),
  amqp_channel:subscribe(Channel, #'basic.consume'{queue = <<"yyyyyyy2">>},self()),
  receive
    #'basic.consume_ok'{} -> io:fwrite(" _rec_main_ok_ "),ok
  end,
  loop(Channel),
  io:fwrite("begin~n", []).

loop(Channel)->
  receive
    {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{payload = Body}} ->
      Dots = length([C || C <- binary_to_list(Body), C == $.]),
      io:format(" [x] Received Body ~p~n", [Body]),
      receive
      after
        Dots*1000 -> io:format(" _loop_rec_after_ ~p",[0]), ok
      end,
      timer:sleep(3500),
      amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
      loop(Channel),
      io:format(" [x] Done 3~n")
  end.



handle_call(_Request, _From, State) ->
    {reply, ok, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.



Тут все достаточно просто, мы подписываемся на очередь «yyyyyyy2»:

amqp_channel:subscribe(Channel, #'basic.consume'{queue = <<"yyyyyyy2">>},self())

Затем сообщаем RabbitMQ, что сообщение успешно обработано:

amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag})
Tags:
Hubs:
+8
Comments 36
Comments Comments 36

Articles