Пользователь
0,0
рейтинг
27 мая 2013 в 13:52

Разработка → Разработка системы синхронизации в реальном времени с использованием SockJS, Django, Tornado и ZeroMQ

Не так давно, разрабатывая очередной программный продукт, наша команда разработчиков столкнулись с задачей реализации полноценной системы синхронизации пользовательских данных в реальном времени, путем рассылки (PUSH метод) изменений сервером. В самом приложении объем данных был не велик, но они могли просматриваться несколькими пользователями одновременно. Поэтому нам был необходим легковесный и достаточно производительный подход к синхронизации данных в рамках Веб-приложения. После того как были рассмотрены различные пути к решению этой задачи, мы остановили свой выбор на достаточно популярном эмуляторе WebSocket’ов – SockJS, который использует различные алгоритмы обмена данными между клиентом и сервером, в зависимости от браузера, которым пользуется клиент. В рамках данной статьи я не буду заострять внимание на том, почему был сделан именно такой выбор (по этому поводу написано немало статей, в том числе и на хабрахабре), а просто скажу, что мы ещё ни разу об этом не пожалели.

Изначально при изучении стандартных подходов к реализации подобного рода задач мы столкнулись с одной проблемой. Эта проблема заключалась в том, что взаимодействие с нашей системой производилось не только посредством веб интерфейса, но также посредством использования API сторонними продуктами, которые мы не могли контролировать. И конечный пользователь нашего продукта, безусловно, ожидает увидеть всю информацию об изменениях в данных, которые его касаются. Стандартный подход использования sockjs сервера подразумевает, что уведомления об изменении каких-либо данных в системе будут посылаться с использованием того же самого JS клиента, который используется для получения информации об этих изменениях. Именно поэтому в нашем случае такой подход был бы неприменим.

В этой статье я хотел бы рассказать о том, как мы решили эту задачу.

Итак, исходные данные:

  • портал (написан на Django) с API
  • веб клиент (с sockjs-client библиотекой для организации синхронизации)
  • SockJS сервер (в виде tornado-sockjs, что являлось одной из причин, по которой мы сделали свой выбор в сторону этого продукта)

Общую задачу можно разделить на две подзадачи:

  • Организация получения уведомлений конечным пользователем об изменениях в системе (тривиальная задача)
  • Создание сообщений/уведомлений о наличии изменений в системе используя средства Django (напомним, что мы не можем использовать стандартный подход и создавать такие сообщения, используя стандартный JS клиент, т.к. в таком случае нам не удастся обработать изменения, сделанные при помощи набора APIs).

Изучив доступную информацию, стало ясно, что без “посредника” эту задачу решить нельзя. В качестве посредника была выбрана система обмена сообщениями, а именно ZeroMQ. Среди основных достоинств именно этой системы стоит отметить её простоту, легкость, а также поддержку языка Python (что для нас является весьма важным условием).

В целом полученное решение можно изобразить в виде схемы, представленной ниже.



Изменения данных в системе, сделанные конечным пользователем с использованием веб интерфейса или APIs, в конечном счете попадают на наш Django сервер, где через ZeroMQ передаются в SockJS сервер, который, в свою очередь, уведомляет пользователей о факте изменения данных отображаемого объекта.

Давайте перейдем к коду

Веб клиент

Для получения уведомлений нам необходимо подключить библиотеку sockjs-client, устанавливать соединение с сервером и подписаться на нужные события. Сделать это можно примерно следующим образом:

<script src="/js/sockjs-0.3.4.min.js"></script>
<script>
var SyncServer = new SockJS("http://example.com/echo");
SyncServer.onmessage = function(e) {
    // ваш обработчик сообщений
};
</script>

ZeroMQ сервер

Для реализации этой части нам понадобится сам ZeroMQ и библиотека Python для работы с ним — pyzmq. После этого необходимо настроить и запустить сервер в режиме прокси (данный режим называется FORWARDER). Сделать это можно буквально несколькими строками кода:

import zmq

def main():
    try:
        context = zmq.Context(1)
        # Socket facing clients
        frontend = context.socket(zmq.SUB)
        frontend.bind("tcp://127.0.0.1:XXXX")

        frontend.setsockopt(zmq.SUBSCRIBE, "")

        # Socket facing services
        backend = context.socket(zmq.PUB)
        backend.bind("tcp://127.0.0.1:YYYY")

        zmq.device(zmq.FORWARDER, frontend, backend)
    except Exception, e:
        # Handle exception
        pass
    finally:
        frontend.close()
        backend.close()
        context.term()

if __name__ == '__main__':
    main() 

Запустив подобный скрипт, вы получите proxy сервер, ожидающий сообщения на порту XXXX и отправляющих их на порт YYYY.

SockJS сервер

