0,0
рейтинг
25 октября 2013 в 12:21

Разработка → Пишем numpy-модуль для ускорения математических функций с помощью SIMD-инструкций из песочницы tutorial

Пакеты numpy и scipy предоставляют прекрасные возможности для быстрого решения различных вычислительных задач. Концепция универсальных функций (ufunc), работающих как со скалярными значениями, так и с массивами различных размерностей, позволяет получить высокую производительность при сохранении присущей языку Python простоты и элегантности. Универсальная функция обычно используются для выполнения одной операции над большим массивом данных, что идеально подходит для оптимизации с помощью SIMD-инструкций, однако мне не удалось найти готового решения, основанного на свободном программном обеспечении и позволяющего использовать SIMD для вычисления в numpy таких математических функций, как синус, косинус и экспонента. Реализовывать алгоритмы вычисления этих функций с нуля совсем не хотелось, но к счастью в интернете нашлось несколько свободных библиотек на языке «С». Преодолев лень сомнения, я решил написать собственный numpy-модуль, предлагающий универсальные функции для синуса, косинуса и экспоненты. За подробностями и результатами тестов добро пожаловать под кат.


Немного о SIMD-инструкциях


SIMD-инструкции позволяют одновременно производить один и тот же набор операций над несколькими числами, записанными в одном регистре. Таким образом, за один такт можно обрабатывать сразу несколько чисел и потенциально увеличить производительность в несколько раз.
Например, набор SIMD-инструкций Advanced Vector Extensions (AVX) позволяет выполнять операции с 256-битными регистрами, каждый из которых может включать восемь 32-разрядных чисел с плавающей точкой (числа одинарной точности), либо четыре 64-разрядных (числа двойной точности). Набор операций довольно скромный, в основном это сложение, вычитание, умножение и деление. Точная и быстрая реализация тригонометрических функций с помощью этих операций — задача нетривиальная и, чтобы не изобретать велосипед, стоит использовать какую-нибудь готовую библиотеку. Помимо проприетарной Intel MKL(которая уже умеет работать с numpy) вариантов нашлось всего три (раз, два, три).
Первый вариант представляет собой заголовочный файл с очень скромным набором функций, практически без документации и тестов. Второй вариант — С++ библиотека vecmathlib, которая у меня почему-то упорно отказывалась компилироваться, несмотря на использование рекомендованного компилятора GCC-4.7. Вариант перспективный, но пока еще похоже сырой. Третий вариант — библиотеку SLEEF — удалось найти именно благодаря vecmathlib, которая использует его кодовую базу. Вариант мне сразу понравился простотой и ясностью кода, а также обилием тестов.

Небольшой тест для мотивации


Чтобы получить достаточную мотивацию для написания модуля, а заодно разобраться с использованием SLEEF, я решил сравнить скорость вычисления синуса на языке «C» при использовании SLEEF со стандартной math.h. Естественно, речь идет о поэлементном вычислении синуса для большого массива данных.
К сожалению, документации и примеров в SLEEF практически нет, но зато есть вполне понятно написанные тесты, так что разобраться с использованием библиотеки было не сложно. Исходный код SLEEF состоит из четырех директорий: java, purec, simd и tester. Кроме этого, там лежит файл README с кратким описанием библиотеки и общий Makefile, дергающий Makefile из перечисленных директорий. Меня естественно больше всего заинтересовала директория simd, в которой, как можно догадаться из названия, содержались оптимизированные с помощью SIMD-инструкций функции.
Из Makefile в директории simd понятно, что поддерживаются 4 варианта SIMD-инструкций: SSE2, AVX, AVX2 и FMA4. Прототипы функций определены в файле sleefsimd.h, а нужный набор инструкций выбирается при компиляции с помощью флагов -DENABLE_SSE2, -DENABLE_AVX, -DENABLE_AVX2 или -DENABLE_FMA4. Makefile собирает исполняемые файлы для тестирования функций с использованием каждого из наборов инструкций: iutsse2, iutavx, iutavx2 или iutfma4. Эти файлы вызываются из универсальной программы tester (из директории tester) и осуществляют выполнение получаемых от tester команд. Реализация команд находится в файле iut.c, откуда становится очевидным способ использования библиотеки.
Функция тестирования синуса из файла simd/iut.c исходного кода SLEEF
double xxsin(double d) { 
  double s[VECTLENDP];
  int i;
  for(i=0;i<VECTLENDP;i++) {
    s[i] = random()/(double)RAND_MAX*20000-10000;
  }
  int idx = random() & (VECTLENDP-1);
  s[idx] = d; 

  vdouble a = vloadu(s);
  a = xsin(a);
  vstoreu(s, a);

  return s[idx];
}


