Pull to refresh

Создание неблокирующего TCP сервера с использованием принципов OTP

Reading time 15 min
Views 7.6K
Original author: Serge Aleynikov

Вступление


Предполагается, что читатель этого руководства, уже знаком с gen_server и gen_fsm поведениями, взаимодействиям посредством TCP сокетов с использованием модуля gen_tcp, активным и пассивным режимами сокетов, и принципом «OTP Supervisor».

OTP предоставляет удобный инструментарий для создания надежных приложений. Отчасти, это осуществляется путем абстрагирования общей функциональности в поведения, такие как gen_server и gen_fsm, которые связаны иерархией cупервизоров OTP.

Существует несколько известных шаблонов TCP сервера. Тот, который мы собираемся рассмотреть включает в себя один слушающий процесс и процесс создания нового FSM процесса на каждого подключившегося клиента. Хотя существует поддержка TCP соединений в OTP через gen_tcp модуль, не существует стандартного поведение для создания неблокирующего TCP сервера опираясь на принципы OTP. Под неблокирующим сервером мы подразумеваем, что слушающий процесс и FSM-процесс не должны делать каких-либо блокирующих вызовов и быстро реагировать на входящие сообщения (например, изменения в конфигурации, перезапуск и т.д.), не вызывая таймауты. Обратите внимание, что блокировка в контексте Erlang означает блокировку процесса Erlang, а не процесса операционой системы.

В этом руководстве мы покажем, как создать неблокирующий TCP сервер, используя gen_server и gen_fsm, которые предоставляют контроль над поведением приложения и полностью удовлетворяют принципам OTP.

Читателю, который не знаком с OTP, рекомендуется обратить внимание на руководство Джо Армстронга о том, как построить отказоустойчивые сервера с использованием блокирующих вызовов gen_tcp:connect/3 и gen_tcp:acceept/1 без использования OTP.

Структура сервера


Дизайн нашего сервера будет включать в себя основной процесс-супервизор приложения tcp_server_app со стратегией перезапуска one_for_one и два дочерних процесса. Первый из которых является слушающим процессом, реализованным как gen_server, который будет ждать асинхронных уведомлений о клиентских подключениях. Второй является другим супервизором приложения tcp_client_sup и отвечает за запуск FSM-процесса обработки клиенских запросов и регистрации ненормальных отключений с помощью стандартных отчетов SASL об ошибках.

Для простоты этого материала, обработчик клиенских запросов (tcp_echo_fsm) будет предоставлять «Эхо» сервер, который будет возвращать запросы клиента обратно.

Поведения приложения и супервизоров


Для того, чтобы создать наше приложение, нам нужно написать модули, реализующие функции обратного вызова поведений «Supervisor» и «Application». Хотя традиционно эти функции реализуются в отдельных модулях, с учетом их краткости мы объединим их в одном.

В качестве дополнительного бонуса мы реализуем get_app_env функцию, которая показывает, как обрабатывать параметры конфигурации, а также параметры командной строки эмулятора при запуске.

Два экземпляра функции init/1 нужны для двух уровней иерархии супервизоров. Так как используется две различные стратегии перезапуска, мы реализуем их на различных уровнях.

После запуска приложения функция обратного вызова tcp_server_app:start/2 вызывает функцию supervisor: start_link/2, которая создает главный супервизор приложения вызывая tcp_server_app: init ([Port, Module]). Это супервизор создает процесс tcp_listener и дочерний супервизор tcp_client_sup ответственный за обработку клиентских подключений. Аргумент Module в функции init это имя FSM-обработчика клиентских соединений (в данном случае tcp_echo_fsm).

TCP Server Application (tcp_server_app.erl):

-module(tcp_server_app).
-author('saleyn@gmail.com').

-behaviour(application).

%% Internal API
-export([start_client/0]).

%% Application and Supervisor callbacks
-export([start/2, stop/1, init/1]).

-define(MAX_RESTART,    5).
-define(MAX_TIME,      60).
-define(DEF_PORT,    2222).

%% A startup function for spawning new client connection handling FSM.
%% To be called by the TCP listener process.
start_client() ->
    supervisor:start_child(tcp_client_sup, []).

%%----------------------------------------------------------------------
%% Application behaviour callbacks
%%----------------------------------------------------------------------
start(_Type, _Args) ->
    ListenPort = get_app_env(listen_port, ?DEF_PORT),
    supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, tcp_echo_fsm]).

stop(_S) ->
    ok.

