Python и Twisted — Заметки о параллельной обработке данных (мультипроцессности)

    imageTwisted — это фреймворк на Python для разработки сетевых приложений, который среди многих других применений, может быть использован и для параллельной обработки данных — мультипроцессности. Это замечательно, но мне пришлось попотеть для того, чтобы найти то, что мне нужно.

    Я листал документацию Twisted и книгу O'Reilly Twisted. Существует также рецепт в Python Cookbook. Однако, самое интересное я нашел в статье Брюса Эккель — Параллельность с Python, Twisted и Flex. Также стоит прочитать первоначальные статьи Брюса Эккель про Twisted: Grokking Twisted.

    Вот мои замечания о текущем примере Брюса.

    Я убрал Flex — отчасти потому, что мне это не нужно и я ничего не хочу знать об этом. В примере запускается контроллер, который инициализирует ряд отдельных параллельных процессов-вычислителей, в которых уже запускаются какие-то сложные действия (эти процессы называют solvers). Также тут имеется взаимодействие между контроллером и вычислителями. Хотя этот пример запускается только на одной машине, те принципы, о которых говориться в статье — не трудно распространить и на систему из нескольких компьютеров.

    Для хорошего примера, как это работает, пожалуйста, смотрите оригинал статьи .

    Вот solver.py который скопирован с оригинала. Настоящая «работа» происходит в методе step(). Я только добавил некоторую отладочную информацию для себя.
    """
    solver.py
    Original version by Bruce Eckel
    Solves one portion of a problem, in a separate process on a separate CPU
    """
    import sys, random, math
    from twisted.spread import pb
    from twisted.internet import reactor
    
    class Solver(pb.Root):
    
        def __init__(self, id):
            print "solver.py %s: solver init" % id
            self.id = id
    
        def __str__(self): # String representation
            return "Solver %s" % self.id
    
        def remote_initialize(self, initArg):
            return "%s initialized" % self
    
        def step(self, arg):
            print "solver.py %s: solver step" % self.id
            "Simulate work and return result"
            result = 0
            for i in range(random.randint(1000000, 3000000)):
                angle = math.radians(random.randint(0, 45))
                result += math.tanh(angle)/math.cosh(angle)
            return "%s, %s, result: %.2f" % (self, str(arg), result)
    
        # Alias methods, for demonstration version:
        remote_step1 = step
        remote_step2 = step
        remote_step3 = step
    
        def remote_status(self):
            print "solver.py %s: remote_status" % self.id
            return "%s operational" % self
    
        def remote_terminate(self):
            print "solver.py %s: remote_terminate" % self.id
            reactor.callLater(0.5, reactor.stop)
            return "%s terminating..." % self
    
    if __name__ == "__main__":
        port = int(sys.argv[1])
        reactor.listenTCP(port, pb.PBServerFactory(Solver(sys.argv[1])))
        reactor.run()
    



    Вот controller.py. Он также скопирован из оригинальной статьи, но я убрал Flex и создал сигналы start и terminate в классе контроллера. Я не уверен, что это имеет смысл, но, по крайней мере, это позволило мне нормально использовать пример. Я также перенес метод terminate из FlexInterface в Controller.
    """
    Controller.py
    Original version by Bruce Eckel
    Starts and manages solvers in separate processes for parallel processing.
    """
    import sys
    from subprocess import Popen
    from twisted.spread import pb
    from twisted.internet import reactor, defer
    
    START_PORT = 5566
    MAX_PROCESSES = 2
    
    class Controller(object):
    
        def broadcastCommand(self, remoteMethodName, arguments, nextStep, failureMessage):
            print "controller.py: broadcasting..."
            deferreds = [solver.callRemote(remoteMethodName, arguments) 
                         for solver in self.solvers.values()]
            print "controller.py: broadcasted"
            reactor.callLater(3, self.checkStatus)
    
            defer.DeferredList(deferreds, consumeErrors=True).addCallbacks(
                nextStep, self.failed, errbackArgs=(failureMessage))
        
        def checkStatus(self):
            print "controller.py: checkStatus"
            for solver in self.solvers.values():
                solver.callRemote("status").addCallbacks(
                    lambda r: sys.stdout.write(r + "\n"), self.failed, 
                    errbackArgs=("Status Check Failed"))
                                                         
        def failed(self, results, failureMessage="Call Failed"):
            print "controller.py: failed"
            for (success, returnValue), (address, port) in zip(results, self.solvers):
                if not success:
                    raise Exception("address: %s port: %d %s" % (address, port, failureMessage))
    
        def __init__(self):
            print "controller.py: init"
            self.solvers = dict.fromkeys(
                [("localhost", i) for i in range(START_PORT, START_PORT+MAX_PROCESSES)])
            self.pids = [Popen(["python", "solver.py", str(port)]).pid
                         for ip, port in self.solvers]
            print "PIDS: ", self.pids
            self.connected = False
            reactor.callLater(1, self.connect)
    
        def connect(self):
            print "controller.py: connect"
            connections = []
            for address, port in self.solvers:
                factory = pb.PBClientFactory()
                reactor.connectTCP(address, port, factory)
                connections.append(factory.getRootObject())
            defer.DeferredList(connections, consumeErrors=True).addCallbacks(
                self.storeConnections, self.failed, errbackArgs=("Failed to Connect"))
    
            print "controller.py: starting parallel jobs"
            self.start()
    
        def storeConnections(self, results):
            print "controller.py: storeconnections"
            for (success, solver), (address, port) in zip(results, self.solvers):
                self.solvers[address, port] = solver
            print "controller.py: Connected; self.solvers:", self.solvers
            self.connected = True
    
        def start(self):
            "controller.py: Begin the solving process"
            if not self.connected:
                return reactor.callLater(0.5, self.start)
            self.broadcastCommand("step1", ("step 1"), self.step2, "Failed Step 1")
    
        def step2(self, results):
            print "controller.py: step 1 results:", results
            self.broadcastCommand("step2", ("step 2"), self.step3, "Failed Step 2")
    
        def step3(self, results):
            print "controller.py: step 2 results:", results
            self.broadcastCommand("step3", ("step 3"), self.collectResults, "Failed Step 3")
    
        def collectResults(self, results):
            print "controller.py: step 3 results:", results
            self.terminate()
            
        def terminate(self):
            print "controller.py: terminate"
            for solver in self.solvers.values():
                solver.callRemote("terminate").addErrback(self.failed, "Termination Failed")
            reactor.callLater(1, reactor.stop)
            return "Terminating remote solvers"
    
    if __name__ == "__main__":
        controller = Controller()
        reactor.run()
    



    Чтобы запустить программу, положите оба файла в одну папку и запустите
    python controller.py
    


    Вы должны увидеть, как загрузка двух процессоров (если их, конечно, у вас — 2 ;-) ) поднимется до 100%. А вот и вывод скрипта на экран:
    controller.py: init
    PIDS:  [12173, 12174]
    solver.py 5567: solver init
    solver.py 5566: solver init
    controller.py: connect
    controller.py: starting parallel jobs
    controller.py: storeconnections
    controller.py: Connected; self.solvers: {('localhost', 5567): , ('localhost', 5566): }
    controller.py: broadcasting...
    controller.py: broadcasted
    solver.py 5566: solver step
    solver.py 5567: solver step
    controller.py: checkStatus
    solver.py 5566: remote_status
    Solver 5566 operational
    solver.py 5567: remote_status
    controller.py: step 1 results: [(True, 'Solver 5567, step 1, result: 683825.75'), (True, 'Solver 5566, step 1, result: 543177.17')]
    controller.py: broadcasting...
    controller.py: broadcasted
    Solver 5567 operational
    solver.py 5566: solver step
    solver.py 5567: solver step
    controller.py: checkStatus
    solver.py 5566: remote_status
    Solver 5566 operational
    solver.py 5567: remote_status
    controller.py: step 2 results: [(True, 'Solver 5567, step 2, result: 636793.90'), (True, 'Solver 5566, step 2, result: 335358.16')]
    controller.py: broadcasting...
    controller.py: broadcasted
    Solver 5567 operational
    solver.py 5566: solver step
    solver.py 5567: solver step
    controller.py: checkStatus
    solver.py 5566: remote_status
    Solver 5566 operational
    solver.py 5567: remote_status
    controller.py: step 3 results: [(True, 'Solver 5567, step 3, result: 847386.43'), (True, 'Solver 5566, step 3, result: 512120.15')]
    controller.py: terminate
    Solver 5567 operational
    solver.py 5566: remote_terminate
    solver.py 5567: remote_terminate
    




    Оригинал
    Поделиться публикацией
    Похожие публикации
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама
    Комментарии 10
    • +11
      Рискуя показаться ворчливым, но:
      Михаил, на хабре есть специальный вид статей: статья-перевод.

      А основное замечание:
      вы ухудшили читаемость статьи ну так в два раза. Вот сравните:



      Большая картинка


      Вы убрали всё форматирование, так, что не понятно, добавилили вы лично что-нибудь к англоязычной статье от 12 сентября 2008 года.
      Я три раза смотрел и не понял.

      А если уж быть вообще честным — то есть такие статьи-ссылки. Статья-ссылка была бы уместна. Перевод двух абзацев не так уж важен, как потеря читаемости.

      Извините за ворчливость.
      Успехов вам и хороших статей.
      Плюсанул, потому, что тема-то хорошая. Twisted.
      • 0
        >Рискуя показаться ворчливым, но:

        я могу быть не менее ворчливым ;-) на хабре межстрочный интервал больше в 1.5-2 раза + другой шрифт

        >Извините за ворчливость.

        Не проблема

        >А если уж быть вообще честным — то есть такие статьи-ссылки. Статья-ссылка была бы уместна. Перевод двух абзацев не так уж важен, как потеря читаемости.

        По twisted вообще немного хорошей документации, а русской — вообще «днем с огнем» (с). Перевожу то, с чем сталкиваюсь в работе и «рою» сам.

        >Успехов вам и хороших статей.

        Спасибо.
        • 0
          Думаю, Yfka хотел обратить внимание на подсветку синтаксиса
          • 0
            С удовольствием бы разметил, но Хабр съедает отступы в коде.
      • 0
        >Рискуя показаться ворчливым, но:

        >Михаил, на хабре есть специальный вид статей: статья-перевод.

        ок, спасибо

        А основное замечание:
        вы ухудшили читаемость статьи ну так в два раза. Вот сравните:
        • +1
          А ещё в перспективе в Twisted-е нас ждёт (когда-нибудь), в пару с уже имеющимся методом deferToThread(), метод deferToProcess() (см. также twisted.internet.processes).
          • 0
            >А ещё в перспективе

            Именно, что в ПЕРСПЕКТИВЕ

            Прорабатывал я их, но twisted.internet.processes у меня на 2.5 не завелся почему-то, прекрасно работая на 2.6. Разбираться было лениво, так как нашел переведенный вариант, который для моих нужд подходит лучше — нужно постоянно держать процессы в памяти.
            • +1
              Не multiprocessing ли захотел (а какие, впрочем, варианты...)? Так разумеется, у Python 2.6 модуль multiprocessing встроенный, а для 2.5 есть порт, python-multiprocessing.
              • 0
                Да, действительно он, спасибо — наврал

                В лоб на 2.5 почему-то порт не завелся, не понял почему — ну и отложил в долгий ящик.

                А статья выше на необходимый функционал легла
          • +1
            Я очень люблю параллельное программирование, но когда мне выкатывают метровые листинги кода без комментариев и практически без объяснений — меня это в нокаут посылает. А ведь я честно сидел и вкуривал эти коды, что бы понять что умеет этот твистед.

            Вы бы хоть комментов наставили.

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.