Для чисел двойной точности (double) нужно определить массив длиной VECTLENDP, заполнить аргументами интересующей нас функции и передать в функцию vloadu, которая скопирует их в необходимое для использования SIMD место и вернет значение типа vdouble. Значение vdouble мы передаем функции xsin, которая вычисляет значение синуса сразу для всех VECLENDP аргументов и возвращает опять же vdouble. Результат распаковывается в массив из double с помощью функции vstoreu.
Для желающих проверить SLEEF на своей машине привожу полный исходный код программы, которую написал для оценки потенциального ускорения от использования SIMD с помощью SLEEF.
Программа для оценки скорости вычисления синуса с помощью SLEEF
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <math.h>
#include "sleefsimd.h"

#define TESTSIZE (VECTLENDP*10000000)
double s[TESTSIZE];
double r1[TESTSIZE];
double r2[TESTSIZE];
#define COUNT 10
int main(int argc, char *argv[])
{
	int k, i;
    clock_t t1, t2;
    double time1, time2;
    double max, rmax;

    srandom(time(NULL));
	for(i = 0; i < TESTSIZE; i++) 
    {
		s[i] = random()/(double)RAND_MAX*20000-10000;
	}

	printf("Testing sin, %d values\n", TESTSIZE*COUNT);
	t1 = clock();
	for(k = 0; k < COUNT; k++)
	{
		for(i = 0; i < TESTSIZE; i++) 
        {
			r1[i] = sin(s[i]);
		}
	}
    t2 = clock();
    time1 = (double)(t2 - t1)/CLOCKS_PER_SEC;
	printf("Finish sin, spent time = %lg sec\n\n", time1);

	printf("Testing xsin\n");
	t1 = clock();
	for(k = 0; k < COUNT; k++)
	{
		for(i = 0; i < TESTSIZE; i += VECTLENDP) 
        {
			vdouble a = vloadu(s+i);
			a = xsin(a);
			vstoreu(r2+i, a);
		}
	}
    t2 = clock();
    time2 = (double)(t2 - t1)/CLOCKS_PER_SEC;
	printf("Finish xsin, spent time = %lg sec\n\n", time2);

	printf("Speed ratio: %lf\n", time1/time2);

	max = r1[0] - r2[0];
    rmax = (r1[0] - r2[0])/r1[0];
	for(i = 0; i < TESTSIZE; i++) 
    {
		double delta = (r1[i] - r2[i]);
		if(abs(delta) > abs(max)) max = delta;
        delta = (r1[i] - r2[i])/r1[i];
		if(abs(delta) > abs(max)) rmax = delta;
	}

	printf("Max absolute delta: %lg, relative delta %lg\n", max, rmax);
	return 0;
}


Самый продвинутый из поддерживаемых на моем компьютере набор команд — это AVX, поэтому я компилировал программу (записанную в файл simd/speedtest.c в исходниках SLEEF) следующей командой:
gcc -O3 -Wall -Wno-unused -Wno-attributes -DENABLE_AVX -mavx speedtest.c sleefsimddp.c sleefsimdsp.c -o speedtest -lm

