Реализация небольшого асинхронного сервера

    Целью публикации данного топика является представление аудитории Хабрахабра кода небольшого асинхронного сервера, написанного на Питоне с использованием практически «голых» сокетов.

    Мне приходилось писать достаточно много приложений, работающих в качестве сетевых сервисов. Эти сервисы были написаны на разных языках, под разную нагрузку и каждый раз реализация нового сервиса чем-то отличалась от предыдущей. Под хабракатом я привожу пример довольно удачной, на мой вгляд, реализации «учебного» сервера, сопровождая код своими комментариями по мере необходимости.

    Вводное слово


    Итак, в начале несколько слов о том, что собственно делает сервер. Так как он является учебным примером, его задача достаточно проста: при запуске создать сокет, прослушивающий порт; при подключении клиента начать передавать ему информацию. В качестве информации клиенты получают нескончаемый поток цифр числа Пи, вычисляемых на сервере. Сервер генерирует цифры только тогда, когда к нему подключен хотя бы один клиент, запоминая уже сгенерированную последовательность. При отключении всех клиентов генерация приостанавливается, сервер ждет нового подключения.

    Реализация сервера является однопоточной. Преимуществом такого подхода является отсутствие дополнительного оверхеда в виде отдельного потока или даже отдельного экземпляра приложения, который бы возник при использовании других часто применяемых техник.

    По большому счету это отличие трудно назвать определяющим, т.к. Питон все-таки является интерпретируемым языком и вносит свои ограничения на область использования, однако в перспективе «сохраненные» при такой архитектуре потоки мы сможем использовать в приложении для выполнения каких-либо распараллеленных вычислительных задач.

    Перед тем как продолжить, настоятельно рекомендую скачать код и бегло с ним ознакомиться. Также код можно посмотреть онлайн на Шоумикод.
    Для запуска сервера пользователи UNIX-подобных систем могут выполнить следующие команды:
    chmod +x ./pypi.py
    ./pypi.py

    Для подключения к серверу можно использовать такую команду:
    telnet 127.0.0.1 45067


    Разбор кода


    Ядром приложения является функция main(), запускающая цикл диспетчеризации модуля asyncore:
    def main():
        try:
            asyncore.loop(0.1True, Server('127.0.0.1'))
        except KeyboardInterrupt:
            print '\nBye :-*'
            sys.exit()
     
    if __name__ == '__main__':
        main()
     

    Модуль asyncore предоставляет функцию loop, принимающую четыре опциональных аргумента: (1) таймаут, (2) флаг предпочтения механизма poll обычному select, (3) словарь дескрипторов сокетов, (4) количество итераций. Важное значение для нас имеет третий параметр, которым мы передали функции свежесозданный объект сервера.

    Благодаря «особой питоновской магии» класс Server пронаследован как от класса dispatcher из модуля asyncore, так и от класса словаря dict, что позволяет ему выступать одновременно сокетом сервера и хранилищем дескрипторов сокетов всех подключенных клиентов.

    Начало объявления класса Server выглядит таким образом:
    class Server(dispatcher, dict):
        writable = lambda x: False
     
        def __init__(self, host = None, port = 0xB00B):
            dispatcher.__init__(self)
     
            self.create_socket(AF_INET, SOCK_STREAM)
            dict.__init__(self{self.fileno()self})
     
            self.set_reuse_addr()
            self.bind((host, port))
            self.listen(0xA)
     
            self.dataSource = PiGenerator()

    В конструкторе объект сначала инициализируется как обработчик серверного сокета, а затем как словарь, состоящий из одной записи — дескриптора сокета самого сервера, указывающего на объект сервера. Важно, что сокет создается функцией create_socket до инициализации словаря, т.к. до создания сокета мы не смогли бы получить его дескриптор. Затем происходит привязка серверного сокета к порту на указанном хосте, запускается прослушка и создается генератор цифр числа Пи, который в дальнейшем будет использоваться для генерации потока данных клиенту.

    После того, как цикл диспетчеризации запущен основная доля работы ложится на модуль asyncore, который при подключении нового клиента вызовет метод handle_accept объекта сервера для обработки входящего подключения:
        def handle_accept(self):
            sock, (host, port) = self.accept()
            print 'new client from %s:%d' % (host, port)
     
            stream = Stream(self.dataSource)        
            self[sock.fileno()] = Client(sock, self, stream)

    Внутри метода-обработчика происходит непосредственно принятие нового клиента с помощью функции accept, которая возвращает вновь созданный сокет для общения с клиентом и пару хост-порт, с которых произошло подключение. Получив сокет клиента сервер создает новый поток данных (реализуемый классом Stream) для чтения данных из генератора. После этого к списку клиентов добавляется новый объект клиента, инициализируемый только что созданным потоком данных.

    Чтение данных клиентом из потока осуществляется внутри метода writable():
        def writable(self):
            data = self.stream.read()
            if data == None:
                print 'client finished reading'
                self.close()
                return False
     
            self.buffer += data
            return len(self.buffer) > 

    Метод writable вызывается модулем asyncore для каждого сокета перед очередной итерацией цикла диспетчеризации, чтобы узнать необходимо ли проверять для данного сокета доступость на запись. Мы пользуемся этим, чтобы попробовать прочитать данные из потока и сообщить о необходимости записи, если данные в потоке есть. Если поток возвращает None это значит, что в нем больше не будет данных и сокет закрывается. В данном примере такой ситуации проиcходить не должно, т.к. цифры генерируются бесконечно.

    Узнав о доступности операции записи для клиентского сокета asyncore вызывает метод handle_write(), который отправляет данные, ранее прочитанные из потока, через сокет:
        def handle_write(self):                
            sent = self.send(self.buffer)
            self.buffer = self.buffer[sent:]

    Генератор и поток тесно связаны между собой реализуя паттерн наблюдателя. Генератор выступает наблюдаемым объектом и предоставляет методы subscribe и unsubscribe соответственно для подписки на события и отписки от них:
    class PiGenerator(list):    
        def subscribe(self, obj):  
            self.lock.acquire()
            try:     
                self.append(obj)
                self._notify(obj=obj)
            finally:
                self.lock.release()            
     
            if not self.calculator:
                self.calculator = PiCalcThread(selfself.lock)
                self.calculator.start()
            else:
                if len(self) > :
                    self._resumeCalculator()
     
        def unsubscribe(self, obj):
            self.lock.acquire()
            self.remove(obj)   
            self.lock.release()
     
            if len(self) <:
                self._pauseCalculator()

    Непосредственно генерация цифр реализована отдельным классом PiCalcThread, который производит вычисления в отдельном потоке, поэтому для синхронизации операций добавления и удаления подписки используется механизм блокировки. С помощью того же объекта блокировки осуществляется приостановка потока при отсутствии подписавшихся потоков. При подписке нового потока с помощью метода _notify() ему отправляются цифры, посчитанные и сохраненные до этого, если он является не первым подписавшимся потоком.

    Метод _notify() пробегает по подписанным потокам и вызывает метод update передавая в поток новые цифры:
        def _notify(self, digits = None, obj = None):
            objs = [obj] if obj else self
            digits = digits or self.digits
     
            for obj in objs:
                obj.update(digits)

    Метод update() потока просто добавляет новые данные к уже имеющимся:
        def update(self, data):
            self.data += data

    Класс потока генерации цифр числа Пи использует алгоритм, предложенный Джереми Гиббонсом (Jeremy Gibbons) в документе Unbounded Spigot Algorithm for the Digits of Pi:
    class PiCalcThread(Thread):
        def __init__(self, buffer, lock):
            Thread.__init__(self)
            self.buffer = buffer
            self.lock = lock
     
        def run(self):
            q,r,t,k,n,l = 1,,1,1,3,3
     
            while True:
                if 4*q+r-t < n*t:
                    self.lock.acquire()
                    self.buffer.newDigits(str(n))
                    self.lock.release()
     
                    q,r,t,k,n,l = (10*q,10*(r-n*t),t,k,(10*(3*q+r))/t-10*n,l)
                else:
                    q,r,t,k,n,l = (q*k,(2*q+r)*l,t*l,k+1,(q*(7*k+2)+r*l)/(t*l),l+2)
     
                time.sleep(0.001)

    Метод run() бесконечно вычисляет очередную цифру числа Пи и опять же с использованием блокировки сообщает ее генератору, который в свою очередь передает ее потокам данных, подписавшимся на генератор. Искусственная задержка добавлена для того, чтобы сервер не генерировал слишком большой поток данных в единицу времени.

    Для подсветки синтаксиса при подготовке материала использовался ресурс http://highlight.hohli.com/. Очень надеюсь, что кому-то данная статья окажется полезной, хотя описание и получилось сумбурным при достаточно большом объеме.
    Метки:
    Поделиться публикацией
    Реклама помогает поддерживать и развивать наши сервисы

    Подробнее
    Реклама
    Комментарии 26
    • +1
      Если клиент обрывает соединение — сервер падает:

      > ./pypi.py
      new client from 127.0.0.1:56071
      client dropped connection
      warning: unhandled close event
      Client error: [Errno 9] Bad file descriptor
      Traceback (most recent call last):
        File "/local/python/lib/python2.6/asyncore.py", line 105, in readwrite
          obj.handle_close()
        File "/local/python/lib/python2.6/asyncore.py", line 491, in handle_close
          self.close()
        File "./pypi.py", line 160, in close
          self.server.removeClient(self)
        File "./pypi.py", line 123, in removeClient
          del self[client.fileno()]
        File "<string>", line 1, in fileno
        File "/local/python/lib/python2.6/socket.py", line 165, in _dummy
          raise error(EBADF, 'Bad file descriptor')
      error: [Errno 9] Bad file descriptor
      
      • 0
        Что-то у меня не получается воспроизвести ошибку. Вы через телнет подключаетесь? Как закрываете соединение?
        • 0
          В telnet — ctrl+], «quit», enter. Linux. Воспроизводится стабильно.
          Python 2.6 — может в версии дело?
          • 0
            Действительно, проверил на 2.6 версии — такая же ошибка. Получилось исправить добавив пустой метод handle_close(self): pass. В версии 2.5 у меня вызывается handle_expt при такой ошибке, а до handle_close дело не доходит. Вероятно, разработчики что-то пересмотрели.
            • +1
              Оперативно! Вам бы суппорте работать (:
      • НЛО прилетело и опубликовало эту надпись здесь
        • 0
          Зачем, для этого есть замечательная asyncore :)
          Или что вы понимаете под «диспетчиньем»?
          • НЛО прилетело и опубликовало эту надпись здесь
            • 0
              Кажется мне, что либэв здесь вообще никаким местом не прикрутить. Да и не стоило бы три строчки менять на целую библиотеку нестандартную :)
              • НЛО прилетело и опубликовало эту надпись здесь
        • +2
          А почему не на Twisted? Есть ли какие-то примущества в реализации на голых сокетах?
          • +2
            меньше зависимостей. да и это уже не голые сокеты. это простая абстракция, встроенная в питон.
            • 0
              На твистеде я пишу в настоящее время довольно замудренный REST-сервис, впечатления исключительно положительные. Можно сказать, что он меня и натолкнул на мысль написания этого маленького сервиса. Только здесь я решил воспользоваться возможностями, встроенными в язык, без лишних усложнений.
              • 0
                И все же, интереса ради, для production рекомендуете Twisted? Простота разработки, надежность, etc?
                • +2
                  Простота разработки приходит только после определенного опыта, потому что фреймворк все-таки оправдывает свое название :)
                  А архитектурно и по производительности он меня целиком устраивает, когда привыкнешь к идеологии становится очень комфортно с ним работать.
            • 0
              если это сервер, то как его остановить? ;)
              • 0
                Т.к. это учебный сервер, достаточно нажать ^C :)
                • 0
                  а зачем учить плохому? ;)
                  (я этот тред затеял к тому, что обработка вызовов — далеко не самая морочливая и сложная задача, которая возникает при разработке серверов. Гораздо больше возьни с запуском/остановкой, контекстами и т.п. вещами — вот об этом и интересно было бы почитать.
                  sidenote: Тут, например, может помочь библиотека python-daemon, однако и она решает только часть проблем)
                  • 0
                    Все это во многом зависит от задач, накладываемых на сервер, поэтому и волшебной таблетки нет. Хотя я как-то и не сталкивался с особыми проблемами при старте/остановке.
                    • 0
                      а в чем, по вашему, заключается особая специфичность в зависимости от задач?
                      • 0
                        В том, что зачастую серверу необходимо инициализировать и финализировать разнообразные и разнородные ресурсы. Это в свою очередь может накладывать определенные ограничения на процессы запуска/остановки.
                        • 0
                          и что, это никак нельзя обобщить? ресурсы сервера — это обычно дескрипторы (файлов, сокетов) — методы работы с ними стандартны. А какие еще ресурсы вы имеете в виду?
                          • 0
                            Я имею в виду кэш для прокси-сервера, базу данных для сервера отчетов, спул-директорию для сервера печати.
                            • 0
                              вы простите, конечно, мое занудство, но что будет делать при остановке прокси-сервер с кешем, сервер отчетов с БД, а сервер печати со спул-директорией?
                              • 0
                                Да все что угодно, в том то и дело :) Сжимать кэш, ждать отключения клиентов, и т.д.

                                А если под стартом/остановкой вы имели в виду просто отключение от терминала, форк в фон и прочие мелочи, то я не вижу в этом никаких подводных камней. Есть ли они?
                                • 0
                                  да, именно это.
                                  Оказывается есть (почитайте rationale к тому же python-daemon). Особенно проявляются в связи с остановом, особенно когда начинаем иметь дело с потоками (например, попробуйте ка убить thread)

              Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.