Pull to refresh

EventMachine прокси демон

Reading time 11 min
Views 3.7K
Несмотря на то, что EventMachine достаточно удобный фреймворк для написания высокопроизводительных и хорошо масштабирующихся сетевых приложений, интернет не радует обилием примеров его использования и тестирования. А те примеры, которые существуют, например, на хабре, не будут корректно работать, так как не учитывают особенности передачи данных (почему-то не учитывают, что данные, в общем случае, передаются по частям). Собственно, данная статья предназначена для тех, кто ознакомился с базовыми принципами работы EM, например, в статье Ruby и EventMachine , и хочет узнать как на его основе написать что-нибудь посложнее и как затем тестировать код, полученный в результате.

Недавно мне прислали тестовое задание, суть которого заключалась в написании прокси демона на EM, который бы асинхронно принимал соединения от клиентов через unix domain socket, выстраивал их в очередь и перенаправлял эти команды на socket, подключенный к какой-то абстрактной системе, и, получив ответ от этой системы, отсылал бы их обратно клиентам. Формат клиентского сообщения — {id: 1, text: «req1»}, которое должны быть преобразовано сервером в ответ — {id: 1, text: «answ1»}.

Для упрощения задачи, клиентов и абстрактную систему я эмулировал также с помощью EM, что позволило использовать встроенный протокол передачи данных EM::P::ObjectProtocol, который учитывает, что данные могут принимается не за единичный тик реактора, а также сериализует данные для передачи.

Начнем с написания кода клиента. При установлении соединения клиента с прокси демоном происходит вызов функции post_init, в которой клиент с помощью send_object отсылает сообщение в виде хеша.
Затем ждет ответа от демона, и выводит его в консоль. Так как было необходимо эмулировать подключения сразу нескольких клиентов, были введены переменные, хранящие данные о количестве соединений и количестве клиентов, которые уже получили ответ и были отсоединены от демона. Общее количество соединений хранится в константе TOTAL_CONNECTIONS, которая устанавливается при запуске клиентов. Когда клиента отсоединяет от сервера происходит вызов unbind. Когда все клиенты получают ответ, реактор останавливается.

module EMClient<br/>  include EM::P::ObjectProtocol<br/> <br/>  @@connection_number =  0<br/>  @@dissconnected =  0<br/> <br/>  #send request as the connection has been established<br/>  def post_init<br/>    @@connection_number += 1<br/>    send_object({'id'=> rand(10), 'text' => "req#{@@connection_number}"})<br/>  end<br/> <br/> <br/>  def receive_object(obj)<br/>    #display response from server<br/>    p obj.inspect<br/>  end<br/> <br/>  def unbind<br/>    @@dissconnected += 1<br/>    #stop reactor after all requests have been processed<br/>    EM.stop if @@dissconnected == TOTAL_CONNECTIONS<br/>  end<br/>end <br/>

Далее код сервера, эмулирующего работу абстрактной системы. Его задача — получить объект, преобразовать его и отправить обратно. С помощью EM.add_time() можно отсрочить выполнение блока кода, передаваемого вторым аргументом функции, в данном случае ответа на запрос со стороны демона.

module SocketServer<br/>  include EM::P::ObjectProtocol<br/> <br/>  def receive_object(obj)<br/>    #emulation of job on server<br/>    EM.add_timer(1+rand(5)) do<br/>      #validation of obj goes here))<br/>      obj['text'].sub!(/req/,'answ')<br/>      send_object(obj)<br/>    end<br/>    p "Server received object: #{obj.inspect}"<br/>  end<br/>end <br/>

А теперь перейдем к более интересной части, созданию очереди сообщений сообщений и пула соединений с абстрактным сервером. Для это был использован класс EM::Queue, который располагает двумя методами pop(*a, &b) и push(*items), позволяющими добавлять элементы в очередь и доставать их из неё. Метод pop принимает последним аргументом блок кода, который будет исполнен, когда в очереди появится элемент.

Для установления соединения с сервером, использовался модуль EMConnection, в котором был определен метод send_request(obj, &block) , суть которого заключается в отправке сообщения сервера и передаче блока, который будет выполнен при получении ответа от сервера.

