Пользователь
0,0
рейтинг
29 января 2013 в 16:30

Разработка → Параллельное программирование в Python при помощи multiprocessing и shared array из песочницы

Введение.


Python замечательный язык. Связка Python + NumPy + Matplotlib, на мой взгляд, сейчас одна из лучших для научных расчётов и быстрого прототипирования алгоритмов. Но у каждого инструмента есть свои светлые и тёмные стороны. Одной из самых дискутируемых особенностей Python является GIL – Global Interpreter Lock. Я бы отнёс эту особенность к тёмной стороне инструмента. Хотя многие со мной не согласятся.

Если кратко, то GIL не позволяет в одном интерпретаторе Python эффективно использовать больше одного потока. Защитники GIL утверждают, что однопоточные программы при наличии GIL работают намного эффективнее. Но наличие GIL означает, что параллельные вычисления с использованием множества потоков и общей памяти невозможны. А это достаточно сильное ограничения в современном многоядерном мире.

Один из способов преодоления GIL при помощи потоков на C++ был недавно рассмотрен в статье: Использование Python в многопоточном приложении на C++. Я же хочу рассмотреть другой способ преодоления ограничений GIL, основанный на multiprocessing и shared array. На мой взгляд, этот способ позволяет достаточно просто и эффективно использовать процессы и разделяемую память для прозрачного параллельного программирования в стиле множества потоков и общей памяти.

Задача.


В качестве примера рассмотрим следующую задачу. В трёхмерном пространстве заданы N точек v0, v1, …, vN. Требуется для каждой пары точек вычислить функцию, зависящую от расстояния между ними. Результат будет представлять собой матрицу NxN со значениями этой функции. В качестве функции возьмем следующую: f = r^3 / 12 + r^2 / 6. Этот тест, на самом деле, не такой уж и синтетический. На вычислении таких функций от расстояния основана RBF интерполяция, которая используется во многих областях вычислительной математики.

Способ параллелизации.


image
В данной задаче каждая строка матрицы может вычисляться независимо. Из каждых нескольких строк матрицы сформируем независимые работы и поместим их в очередь заданий (см. рис. 1). Запустим несколько процессов. Каждый процесс будет брать из очереди следующее задание на выполнение, пока не встретит специальное задание с кодом «end». В этом случае процесс заканчивает свою работу.

Реализация на Python.


В реализации на Python у нас будут два главных метода: mpCalcDistance(nodes) и
mpCalcDistance_Worker(nodes, queue, arrD). Метод mpCalcDistance(nodes) принимает на вход список узлов, создает область общей памяти, подготавливает очередь заданий и запускает процессы. Метод mpCalcDistance_Worker(nodes, queue, arrD) это вычислительный метод, работающий в собственном потоке. Он принимает на вход список узлов, очередь заданий и область общей памяти. Рассмотрим реализацию этих методов подробнее.

Метод mpCalcDistance(nodes)

Создаем область общей памяти:
    nP = nodes.shape[0]    
    nQ = nodes.shape[0]

    arrD = mp.RawArray(ctypes.c_double, nP * nQ)

Создаем очередь заданий. Каждое задание не что иное, как просто диапазон строчек матрицы. Специальное значение None это признак завершения работы процесса:
    nCPU = 2
    nJobs = nCPU * 36
   
    q = nP / nJobs
    r = nP % nJobs
 
    jobs = []
    firstRow = 0
    for i in range(nJobs):
        rowsInJob = q
        if (r > 0):
            rowsInJob += 1
            r -= 1
        jobs.append((firstRow, rowsInJob))
        firstRow += rowsInJob

    queue = mp.JoinableQueue()
    for job in jobs:
        queue.put(job)
    for i in range(nCPU):
        queue.put(None)

Создаем процессы и ждем пока не закончится очередь заданий:
    workers = []
    for i in range(nCPU):
        worker = mp.Process(target = mpCalcDistance_Worker,
                            args = (nodes, queue, arrD))
        workers.append(worker)
        worker.start()

    queue.join()

Из области общей памяти формируем матрицу с результатами:
    D = np.reshape(np.frombuffer(arrD), (nP, nQ))
    return D


Метод mpCalcDistance_Worker(nodes, queue, arrD)

Оборачиваем область общей памяти в матрицу:
    nP = nodes.shape[0]
    nQ = nodes.shape[0]

    D = np.reshape(np.frombuffer(arrD), (nP, nQ))

Пока не закончилась очередь заданий берем следующее задание из очереди и выполняем расчет:
    while True:
        job = queue.get()
        if job == None:
            break

        start = job[0]
        stop = job[0] + job[1]
          
        # components of the distance vector
        p = nodes[start:stop]
        q = nodes.T
        
        Rx = p[:, 0:1] - q[0:1]
        Ry = p[:, 1:2] - q[1:2]
        Rz = p[:, 2:3] - q[2:3]

        # calculate function of the distance
        L = np.sqrt(Rx * Rx + Ry * Ry + Rz * Rz)
        D[start:stop, :] = L * L * L / 12 + L * L / 6
        
        queue.task_done()


