Разработка системы синхронизации в реальном времени с использованием SockJS, Django, Tornado и ZeroMQ

    Не так давно, разрабатывая очередной программный продукт, наша команда разработчиков столкнулись с задачей реализации полноценной системы синхронизации пользовательских данных в реальном времени, путем рассылки (PUSH метод) изменений сервером. В самом приложении объем данных был не велик, но они могли просматриваться несколькими пользователями одновременно. Поэтому нам был необходим легковесный и достаточно производительный подход к синхронизации данных в рамках Веб-приложения. После того как были рассмотрены различные пути к решению этой задачи, мы остановили свой выбор на достаточно популярном эмуляторе WebSocket’ов – SockJS, который использует различные алгоритмы обмена данными между клиентом и сервером, в зависимости от браузера, которым пользуется клиент. В рамках данной статьи я не буду заострять внимание на том, почему был сделан именно такой выбор (по этому поводу написано немало статей, в том числе и на хабрахабре), а просто скажу, что мы ещё ни разу об этом не пожалели.

    Изначально при изучении стандартных подходов к реализации подобного рода задач мы столкнулись с одной проблемой. Эта проблема заключалась в том, что взаимодействие с нашей системой производилось не только посредством веб интерфейса, но также посредством использования API сторонними продуктами, которые мы не могли контролировать. И конечный пользователь нашего продукта, безусловно, ожидает увидеть всю информацию об изменениях в данных, которые его касаются. Стандартный подход использования sockjs сервера подразумевает, что уведомления об изменении каких-либо данных в системе будут посылаться с использованием того же самого JS клиента, который используется для получения информации об этих изменениях. Именно поэтому в нашем случае такой подход был бы неприменим.

    В этой статье я хотел бы рассказать о том, как мы решили эту задачу.

    Итак, исходные данные:

    • портал (написан на Django) с API
    • веб клиент (с sockjs-client библиотекой для организации синхронизации)
    • SockJS сервер (в виде tornado-sockjs, что являлось одной из причин, по которой мы сделали свой выбор в сторону этого продукта)

    Общую задачу можно разделить на две подзадачи:

    • Организация получения уведомлений конечным пользователем об изменениях в системе (тривиальная задача)
    • Создание сообщений/уведомлений о наличии изменений в системе используя средства Django (напомним, что мы не можем использовать стандартный подход и создавать такие сообщения, используя стандартный JS клиент, т.к. в таком случае нам не удастся обработать изменения, сделанные при помощи набора APIs).

    Изучив доступную информацию, стало ясно, что без “посредника” эту задачу решить нельзя. В качестве посредника была выбрана система обмена сообщениями, а именно ZeroMQ. Среди основных достоинств именно этой системы стоит отметить её простоту, легкость, а также поддержку языка Python (что для нас является весьма важным условием).

    В целом полученное решение можно изобразить в виде схемы, представленной ниже.



    Изменения данных в системе, сделанные конечным пользователем с использованием веб интерфейса или APIs, в конечном счете попадают на наш Django сервер, где через ZeroMQ передаются в SockJS сервер, который, в свою очередь, уведомляет пользователей о факте изменения данных отображаемого объекта.

    Давайте перейдем к коду

    Веб клиент

    Для получения уведомлений нам необходимо подключить библиотеку sockjs-client, устанавливать соединение с сервером и подписаться на нужные события. Сделать это можно примерно следующим образом:

    <script src="/js/sockjs-0.3.4.min.js"></script>
    <script>
    var SyncServer = new SockJS("http://example.com/echo");
    SyncServer.onmessage = function(e) {
        // ваш обработчик сообщений
    };
    </script>
    

    ZeroMQ сервер

    Для реализации этой части нам понадобится сам ZeroMQ и библиотека Python для работы с ним — pyzmq. После этого необходимо настроить и запустить сервер в режиме прокси (данный режим называется FORWARDER). Сделать это можно буквально несколькими строками кода:

    import zmq
    
    def main():
        try:
            context = zmq.Context(1)
            # Socket facing clients
            frontend = context.socket(zmq.SUB)
            frontend.bind("tcp://127.0.0.1:XXXX")
    
            frontend.setsockopt(zmq.SUBSCRIBE, "")
    
            # Socket facing services
            backend = context.socket(zmq.PUB)
            backend.bind("tcp://127.0.0.1:YYYY")
    
            zmq.device(zmq.FORWARDER, frontend, backend)
        except Exception, e:
            # Handle exception
            pass
        finally:
            frontend.close()
            backend.close()
            context.term()
    
    if __name__ == '__main__':
        main() 
    

    Запустив подобный скрипт, вы получите proxy сервер, ожидающий сообщения на порту XXXX и отправляющих их на порт YYYY.

    SockJS сервер

    В качестве SockJS сервера был выбран стандартный SockJS-tornado с некоторыми небольшими изменениями, позволяющими ему принимать сообщения от ZeroMQ сервера (эти изменения вы можете найти в __init__ методе класса SockJSMyRouter)

    from sockjs.tornado import SockJSConnection, SockJSRouter
    from tornado import ioloop as tornado_ioloop, web
    import zmq
    from zmq.eventloop import ioloop
    from zmq.eventloop.zmqstream import ZMQStream
    
    ioloop.install()
    io_loop = tornado_ioloop.IOLoop.instance()
    
    class SyncConnection(SockJSConnection):
        _connected = set()
        stream = None
    
        def __init__(self, session):
            super(SyncConnection, self).__init__(session)
            self.stream.on_recv(self.on_server_message)
    
        def on_open(self, request):
            self._connected.add(self)
    
        def on_message(self, data):
            pass
    
        def on_server_message(self, data):
            message = "your message"
            self.broadcast(self._connected, {'message': message})
    
        def on_close(self):
            self._connected.remove(self)
    
    
    class SockJSMyRouter(SockJSRouter):
    
        def __init__(self, *args, **kw):
            super(SockJSMyRouter, self).__init__(*args, **kw)
            socket = context.socket(zmq.SUB)
            socket.setsockopt(zmq.SUBSCRIBE, "")
            socket.connect("tcp://127.0.0.1:YYYY")
            self._connection.stream = ZMQStream(socket, self.io_loop)
    
    if __name__ == '__main__':
        context = zmq.Context()
    
        EchoRouter = SockJSMyRouter(SyncConnection, '/echo')
    
        app = web.Application(EchoRouter.urls)
        app.listen("ZZZZ")
        io_loop.start()
    

    Django сервер

    В рамках Django сервера нам необходимо добавить лишь несколько строк, ответственных за создание уведомлений и отправку их на наш ZeroMQ сервер. Сделать это можно следующим образом:

    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect("XXX")
    socket.send("your message")
    socket.close()
    

    Вот похоже и всё! Дописав методы, отвечающие за создание и обработку сообщений в Django, Sockjs сервере и клиенте, вы получите полноценную работающую систему синхронизации в реальном времени для вашего приложения.

    Update:
    По просьбе пользователя timetogo размещаю ссылку на получившуюся систему управления списками задач: Cloud Checklist
    Работу системы можно наблюдать при открытии одного и того же листа задач в двух отдельных сессиях Интернет браузера. В этом сценарии, конечно, наличие двух мониторов является огромным плюсом.
    Поделиться публикацией
    Похожие публикации
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама
    Комментарии 20
    • 0
      Какие ощущения от ZeroMQ?
      • 0
        Хотя использовался один из достаточно простых сценариев ZeroMQ, общие впечатления продукт оставил весьма и весьма позитивные. Судите сами – быстро развернуть, просто настроить и достаточно приятный результат организации системы рассылки уведомлений.
        Общая производительность (скорость прихода пакета обновлений объекта) системы оказалась тоже весьма достойной. Если не сочтется за PR, то могу раскрыть ссылку на приложение.
        • 0
          Покажите, пожалуйста, ссылку, можно в личку
          • 0
            Данное решение использовалось в системе управления списками задач
            P.S. Чтобы остальным пользователям было легче я обновил пост
            • +2
              Как вы решаете такую задачу: один из пользователей потерял соединение с интернетом, в это время добавилось 20 новых задач другим пользователем. После восстановления соединения у первого пользователя новые 20 задач не появились. Появляются только если обновить страницу или произвести какое-либо действие на странице.
              • 0
                Здесь вы правы, что узкое место такого рода систем – это потеря связи актуальной сессии браузера определённого пользователя с сервером сообщений. В данном случае ключевым является так называемое понятие времени «клинической смерти» сессии. Для оптимизации загрузки сервера рассылки сообщений мы установили максимальное число попыток переподключений и интервалы между ними. После этого, к сожалению для пользователей с непостоянным Интернет соединением, регистрация сессии на сервере будет окончена. НО, в рамках данной версии приложения дополнительно мы пытаемся переподключится перед каждым действием пользователя.

                Пока мы видим необходимость доработки этой части решения, и данная задача входит в наш roadmap.

                С другой стороны, я не могу сказать, что это узкое место решаемой нами задачи, потому что наличие у приложения только лишь веб-клиента предполагает наличие достаточно устойчивого Интернет соединения. Также пользователь не сможет добавлять задачи, не имея связи с сервером
                • +1
                  Можно решить вызовом из SyncServer.onopen функции get_items.
                • +1
                  Можно при получении новых данных посылать некий acknowledge, это как в tcp тройное рукопожатие.
        • 0
          Как горизонтально масштабироваться будете с ZeroMQ?
        • 0
          какое количество уведомлений в минуту рассылаете? не тестировали nginx_push_stream_module для решения данной задачи?
          • 0
            SockJS сервер (в виде tornado-sockjs, что являлось одной из причин, по которой мы сделали свой выбор в сторону этого продукта)

            Не совсем понял причину выбора? Наличие библиотеки? Вы рассматривали альтернативы?

            Сегодня как раз занимался подобным — сделал websocket «звено», для оперативного взаимодействия связки: Клиента (browser) + IP-Телефония + Веб-сервер в виде «Call» центра, tornado уже пробовал поэтому решил попробовать gevent — pypi.python.org/pypi/gevent-websocket/, некоторые из основных причин — «синхронный» стиль кода (для меня более комфортен) + позволяет напрямую вызывать API web-сервера (синхронный, многопоточный), + не плохая производительность.
            • +2
              А что делать когда у клиента нет поддержки ws?
              • +1
                Теперь понято, конечно есть варианты, но SockJS хороший выбор.
            • 0
              Так и не понял, зачем тут ZeroMQ. Можно же у Tornado сервера сделать «секретное» HTTP API по которому принимать информацию о новых событиях.
              Ну ок, решили что ZeroMQ всё-же удобнее… Но зачем вам понадобился ZeroMQ Proxy??? Какую он функцию выполняет?

              Кстати, можете объяснить как интегрируется event loop ZeroMQ и event loop Tornado? Я так понял, дело в этой строчке self._connection.stream = ZMQStream(socket, self.io_loop)?

              P.S.: шикарно
              except Exception, e:
                      # Handle exception
                      pass
              
              • 0
                Так и не понял, зачем тут ZeroMQ. Можно же у Tornado сервера сделать «секретное» HTTP API по которому принимать информацию о новых событиях.
                Безусловно, предложенный Вами подход имеет права на жизнь. Но в случае с “секретным” HTTP API нам бы пришлось каким-либо образом обеспечивать “секретность”. Я не думаю, что полученное решение получилось бы намного более легким, чем текущее решение с ZeroMQ.
                Ну ок, решили что ZeroMQ всё-же удобнее… Но зачем вам понадобился ZeroMQ Proxy??? Какую он функцию выполняет?
                ZeroMQ Proxy использовался как прокси между django и tornado. Возможно, что я не совсем точно понял Ваш вопрос, не могли бы Вы уточнить, как именно в данной ситуации Вы бы использовали ZeroMQ в подобной ситуации?

                Касательно примера с обработкой исключений, то, как Вы могли заметить, исходные коды, приведённые в этой статье, максимально упрощены в целях облегчения понимания кода. Я могу Вас заверить, что в данном виде Вы бы не увидели их в работающем приложении.
                • 0
                  Обеспечить секретность можно передавая GET параметр, который знает только Django и Tornado. Если GET параметр кажется недостаточно секретным, можно использовать Basic HTTP авторизацию, но я не уверен что для неё есть готовые обработчики в Tornado.
                  В конце концов, ZeroMQ у вас тоже НИКАК не защищён. Просто он слушает на localhost. Так же и секретное API можно повесть слушать только на localhost на отдельном порту.

                  Не такой спец по ZeroMQ, но по аналогии с TCP сокетами Tornado был бы сервером (сабскрайбером) а Django клиентом (паблишером).

                  Tornado:
                  class SockJSMyRouter(SockJSRouter):
                  
                      def __init__(self, *args, **kw):
                          super(SockJSMyRouter, self).__init__(*args, **kw)
                          socket = context.socket(zmq.SUB)
                          socket.setsockopt(zmq.SUBSCRIBE, "")
                          socket.bind("tcp://127.0.0.1:XXXX")  # !!!!!!!!!!!!!!!
                          self._connection.stream = ZMQStream(socket, self.io_loop)
                  

                  Django:
                  import zmq
                  
                  context = zmq.Context()
                  socket = context.socket(zmq.PUB)
                  socket.connect("XXXX")
                  socket.send("your message")
                  socket.close()
                  

                  Не проверял, но просто по логике должно работать.
                  • +1
                    Так и не понял, зачем тут ZeroMQ. Можно же у Tornado сервера сделать «секретное» HTTP API по которому принимать информацию о новых событиях.


                    http медленный. 1.5*rtt только на установление tcp-коннекта и большой оверхед на коротких запросах.
                    одним процессом на cpython я получал ~ 80-90 тысяч сообщений в секунду. если взять pypy, будет еще веселее.
                    • 0
                      В принципе можно навернуть keepalive и pipelining, но в стандартной библиотеке питона этого нет. Ну ок. Если ZeroMQ так легко интегрируется в Event loop Tornado то почему бы и нет?
                      Но вот зачем Proxy я всё равно не понимаю
                  • +2
                    Могу назвать две причины:
                    1. Накладные расходы на разбор HTTP запроса будут выше чем разбор пакета ZMQ
                    2. С ZMQ будет легче масштабироваться в будущем. Добавление ноды в кластер не требует написания кода вообще.

                    В принципе, это применимо к любой message bus (AMQP, redis pub/sub, etc).

                    Но HTTP API тоже можно, если не планируется какой либо серьезной нагрузки.

                  Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.