23 июня 2010 в 13:03

Python и Twisted — Заметки о параллельной обработке данных (мультипроцессности)

imageTwisted — это фреймворк на Python для разработки сетевых приложений, который среди многих других применений, может быть использован и для параллельной обработки данных — мультипроцессности. Это замечательно, но мне пришлось попотеть для того, чтобы найти то, что мне нужно.

Я листал документацию Twisted и книгу O'Reilly Twisted. Существует также рецепт в Python Cookbook. Однако, самое интересное я нашел в статье Брюса Эккель — Параллельность с Python, Twisted и Flex. Также стоит прочитать первоначальные статьи Брюса Эккель про Twisted: Grokking Twisted.

Вот мои замечания о текущем примере Брюса.

Я убрал Flex — отчасти потому, что мне это не нужно и я ничего не хочу знать об этом. В примере запускается контроллер, который инициализирует ряд отдельных параллельных процессов-вычислителей, в которых уже запускаются какие-то сложные действия (эти процессы называют solvers). Также тут имеется взаимодействие между контроллером и вычислителями. Хотя этот пример запускается только на одной машине, те принципы, о которых говориться в статье — не трудно распространить и на систему из нескольких компьютеров.

Для хорошего примера, как это работает, пожалуйста, смотрите оригинал статьи .

Вот solver.py который скопирован с оригинала. Настоящая «работа» происходит в методе step(). Я только добавил некоторую отладочную информацию для себя.
"""
solver.py
Original version by Bruce Eckel
Solves one portion of a problem, in a separate process on a separate CPU
"""
import sys, random, math
from twisted.spread import pb
from twisted.internet import reactor

class Solver(pb.Root):

    def __init__(self, id):
        print "solver.py %s: solver init" % id
        self.id = id

    def __str__(self): # String representation
        return "Solver %s" % self.id

    def remote_initialize(self, initArg):
        return "%s initialized" % self

    def step(self, arg):
        print "solver.py %s: solver step" % self.id
        "Simulate work and return result"
        result = 0
        for i in range(random.randint(1000000, 3000000)):
            angle = math.radians(random.randint(0, 45))
            result += math.tanh(angle)/math.cosh(angle)
        return "%s, %s, result: %.2f" % (self, str(arg), result)

    # Alias methods, for demonstration version:
    remote_step1 = step
    remote_step2 = step
    remote_step3 = step

    def remote_status(self):
        print "solver.py %s: remote_status" % self.id
        return "%s operational" % self

    def remote_terminate(self):
        print "solver.py %s: remote_terminate" % self.id
        reactor.callLater(0.5, reactor.stop)
        return "%s terminating..." % self

if __name__ == "__main__":
    port = int(sys.argv[1])
    reactor.listenTCP(port, pb.PBServerFactory(Solver(sys.argv[1])))
    reactor.run()



Вот controller.py. Он также скопирован из оригинальной статьи, но я убрал Flex и создал сигналы start и terminate в классе контроллера. Я не уверен, что это имеет смысл, но, по крайней мере, это позволило мне нормально использовать пример. Я также перенес метод terminate из FlexInterface в Controller.
"""
Controller.py
Original version by Bruce Eckel
Starts and manages solvers in separate processes for parallel processing.
"""
import sys
from subprocess import Popen
from twisted.spread import pb
from twisted.internet import reactor, defer

START_PORT = 5566
MAX_PROCESSES = 2