Результаты


Среда выполнения: двухядерный процессор, Ubuntu 12.04, 64bit.
image
image
На первой картинке показаны времена однопоточного и двухпоточного расчета для различных N. На второй картинке показано отношение времени однопоточного расчета к двухпоточному. Видно, что начиная с N = 500 мы получаем уже существенное ускорение расчета.

Очень интересный результат получается в районе числа N = 2000. В однопоточном варианте мы получаем заметный скачок времени расчета, а при многопоточном расчете коэффициент ускорения даже превышает 2. Я объясняю это эффектом кэша. В многопоточном варианте данные для каждой задачи полностью умещаются в кэш. А в однопоточном уже нет.

Так что использование процессов и общей памяти, на мой взгляд, достаточно просто позволяет обойти ограничения GIL.

Полный код всего скрипта:
""" Python multiprocessing with shared memory example.

This example demonstrate workaround for the GIL problem. Workaround uses
processes instead of threads and RawArray allocated from shared memory.

See also:
    [1] http://docs.python.org/2/library/multiprocessing.html
    [2] http://folk.uio.no/sturlamo/python/multiprocessing-tutorial.pdf
    [3] http://www.bryceboe.com/2011/01/28/the-python-multiprocessing-queue-and-large-objects/

"""

import time
import ctypes
import multiprocessing as mp
import numpy as np
import matplotlib.pyplot as plt

def generateNodes(N):
    """ Generate random 3D nodes
    """
    
    return np.random.rand(N, 3)

def spCalcDistance(nodes):
    """ Single process calculation of the distance function.
    """
    
    p = nodes
    q = nodes.T
    
    # components of the distance vector        
    Rx = p[:, 0:1] - q[0:1]
    Ry = p[:, 1:2] - q[1:2]
    Rz = p[:, 2:3] - q[2:3]
    
    # calculate function of the distance
    L = np.sqrt(Rx * Rx + Ry * Ry + Rz * Rz)
    D = L * L * L / 12 + L * L / 6
    
    return D
    
def mpCalcDistance_Worker(nodes, queue, arrD):
    """ Worker process for the multiprocessing calculations
    """

    nP = nodes.shape[0]
    nQ = nodes.shape[0]

    D = np.reshape(np.frombuffer(arrD), (nP, nQ))

    while True:
        job = queue.get()
        if job == None:
            break

        start = job[0]
        stop = job[0] + job[1]
          
        # components of the distance vector
        p = nodes[start:stop]
        q = nodes.T
        
        Rx = p[:, 0:1] - q[0:1]
        Ry = p[:, 1:2] - q[1:2]
        Rz = p[:, 2:3] - q[2:3]

        # calculate function of the distance
        L = np.sqrt(Rx * Rx + Ry * Ry + Rz * Rz)
        D[start:stop, :] = L * L * L / 12 + L * L / 6
        
        queue.task_done()
    queue.task_done()

def mpCalcDistance(nodes):
    """ Multiple processes calculation of the distance function.
    """

    # allocate shared array
    nP = nodes.shape[0]    
    nQ = nodes.shape[0]

    arrD = mp.RawArray(ctypes.c_double, nP * nQ)
   
    # setup jobs
    #nCPU = mp.cpu_count()
    nCPU = 2
    nJobs = nCPU * 36
   
    q = nP / nJobs
    r = nP % nJobs
 
    jobs = []
    firstRow = 0
    for i in range(nJobs):
        rowsInJob = q
        if (r > 0):
            rowsInJob += 1
            r -= 1
        jobs.append((firstRow, rowsInJob))
        firstRow += rowsInJob

    queue = mp.JoinableQueue()
    for job in jobs:
        queue.put(job)
    for i in range(nCPU):
        queue.put(None)

    # run workers
    workers = []
    for i in range(nCPU):
        worker = mp.Process(target = mpCalcDistance_Worker,
                            args = (nodes, queue, arrD))
        workers.append(worker)
        worker.start()

    queue.join()
   
    # make array from shared memory    
    D = np.reshape(np.frombuffer(arrD), (nP, nQ))
    return D

def compareTimes():
    """ Compare execution time single processing versus multiple processing.
    """
    nodes = generateNodes(3000)
    
    t0 = time.time()
    spD = spCalcDistance(nodes)
    t1 = time.time()
    print "single process time: {:.3f} s.".format(t1 - t0)

    t0 = time.time()
    mpD = mpCalcDistance(nodes)
    t1 = time.time()
    print "multiple processes time: {:.3f} s.".format(t1 - t0)
    
    err = np.linalg.norm(mpD - spD)
    print "calculate error: {:.2e}".format(err)
    
