Pull to refresh

Быстрые TCP сокеты на Erlang

Reading time 10 min
Views 12K
Обработка TCP соединений может запросто оказаться узким местом, когда скорость приближается к 10 тыс запросов в секунду: эффективное чтение и запись становится отдельной проблемой, а большая часть вычислительных ядер простаивает.

В этой статье я предлагаю оптимизации, которые позволяют улучшить три составляющие работы с TCP: приём соединений, получение сообщений и ответ на них.

Статья адресована как Erlang программистам, так и всем, кто просто интересуется Erlang. Глубокие знания языка не требуются.


Я разделяю “Работу с TCP” на три части:
  1. Приём соединений
  2. Получение сообщений
  3. Ответ на сообщения

В зависимости от задачи, любая из этих частей может оказаться наиболее узким местом.

Я буду рассматривать два подхода к написанию TCP сервисов — напрямую через gen_tcp и при помощи ranch, наиболее популярной библиотеки для пулов соединений на Erlang. Некоторые из предложенных оптимизаций будут применимы только в одном из случаев.

Для того чтобы оценить изменение производительности, я использую MZBench с tcp_worker, который реализует функции connect и request плюс функции синхронизации. Будут использоваться два сценария “fast_connect” и “fast_receive”. Первый открывает соединения с линейно нарастающей скоростью, а второй пытается отправить как можно больше пакетов по уже открытым соединениям. Каждый из сценариев запускался на c4.2xlarge Amazon node. Версия Erlang — 18.

Сценарии и код функций для MZBench доступны на GitHub.

Приём соединений


Быстрый приём соединений важен, если у вас много клиентов, которые постоянно переподключаются, например если клиентские процессы сильно ограничены по времени или не поддерживают постоянные соединения.

Оптимизации ranch


TCP сервисы при помощи ranch создаются довольно просто. Я поменяю код примера echo-сервиса, который идёт вместе с ranch, чтобы он отвечал “ok” на любой приходящий пакет, ниже различия:

--- a/examples/tcp_echo/src/echo_protocol.erl
+++ b/examples/tcp_echo/src/echo_protocol.erl
@@ -16,8 +16,8 @@ init(Ref, Socket, Transport, _Opts = []) ->
 
 loop(Socket, Transport) ->
        case Transport:recv(Socket, 0, 5000) of
-               {ok, Data} ->
-                       Transport:send(Socket, Data),
+               {ok, _Data} ->
+                       Transport:send(Socket, <<"ok">>),
                        loop(Socket, Transport);
                _ ->
                        ok = Transport:close(Socket)

--- a/examples/tcp_echo/src/tcp_echo_app.erl
+++ b/examples/tcp_echo/src/tcp_echo_app.erl
@@ -11,8 +11,8 @@
 %% API.
 
 start(_Type, _Args) ->
-       {ok, _} = ranch:start_listener(tcp_echo, 1,
-               ranch_tcp, [{port, 5555}], echo_protocol, []),
+       {ok, _} = ranch:start_listener(tcp_echo, 100,
+               ranch_tcp, [{port, 5555}, {max_connections, infinity}], echo_protocol, []),
        tcp_echo_sup:start_link().


Я начну с запуска сценария “fast_connect” (с нарастающей скоростью открытия соединений):


График слева показывает выброс размером в 214ms, остальные линии соответствуют персентилям временных задержек, разбитых по пятисекундным интервалам. График справа это скорость октрытия соеднинений, например в районе выброса она была около 3.5 тыс соединений в секунду. В этом сценарии каждый раз отправляется по одному сообщению, по этому количество сообщений соответствует количеству открытых соединений.

Дальнейшее увеличение скорости даёт следующие результаты:



Выбросы в 1000 msec соответствуют превышению времени ожидания. Если продолжить увеличивать скорость открытия соединений, выбросы станут более частыми. Первые выбросы появляются при скорости 5k rps и постоянно присутствуют при скорости 11k rps.