Я ожидал ускорения примерно в 4 раза, но результат превзошел все мои ожидания:
Testing sin, 400000000 values
Finish sin, spent time = 14.95 sec

Testing xsin
Finish xsin, spent time = 1.31 sec

Speed ratio: 11.412214
Max absolute delta: 5.55112e-17, relative delta 1.58441e-16

Ускорение более чем в 10 раз, при относительной погрешности вычисления менее 2·10-16 (порядка точности самого double), на одном ядре процессора! Конечно, в реальном приложении ускорение будет меньше, но мотивации для написания своего numpy-модуля уже достаточно.

Пару слов об универсальных функциях


В numpy данные представляются в виде многомерных массивов. Универсальные функции поэлементно работают с массивами любой размерности, причем в случае нескольких параметров их размерность может не совпадать. Параметры универсальной функции сначала приводятся к одной размерности в соответствии со специальными правилами (это называется Broadcasting), а затем необходимые вычисления проводятся поэлементно. На выходе получается массив наибольшей из размерностей.
Например, одна и та же функция add (которая автоматически применяется при использовании оператора "+" для numpy-массивов) позволяет сложить как два числа или одномерных массива, так и добавить число к массиву или одномерный массив к двумерному.
Пример
>>> from numpy import array, add

>>> add(1, 2)
3

>>> add(array([1,2]), array([4,5])) # поэлементно складываем два массива одинаковой размерности
array([5, 7])

>>> add(array([1,2]), 1) # добавляем к одномерному массиву число (т.е. массив нулевой размерности)
array([2, 3])

>>> add(array([[1,2],[3,4]]), array([1,2])) # добавляем к двумерному массиву одномерный (построчно)
array([[2, 4],
       [4, 6]])


Более подробную информацию по numpy на английском можно найти в официальной документации, на русском — например здесь.

Пишем свой numpy-модуль с универсальной функцией и SIMD-инструкциями


У numpy и skipy довольно удобный API и неплохая документация, в которой для желающих написать собственный numpy-модуль с универсальной функцией есть соответствующий tutorial. Сначала мы пишем C-функцию, в линейном цикле вычисляющую значение интересующей нас математической функции от скалярного аргумента:
Фукнция для вычисления синуса в numpy-модуле
static void double_xsin(char **args, npy_intp *dimensions,
                            npy_intp* steps, void* data)
{
    npy_intp i;
    npy_intp n = dimensions[0];
    char *in = args[0], *out = args[1];
    npy_intp in_step = steps[0], out_step = steps[1];
    double tmp[VECTLENDP];
    vdouble a;
    int slow_n = n % VECTLENDP;
    if(in_step != sizeof(double) || out_step != sizeof(double))
        slow_n = n;
    for(i = 0; i < slow_n; i += VECTLENDP)
    {
        int j;
        for(j = 0; j < VECTLENDP && i + j < slow_n; j++)
        {
            tmp[j] = *(double *)in;
            in += in_step;            
        }
        a = vloadu(tmp);
        a = xsin(a);
        vstoreu(tmp, a);
        for(j = 0; j < VECTLENDP && i + j < slow_n; j++)
        {
            *(double *)out = tmp[j];
            out += out_step;
        }        
    }
    if(n > slow_n)
    {
        double *in_array = (double *)in;
        double *out_array = (double *)out;
        for(i = 0; i < n - slow_n; i += VECTLENDP)
        {
            a = vloadu(in_array + i);
        	a = xsin(a);
	        vstoreu(out_array + i, a);    
        }
    }
}