class Controller(object):

    def broadcastCommand(self, remoteMethodName, arguments, nextStep, failureMessage):
        print "controller.py: broadcasting..."
        deferreds = [solver.callRemote(remoteMethodName, arguments) 
                     for solver in self.solvers.values()]
        print "controller.py: broadcasted"
        reactor.callLater(3, self.checkStatus)

        defer.DeferredList(deferreds, consumeErrors=True).addCallbacks(
            nextStep, self.failed, errbackArgs=(failureMessage))
    
    def checkStatus(self):
        print "controller.py: checkStatus"
        for solver in self.solvers.values():
            solver.callRemote("status").addCallbacks(
                lambda r: sys.stdout.write(r + "\n"), self.failed, 
                errbackArgs=("Status Check Failed"))
                                                     
    def failed(self, results, failureMessage="Call Failed"):
        print "controller.py: failed"
        for (success, returnValue), (address, port) in zip(results, self.solvers):
            if not success:
                raise Exception("address: %s port: %d %s" % (address, port, failureMessage))

    def __init__(self):
        print "controller.py: init"
        self.solvers = dict.fromkeys(
            [("localhost", i) for i in range(START_PORT, START_PORT+MAX_PROCESSES)])
        self.pids = [Popen(["python", "solver.py", str(port)]).pid
                     for ip, port in self.solvers]
        print "PIDS: ", self.pids
        self.connected = False
        reactor.callLater(1, self.connect)

    def connect(self):
        print "controller.py: connect"
        connections = []
        for address, port in self.solvers:
            factory = pb.PBClientFactory()
            reactor.connectTCP(address, port, factory)
            connections.append(factory.getRootObject())
        defer.DeferredList(connections, consumeErrors=True).addCallbacks(
            self.storeConnections, self.failed, errbackArgs=("Failed to Connect"))

        print "controller.py: starting parallel jobs"
        self.start()

    def storeConnections(self, results):
        print "controller.py: storeconnections"
        for (success, solver), (address, port) in zip(results, self.solvers):
            self.solvers[address, port] = solver
        print "controller.py: Connected; self.solvers:", self.solvers
        self.connected = True

    def start(self):
        "controller.py: Begin the solving process"
        if not self.connected:
            return reactor.callLater(0.5, self.start)
        self.broadcastCommand("step1", ("step 1"), self.step2, "Failed Step 1")

    def step2(self, results):
        print "controller.py: step 1 results:", results
        self.broadcastCommand("step2", ("step 2"), self.step3, "Failed Step 2")

    def step3(self, results):
        print "controller.py: step 2 results:", results
        self.broadcastCommand("step3", ("step 3"), self.collectResults, "Failed Step 3")

    def collectResults(self, results):
        print "controller.py: step 3 results:", results
        self.terminate()
        
    def terminate(self):
        print "controller.py: terminate"
        for solver in self.solvers.values():
            solver.callRemote("terminate").addErrback(self.failed, "Termination Failed")
        reactor.callLater(1, reactor.stop)
        return "Terminating remote solvers"

if __name__ == "__main__":
    controller = Controller()
    reactor.run()



Чтобы запустить программу, положите оба файла в одну папку и запустите
python controller.py


Вы должны увидеть, как загрузка двух процессоров (если их, конечно, у вас — 2 ;-) ) поднимется до 100%. А вот и вывод скрипта на экран:
controller.py: init
PIDS:  [12173, 12174]
solver.py 5567: solver init
solver.py 5566: solver init
controller.py: connect
controller.py: starting parallel jobs
controller.py: storeconnections
controller.py: Connected; self.solvers: {('localhost', 5567): , ('localhost', 5566): }
controller.py: broadcasting...
controller.py: broadcasted
solver.py 5566: solver step
solver.py 5567: solver step
controller.py: checkStatus
solver.py 5566: remote_status
Solver 5566 operational
solver.py 5567: remote_status
controller.py: step 1 results: [(True, 'Solver 5567, step 1, result: 683825.75'), (True, 'Solver 5566, step 1, result: 543177.17')]
controller.py: broadcasting...
controller.py: broadcasted
Solver 5567 operational
solver.py 5566: solver step
solver.py 5567: solver step
controller.py: checkStatus
solver.py 5566: remote_status
Solver 5566 operational
solver.py 5567: remote_status
controller.py: step 2 results: [(True, 'Solver 5567, step 2, result: 636793.90'), (True, 'Solver 5566, step 2, result: 335358.16')]
controller.py: broadcasting...
controller.py: broadcasted
Solver 5567 operational
solver.py 5566: solver step
solver.py 5567: solver step
controller.py: checkStatus
solver.py 5566: remote_status
Solver 5566 operational
solver.py 5567: remote_status
controller.py: step 3 results: [(True, 'Solver 5567, step 3, result: 847386.43'), (True, 'Solver 5566, step 3, result: 512120.15')]
controller.py: terminate
Solver 5567 operational
solver.py 5566: remote_terminate
solver.py 5567: remote_terminate