%%----------------------------------------------------------------------
%% Supervisor behaviour callbacks
%%----------------------------------------------------------------------
init([Port, Module]) ->
    {ok,
        {_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME},
            [
              % TCP Listener
              {   tcp_server_sup,                          % Id       = internal id
                  {tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A}
                  permanent,                               % Restart  = permanent | transient | temporary
                  2000,                                    % Shutdown = brutal_kill | int() >= 0 | infinity
                  worker,                                  % Type     = worker | supervisor
                  [tcp_listener]                           % Modules  = [Module] | dynamic
              },
              % Client instance supervisor
              {   tcp_client_sup,
                  {supervisor,start_link,[{local, tcp_client_sup}, ?MODULE, [Module]]},
                  permanent,                               % Restart  = permanent | transient | temporary
                  infinity,                                % Shutdown = brutal_kill | int() >= 0 | infinity
                  supervisor,                              % Type     = worker | supervisor
                  []                                       % Modules  = [Module] | dynamic
              }
            ]
        }
    };

init([Module]) ->
    {ok,
        {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
            [
              % TCP Client
              {   undefined,                               % Id       = internal id
                  {Module,start_link,[]},                  % StartFun = {M, F, A}
                  temporary,                               % Restart  = permanent | transient | temporary
                  2000,                                    % Shutdown = brutal_kill | int() >= 0 | infinity
                  worker,                                  % Type     = worker | supervisor
                  []                                       % Modules  = [Module] | dynamic
              }
            ]
        }
    }.

%%----------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------
get_app_env(Opt, Default) ->
    case application:get_env(application:get_application(), Opt) of
    {ok, Val} -> Val;
    _ ->
        case init:get_argument(Opt) of
        [[Val | _]] -> Val;
        error       -> Default
        end
    end.


Слушающий процесс


Один из недостатков gen_tcp модуля является то, что он предоставляется интерфейс только для блокирующиего принятия соединений.

Тестирование модуля prim_inet показало интересный факт, что команда сетовому драйверу принять клиентское соединение является асинхронной. Хотя это и не является задокументированным, что означает, что команда OTP может в любой момент изменить это, мы будем использовать эту функциональность в создании нашего сервера.

Слушающий процесс реализуется как gen_server.
TCP Listener Process (tcp_listener.erl):

-module(tcp_listener).
-author('saleyn@gmail.com').

-behaviour(gen_server).

%% External API
-export([start_link/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
         code_change/3]).

-record(state, {
                listener,       % Listening socket
                acceptor,       % Asynchronous acceptor's internal reference
                module          % FSM handling module
               }).

%%--------------------------------------------------------------------
%% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason}
%
%% @doc Called by a supervisor to start the listening process.
%% @end
%%----------------------------------------------------------------------
start_link(Port, Module) when is_integer(Port), is_atom(Module) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []).

%%%------------------------------------------------------------------------
%%% Callback functions from gen_server
%%%------------------------------------------------------------------------

%%----------------------------------------------------------------------
%% @spec (Port::integer()) -> {ok, State}           |
%%                            {ok, State, Timeout}  |
%%                            ignore                |
%%                            {stop, Reason}
%%
%% @doc Called by gen_server framework at process startup.
%%      Create listening socket.
%% @end
%%----------------------------------------------------------------------
init([Port, Module]) ->
    process_flag(trap_exit, true),
    Opts = [binary, {packet, 2}, {reuseaddr, true},
            {keepalive, true}, {backlog, 30}, {active, false}],
    case gen_tcp:listen(Port, Opts) of
    {ok, Listen_socket} ->
        %%Create first accepting process
        {ok, Ref} = prim_inet:async_accept(Listen_socket, -1),
        {ok, #state{listener = Listen_socket,
                    acceptor = Ref,
                    module   = Module}};
    {error, Reason} ->
        {stop, Reason}
    end.

%%-------------------------------------------------------------------------
%% @spec (Request, From, State) -> {reply, Reply, State}          |
%%                                 {reply, Reply, State, Timeout} |
%%                                 {noreply, State}               |
%%                                 {noreply, State, Timeout}      |
%%                                 {stop, Reason, Reply, State}   |
%%                                 {stop, Reason, State}
%% @doc Callback for synchronous server calls.  If `{stop, ...}' tuple
%%      is returned, the server is stopped and `terminate/2' is called.
%% @end
%% @private
%%-------------------------------------------------------------------------
handle_call(Request, _From, State) ->
    {stop, {unknown_call, Request}, State}.

%%-------------------------------------------------------------------------
%% @spec (Msg, State) ->{noreply, State}          |
%%                      {noreply, State, Timeout} |
%%                      {stop, Reason, State}
%% @doc Callback for asyncrous server calls.  If `{stop, ...}' tuple
%%      is returned, the server is stopped and `terminate/2' is called.
%% @end
%% @private
%%-------------------------------------------------------------------------
handle_cast(_Msg, State) ->
    {noreply, State}.