В качестве SockJS сервера был выбран стандартный SockJS-tornado с некоторыми небольшими изменениями, позволяющими ему принимать сообщения от ZeroMQ сервера (эти изменения вы можете найти в __init__ методе класса SockJSMyRouter)

from sockjs.tornado import SockJSConnection, SockJSRouter
from tornado import ioloop as tornado_ioloop, web
import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream

ioloop.install()
io_loop = tornado_ioloop.IOLoop.instance()

class SyncConnection(SockJSConnection):
    _connected = set()
    stream = None

    def __init__(self, session):
        super(SyncConnection, self).__init__(session)
        self.stream.on_recv(self.on_server_message)

    def on_open(self, request):
        self._connected.add(self)

    def on_message(self, data):
        pass

    def on_server_message(self, data):
        message = "your message"
        self.broadcast(self._connected, {'message': message})

    def on_close(self):
        self._connected.remove(self)


class SockJSMyRouter(SockJSRouter):

    def __init__(self, *args, **kw):
        super(SockJSMyRouter, self).__init__(*args, **kw)
        socket = context.socket(zmq.SUB)
        socket.setsockopt(zmq.SUBSCRIBE, "")
        socket.connect("tcp://127.0.0.1:YYYY")
        self._connection.stream = ZMQStream(socket, self.io_loop)

if __name__ == '__main__':
    context = zmq.Context()

    EchoRouter = SockJSMyRouter(SyncConnection, '/echo')

    app = web.Application(EchoRouter.urls)
    app.listen("ZZZZ")
    io_loop.start()

Django сервер

В рамках Django сервера нам необходимо добавить лишь несколько строк, ответственных за создание уведомлений и отправку их на наш ZeroMQ сервер. Сделать это можно следующим образом:

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("XXX")
socket.send("your message")
socket.close()

Вот похоже и всё! Дописав методы, отвечающие за создание и обработку сообщений в Django, Sockjs сервере и клиенте, вы получите полноценную работающую систему синхронизации в реальном времени для вашего приложения.

Update:
По просьбе пользователя timetogo размещаю ссылку на получившуюся систему управления списками задач: Cloud Checklist
Работу системы можно наблюдать при открытии одного и того же листа задач в двух отдельных сессиях Интернет браузера. В этом сценарии, конечно, наличие двух мониторов является огромным плюсом.
Антон Ахрамович @krollik
карма
24,5
рейтинг 0,0
Пользователь
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Реклама

Самое читаемое Разработка