Замена timeout при приёме пакета на timer:sleep()


Я обнаружл, что простое исключение параметра timeout при приёме сообщения сильно повышает скорость установления соединений. Для того чтобы не опрашивать сокет с максимальной скоростью, я добавил timer:sleep(20):

--- a/examples/tcp_echo/src/echo_protocol.erl
+++ b/examples/tcp_echo/src/echo_protocol.erl
@@ -15,10 +15,11 @@ init(Ref, Socket, Transport, _Opts = []) ->
        loop(Socket, Transport).
 
 loop(Socket, Transport) ->
-       case Transport:recv(Socket, 0, 5000) of
-               {ok, Data} ->
-                       Transport:send(Socket, Data),
+       case Transport:recv(Socket, 0, 0) of
+               {ok, _Data} ->
+                       Transport:send(Socket, <<"ok">>),
                        loop(Socket, Transport);
+                {error, timeout} -> timer:sleep(20), loop(Socket, Transport);
                _ ->
                        ok = Transport:close(Socket)
        end.


С этой оптимизацией, приложение ranch может принимать больше соденинений, первый выброс появляется только при 11k rps:



Выбросов становится ещё больше, если пытаться повысить скорость дальше. Таким образом, максимальное число — 24k rps.

Вывод
С предложенной оптимизацией, я получил примерно двойной выигрыш в скорости приёма соединений, от 11k до 24k rps.

Оптимизация gen_tcp


Ниже чистая реализация при помощи gen_tcp, аналогичная тому что я сделал при помощи ranch (текст доступен в виде simple.erl в репозитории с примерами):

-export([service/1]).

-define(Options, [
    binary,
    {backlog, 128},
    {active, false},
    {buffer, 65536},
    {keepalive, true},
    {reuseaddr, true}
]).

-define(Timeout, 5000).

main([Port]) ->
    {ok, ListenSocket} = gen_tcp:listen(list_to_integer(Port), ?Options),
    accept(ListenSocket).

accept(ListenSocket) ->
    case gen_tcp:accept(ListenSocket) of
        {ok, Socket} -> erlang:spawn(?MODULE, service, [Socket]), accept(ListenSocket);
        {error, closed} -> ok
    end.

service(Socket) ->
    case gen_tcp:recv(Socket, 0, ?Timeout) of
        {ok, _Binary} -> gen_tcp:send(Socket, <<"ok">>), service(Socket);
        _ -> gen_tcp:close(Socket)
    end.


Запустив тот же сценарий, я получил результаты:



Как можно увидеть, примерно в районе 18k rps, приём соединений становится ненадёжным. Будем считать, что получается принять 18k.

Замена timeout при приёме пакета на timer:sleep()


Я просто применю ту же оптимизацию, что и для ranch:

service(Socket) ->
    case gen_tcp:recv(Socket, 0, 0) of
        {ok, _Binary} -> gen_tcp:send(Socket, <<"ok">>), service(Socket);
        {error, timeout} -> timer:sleep(20), service(Socket);
        _ -> gen_tcp:close(Socket)
    end.


В таком случае получается обработать 23k rps:



Добавление принимающих процессов


Вторая идея — увеличение количества принимающих соединение процессов. Это может быть достигнуто при помощи вызова gen_tcp:accept из нескольких процессов:

main([Port]) ->
    {ok, ListenSocket} = gen_tcp:listen(list_to_integer(Port), ?Options),
    erlang:spawn(?MODULE, accept, [ListenSocket]),
    erlang:spawn(?MODULE, accept, [ListenSocket]),
    accept(ListenSocket).


Тестирование под нагрузкой даёт 32k rps:



При дальнейшем увеличении нагрузки, задержки растут.

Вывод
Оптимизация timeout для gen_tcp увеличивает скорость приёма на 5k rps, от 18k до 23k.
С несколькими принимающими процессами, gen_tcp обрабатывает 32k rps, это в 1.8 раз больше, чем без оптимизаций.