Указатели на входные и выходные данные numpy передает нам в массиве args. В нашем случае у функции один вход и один выход, поэтому адрес входных данных args[0], выходных — args[1]. Количество элементов передается в dimensions[0]. Для перемещения по входным данным нужно увеличивать указатель на steps[0], по выходным — на steps[1] (важно, что указатель имеет тип char, поскольку в steps указаны значения в байтах). К сожалению, мне не удалось найти в документации numpy утверждения, что значения в steps должны быть равны размерам соответствующих типов данных, хотя эксперимент показал, что на моей системе для массивов ненулевой размерности это правило выполняется. В случае его нарушения вычисления будут медленнее, поскольку потребуется дополнительное копирование элементов в массив tmp и из него.
Одна и та же универсальная функция в numpy может работать с различными типами данных, но для каждого типа данных пишется отдельная C-функция. При регистрации универсальной функции мы указываем поддерживаемые типы и для каждой комбинации входных и выходных типов передаем указатель на С-функцию, которая будет с этой комбинацией работать:
static PyUFuncGenericFunction funcs[] = {&double_xsin};
static char types[] = {NPY_DOUBLE, NPY_DOUBLE};
static void *data[] = {NULL};

В массиве types указываются как входные, так и выходные типы, поэтому он длиннее массивов funcs и data. Массив указателей data позволяет для каждой комбинации типов указать свой дополнительный параметр, который будет передан в C-фунцкию в качестве последнего аргумента void* data. В частности, это можно использовать для реализации разных универсальных функций с помощью одной C-функции.
Чтобы зарегистрировать нашу универсальную функцию, нужно вызвать PyUFunc_FromFuncAndData и передать ей описанные выше массивы (funcs, data и types), а также количество входных и выходных аргументов универсальной функции, количество поддерживаемых комбинаций типов, имя функции в модуле и строку документации.
Полный исходный код модуля
#include "Python.h"
#include "numpy/ndarraytypes.h"
#include "numpy/ufuncobject.h"
#include "numpy/npy_3kcompat.h"
#include "sleef/sleefsimd.h"

/* The loop definition must precede the PyMODINIT_FUNC. */
static void double_xsin(char **args, npy_intp *dimensions,
                            npy_intp* steps, void* data)
{
    npy_intp i;
    npy_intp n = dimensions[0];
    char *in = args[0], *out = args[1];
    npy_intp in_step = steps[0], out_step = steps[1];
    double tmp[VECTLENDP];
    vdouble a;
    int slow_n = n % VECTLENDP;
    if(in_step != sizeof(double) || out_step != sizeof(double))
        slow_n = n;
    for(i = 0; i < slow_n; i += VECTLENDP)
    {
        int j;
        for(j = 0; j < VECTLENDP && i + j < slow_n; j++)
        {
            tmp[j] = *(double *)in;
            in += in_step;            
        }
        a = vloadu(tmp);
        a = xsin(a);
        vstoreu(tmp, a);
        for(j = 0; j < VECTLENDP && i + j < slow_n; j++)
        {
            *(double *)out = tmp[j];
            out += out_step;
        }        
    }
    if(n > slow_n)
    {
        double *in_array = (double *)in;
        double *out_array = (double *)out;
        for(i = 0; i < n - slow_n; i += VECTLENDP)
        {
            a = vloadu(in_array + i);
        	a = xsin(a);
	        vstoreu(out_array + i, a);    
        }
    }
}

static PyMethodDef AvxmathMethods[] = {
        {NULL, NULL, 0, NULL}
};

static PyUFuncGenericFunction funcs[1] = {&double_xsin};
static char types[] = {NPY_DOUBLE, NPY_DOUBLE};
static void *data[] = {NULL};

void register_xsin(PyObject *module)
{
    PyObject *xsin, *d;
    import_array();
    import_umath();

    xsin = PyUFunc_FromFuncAndData(funcs, data, types, 1, 1, 1,
                                    PyUFunc_None, "sin",
                                    "AVX-accelerated sine calculation", 0);
    d = PyModule_GetDict(module);
    PyDict_SetItemString(d, "sin", xsin);
    Py_DECREF(xsin);
}