Комментарии (20)

  • 0
    Какие ощущения от ZeroMQ?
    • 0
      Хотя использовался один из достаточно простых сценариев ZeroMQ, общие впечатления продукт оставил весьма и весьма позитивные. Судите сами – быстро развернуть, просто настроить и достаточно приятный результат организации системы рассылки уведомлений.
      Общая производительность (скорость прихода пакета обновлений объекта) системы оказалась тоже весьма достойной. Если не сочтется за PR, то могу раскрыть ссылку на приложение.
      • 0
        Покажите, пожалуйста, ссылку, можно в личку
        • 0
          Данное решение использовалось в системе управления списками задач
          P.S. Чтобы остальным пользователям было легче я обновил пост
          • +2
            Как вы решаете такую задачу: один из пользователей потерял соединение с интернетом, в это время добавилось 20 новых задач другим пользователем. После восстановления соединения у первого пользователя новые 20 задач не появились. Появляются только если обновить страницу или произвести какое-либо действие на странице.
            • 0
              Здесь вы правы, что узкое место такого рода систем – это потеря связи актуальной сессии браузера определённого пользователя с сервером сообщений. В данном случае ключевым является так называемое понятие времени «клинической смерти» сессии. Для оптимизации загрузки сервера рассылки сообщений мы установили максимальное число попыток переподключений и интервалы между ними. После этого, к сожалению для пользователей с непостоянным Интернет соединением, регистрация сессии на сервере будет окончена. НО, в рамках данной версии приложения дополнительно мы пытаемся переподключится перед каждым действием пользователя.

              Пока мы видим необходимость доработки этой части решения, и данная задача входит в наш roadmap.

              С другой стороны, я не могу сказать, что это узкое место решаемой нами задачи, потому что наличие у приложения только лишь веб-клиента предполагает наличие достаточно устойчивого Интернет соединения. Также пользователь не сможет добавлять задачи, не имея связи с сервером
              • +1
                Можно решить вызовом из SyncServer.onopen функции get_items.
            • +1
              Можно при получении новых данных посылать некий acknowledge, это как в tcp тройное рукопожатие.
  • 0
    Как горизонтально масштабироваться будете с ZeroMQ?
    • 0
      А в чем проблема?
  • 0
    какое количество уведомлений в минуту рассылаете? не тестировали nginx_push_stream_module для решения данной задачи?
  • 0
    SockJS сервер (в виде tornado-sockjs, что являлось одной из причин, по которой мы сделали свой выбор в сторону этого продукта)

    Не совсем понял причину выбора? Наличие библиотеки? Вы рассматривали альтернативы?

    Сегодня как раз занимался подобным — сделал websocket «звено», для оперативного взаимодействия связки: Клиента (browser) + IP-Телефония + Веб-сервер в виде «Call» центра, tornado уже пробовал поэтому решил попробовать gevent — pypi.python.org/pypi/gevent-websocket/, некоторые из основных причин — «синхронный» стиль кода (для меня более комфортен) + позволяет напрямую вызывать API web-сервера (синхронный, многопоточный), + не плохая производительность.
    • +2
      А что делать когда у клиента нет поддержки ws?
      • +1
        Теперь понято, конечно есть варианты, но SockJS хороший выбор.
  • 0
    Так и не понял, зачем тут ZeroMQ. Можно же у Tornado сервера сделать «секретное» HTTP API по которому принимать информацию о новых событиях.
    Ну ок, решили что ZeroMQ всё-же удобнее… Но зачем вам понадобился ZeroMQ Proxy??? Какую он функцию выполняет?

    Кстати, можете объяснить как интегрируется event loop ZeroMQ и event loop Tornado? Я так понял, дело в этой строчке self._connection.stream = ZMQStream(socket, self.io_loop)?

    P.S.: шикарно
    except Exception, e:
            # Handle exception
            pass
    
    • 0
      Так и не понял, зачем тут ZeroMQ. Можно же у Tornado сервера сделать «секретное» HTTP API по которому принимать информацию о новых событиях.
      Безусловно, предложенный Вами подход имеет права на жизнь. Но в случае с “секретным” HTTP API нам бы пришлось каким-либо образом обеспечивать “секретность”. Я не думаю, что полученное решение получилось бы намного более легким, чем текущее решение с ZeroMQ.
      Ну ок, решили что ZeroMQ всё-же удобнее… Но зачем вам понадобился ZeroMQ Proxy??? Какую он функцию выполняет?
      ZeroMQ Proxy использовался как прокси между django и tornado. Возможно, что я не совсем точно понял Ваш вопрос, не могли бы Вы уточнить, как именно в данной ситуации Вы бы использовали ZeroMQ в подобной ситуации?

      Касательно примера с обработкой исключений, то, как Вы могли заметить, исходные коды, приведённые в этой статье, максимально упрощены в целях облегчения понимания кода. Я могу Вас заверить, что в данном виде Вы бы не увидели их в работающем приложении.
      • 0
        Обеспечить секретность можно передавая GET параметр, который знает только Django и Tornado. Если GET параметр кажется недостаточно секретным, можно использовать Basic HTTP авторизацию, но я не уверен что для неё есть готовые обработчики в Tornado.
        В конце концов, ZeroMQ у вас тоже НИКАК не защищён. Просто он слушает на localhost. Так же и секретное API можно повесть слушать только на localhost на отдельном порту.

        Не такой спец по ZeroMQ, но по аналогии с TCP сокетами Tornado был бы сервером (сабскрайбером) а Django клиентом (паблишером).

        Tornado:
        class SockJSMyRouter(SockJSRouter):
        
            def __init__(self, *args, **kw):
                super(SockJSMyRouter, self).__init__(*args, **kw)
                socket = context.socket(zmq.SUB)
                socket.setsockopt(zmq.SUBSCRIBE, "")
                socket.bind("tcp://127.0.0.1:XXXX")  # !!!!!!!!!!!!!!!
                self._connection.stream = ZMQStream(socket, self.io_loop)
        

        Django:
        import zmq
        
        context = zmq.Context()
        socket = context.socket(zmq.PUB)
        socket.connect("XXXX")
        socket.send("your message")
        socket.close()
        

        Не проверял, но просто по логике должно работать.
      • +1
        Так и не понял, зачем тут ZeroMQ. Можно же у Tornado сервера сделать «секретное» HTTP API по которому принимать информацию о новых событиях.


        http медленный. 1.5*rtt только на установление tcp-коннекта и большой оверхед на коротких запросах.
        одним процессом на cpython я получал ~ 80-90 тысяч сообщений в секунду. если взять pypy, будет еще веселее.
        • 0
          В принципе можно навернуть keepalive и pipelining, но в стандартной библиотеке питона этого нет. Ну ок. Если ZeroMQ так легко интегрируется в Event loop Tornado то почему бы и нет?
          Но вот зачем Proxy я всё равно не понимаю
    • +2
      Могу назвать две причины:
      1. Накладные расходы на разбор HTTP запроса будут выше чем разбор пакета ZMQ
      2. С ZMQ будет легче масштабироваться в будущем. Добавление ноды в кластер не требует написания кода вообще.

      В принципе, это применимо к любой message bus (AMQP, redis pub/sub, etc).

      Но HTTP API тоже можно, если не планируется какой либо серьезной нагрузки.

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.