def showTimePlot():    
    """ Generate execution time plot single processing versus multiple processing.
    """
    
    N = range(100, 4000, 4)
    spTimes = []
    mpTimes = []
    rates = []
    for i in N:
        print i
        nodes = generateNodes(i)
        
        t0 = time.time()
        spD = spCalcDistance(nodes)
        t1 = time.time()
        sp_tt = t1 - t0
        spTimes.append(sp_tt)
        
        t0 = time.time()
        mpD = mpCalcDistance(nodes)
        t1 = time.time()
        mp_tt = t1 - t0
        mpTimes.append(mp_tt)
        
        rates.append(sp_tt / mp_tt)
                
    plt.figure()
    plt.plot(N, spTimes)
    plt.plot(N, mpTimes)
    plt.xlabel("N")
    plt.ylabel("Execution time")
    
    plt.figure()
    plt.plot(N, rates)
    plt.xlabel("N")
    plt.ylabel("Rate")
    plt.show()

def main():
    compareTimes()
    #showTimePlot()

if __name__ == '__main__':
    main()
@wagant
карма
14,0
рейтинг 0,0
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Реклама

Самое читаемое Разработка

Комментарии (15)

  • 0
    Если я не ошибаюсь у multiprocessing есть ограничение по организации файлов проекта. Что-то связанное с запускаемым файлом и __main__ или я ошибаюсь?
    • 0
      Требуется, что бы можно было импортировать пакет __main__ без side-effects.
    • 0
      Да, в Windows необходимо писать:

      from multiprocessing import Process
      
      def foo():
          print 'hello'
      
      if __name__ ==  '__main__':
          p = Process(target=foo)
          p.start()
      

      В Linux это не обязательно. Но так как писать межплатформенный код это правильно, то в своих скриптах я точку входа стараюсь оборачивать в if __name__ == '__main__'.
      • +1
        Я пот винду вообще не пишу, но оборачивать иполняемую часть модуля в «if __name__ == '__main__'» это хороший тон.
    • 0
      Вот здесь об этом более подробно написано:
      docs.python.org/2/library/multiprocessing.html#multiprocessing-programming
    • 0
      Под windows нет fork(), соотвественно запускается вторая инстанция самого python, которая выполняется до места «разветвления». И тут возникает необходимость в некоторых архитектурных решениях (если код сложнее 1-2 небольших файлов).
  • +1
    А как shared array внутри реализуется?
    • +1
      Я могу ошибаться. Но в Linux это вроде бы mmap через /dev/shm.
  • 0
    Я тут, знакомясь с Хоаровскими CSP нашел на занятную их реализацию на питоне — в частности, она позволяет писать один и тот же код с CSP-примитивами (параллельная композиция, всякие guarded alternative и т.д.) и исполнять его двумя с способами — поверх легковесных сопрограмм greenlet в одном процессе, что дает миллионы «нитей» на одной машине, либо с помощью полноценных процессов (в т.ч. уже связанных по TCP/IP). В продакшене не замечена, но на практикуме по параллельному программриованию, как я понял, студентами мучается уже не первый год.

    Кто-нибудь сталкивался?
  • +1
    Из Википедии:
    «GIL освобождается при исполнении кода большинства расширений, например, NumPy/SciPy, позволяя на время расчётов исполняться другому Python потоку.»
    • 0
      Спасибо за очень правильный вопрос! При ответе на него я буду пользоваться официальной информацией с сайта разработчиков SciPy о возможности параллелизации NumPy/SciPy. Там внизу есть интересная табличка сравнения времени выполнения кода с использованием нитей и потоков.

      Прежде всего да, вы абсолютно правы. GIL действительно освобождается при выполнении кода NumPy. Но! Есть два «но». Прежде всего оверхед при использовании нитей все-таки больше, чем при использовании процессов. Но это еще как-то можно было бы пережить.

      Второе «но» важнее. В случае если в нитке используются циклы, то GIL не освобождается и параллельное выполнение становится чуть ли не два раза дольше последовательного. А в реальных программах не всегда удается полностью векторизовать код. Часто остаются какие-либо мелкие циклы, векторизовать которые не слишком удобно. И в случае нитей параллельное программирование превращается в крайне неудобный процесс гадания, отпустится ли GIL или нет. И какой цикл на это влияет. При этом, что самое неприятное, параллельный код может быть в несколько раз медленне последовательного.

      В случае процессов мы избавлены от этих проблем и невекторизованный код проблемы не представляет. А точки зрения архитектуры мы все-равно работаем в парадигме нитей и общей памяти. Поэтому решение проблемы GIL при помощи процессов и shared memory мне кажется наиболее практичным.
  • 0
    Чем и как построены графики?
    • 0
      На Python с использование matplotlib. Стрелки уже сверху я нарисовал в Gimp. Полный код построения графиков приведен в конце поста в полном коде скрипта. Метод showTimePlot().
      • 0
        спасибо
  • 0
    Может быть кому-то будет интересно, но я в своей старой статье тоже рассматривал вопрос скорости выполнения кода через распараллеливание на основе OpenMP

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.