#if PY_VERSION_HEX >= 0x03000000
static struct PyModuleDef moduledef = {
    PyModuleDef_HEAD_INIT,
    "avxmath",
    NULL,
    -1,
    AvxmathMethods,
    NULL,
    NULL,
    NULL,
    NULL
};

PyMODINIT_FUNC PyInit_avxmath(void)
{
    PyObject *m;
    m = PyModule_Create(&moduledef);
    if (!m) {
        return NULL;
    }
    register_xsin(m);
    return m;
}
#else
PyMODINIT_FUNC initavxmath(void)
{
    PyObject *m;
    m = Py_InitModule("avxmath", AvxmathMethods);
    if (m == NULL) {
        return;
    }
    register_xsin(m);
}
#endif


Для сборки модуля я использовал стандартный setup.py из документации, заменив название модуля и добавив C-файлы библиотеки SLEEF, флаги компилятора и линковщика. Приведенный выше исходный код модуля я сохранил рядом с setup.py в файле с именем avxmath.c, директорию simd из исходного кода SLEEF переименовал в sleef и также положил рядом с setup.py.
Файл setup.py для сборки модуля avxmath
def configuration(parent_package='', top_path=None):
    import numpy
    from numpy.distutils.misc_util import Configuration

    config = Configuration('',
                           parent_package,
                           top_path)
    config.add_extension('avxmath', ['avxmath.c', 'sleef/sleefsimddp.c', 'sleef/sleefsimdsp.c'], 
                         extra_compile_args=['-O3', '-Wall', '-Wno-unused', '-Wno-attributes', '-DENABLE_AVX','-mavx'], 
                         extra_link_args=['-lm'])

    return config

if __name__ == "__main__":
    from numpy.distutils.core import setup
    setup(configuration=configuration)


Для компиляции без установки в систему нужно выполнить команду python setup.py build_ext --inplace, результатом успешного выполнения которой должен стать готовый модуль в файле avxmath.so. Теперь можно проверить работоспособность нашей функции. Запускаем python в той же директории, где находится файл avxmath.so, и проверяем:
>>> from numpy import array, pi
>>> import avxmath
>>> avxmath.sin(0)
0.0
>>> avxmath.sin(pi)
1.2246467991473532e-16
>>> avxmath.sin([0, pi/2, pi, 3*pi/2, 2*pi])
array([  0.00000000e+00,   1.00000000e+00,   1.22464680e-16,
        -1.00000000e+00,  -2.44929360e-16])
>>> 

Убедившись, что модуль avxmath импортируется и работает без ошибок, можно сделать небольшой тест производительности и точности новой функции.
Программа проверки функции sin модуля avxmath и результат ее выполнения
import numpy 
import avxmath
import time
from numpy import random, pi

COUNT=10

x = 2e4*random.random(40000000) - 1e4
t = time.clock()
for i in xrange(COUNT):
    y1 = numpy.sin(x)
duration1 = time.clock() - t
print "numpy.sin %f sec" % duration1

t = time.clock()
for i in xrange(COUNT):
    y2 = avxmath.sin(x)
duration2 = time.clock() - t
print "avxmath.sin %f sec" % duration2

delta = y2 - y1
rdelta = delta/y1
print "max absolute difference is %lg, relative %lg" % (
        delta[abs(delta).argmax()], rdelta[abs(rdelta).argmax()])
print "speedup is %lg" % (duration1/duration2)

numpy.sin 15.510000 sec
avxmath.sin 2.260000 sec
max absolute difference is 2.22045e-16, relative 2.63873e-16
speedup is 6.86283