Итоги


  • Лучше не использовать параметр timeout в функции вызова, timer:sleep — лучше. Это применимо и к ranch и к чистому gen_tcp. Для ranch это удваивает скорость приёма соединений.
  • Из нескольких процессов, соединения принимаются быстрей. Это применимо только для чистого gen_tcp. В моём случае это дало 40% улучшение в скорости приёма соединений в совокупности с заменой timeout на timer:sleep().


Получение сообщений


Это часть о том как получать большое количество коротких сообщений по уже установленым соединениям. Новые соединения открываются редко, требуется читать и отвечать на сообщения максимально быстро. Этот сценарий реализуется в нагруженных приложениях с web-сокетами.

Я открываю 25k соединений с нескольких узлов и постепенно увеличиваю скорость отправки сообщений.

Оптимизация ranch


Ниже результаты для неоптимизированного кода с использованием ranch (слева временные задержки, справа скорость обработки сообщений):


Без оптимизаций, ranch обрабатывает 70k rps с максимальной временной задержкой в 800ms.

Увеличение буферов linux


Довольно популярной оптимизацией является увеличение linux буферов для сокетов. Посмотрим, как эта оптимизация скажется на результатах:



Вывод
В данном случае увеличение буферов не даёт большого выигрыша.

Оптимизация get_tcp


Ниже я проверил скорость обработки пакетов решением на gen_tcp из предыдущей части статьи:


70k rps, так же, как и ranch.

Уменьшение количества читающих процессов


В предыдущем случае, у меня 25k процессов читают из сокетов — один процесс на каждое соединение. Теперь я попробую уменьшить это количество и проверить результаты.

Я создам 100 процессов и буду распределять новые сокеты между ними:

main([Port]) ->
    {ok, ListenSocket} = gen_tcp:listen(list_to_integer(Port), ?Options),
    Readers = [erlang:spawn(?MODULE, reader, []) || _X <- lists:seq(1, ?Readers)],
    accept(ListenSocket, Readers, []).

accept(ListenSocket, [], Reversed) -> accept(ListenSocket, lists:reverse(Reversed), []);
accept(ListenSocket, [Reader | Rest], Reversed) ->
    case gen_tcp:accept(ListenSocket) of
        {ok, Socket} -> Reader ! Socket, accept(ListenSocket, Rest, [Reader | Reversed]);
        {error, closed} -> ok
    end.

reader() -> reader([]).

read_socket(S) ->
    case gen_tcp:recv(S, 0, 0) of
        {ok, _Binary} -> gen_tcp:send(S, <<"ok">>), true;
        {error, timeout} -> true;
        _ -> gen_tcp:close(S), false
    end.

reader(Sockets) ->
    Sockets2 = lists:filter(fun read_socket/1, Sockets),
    receive
        S -> reader([S | Sockets2])
    after ?SmallTimeout -> reader(Sockets)
    end.


Эта оптимизация даёт существенный прирост производительности:



Кроме увеличения скорости, временные задержки выглядят гораздо лучше, a количество обрабатываемых пакетов около 100k, кроме того, можно обработать даже 120k сообщений, но с большими временными задержками. В то время как без оптимизации этого сделать не получалось.

Вывод
Обработка нескольких соединений из одного процесса даёт как минимум 50% увеличение производительности для чистого gen_tcp сервера.

Увеличение буферов Linux


Я применю ту же оптимизацию к системе с ванильным gen_tcp скриптом:


Как и в случае с ranch, каких-то существенных результатов не видно, только появились дополнительные выбросы в виде больших временных задержек.

Применяя оптимизацию к уже оптимизированному gen_tcp я получаю множество выбросов временных задержек:



Вывод
Решения на чистом gen_tcp также не выигрывают от увеличения Linux буферов. Понижение количество читающих из сокетов процессов даёт 50% выигрыш в скорости обработки.