Класс ConnectionPool отвечает за создания пула соединений. При его инициализации определятся размер пула и происходит инициализация очереди. Затем в методе start_queue устанавливается заданное количество соединений и для каждой соединения запускается worker (queue_worker_loop), который представляется собой proc, принимающий соединения как аргумент. Суть его работы заключается в получении из очереди элемента, представляющего из себя объект, который должен быть отослан серверу, и блок кода, который должен быть выполнен после получения объекта. Причем, после выполнения этого блока кода proc вызывает сам себя, таким образом получается некое подобие бесконечного цикла.

module EMConnection<br/>  include EM::P::ObjectProtocol<br/> <br/>  def receive_object(obj)<br/>    #calling callback on object receiving<br/>    @callback.call(obj)<br/>  end<br/> <br/>  def send_request obj, &block<br/>    #sending data to server and setting callback<br/>    send_object obj<br/>    @callback = block<br/>  end<br/> <br/>end<br/> <br/>#simple connection pool using EM queue, default size 10<br/>class ConnectionPool<br/> <br/>  def initialize(conf)<br/>    @pool_size = conf[:size] || 10<br/>    @connections = []<br/>    @query_queue = EM::Queue.new<br/>    start_queue conf<br/>  end<br/> <br/>  def queue_worker_loop<br/>    proc{ |connection|<br/>      @query_queue.pop do |request|<br/>        connection.send_request(request[:obj]) do |response|<br/>          request[:callback].call response #if request[:callback]<br/>          queue_worker_loop.call connection<br/>        end<br/>      end<br/>    }<br/>  end<br/> <br/>  def start_queue(conf)<br/>    @pool_size.times do<br/>      connection = EM.connect('0.0.0.0', 8080, EMConnection)<br/>      @connections << connection<br/>      queue_worker_loop.call connection<br/>    end<br/>  end<br/> <br/>  def request(obj, &block)<br/>    @query_queue.push :obj => obj, :callback => block<br/>  end<br/>end <br/>

Теперь перейдем к коду, отвечающему за работу прокси демона. Его работа заключается в инициализации пула соединений, конечно, это можно сделать и не коде демона, но так пул будет инициализирован только, когда он понадобится. При получении объекта от клиента он передает объект и блок кода очереди пула соединений, и когда будет получен ответ от абстрактного сервера, отсылает сообщение обратно клиенты и закрывает соединение с помощью метода close_connection_after_writing, удостоверившись, что сообщение полностью отослано клиенту ( в отличии от метода close_connection, который сразу закрывает соединение).

module DaemonServer<br/>  include EM::P::ObjectProtocol<br/> <br/>  def post_init<br/>    @@connections_pool ||= ConnectionPool.new(:size => 5)<br/>  end<br/> <br/>  def receive_object(obj)<br/>    @@connections_pool.request obj do |response|<br/>      send_object(response)<br/>      close_connection_after_writing<br/>    end<br/>  end<br/>end <br/>

Теперь перейдем к скриптам, отвечающим за запуск сервера, клиентов и прокси демона.
Сервер запускаем на TCPSocket. Тут все очень просто.

EventMachine.run {<br/>  EventMachine.start_server "127.0.0.1", 8080, SocketServer<br/>} <br/>


С клиентами чуть посложнее, так как необходимо дать возможность задавать количество эмулируемых клиентов, что реализовано путем передачи параметра при запуске скрипта. Запуск клиента на Unix Domain Socket отличается от случая c TCP Socket, только вызовом connect_unix_domain вместо connect и передачей вместо ip адреса и порта, имени файла первым аргументом функции.

tc = ARGV[ 0].to_i<br/>TOTAL_CONNECTIONS = tc >  0 ? tc : 25<br/> <br/>file = File.expand_path('../tmp/daemon.sock',__FILE__)<br/>p "Starting #{TOTAL_CONNECTIONS} client(s)"<br/>EventMachine::run {<br/>  TOTAL_CONNECTIONS.times{ EM.connect_unix_domain(file, EMClient) }<br/>} <br/>

Для того, чтобы прокси стал демоном, его, конечно, нужно демонизировать (капитан). Для этого я использовал гем 'daemons'. В случае если скрипт запускает с ключом -d, происходит его демонизация, путем вызова метода Daemons.daemonize с дополнительными опциями, определяющими где хранить логи и файл содержащий pid процесса, для того, чтобы демон можно было затем остановить.

