Pull to refresh

Взаимодействие PHP и Erlang посредством RabbitMQ

Reading time6 min
Views12K

Вступление


Чем больше программируешь на php, тем чаще попадаются задачи, для решения которых нужен демон на сервере. Да, конечно существует phpDaemon, cron или костыли, которые при каждом n-ом запуске скрипта вызывают какой-то определенный набор операций. Но когда мы говорим о проектах с нагрузкой больше, чем на обычном сайте, мы начинаем расстраиваться.

В одном из проектов для решения такой задачи мы решили использовать связку php+RabbitMQ+erlang. На php уже был написан необходимый функционал, нам надо было лишь разнести вызовы по времени и на разные машинки. Конкретно задача звучала так: написать парсер пользователей с внешнего хранилища данных и, самое главное, поддерживать актуальность данных, а в случае их изменения, посылать уведомления.

Исходные данные


— Rest-функция на php, добавляющая пользователя в избранное. Из этого списка в дальнейшем и будут формироваться пользователи, актуальность информации которых мы и будем поддерживать, а в случае изменения — посылать уведомления на клиента.
— Приложение на erlange, который должен координировать, каких пользователей мы сейчас обрабатывем и т.д. Обработка информации с внешнего источника осуществляется двумя путями — это парсинг html страниц или где это возможно — запросы api. Для обработки ответов используется rest-функция, написанная на php.
— Мы должны поддерживать возможность простого масштабирования на большое количество серверов. Список пользователей, которые надо парсить, находятся в очередях, формируемых RabbitMQ.

Задача номер 1 — настроить расширение php для работы с RabbitMQ


В первую очередь устанавливаем дополнительный программные пакеты:

apt-get install gcc
apt-get install php5-dev

Далее сама установка, найденная на просторах интернета:

#download and install the rabbitmq c amqp lib
wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.5.1/rabbitmq-c-0.5.1.tar.gz
tar -zxvf rabbitmq-c-0.5.1.tar.gz
cd rabbitmq-c-0.5.1/
./configure
make
sudo make install
cd ..
#download and compile the amqp
wget http://pecl.php.net/get/amqp-1.4.0.tgz
tar -zxvf amqp-1.4.0.tgz
cd amqp-1.4.0/
phpize && ./configure --with-amqp && make && sudo make install
#Add amqp extension to php mods-availabile directory
echo "extension=amqp.so" > /etc/php5/mods-available/amqp.ini
#Enabled it in cli
cd /etc/php5/cli/conf.d/
ln -s ../../mods-available/amqp.ini 20-amqp.ini
php -m | grep amqp
#Enabled it in cli
cd /etc/php5/apache2/conf.d/
ln -s ../../mods-available/amqp.ini 20-amqp.ini
#restart Apache and than check phpinfo on web
service apache2 restart

Если вам повезло, то все установилось верно.

Задача номер 2 — установить RabbitMQ и web-панель управления им


sudo apt-get install rabbitmq-server

rabbitmq-plugins enable rabbitmq_management

rabbitmqctl stop

rabbitmq-server -detached

Далее по данному адресу вы получаете доступ к управления очередями, по умолчанию логин(guest) и пароль(guest)
ip.addres:15672/

Задача номер 3 — через php влиять на очереди RabbitMQ


//создание точки обмена
$rabbit = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'));
$rabbit->connect();
$testChannel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($testChannel);
$exchange->setName('logoooooo');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

//создания очереди
$testChannel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($testChannel); 
$queue->setName("yyyyyyy2");
$queue->declare();


//привязываем очередь к точки обмена
$testChannel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($testChannel); 
$queue->setName("yyyyyyy2");
$queue->bind('logoooooo'); 
$queue->declare();

//отправить сообщение на точку
$testChannel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($testChannel);
$exchange->setName('logoooooo');
$exchange->publish('pooooooooooooooooooooooooooooooo');


Задача номер 4 — Создать приложение на erlange используя rebar


Устанавливаем rebar
apt-get install rebar
mkdir 1
rebar create template=simpleapp   srvid=my_server46
rebar create template=simplesrv   srvid=my_server46


Задача номер 5 — Запускаем приложение на erlange


Сначала устанавливаем CMake:

apt-get install make


В Makefile прописываем следующее:

all:
	rebar compile

