Pull to refresh

Microsoft Robotics. Параллельная обработка данных

Reading time7 min
Views7.1K
Один из продуктов Microsoft — Microsoft Robotics включает библиотеку Concurrent and Coordination Runtime. Библиотека очень нужна роботам для организации параллельных вычислений при обработке звука и изображений (да и не только их). В посте я расскажу как с помощью данной библиотеки создать параллельный алгоритм умножения матрицы A (m × n элементов) на вектор B (1 × n элементов) по следующей формуле: . Алгоритм вычисления вектора C (m × 1 элементов) легко распараллеливается, так как значение i-го элемента вектора не зависит от значений других его элементов. Перед запуском примера из исходников рекомендуется установить Microsoft Robotics.

1. Краткое введение в Microsoft Robotics


В состав Microsoft Robotics входит несколько компонент:
  • библиотека Concurrent and Coordination Runtime (CCR) – предназначена для организации обработки данных с помощью параллельно и асинхронно выполняющихся методов. Взаимодействие между такими методами организуется на основе сообщений. Рассылка сообщений основана на использовании портов;
  • Decentralized Software Services (DSS) – среда, которая позволяет запускать алгоритмы обработки данных на разных ЭВМ, организовывать асинхронное взаимодействие процессов управления различными подсистемами робота;
  • Visual Simulation Environment (VSE) – среда визуализации, которая позволяет экспериментировать с моделями роботов, тестировать алгоритмы управления роботами;
  • Visual Programming Language (VPL) – язык, предназначенный для разработки программ управления роботами. Программа на таком языке представляется в виде последовательности блоков, которые выполняют обработку данных, и связей между ними.

Основные понятия Microsoft Robotics:
  • сервис – приложение, разрабатываемое в Microsoft Robotics;
  • сообщение – экземпляр любого типа данных;
  • порт – очередь сообщений типа FIFO (First-In-First-Out), сообщение остаётся в порте пока не будут извлечено из очереди порта получателем.
    Определение порта:
    Port<int> p = new Port<int>();
    

    Отправка сообщения в порт:
    p.Post(1);
    

  • получатель – структура, которая выполняет обработку сообщений. Данная структура объединяет:
    1. один или несколько портов, в которые отправляются сообщения;
    2. метод (или методы), которые используются для обработки сообщений (такой метод называется задачей);
    3. логическое условие, определяющее ситуации, в которых активизируется тот или иной получатель.

    Описание получателя:
    Arbiter.Receive(<тип_получателя>, <порт>, <выполняемый_делегат>);
    Пример:
    Activate(
        Arbiter.Receive(false, intPort,
            delegate(int n)
            { Console.WriteLine("Получено сообщение: " + n.ToString()); }
    
        )
    );
    

    Делегат, входящий в получатель, выполнится, когда в порт intPort придёт сообщение.
    Получатели сообщений бывают двух типов: временные и постоянные (в примере получатель – временный). Временный получатель, обработав сообщение (или несколько сообщений), удаляется из списка получателей сообщений данного порта.
  • процессом запуска задач управляет диспетчер. После выполнения условий активации задачи (одним из условий активации может быть получение портом сообщения) диспетчер назначает задаче поток из пула потоков, в котором она будет выполняться.
    Описание диспетчера с двумя потоками в пуле:
    Dispatcher d = new Dispatcher(2, "MyPool");
    

    Описание очереди диспетчера, в которую задачи ставятся на выполнение:
    DispatcherQueue dq = new DispatcherQueue("MyQueue", d);
    


2. Создание проекта сервиса


Проект сервиса создаётся в Visual Studio на закладке Microsoft Robotics (см. рис. 1).


Рис. 1. Создание проекта сервиса

После создания проекта нужно настроить сервис: указать для него имя, пространство имён, домен организации, для которой разрабатывается сервис, а также год и месяц создания сервиса. Домен и дата используются для создания уникального идентификатора сервиса (в данном проекте этот идентификатор непосредственно нужен не будет).


Рис. 2. Настройка сервиса

Проект сервиса состоит из нескольких файлов:
  • ParallelProcessing.cs – ядро сервиса;
  • ParallelProcessingTypes.manifest.xml – манифест, который используется DSS для загрузки сервиса;
  • ParallelProcessingTypes.cs – содержит описание типов, которые используются сервисом.

После компиляции сервиса генерируется набор dll библиотек и манифест. Манифест – файл в XML формате, содержащий ссылки на сервисы, с которыми взаимодействует разрабатываемый сервис. Настройки проекта сервиса установлены так, что после компиляции сервиса созданные dll библиотеки копируются в каталог \bin.
Запуск сервиса выполняется с помощью программы dsshost, которая загружает манифест и все сервисы, которые в нём указаны.

3. Структура сервиса


В структуру кода сервиса входит метод Start, который вызывается, когда DSS узел (dsshost.exe) запускает сервис (поэтому в метод Start обычно помещают действия по инициализации данных):
protected override void Start()
{
    base.Start();
    // команды инициализации 
    // сервиса добавляются ниже
}

После компиляции проекта генерируются три dll библиотеки:
  • ParallelProcessing.Y2013.M02.dll – библиотека реализации сервиса, формируемая на основе файлов исходного кода, включённых в проект сервиса;
  • библиотека ParallelProcessing.Y2013.M02.proxy.dll позволяет использовать твой сервис другими сервисами;
  • библиотека ParallelProcessing.Y2013.M02.transform.dll содержит описание соответствия между типами, определенными в реализации сервиса и реализации proxy библиотеки. Данная библиотека загружается автоматически с помощью среды выполнения DSS.