options = {<br/>  :app_name => 'ProxyServer',<br/>  :backtrace => true,<br/>  :log_output => true,<br/>  :dir_mode => :normal,<br/>  :dir => File.expand_path('../tmp',__FILE__)<br/>}<br/> <br/>file = File.expand_path('../tmp/daemon.sock',__FILE__)<br/>File.unlink(file) if File.exists?(file)<br/> <br/>Daemons.daemonize(options) if ARGV.index('-d')<br/> <br/>EventMachine::run {<br/>  EventMachine::start_unix_domain_server(file, DaemonServer)<br/>} <br/>

Я считаю, что практически любой код без тестов стоит очень мало, даже если он рабочий, так как может доставить очень много проблем и головной боли в будущем как тому, кто его писал, так и тем, кому его нужно править.

Существует готовое решение для тестирования программ, созданных на базе EM, — EMSpec. Но я им не воспользовался и поэтому продемонстрирую, как можно обойтись без него, используя rspec.
Для начала нужно создать тестовый клиент. В нем определим метод send_request(obj, &block), позволяющий отсылать запросы прокси демону и задавать колбек в виде блока кода, который будет вызван, когда клиент получит ответ. Также создадим метод onclose=(proc), который будет определять колбек, который будет вызываться при закрытии соединения.

module TestClient<br/>  include EM::P::ObjectProtocol<br/> <br/>  #on object received callback<br/>  def receive_object(obj)<br/>    @onresponse.call(obj)<br/>    p "Client received object: #{obj.inspect}"<br/>  end<br/> <br/>  def send_request obj, &block<br/>    @onresponse = block<br/>    send_object obj<br/>  end<br/> <br/>  # on disconnect callback<br/>  def onclose=(proc)<br/>    @onclosed = proc<br/>  end<br/> <br/>  def unbind<br/>    @onclosed.call<br/>  end<br/> <br/>end <br/>

Теперь можно перейти к созданию методов, который позволят тестировать написанный код. Первый метод start_serv отвечает за запуск сервера, тестового клиента и прокси, при этом, принимает аргументом блок, который получит в свое распоряжение переменную client, позволяющую производить манипуляции с клиентом. Метод timer нужен на случай, если что-то пойдет не так, и клиент не получит ответ от сервера, тогда rspec выдаст, что тест не пройден, а не просто зависнет. Основой для тестов послужит метод server_test, который использует вышеуказанные методы для запуска сервера, клиента и прокси, определяет, что при закрытии соединения реактор нужно остановить, а также принимает аргументом запрос, который должен быть послан клиентом серверу.

module HelperMethods<br/>  def start_serv<br/>    File.unlink(SOCK_FILE) if File.exists?(SOCK_FILE)<br/>    EM.run {<br/>      EventMachine.start_server "127.0.0.1", 8080, SocketServer<br/>      EventMachine.start_unix_domain_server(SOCK_FILE, DaemonServer)<br/>      client = EM.connect_unix_domain(SOCK_FILE, TestClient)<br/>      yield client<br/>    }<br/>  end<br/> <br/>  # if request takes to long it will show fail<br/>  def timer start<br/>    timeout = 6<br/>    EM.add_timer(timeout){<br/>      (Time.now-start).should be_within( 0).of(timeout)<br/>      EM.stop<br/>    }<br/>  end<br/> <br/>  #main wrapper for test starts server daemon and client<br/>  def server_test request<br/>    time = Time.now<br/>    start_serv do |client|<br/>      client.send_request request do |response|<br/>        yield response<br/>      end<br/>      client.onclose= lambda{EM.stop}<br/>      timer(time)<br/>    end<br/>  end<br/>end <br/>

Для примера, приведу тест, проверяющий, что данное задание выполняется корректно.

describe "on sending test request" do<br/> include HelperMethods<br/> it "should responsend with right answer" do<br/>    server_test({'id'=>  0, 'text' => "req1"}) do |response|<br/>      response['text'].should == "answ1"<br/>      response['id'].should ==  0<br/>    end<br/> end<br/>end <br/>

Вот, собственно, и все, надеюсь эта статья будет кому-нибудь полезна. Весь код доступен на житхабе.

P.S. Получилось довольно много текста, кто осилил дочитать до конца, тем спасибо.
Tags:
Hubs:
+25
Comments 18
Comments Comments 18

Articles