%%-------------------------------------------------------------------------
%% @spec (Msg, State) ->{noreply, State}          |
%%                      {noreply, State, Timeout} |
%%                      {stop, Reason, State}
%% @doc Callback for messages sent directly to server's mailbox.
%%      If `{stop, ...}' tuple is returned, the server is stopped and
%%      `terminate/2' is called.
%% @end
%% @private
%%-------------------------------------------------------------------------
handle_info({inet_async, ListSock, Ref, {ok, CliSocket}},
            #state{listener=ListSock, acceptor=Ref, module=Module} = State) ->
    try
        case set_sockopt(ListSock, CliSocket) of
        ok              -> ok;
        {error, Reason} -> exit({set_sockopt, Reason})
        end,

        %% New client connected - spawn a new process using the simple_one_for_one
        %% supervisor.
        {ok, Pid} = tcp_server_app:start_client(),
        gen_tcp:controlling_process(CliSocket, Pid),
        %% Instruct the new FSM that it owns the socket.
        Module:set_socket(Pid, CliSocket),

        %% Signal the network driver that we are ready to accept another connection
        case prim_inet:async_accept(ListSock, -1) of
        {ok,    NewRef} -> ok;
        {error, NewRef} -> exit({async_accept, inet:format_error(NewRef)})
        end,

        {noreply, State#state{acceptor=NewRef}}
    catch exit:Why ->
        error_logger:error_msg("Error in async accept: ~p.\n", [Why]),
        {stop, Why, State}
    end;

handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) ->
    error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]),
    {stop, Error, State};

handle_info(_Info, State) ->
    {noreply, State}.