Выполнить созданный сервис можно не только через F5 в Visual Studio, но и через командную строку. Для этого нужно выбрать пункт «DSS Command Prompt» в меню Пуск\MRDS. В результате, запустится окно командного интерпретатора, будет установлен корневой каталог и переменные окружения. В открывшемся окне нужно запустить на выполнение следующую команду:
dsshost /p:50000 /t:50001 /m:"<путь к файлу манифеста сервиса>"

4. Оценка времени выполнения


Время выполнения вычислений будем определять с помощью класса Stopwatch:
Stopwatch sWatch = new Stopwatch();
sWatch.Start();
<выполняемый код>
sWatch.Stop();
Console.WriteLine(sWatch.ElapsedMilliseconds.ToString());

5. Реализация вычислений


5.1. Объявление структур данных

Матрицу A, векторы B и C, переменные для хранения их размеров определим глобально:
int[,] A;
int[] B;
int[] C;
int m;
int n;

Метод TestFunction() запускает вычисления. Сначала в методе выполняется умножение матрицы на вектор с помощью последовательного алгоритма, затем – та же задача решается с помощью параллельных вычислений. Рассмотрим этот метод.
Выполняется инициализация структур данных:
nc = 2; // количество ядер
m = 11000; // количество строк матрицы
n = 11000; // количество столбцов матрицы
A = new int[m, n];
B = new int[n];
C = new int[m];

Генерируется матрица A и вектор B:
Random r = new Random();
for (int i = 0; i < m; i++)
{
    for (int j = 0; j < n; j++)
        A[i, j] = r.Next(100);
}
for (int j = 0; j < n; j++)
    B[j] = r.Next(100);

5.2. Последовательный алгоритм перемножения

Рассмотрим метод SequentialMul:
Stopwatch sWatch = new Stopwatch();
sWatch.Start(); 
for (int i = 0; i < m; i++)
{
    C[i] = 0;
    for (int j = 0; j < n; j++)
        C[i] += A[i, j] * B[j];
}
sWatch.Stop();
Console.WriteLine("Последовательный алгоритм = {0} мс.", sWatch.ElapsedMilliseconds.ToString());

5.3. Параллельный алгоритм перемножения

Параллельная обработка выполняется с помощью запуска нескольких копий вычислительного метода. Каждая копия метода выполняет обработку определённой части исходных данных. Для описания задания для каждого метода используется класс InputData:
public class InputData
{
    public int start; // начало диапазона
    public int stop;  // конец диапазона
}

Поля start / stop класса хранят номер начальной / конечной строки вектора C, которые рассчитываются с помощью экземпляра вычислительного метода.
Рассмотрим метод ParallelMul:
// создание массива объектов для хранения параметров
InputData[] ClArr = new InputData[nc];
for (int i = 0; i < nc; i++)
    ClArr[i] = new InputData

Далее, задаются исходные данные для каждого экземпляра вычислительного метода:
// делим количество строк а матрице на nc частей
int step = (Int32)(m / nc);
// заполняем массив параметров
int c = -1;
for (int i = 0; i < nc; i++)
{
    ClArr[i].start = c + 1;
    ClArr[i].stop = c + step;
    c = c + step;
}

Создаётся диспетчер с пулом из двух потоков:
Dispatcher d = new Dispatcher(nc, "Test Pool");
DispatcherQueue dq = new DispatcherQueue("Test Queue", d);

Описывается порт, в который каждый экземпляр метода Mul() отправляет сообщение после завершения вычислений:
Port<int> p = new Port<int>();

Метод Arbiter.Activate помещает в очередь диспетчера две задачи (два экземпляра метода Mul):
for (int i = 0; i < nc; i++)
    Arbiter.Activate(dq, new Task<InputData, Port<int>>(ClArr[i], p, Mul));

Первый параметр метода Arbiter.Activate – очередь диспетчера, который будет управлять выполнением задачи, второй параметр – запускаемая задача.
С помощью метода Arbiter.MultipleItemReceive запускается задача (приёмник), которая обрабатывает получение двух сообщений портом p:
Arbiter.Activate(Environment.TaskQueue, Arbiter.MultipleItemReceive(true, p, nc, 
    delegate(int[] array)
{
    Console.WriteLine("Вычисления завершены");
}));

Приёмник используется для определения момента окончания вычислений. Он сработает, только после того, как в порт p придёт два сообщения. В делегат, описанный в приёмнике, можно включить действия, которые должны быть выполнены после завершения процесса умножения.
Метод Mul() выполняет перемножение части матрицы на вектор:
void Mul(InputData data, Port<int> resp)
{
    Stopwatch sWatch = new Stopwatch();
    sWatch.Start();
    for (int i = data.start; i < data.stop; i++)
    {
        C[i] = 0;
        for (int j = 0; j < n; j++)
            C[i] += A[i, j] * B[j];
    }
    sWatch.Stop();
    Console.WriteLine("Поток № {0}: Параллельный алгоритм = {1} мс.", 
        Thread.CurrentThread.ManagedThreadId, sWatch.ElapsedMilliseconds.ToString());
    resp.Post(1);
}

Метод Mul() имеет два параметра:
1) индекс, хранящий значение элемента массива, который определяет параметры, передаваемые на вход метода;
2) порт завершения, в который отправляется целое число после завершения вычислений.
После завершения вычислений метод Mul отправляет в порт p целое значение (отправляемое значение может быть любым).
Результат вычислений показан на рис. 3.


Рис. 3. Результаты вычислений

Ссылка на исходник
Tags:
Hubs:
+8
Comments0

Articles

Change theme settings