Оригинал
Михаил Нерадков @MichaelXIII
карма
17,0
рейтинг 0,0
Похожие публикации
Самое читаемое Разработка

Комментарии (10)

  • +11
    Рискуя показаться ворчливым, но:
    Михаил, на хабре есть специальный вид статей: статья-перевод.

    А основное замечание:
    вы ухудшили читаемость статьи ну так в два раза. Вот сравните:



    Большая картинка


    Вы убрали всё форматирование, так, что не понятно, добавилили вы лично что-нибудь к англоязычной статье от 12 сентября 2008 года.
    Я три раза смотрел и не понял.

    А если уж быть вообще честным — то есть такие статьи-ссылки. Статья-ссылка была бы уместна. Перевод двух абзацев не так уж важен, как потеря читаемости.

    Извините за ворчливость.
    Успехов вам и хороших статей.
    Плюсанул, потому, что тема-то хорошая. Twisted.
    • 0
      >Рискуя показаться ворчливым, но:

      я могу быть не менее ворчливым ;-) на хабре межстрочный интервал больше в 1.5-2 раза + другой шрифт

      >Извините за ворчливость.

      Не проблема

      >А если уж быть вообще честным — то есть такие статьи-ссылки. Статья-ссылка была бы уместна. Перевод двух абзацев не так уж важен, как потеря читаемости.

      По twisted вообще немного хорошей документации, а русской — вообще «днем с огнем» (с). Перевожу то, с чем сталкиваюсь в работе и «рою» сам.

      >Успехов вам и хороших статей.

      Спасибо.
      • 0
        Думаю, Yfka хотел обратить внимание на подсветку синтаксиса
        • 0
          С удовольствием бы разметил, но Хабр съедает отступы в коде.
  • 0
    >Рискуя показаться ворчливым, но:

    >Михаил, на хабре есть специальный вид статей: статья-перевод.

    ок, спасибо

    А основное замечание:
    вы ухудшили читаемость статьи ну так в два раза. Вот сравните:
  • +1
    А ещё в перспективе в Twisted-е нас ждёт (когда-нибудь), в пару с уже имеющимся методом deferToThread(), метод deferToProcess() (см. также twisted.internet.processes).
    • 0
      >А ещё в перспективе

      Именно, что в ПЕРСПЕКТИВЕ

      Прорабатывал я их, но twisted.internet.processes у меня на 2.5 не завелся почему-то, прекрасно работая на 2.6. Разбираться было лениво, так как нашел переведенный вариант, который для моих нужд подходит лучше — нужно постоянно держать процессы в памяти.
      • +1
        Не multiprocessing ли захотел (а какие, впрочем, варианты...)? Так разумеется, у Python 2.6 модуль multiprocessing встроенный, а для 2.5 есть порт, python-multiprocessing.
        • 0
          Да, действительно он, спасибо — наврал

          В лоб на 2.5 почему-то порт не завелся, не понял почему — ну и отложил в долгий ящик.

          А статья выше на необходимый функционал легла
  • +1
    Я очень люблю параллельное программирование, но когда мне выкатывают метровые листинги кода без комментариев и практически без объяснений — меня это в нокаут посылает. А ведь я честно сидел и вкуривал эти коды, что бы понять что умеет этот твистед.

    Вы бы хоть комментов наставили.

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.