Итак, мы получили ускорение более чем в 6 раз при точности вычислений не хуже 3·10-16! Заменив вызовы функции xsin на простое копирование памяти, нетрудно убедиться, что ускорение в 10 раз не получилось из-за того, что около 1 секунды из полученных нами 2.26 секунд выполнения ушло на накладные расходы. Аналогично, заменив функцию xsin на обычный синус из math.h, мы обнаружим, что времена вычислений с помощью avxmath.sin и numpy.sin в нашем тесте с высокой точностью совпадут.
Таким образом, с помощью SIMD инструкций можно добиться значительного ускорения выполняемых с помощью numpy и scipy вычислений, просто заменив импорты обычных функций на оптимизированные. Ну и конечно исходные коды несколько расширенного по сравнению с данной статьей модуля avxmath доступны на Github по ссылке.
Upd: Не используйте синус из SLEEF для значений аргумента порядка 1e10 и больше (см. комментарий)
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Спецпроект

Самое читаемое Разработка

Комментарии (4)

  • 0
    странно, что в numpy этого нет «из коробки».

    Насколько я знаю, то обвязку для C можно еще делать на Cython и синтаксис там будет привычнее для тех кто привык к python (как-то так наверное). Не думали глянуть такой вариант привязки? Ну а может и ваш и этот вариант, да сравнить потом.
    • 0
      странно, что в numpy этого нет «из коробки».

      Согласен, но тут видимо проблема архитектурной зависимости. Все-таки не на всех процессорах поддерживается AVX, а реализовать детектирование поддерживаемых SIMD-интрукций и выбор оптимальных функций для каждого типа данных при импорте модуля достаточно сложно.

      По поводу Cython — его наверное можно было бы использовать, но тут не просто вызов С-функции из python, а еще и работа по упаковыванию элементов numpy-массива в группы по 4 элемента и последующая распаковка обратно. В любом случае на Cython вряд ли получится быстрее, чем на C :)
  • 0
    Точность операций над числами с плавающей точкой нельзя оценить одним числом. На то и называется точка плавающей, что посчитать sin(2e20 * pi) возможно, но абсолютно бессмысленно.
    • 0
      Конечно, абсолютная точность у sin(2e20*pi) и sin(2*pi) будет разной (как впрочем она будет различаться для самих чисел double, содержащих 2*pi или 2e20*pi):
      xsin(1e+00*M_PI) = 1.22465e-16, sin(1e+00*M_PI) = 1.22465e-16
      xsin(1e+01*M_PI) = -1.22465e-15, sin(1e+01*M_PI) = -1.22465e-15
      xsin(1e+02*M_PI) = 1.96439e-15, sin(1e+02*M_PI) = 1.96439e-15
      xsin(1e+03*M_PI) = -3.21417e-13, sin(1e+03*M_PI) = -3.21417e-13
      xsin(1e+04*M_PI) = -4.85682e-13, sin(1e+04*M_PI) = -4.85682e-13
      xsin(1e+05*M_PI) = -3.39607e-11, sin(1e+05*M_PI) = -3.39607e-11
      xsin(1e+06*M_PI) = -2.23191e-10, sin(1e+06*M_PI) = -2.23191e-10xsin(1e+10*M_PI) = -8.96779e+183, sin(1e+10*M_PI) = -2.23936e-06
      xsin(1e+07*M_PI) = 5.62056e-10, sin(1e+07*M_PI) = 5.62056e-10
      xsin(1e+08*M_PI) = -3.90829e-08, sin(1e+08*M_PI) = -3.90829e-08
      xsin(1e+09*M_PI) = -3.32014e-08, sin(1e+09*M_PI) = -3.32014e-08
      

      Именно потому в статье я и привожу относительную погрешность в сравнении с обычным синусом (т.е. (xsin(x)-sin(x))/sin(x)). Показатель конечно не идеальный (а при нулевом знаменателе и вовсе неопределенный), но представление о погрешности дает. С большими числами есть другая пробелема — недавно я обнаружил, что синус в SLEEF для чисел порядка 1e10 и больше, дает расходимость:
      xsin(1e+10*M_PI) = -8.96779e+183, sin(1e+10*M_PI) = -2.23936e-06
      

      Так что Вы правы насчет того, что с большими числами нужно быть осторожнее.

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.