Итоги


  • Изначально оба решения позволяют обрабатывать примерно одинаковое количество сообщений, около 70k rps.
  • Увеличение буферов не позволяет существенным образом повысить скорость обработки и в случае с чистым gen_tcp добавляет выборосы в виде больших временных задержек.
  • Gen_tcp решение с несколькими сокетами на один процесс работает как минимум в 1.5 раза быстрей чем неоптимизированное и имеет гораздо лучшие временные задержки. К сожалению, это оптимизация не применима к ranch без изменения его архитектуры.


Ответ на сообщения


Формально, в предыдущих главах цикл обработки сообщения предполагал ответ на него, но я не делал чего-то для оптимизации этой части. Я попробую применить те же самые идеи к функциям отправки сообщения. Здесь я использую сценарий из предыдущей главы, в котором пакеты идут по уже установленым соединениям.

Оптимизации таймаутов и процессов


Те же идеи, которые я использовал в предыдущих главах можно применить к функции отправки: убрать timeout и отвечать из меньшего количества процессов. Такого параметра как timeout в функции send нет, нужно установить опцию {send_timeout, 0} при открытии соединения.

К сожалению, это оптимизация практически ничего не меняет, а изменение кода сводится к простому добавлению опции, по этой причине я решил не утруждать читателя diff-ом и графиком.

Для проверки того как влияет количество процессов, я использовал следующий скрипт:

-export([responder/0, service/2]).

-define(Options, [
    binary,
    {backlog, 128},
    {active, false},
    {buffer, 65536},
    {keepalive, true},
    {send_timeout, 0},
    {reuseaddr, true}
]).

-define(SmallTimeout, 50).
-define(Timeout, 5000).
-define(Responders, 200).

main([Port]) ->
    {ok, ListenSocket} = gen_tcp:listen(list_to_integer(Port), ?Options),
    Responders = [erlang:spawn(?MODULE, responder, []) || _X <- lists:seq(1, ?Responders)],
    accept(ListenSocket, Responders, []).

accept(ListenSocket, [], Reversed) -> accept(ListenSocket, lists:reverse(Reversed), []);
accept(ListenSocket, [Responder | Rest], Reversed) ->
    case gen_tcp:accept(ListenSocket) of
        {ok, Socket} -> erlang:spawn(?MODULE, service, [Socket, Responder]), accept(ListenSocket, Rest, [Responder | Reversed]);
        {error, closed} -> ok
    end.

responder() ->
    receive
        S -> gen_tcp:send(S, <<"ok">>), responder()
    after ?SmallTimeout -> responder()
    end.

service(Socket, Responder) ->
    case gen_tcp:recv(Socket, 0, ?Timeout) of
        {ok, _Binary} -> Responder ! Socket, service(Socket, Responder);
        _ -> gen_tcp:close(Socket)
    end.


Здесь отвечающие процессы разделены с читающими; У меня 25000 читающих и 200 отвечающих.

Но опять, эта оптимизация не показывает существенного прироста производительности в сравнении с решением на gen_tcp из предыдущего раздела:


Тюнинг Erlang


В случае если один процесс используется для работы с несколькими сокетами, один медленный клиент может затормозить всех остальных. Для того чтобы избежать такой ситуации, можно установить {send_timeout, 0} при открытии сокета и в случае неудачи повторять отправку следующим циклом.

К сожалению, функция send не возвращает количество отправленых байт. Возвращается только ошибка POSIX, либо атом “ok”. Это делает невозможным отправку с последнего успешно отправленого байта. Кроме того, зная это количество можно использовать сеть более эффективно, что становится особенно важно, если клиенты имеют плохие каналы.

