Pull to refresh
0

Передаем файлы на Java с помощью ØMQ и JZMQ

Reading time 11 min
Views 14K
Приветствую, мы являемся небольшой компанией единомышленников которые разрабатывает продукт предназначенный для управления данными, вне зависимости от их формата и метода хранения — ArkStore, в нашем блоге мы попытаемся поделится опытом, который мы накопили, в ходе, его уже почти двухлетней разработки. Первую статью я решил посветить IO слою и продукту под названием ØMQ (или ZeroMQ). Я попытаюсь рассказать как начать пользоваться ØMQ и как с его помощью можно передавать достаточно большой объем данных.

Почему ØMQ

В ходе разработки нашего продукта перед нами стояла задача — «как обеспечить надежную передачу большего объема данных?», при этом желательно что бы весь IO слой был полностью асинхронным, не поедал большое количество памяти и был достаточно простым. Изначально, так как вся наша архитектура была построена с использованием Akka, мы использовали Spray IO (или Akka IO). Но столкнулись с рядом проблем для которых не было адекватного решения, например обнаруженный мной баг заставлял нас создавать дополнительные Heartbeat сообщения или передавать большое количество служебной информации.

В итоге мы решили посмотреть в сторону брокеров сообщений. ActiveMQ, RabbitMQ и ØMQ. В принципе все брокеры решали поставленную перед нами задачу, но мы остановились на ØMQ. ActiveMQ показался слишком тяжеловесным, а RabbitMQ вносил мастер ноду в изначально распределенную архитектуру (без явного лидера).

ØMQ поддерживает три основных шаблона передачи данных, это:
  • Request-Reply — самый простой распространенный шаблон, мы отправили запрос на сервер и получили ответ. Классическая клиент-серверная модель.
  • Publish-Subscribe — в данном случае, сервер (publisher) периодически публикует информацию для своих подписчиков. Примером может служить, датчик, который постоянно публикует данные измерений для всех заинтересованных в них клиентов.
  • Push-Pull — или Parallel Pipeline, позволяет нам проводить параллельные вычисления. Сервер равномерно рассылает сообщения содержащие работу (Push), машинам, которые могут произвести вычисления (workers), машины забирают эти сообщения (Pull), производят вычисления и отдают результаты клиенту (Push), заинтересованному в результатах вычислений (Pull). Картинка

Более подробно про эти шаблоны и ØMQ можно узнать из замечательного руководства, доступного официальном сайте.

В рамках данной статьи мы рассмотрим только первый шаблон (Request-Reply).

Собираем ØMQ и JZMQ

Перед тем как мы перейдем к коду, нам необходимо собрать сам ØMQ и библиотеки привязки (Java Bindings).

Linux

Для CentOS процесс будет выглядеть следующим образом (должен слабо отличатся для других *nix ОС).

Убедимся что у нас стоят все необходимые библиотеки:
yum install libtool autoconf automake gcc-c++ make e2fsprogs

Заберем и разархивируем последнюю стабильную версию библиотеки ØMQ с сайта разработчика (на момент написания 3.2.4).
wget http://download.zeromq.org/zeromq-3.2.4.tar.gz
tar -xzvf zeromq-3.2.4.tar.gz
cd zeromq-3.2.4

Собираем и устанавливаем ØMQ, библиотеки попадут в директорию /usr/local/lib, это понадобится нам в будущем.
./configure
make
sudo make install
sudo ldconfig

После того как мы собрали ØMQ нам необходимо собрать JZMQ. Для этого выкачиваем последнюю версию с GIT репозитория (master или tag, последний tag на момент написания — 2.2.2).
wget https://github.com/zeromq/jzmq/archive/v2.2.2.zip
unzip jzmq-2.2.2.zip
cd jzmq-2.2.2
./autogen.sh
./configure
make
sudo make install
sudo ldconfig