%%-------------------------------------------------------------------------
%% @spec (Reason, State) -> any
%% @doc  Callback executed on server shutdown. It is only invoked if
%%       `process_flag(trap_exit, true)' is set by the server process.
%%       The return value is ignored.
%% @end
%% @private
%%-------------------------------------------------------------------------
terminate(_Reason, State) ->
    gen_tcp:close(State#state.listener),
    ok.

%%-------------------------------------------------------------------------
%% @spec (OldVsn, State, Extra) -> {ok, NewState}
%% @doc  Convert process state when code is changed.
%% @end
%% @private
%%-------------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%------------------------------------------------------------------------
%%% Internal functions
%%%------------------------------------------------------------------------

%% Taken from prim_inet.  We are merely copying some socket options from the
%% listening socket to the new client socket.
set_sockopt(ListSock, CliSocket) ->
    true = inet_db:register_socket(CliSocket, inet_tcp),
    case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of
    {ok, Opts} ->
        case prim_inet:setopts(CliSocket, Opts) of
        ok    -> ok;
        Error -> gen_tcp:close(CliSocket), Error
        end;
    Error ->
        gen_tcp:close(CliSocket), Error
    end.


В этом модуле init/1 принимает два параметра — номер порта, который слушающий процесс должен открыть и имя обработчика клиентских подключений. Функция инициализации открывает сокет в пассивном режиме. Это делается для того, чтобы у нас был контроль над потоком данных, полученных от клиента.

Самая интересная часть этого кода это вызов prim_inet:async_accept/2. Для того чтобы заставить это работать нам необходимо скопировать часть внутреннего кода OTP из функции set_sockopt/2, которая обрабатывает регистрацию сокетов и копирование некоторых опций для клиентского сокета.

Как только клиентский сокет будет подключен, сетевой драйвер уведомит об этом слушающий процесс с помощью сообщения {inet_async, ListSock, Ref, {OK, CliSocket}}. На данный момент мы запускаем экземпляр процесса обработки клиентских запросов и передаем ему во владение CliSocket.

Процесс обработки клиентских сообщений


В то время как tcp_listener является обобщенной реализацией, tcp_echo_fsm есть не что иное, как FSM заглушка для описания, как создавать TCP сервера. Из этот модуля необходимо экспортировать две функции — start_link/0 для супервизора tcp_client_sup и set_socket/2 для слушающего процесса для того, чтобы последний уведомил процесс обработки клиентских сообщений что то становится владельцем сокета, и может начать получать сообщения, установив {active, once} или {active, true} опцию.

Мы хотели бы подчеркнуть шаблон синхронизации, используемый между слушающим процессом и клиентскими, для избежания возможных потерь сообщение в связи с передачей их не тому (слушающему) процессу. Процесс, владеющий сокетом, держит его открытым в пассивном режиме. Далее клиентский процесс принимает сокет, который наследует опции(в том числе пассивный режим) от слушающего процесса. Право владения сокетом передается клиентскому процессу с помощью вызовов gen_tcp:controlling_process/2 и set_socket/2, который уведомит клиентский процесс, что он может начать получать сообщения от сокета. До момента, когда сокет будет установлен в активный режим, все принятые данные будут хранится в буфере сокета.

Когда право владения сокетом передается клиентскому FSM-прицессу в состоянии «WAIT_FOR_SOCKET», устанавлвается {active, once} режим, чтобы позвалить сетевому драйверу передавать одно сообщение за раз. Это принцип OTP применяемый для сохранения контроля над потоком данных и избежания смешивания сообщений и TCP траффика в очереди процесса.

FSM-состояния реализуются с помощью специальных функций в модуле tcp_echo_fsm, который используют соглашение об именовании. FSM состоит из двух состояний. WAIT_FOR_SOCKET является начальным состоянием, в котором FSM ждет права владения сокетом, и WAIT_FOR_DATA, которое является состоянием ожидания TCP сообщения от клиента. В этом состоянии FSM также обрабатывает специальные «timeout» сообщение, что означает отсутствие активности от клиента и вызывает процесс, чтобы закрыть соединение с клиентом.


-module(tcp_echo_fsm).
-author('saleyn@gmail.com').

-behaviour(gen_fsm).

-export([start_link/0, set_socket/2]).

%% gen_fsm callbacks
-export([init/1, handle_event/3,
         handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).

%% FSM States
-export([
    'WAIT_FOR_SOCKET'/2,
    'WAIT_FOR_DATA'/2
]).

-record(state, {
                socket,    % client socket
                addr       % client address
               }).

-define(TIMEOUT, 120000).

%%%------------------------------------------------------------------------
%%% API
%%%------------------------------------------------------------------------

%%-------------------------------------------------------------------------
%% @spec (Socket) -> {ok,Pid} | ignore | {error,Error}
%% @doc To be called by the supervisor in order to start the server.
%%      If init/1 fails with Reason, the function returns {error,Reason}.
%%      If init/1 returns {stop,Reason} or ignore, the process is
%%      terminated and the function returns {error,Reason} or ignore,
%%      respectively.
%% @end
%%-------------------------------------------------------------------------
start_link() ->
    gen_fsm:start_link(?MODULE, [], []).

set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) ->
    gen_fsm:send_event(Pid, {socket_ready, Socket}).

%%%------------------------------------------------------------------------
%%% Callback functions from gen_server
%%%------------------------------------------------------------------------

%%-------------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, StateName, StateData}          |
%%          {ok, StateName, StateData, Timeout} |
%%          ignore                              |
%%          {stop, StopReason}
%% @private
%%-------------------------------------------------------------------------
init([]) ->
    process_flag(trap_exit, true),
    {ok, 'WAIT_FOR_SOCKET', #state{}}.

%%-------------------------------------------------------------------------
%% Func: StateName/2
%% Returns: {next_state, NextStateName, NextStateData}          |
%%          {next_state, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}
%% @private
%%-------------------------------------------------------------------------
'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) ->
    % Now we own the socket
    inet:setopts(Socket, [{active, once}, {packet, 2}, binary]),
    {ok, {IP, _Port}} = inet:peername(Socket),
    {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT};
'WAIT_FOR_SOCKET'(Other, State) ->
    error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]),
    %% Allow to receive async messages
    {next_state, 'WAIT_FOR_SOCKET', State}.

%% Notification event coming from client
'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) ->
    ok = gen_tcp:send(S, Data),
    {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT};

'WAIT_FOR_DATA'(timeout, State) ->
    error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]),
    {stop, normal, State};

'WAIT_FOR_DATA'(Data, State) ->
    io:format("~p Ignoring data: ~p\n", [self(), Data]),
    {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}.

%%-------------------------------------------------------------------------
%% Func: handle_event/3
%% Returns: {next_state, NextStateName, NextStateData}          |
%%          {next_state, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}
%% @private
%%-------------------------------------------------------------------------
handle_event(Event, StateName, StateData) ->
    {stop, {StateName, undefined_event, Event}, StateData}.

%%-------------------------------------------------------------------------
%% Func: handle_sync_event/4
%% Returns: {next_state, NextStateName, NextStateData}            |
%%          {next_state, NextStateName, NextStateData, Timeout}   |
%%          {reply, Reply, NextStateName, NextStateData}          |
%%          {reply, Reply, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}                          |
%%          {stop, Reason, Reply, NewStateData}
%% @private
%%-------------------------------------------------------------------------
handle_sync_event(Event, _From, StateName, StateData) ->
    {stop, {StateName, undefined_event, Event}, StateData}.

%%-------------------------------------------------------------------------
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData}          |
%%          {next_state, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}
%% @private
%%-------------------------------------------------------------------------
handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) ->
    % Flow control: enable forwarding of next TCP message
    inet:setopts(Socket, [{active, once}]),
    ?MODULE:StateName({data, Bin}, StateData);

handle_info({tcp_closed, Socket}, _StateName,
            #state{socket=Socket, addr=Addr} = StateData) ->
    error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),
    {stop, normal, StateData};

handle_info(_Info, StateName, StateData) ->
    {noreply, StateName, StateData}.

%%-------------------------------------------------------------------------
%% Func: terminate/3
%% Purpose: Shutdown the fsm
%% Returns: any
%% @private
%%-------------------------------------------------------------------------
terminate(_Reason, _StateName, #state{socket=Socket}) ->
    (catch gen_tcp:close(Socket)),
    ok.

%%-------------------------------------------------------------------------
%% Func: code_change/4
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState, NewStateData}
%% @private
%%-------------------------------------------------------------------------
code_change(_OldVsn, StateName, StateData, _Extra) ->
    {ok, StateName, StateData}.


Описание приложения


Другая необходимая часть создания приложения OTP является создание конфигурациооного файла, который содержит название приложения, версию, стартовый модуль и переменные окружения.

Application File (tcp_server.app):

{application, tcp_server,
 [
  {description, "Demo TCP server"},
  {vsn, "1.0"},
  {id, "tcp_server"},
  {modules,      [tcp_listener, tcp_echo_fsm]},
  {registered,   [tcp_server_sup, tcp_listener]},
  {applications, [kernel, stdlib]},
  %%
  %% mod: Specify the module name to start the application, plus args
  %%
  {mod, {tcp_server_app, []}},
  {env, []}
 ]
}.


Компиляция


Создайте следующую структуру каталогов для этого приложения:


./tcp_server
./tcp_server/ebin/
./tcp_server/ebin/tcp_server.app
./tcp_server/src/tcp_server_app.erl
./tcp_server/src/tcp_listener.erl
./tcp_server/src/tcp_echo_fsm.erl


$ cd tcp_server/src
$ for f in tcp*.erl ; do erlc +debug_info -o ../ebin $f

Запуск


Мы собираемся запустить оболочку Erlang с поддержкой SASL, чтобы мы могли видеть состояние процессов и отчеты об ошибках для нашего приложения. Кроме того, мы собираемся запсутить приложение appmon в целях визуального изучения иерархии супервизоров.


$ cd ../ebin
$ erl -boot start_sasl
...
1> appmon:start().
{ok,<0.44.0>}
2> application:start(tcp_server).
ok


Теперь нажмите на кнопку tcp_server в приложении appmon, чтобы отобразить иерархию супервизоров приложения.

 
3> {ok,S} = gen_tcp:connect({127,0,0,1},2222,[{packet,2}]).
{ok,#Port<0.150>}


Мы только что инициировали новое соединение клиента Echo Server.


4> gen_tcp:send(S,<<"hello">>).
ok
5> f(M), receive M -> M end.
{tcp,#Port<0.150>,"hello"}


Мы проверили, что эхо-сервер работает, как ожидалось. Теперь давайте попробуем «положить» соединение клиента на сервере и посмотреть за генерацией сообщения об ошибке.


6> [{_,Pid,_,_}] = supervisor:which_children(tcp_client_sup).
[{undefined,<0.64.0>,worker,[]}]
7> exit(Pid,kill).
true
=SUPERVISOR REPORT==== 31-Jul-2007::14:33:49 ===
     Supervisor: {local,tcp_client_sup}
     Context:    child_terminated
     Reason:     killed
     Offender:   [{pid,<0.77.0>},
                  {name,undefined},
                  {mfa,{tcp_echo_fsm,start_link,[]}},
                  {restart_type,temporary},
                  {shutdown,2000},
                  {child_type,worker}]


Заметим, что если вы сервер попадет под нагрузку с помощью большого число соединений, слушающий процесс может не принимать новые подключения после определенного предела, установленного в операционной системе. В этом случае вы увидите сообщение об ошибке:


"too many open files"


Заключение


OTP предоставляет строительные блоки для построения неблокирующих TCP серверов. В этом руководстве показано, как создать простой сервер с использованием стандартного поведения OTP.
Tags:
Hubs:
+31
Comments 10
Comments Comments 10

Articles