Далее я привожу пример как это можно исправить:

  1. Скачаем исходники Erlang с официального сайта:
    $ wget http://erlang.org/download/otp_src_18.2.1.tar.gz
    $ tar -xf otp_src_18.2.1.tar.gz
    $ cd otp_src_18.2.1
    

  2. Обновим функцию драйвера inet erts/emulator/drivers/common/inet_drv.c:
    1. Добавим возможность отвечать числом:
      static int inet_reply_ok_int(inet_descriptor* desc, int Val)
      {
          ErlDrvTermData spec[2*LOAD_ATOM_CNT + 2*LOAD_PORT_CNT + 2*LOAD_TUPLE_CNT];
          ErlDrvTermData caller = desc->caller;
          int i = 0;
      
          i = LOAD_ATOM(spec, i, am_inet_reply);
          i = LOAD_PORT(spec, i, desc->dport);
          i = LOAD_ATOM(spec, i, am_ok);
          i = LOAD_INT(spec, i, Val);
          i = LOAD_TUPLE(spec, i, 2);
          i = LOAD_TUPLE(spec, i, 3);
          ASSERT(i == sizeof(spec)/sizeof(*spec));
      
          desc->caller = 0;
          return erl_drv_send_term(desc->dport, caller, spec, i);
      }
      

    2. Уберём отправку атома “ok” из функции tcp_inet_commandv:

             else
                  inet_reply_error(INETP(desc), ENOTCONN);
          }
          else if (desc->tcp_add_flags & TCP_ADDF_PENDING_SHUTDOWN)
              tcp_shutdown_error(desc, EPIPE);
      >>    else tcp_sendv(desc, ev);
          DEBUGF(("tcp_inet_commandv(%ld) }\r\n", (long)desc->inet.port));
      }
      

    3. Добавим отправку int вместо возврата 0 in в функции tcp_sendv:
          default:
               if (len == 0)
      >>             return inet_reply_ok_int(desc, 0);
               h_len = 0;
               break;
           }
      -----------------------------------
             else if (n == ev->size) {
                  ASSERT(NO_SUBSCRIBERS(&INETP(desc)->empty_out_q_subs));
      >>            return inet_reply_ok_int(desc, n);
              }
              else {
                  DEBUGF(("tcp_sendv(%ld): s=%d, only sent "
                          LLU"/%d of "LLU"/%d bytes/items\r\n",
                          (long)desc->inet.port, desc->inet.s,
                          (llu_t)n, vsize, (llu_t)ev->size, ev->vsize));
              }
      
              DEBUGF(("tcp_sendv(%ld): s=%d, Send failed, queuing\r\n",
                      (long)desc->inet.port, desc->inet.s));
              driver_enqv(ix, ev, n);
              if (!INETP(desc)->is_ignored)
                  sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1);
          }
      >>    return inet_reply_ok_int(desc, n);
      


  3. Запустим /configure && make && make install.


И всё, теперь функция gen_tcp:send будет возвращать {ok, Number} в случае успеха. Приведённый фрагмент кода выведет “9”:

   {ok, Sock} = gen_tcp:connect(SomeHostInNet, 5555,
                                 [binary, {packet, 0}]),
    {ok, N} = gen_tcp:send(Sock, "Some Data"),
    io:format("~p", [N])


Вывод
Если вы обрабатываете несколько соединений из одного процесса, необходимо использовать опцию {send_timeout, 0} при создании сокета, в противном случае один медленный клиент может затормозить отправку всем остальным.

Если ваш протокол может обрабатывать частичные сообщения, лучше пропатчить OTP и учитывать количество отправленных байт.

Кратко


  • Если вам нужно быстро принимать соединения, нужно принимать их из нескольких процессов.
  • Если нужно быстро читать из сокетов, нужно обрабатывать несколько сокетов из одного процесса и не пользоваться ranch.
  • Увеличение буферов linux приводит к понижению стабильности системы и не даёт существенного выигрыша производительности.
  • При использовании нескольких сокетов из одного процесса необходимо убирать таймаут на отправку.
  • Если нужно знать точное количество отправленных байт — можно пропатчить OTP.


Ссылки


Tags:
Hubs:
+30
Comments 14
Comments Comments 14

Articles