Библиотеки так же попадут в директорию /usr/lib/local. Почему это важно? Дело в том, что для того что бы использовать нативные библиотеки, Java должна знать где мы можем их найти, для этого при запуске программы мы должны указать параметр java.library.path. Для этого есть несколько способов, мы можем указать его в момент запуска приложения -Djava.library.path="/usr/lib/local", или установить его прямо во время работы программы. Мы так же можем использовать значение java.library.path, которое установлено по умолчанию. Что бы узнать, какие значение установлены по умолчанию, нужно выполнить следующую команду:

java -XshowSettings:properties

В моем случае это:
java.library.path = /usr/java/packages/lib/amd64
        /usr/lib64
        /lib64
        /lib
        /usr/lib

Для того что бы Java смогла найти нативные библиотеки нам достаточно перенести наши библиотеки по одному из данных адресов или залинковать их. Какой подход выбрать, выбирать лично вам.

Что бы узнать куда были установлены библиотеки после make install, достаточно выполнить команды:
whereis libzmq
whereis libjzmq

Windows

Собранные dll файлы для libzmq можно скачать с официального сайта, а тут можно найти руководство по сбору JZMQ под Windows. К сожалению, собрать библиотеки с помощью CMAKE у меня не получилось, пришлось собирать libzmq и jzmq с помощью Visual Studio 2013. При этом, важно что бы сами библиотеки были собраны под архитектуру соответствующую вашей JVM (32 или 64 битная),

Если libzmq.dll и jzmq.dll добавлены в PATH, то JVM должна их найти автоматически.

Программа

Ух, мы наконец смогли поставить и настроить ØMQ и JZMQ на нашем компьютере! Настало время пустить его в дело. В качестве примера, мы попробуем с помощью реализовать протокол передачи файлов описанный в руководстве и немного улучшить его.

Для начала опишем требования для нашего протокола:
  • Протокол должен передавать данные асинхронно, не дожидаясь каждый раз ответа от клиента о получении данных.
  • Сервер должен уметь работать с большим количеством клиентов (уметь идентифицировать клиента)
  • При этом протокол должен аккуратно обращаться с памятью сервера, не держа большое количество данных в памяти, это важно как для клиента так и для сервера, например если клиент будет обрабатывать данные намного медленней чем их получает.
  • Протокол должен поддерживать отмену передачи данных.
  • Дать возможность клиенту выбирать размер кусков данных (chunks) которые он принимает.
  • Поддерживать рестарт передачи данных (например если произошла ошибка и мы хотим возобновить передачу данных с определенного места, не передавая весь файл заново).

Подготовим один гигабайт «гарантированно случайных»™ данных.
dd if=/dev/urandom of=testdata bs=1M count=1024

Замерим сколько времени ОС требуется для копирования данных. Данные цифры довольно приблизительны, но хотя бы мы будем иметь какую то точку сравнения.
echo 3 > /proc/sys/vm/drop_caches
time cp testdata testdata2
real    0m7.745s
user    0m0.011s
sys     0m1.223s

Приступим к коду. Клиент:
package com.coldsnipe.example;

import org.zeromq.ZFrame;
import org.zeromq.ZMQ;

public class ZMQClient implements Runnable {
    // Сколько сообщений мы отправим в "кредит"
    private static final int PIPELINE = 10;

    // Размер 1го куска данных, 250кб.
    private static final int CHUNK_SIZE = 250000;

    private final ZMQ.Context zmqContext;

    public ZMQClient(ZMQ.Context zmqContext) {
        this.zmqContext = zmqContext;
    }