run:
	ERL_LIBS=deps erl +K true -name myapp_app@127.0.0.1 -boot start_sasl -pa ebin -s myapp_app -sasl errlog_type error


Строчка -pa ebin -s myapp_app означает, что мы запускаем ebin/myapp_app.erl и в нем функцию myapp_app:start().
ERL_LIBS=deps означает, что мы подгружаем все библиотеки, которые расположены в папке deps.

Задача номер 6 — подключить необходимые библиотеки для связи RabbitMQ и Erlang


В rebar.config помещаем следующее:

{deps, [
	{rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq-3.0.2"}}}
]}.

{erl_opts, [
  debug_info, 
  compressed, 
  report, 
  warn_export_all, 
  warn_export_vars, 
  warn_shadow_vars, 
  warn_unused_function, 
  warn_deprecated_function, 
  warn_obsolete_guard, 
  warn_unused_import 
%  warnings_as_errors
]}.

Выполняем rebar get-deps, подтягивающий зависимости. Далее возникли сложности с оставшимися библиотеками, поэтому пришлось использовать то, что написано на официальном сайте RabbitMQ. Но перед эти доустанавливаем необходимые пакеты:

apt-get install xsltproc
apt-get install zip

После заходим в папочку deps, которую создал rebar и, используя git, все скачиваем, а после устанавливаем:

cd deps
git clone https://github.com/rabbitmq/rabbitmq-erlang-client.git
git clone https://github.com/rabbitmq/rabbitmq-server.git
git clone https://github.com/rabbitmq/rabbitmq-codegen.git
cd rabbitmq-erlang-client
make

Задача номер 7 — из Erlangа получать сообщения из очередей RabbitMQ


Файл myapp_app.erl оставляем чуть чуть редактируем, чтобы можно было запускать из makefile, что мы написали:

-module(myapp_app).

-behaviour(application).

-export([start/0,start/2, stop/1]).

start() ->
  myapp_sup:start_link().

start(_StartType, _StartArgs) ->
    myapp_sup:start_link().

stop(_State) ->
    ok.


Файл myapp_sup.erl, отвечающий за наблюдение процессами, дописываем вызов нашего модуля из init:

-module(myapp_sup).

-behaviour(supervisor).

-export([start_link/0]).

-export([init/1]).

-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
    {ok, { {one_for_one, 5, 10}, [{my_server46_0, {my_server46, start_link, []},permanent, brutal_kill, worker, [my_server46]}]} }.


Модуль, отвечающий за связь с RabbitMQ:

-module(my_server46).
-behaviour(gen_server).

-include("deps/rabbitmq-erlang-client/include/amqp_client.hrl").

-define(SERVER, ?MODULE).

-export([start_link/0,main/0,loop/1]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

init(Args) ->
  main(),
  {ok, Args}.

main() ->
  {ok, Connection} =
    amqp_connection:start(#amqp_params_network{host = "localhost"}),
  {ok, Channel} = amqp_connection:open_channel(Connection),

  io:format(" [*] Waiting for messages. To exit press CTRL+C~n"),

  amqp_channel:call(Channel, #'basic.qos'{prefetch_count = 1}),
  amqp_channel:subscribe(Channel, #'basic.consume'{queue = <<"yyyyyyy2">>},self()),
  receive
    #'basic.consume_ok'{} -> io:fwrite(" _rec_main_ok_ "),ok
  end,
  loop(Channel),
  io:fwrite("begin~n", []).

loop(Channel)->
  receive
    {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{payload = Body}} ->
      Dots = length([C || C <- binary_to_list(Body), C == $.]),
      io:format(" [x] Received Body ~p~n", [Body]),
      receive
      after
        Dots*1000 -> io:format(" _loop_rec_after_ ~p",[0]), ok
      end,
      timer:sleep(3500),
      amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
      loop(Channel),
      io:format(" [x] Done 3~n")
  end.



handle_call(_Request, _From, State) ->
    {reply, ok, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.



Тут все достаточно просто, мы подписываемся на очередь «yyyyyyy2»:

amqp_channel:subscribe(Channel, #'basic.consume'{queue = <<"yyyyyyy2">>},self())

Затем сообщаем RabbitMQ, что сообщение успешно обработано:

amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag})
Tags:
Hubs:
+8
Comments36

Articles

Change theme settings