    @Override
    public void run() {
        // Создаем ZMQ.Socket, в данном случае это DEALER socket
        try (ZMQ.Socket socket = zmqContext.socket(ZMQ.DEALER)) {

            // В случае если мы закрываем сокет но у нас есть сообщения
            // в очереди, мы ждем 1 секунды до того как скинем (drop) эти сообщения
            socket.setLinger(1000);

            // Соединяемся с сервером используя TCP протокол
            socket.connect("tcp://127.0.0.1:6000");

            // Текущий кредит, сколько Chunks мы еще можем принять
            int credit = PIPELINE;

            // Количество переданных байт
            long total = 0;
            // Количество переданных кусков (Chunks)
            int chunks = 0;

            // Наш отступ от начала файла
            long offset = 0;

            while (!Thread.currentThread().isInterrupted()) {

                // Пока у нас есть "кредит", отправляем запросы на получение
                // кусков данных
                while (credit > 0) {
                    socket.sendMore("fetch");
                    socket.sendMore(Long.toString(offset));
                    socket.send(Integer.toString(CHUNK_SIZE));

                    offset += CHUNK_SIZE;
                    credit--;
                }

                // Получаем фрейм данных от нашего сокета
                ZFrame zFrame = ZFrame.recvFrame(socket);

                // Если фрейм пустой, прерываем цикл
                if (zFrame == null) {
                    break;
                }

                chunks++;
                credit++;
                int size = zFrame.getData().length;
                total += size;
                zFrame.destroy();

                // Если размер куска меньше чем размер по умолчанию
                // значит мы получили последний кусок данных
                if (size < CHUNK_SIZE) {
                    break;
                }
            }

            System.out.format("%d chunks received, %d bytes %n", chunks, total);
        }
    }
}

Клиент подключается к серверу и отправляет запросы на получение данных. Каждое сообщение состоит из нескольких частей.
  1. Команда — то что мы хотим от сервера, в данном случае команда всего одна, fetch — получить данные.
  2. Параметры команды (если присутствуют) — в случае с fetch это отступ от начала файла и размер куска данных.

Клиент отправляет эти команды в «кредит», это значит что клиент отправит столько fetch команд, сколько у него осталось «кредита». Кредит увеличивается только в том случае, если клиент успешно обработал полученные данные. В примере клиент ничего не делает с данными, но мы можем добавить обработчик для сохранения данных или имитировать работу с помощью sleep, даже если клиент будет очень медленно обрабатывать данные, в его очереди не будет более десяти кусков данных, по 250кб каждый. Таким образом, клиент не будет простаивать в ожидании новых данных от сервера.

package com.coldsnipe.example;

import org.zeromq.ZFrame;
import org.zeromq.ZMQ;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;

public class ZMQServer implements Runnable {
    private final ZMQ.Context zmqContext;

    // Путь до файла который мы будем отдавать клиенту
    private final Path filePath;

    public ZMQServer(ZMQ.Context zmqContext, Path filePath) {
        this.zmqContext = zmqContext;
        this.filePath = filePath;
    }

    @Override
    public void run() {
        try {
            File file = filePath.toFile();
            if (!file.exists()) {
                throw new RuntimeException("File does not exists: " + filePath);
            }
            // Открываем файл и сокет, Router сокет знает identity клиента,
            // таким образом мы можем различать клиенты между собой
            try (FileChannel fileChannel = FileChannel.open(filePath)) {
                try (ZMQ.Socket socket = zmqContext.socket(ZMQ.ROUTER)) {

                    // Пытаемся поднять наш сервер на localhost на 6000 порту
                    socket.bind("tcp://*:6000");
                    socket.setLinger(1000);

                    while (!Thread.currentThread().isInterrupted()) {

                        // Первый фрейм - всегда identity frame
                        ZFrame identity = ZFrame.recvFrame(socket);
                        assert identity != null;

                        // Дальше идет команда
                        String command = socket.recvStr();
                        if (command.equals("fetch")) {
                            // В случае если это fetch, мы получаем offset и желаемый размер куска
                            String offsetString = socket.recvStr();
                            long offset = Long.parseLong(offsetString);

                            String chunkSizeString = socket.recvStr();
                            int chunkSize = Integer.parseInt(chunkSizeString);

                            int currentChunkSize = chunkSize;

                            // Проверяем если offset + размер куска не больше нашего файла
                            // в случае если это так, мы уменьшаем размер куска от offset до конца файла
                            if (file.length() < (offset + chunkSize)) {
                                currentChunkSize = (int) (file.length() - offset);
                            }
                            if (currentChunkSize > 0) {
                                ByteBuffer byteBuffer = ByteBuffer.allocate(currentChunkSize);
                                fileChannel.read(byteBuffer, offset);
                                byteBuffer.flip();
                                byte[] bytes = new byte[currentChunkSize];
                                byteBuffer.get(bytes);

                                // Подготовляем фрейм с данными
                                ZFrame frameToSend = new ZFrame(bytes);

                                // Первым фреймом в ответе всегда должен быть identity
                                // клиента, которому мы отсылаем данные
                                identity.send(socket, ZFrame.MORE);
                                frameToSend.send(socket, 0);
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

Наш сервер умеет передавать всего один файл (ссылку на который он получил при старте) и отвечать всего на одну команду — fetch. Он умеет различать клиентов, но клиенты могут получить только один единственный файл. Как это улучшить я напишу чуть ниже, а пока тест и результаты измерений.

package com.coldsnipe.example;

import org.junit.Test;
import org.zeromq.ZMQ;

import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;

public class ZMQExampleTest {
    @Test
    public void testDataExchange() throws Exception {
        // Создаем ZMQ.Context, нам необходим всего 1! контекст на всю нашу программу
        // в скобках указывается количество потоков которое будет отведено контексту
        ZMQ.Context zmqContext = ZMQ.context(1);

        // Находим наши тестовые данные
        final URL fileUrl = ZMQExampleTest.class.getClassLoader().getResource("testdata");
        assert fileUrl != null;
        Path filePath = Paths.get(fileUrl.toURI());

        // Замеряем время и стартуем наш клиент не сервер, порядок старта не важен
        // клиент может быть запущен раньше сервера
        long startTime = System.nanoTime();
        Thread clientThread = new Thread(new ZMQClient(zmqContext));
        Thread serverThread = new Thread(new ZMQServer(zmqContext, filePath));
        clientThread.start();
        serverThread.start();
        clientThread.join();
        long estimatedTime = System.nanoTime() - startTime;
        float timeForTest = estimatedTime / 1000000000F;

        System.out.format("Elapsed time: %fs%n", timeForTest);

        // Не забываем остановить контекст, данная операция является блокирующей
        // и не будет завершена если у нас есть открытые сокеты.
        zmqContext.term();
    }
}

Наш тест запускает ZMQ.Context, стартует клиент и сервер, и замеряет время необходимо для передачи данных. О контексте хочется сказать отдельно. Именно контекст является скрытым дирижером который управляет нашими сокетами внутри процесса и решает как и когда отправить данные. Поэтому отсюда вытекает простое правило — один контекст на один процесс.

Запустим тест и посмотрим на результат.
4295 chunks received, 1073741824 bytes 
Elapsed time: 1.429522s

Чтение одного гигабайта заняло 1.42 секунды. Мне тяжело сказать насколько хороший это показатель, но в сравнении с тем же Spray IO, ØMQ отрабатывает на 30-40% быстрей, при этом нагрузка на IO близка к 100% (у Spray 85-90), нагрузка на CPU ниже почти на треть.

Улучшаем протокол

Пока наш протокол знает всего одну команду, её достаточно для теста, но в реальных условиях мы хотим что бы наш сервер мог передавать множество разных файлов и предоставлял служебную информацию для клиента. Для этого введем две новых команды:
  • get — сообщение о том что клиент хочет получить данные, должен содержать какие данные мы хотим получить
  • end — сообщение посылаемое после того как клиент закончил принимать данные, свидетельствует о том что сервер может освободить ресурсы связанные с передаваемым файлом.

Обработчики сообщений в данном случае могут выглядеть следующим образом
else if(command.equals("get")) {
    // ID данных
    String id = socket.recvStr();

    // Проверяем запрашивал ли данный клиент заголовок
    BigInteger identityString = new BigInteger(identity.getData());
    DataBlob dataBlob = blobMap.get(identityString);
    if (dataBlob!= null){
        // В случае если запрос уже был, освобождаем текущий DataBlob
        dataBlob.closeSource();
        blobMap.remove(identityString);
    }

    // Получаем данные из хранилища
    DataBlob dataBlob = dataProvider.getBlob(id);

    if (dataBlob == null) {
        log.error("Received wrong get call on server socket, ID  [{}]", id);
    } else {
        // Создаем новый заголовок, который затем енкодим и отправляем его клиенту
        DataHeader dataHeader = new DataHeader(id, dataBlob.getSize());
        byte[] bytesToSend = FrameHeaderEncoder.encode(dataHeader);
        ZFrame frameToSend = new ZFrame(bytesToSend);

        // Кладем полученный DataBlob в коллекцию
        blobMap.put(new BigInteger(identity.getData()), dataBlob);

        // Отправляем заголовок клиенту
        identity.send(socket, ZFrame.MORE);
        frameToSend.send(socket, 0);
    }
} else if (command.equals("end")){
    BigInteger identityString = new BigInteger(identity.getData());

    // Удаляем DataBlob и освобождаем данные
    DataBlob dataBlob = blobMap.remove(identityString);
    if (dataBlob != null) {
        dataBlob.closeSource();
    }
}

Для того что бы мы могли работать с множеством файлов, мы дополнили сервер новыми объектами.
public interface DataProvider {
    public DataBlob getBlob(String dataId);
}

public abstract class DataBlob {
    private final long size;
    private final String dataId;

    protected DataBlob(long size, String dataId) {
        this.size = size;
        this.dataId= dataId;
    }
    public abstract byte[] getData(long position, int length);

    public abstract void closeSource();

    public long getSize() {
        return size;
    }

    public String getDataId() {
        return getDataId;
    }
}

Класс реализующий DataProvider управляет получением данных, метод getBlob возвращает нам новый DataBlob который по сути является ссылкой на ресурс.

Реализация DataBlob для файла может выглядит следующим образом:
public class FileDataBlob extends DataBlob {
    private final FileChannel fileChannel;

    public FileDataBlob(long size, String dataId, Path filePath) {
        super(size, dataId);

        try {
            this.fileChannel = FileChannel.open(filePath);
        } catch (IOException e) {
            throw new DataBlobException(e);
        }
    }

    @Override
    public byte[] getData(long position, int length) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(length);
        try {
            fileChannel.read(byteBuffer, position);
            byteBuffer.flip();
            byte[] bytes = new byte[length];
            byteBuffer.get(bytes);

            return bytes;
        } catch (IOException e) {
            throw new DataBlobException(e);
        }
    }

    @Override
    public void closeSource() {
        try {
            fileChannel.close();
        } catch (IOException e) {
            throw new DataBlobException(e);
        }
    }
}

Добавив эти два метода мы позволяем выбирать какие данные хочет получить клиент и информируем клиента о размере запрошенных данных. Мы можем и далее улучшать протокол, например добавить возможность пересылки множества файлов подряд, добавить Heartbeat для определения мертвых клиентов и освобождения ресурсов и т.д.

Заключение

В ходе данной статьи я хотел показать каким образом мы можем использовать ØMQ в Java. В данный момент в нашем проекте именно ØMQ является основным брокером сообщений, не только для файлов но и для метаданных, показывая довольно хорошие результаты (проблем, связанных именно с ним, пока не наблюдалось).

В следующих статьях я попытаюсь рассказать о других технологиях используемых в ArkStore, пока на очереди Akka и Semantic Web. Спасибо за внимание, надеюсь прочитанные было хоть кому то полезно!
Tags:
Hubs:
+5
Comments 4
Comments Comments 4

Articles

Information

Website
coldsnipe.com
Registered
Founded
Employees
2–10 employees
Location
Россия