Pull to refresh

Несколько примеров практического использования RxJava

Reading time 11 min
Views 32K
RxJava — это реализация ReactiveX для Java — библиотеки для асинхронной обработки потоков данных. Паттерн observable на стероидах, как они сами пишут. В интернете, в том числе на Хабре, есть много «введений в RxJava». Я хочу привести несколько примеров реальных задач. Они не очень сложные, но возможно кто-то увидит какие-то сходства со своими и задумается.

Собственно, задачи:

1. Простое клиентское TCP-соединение. Есть протокол поверх TCP/IP, нужно сформировать сообщение, подключиться к удаленному узлу, если еще не подключился, передать сообщение и прочитать ответ. Плюс обработка ошибок, проверка таймаутов, повтор отправки в случае неудачи. Жестких требований к производительности нет, трафик не большой.

2. Есть двигатель и некоторый датчик. Нужно произвести сканирование — пройтись двигателем по заданной траектории: послать двигатель к точке, дождаться, когда он к ней приедет, снять показания датчика, отобразить точку на графике (в GUI потоке), поехать к следующей точке…

3. Полученные после сканирования данные нужно обработать (условно длительный вычислительный процесс) и засунуть в pdf-отчет (условно длительный процесс ввода-вывода) вместе с изображением графика и данными введенными пользователем (GUI поток).

1. Простое клиентское TCP-соединение


Предположим, есть некоторый протокол обмена сообщениями. Сообщения могут содержать заголовок, контрольную сумму или что-нибудь еще. На каждое сообщение должен быть ответ от сервера. В простейшем виде решение может выглядеть как-то так:

public String send(String command) {
    try {
        if (!isConnected()) {
            connect();
        }
        byte[] bytes = command.getBytes();
        bytes = addHeader(bytes);
        sendBytes(bytes);
        return readAnswer();
    } catch (IOException e) {
        // паника
    }
}

Детали реализации я не описываю, но вкратце: connect() создает java.net.Socket и подключается к серверу, sendBytes() пишет в output-поток сокета, readAnswer() читает из input-потока сокета. Помимо addHeader() могут быть еще методы, добавляющие контрольную сумму, кодирование и прочее.

Проблемы этого кода: блокирующие запись/чтение и неудобная обработка ошибок — не понятно, что делать с исключением: то ли наверх пробрасывать, то ли тут что-то сделать (рекурсивно повторить отправку?). Как раз эти две проблемы и решает RxJava. Перепишем:

public Observable<String> send(String command) {
    return Observable.just(command)
            .doOnNext(cmd -> checkConnection())
            .map(cmd -> cmd.getBytes())
            .map(bytes -> addHeader(bytes))
            .map(bytes -> sendBytes(bytes))
            .map(result -> readAnswer());
}

Применение:

connection.send("echo 123")
            .subscribe(
                    answer -> { /*обработать ответ*/ },
                    throwable -> { /*обработать ошибку*/ }
            );

В общем-то, получилось то же самое, только в виде монады, набора операторов и с рядом нюансов.

Во-первых, метод sendBytes() теперь возвращает boolean. RxJava работает с потоками данных, а если кто-то возвращает void вместо данных, то потока как-бы уже и нет. Поэтому нужно либо добавить возвращаемый результат в метод (хотя бы return true), либо вместо map использовать doOnNext — этот оператор возвращает то же, что и получил.

Во-вторых, метод send() теперь возвращает Observable, а не сам String. Поэтому нужен отдельный обработчик ответа (или лямбда, как в примере). С исключением то же самое. Тут нужно, как говорится, начать мыслить асинхронно. Вместо самого результата, мы получаем объект, который потом когда-нибудь предоставит нам результат, а мы должны предоставить ему того, что этот результат получит. Вот только этот код все еще блокирующий, поэтому это асинхронное мышление не имеет смысла. Можно, правда, сделать обертку для String и вытащить результат из монады через замыкание этой обертки, но это уже грязные хаки, которые нарушают принципы функционального программирования.

Улучшим этот код. Начнем с обработки ошибок. RxJava отлавливает исключения, возникающие в операторах, и передает их подписчику. Второй аргумент метода subscribe() — это функциональный интерфейс Action1 — он как раз и отвечает за обработку исключения. Если какой-то из методов раньше мог кидать IOException или какое-то еще checked исключение, то теперь больше нельзя. Такие исключения нужно ловить руками и что-то с ними делать. Например, оборачивать в RuntimeException, чтобы предоставить дальнейшие решения RxJava. Но Action1 не сильно отличается от обычного try-catch подхода. У RxJava есть операторы для работы с ошибками: doOnError(), onErrorReturn(), onErrorResumeNext() и onExceptionResumeNext(). А еще есть банальный retry(), который тут как раз и нужен. Если возникла какая-то ошибка с подключением, то можно просто повторить отправку n-раз.

public Observable<String> send(String command) {
    return Observable.just(command)
            .doOnNext(cmd -> checkConnection())
            .map(cmd -> cmd.getBytes())
            .map(bytes -> addHeader(bytes))
            .map(bytes -> sendBytes(bytes))
            .map(result -> readAnswer())
            .doOnError(throwable -> disconnect())
            .retry(MAX_RETRY_COUNT);
    }

Обработчик исключения, передаваемый в subscribe() будет вызван только в том случае, если все повторы закончатся с ошибкой. Для надежности еще вызываем disconnect() перед повторной попыткой, чтобы закрыть и обнулить сокет. Иначе в checkConnection() внутри при вызове isConnected() можем получить ложно положительное срабатывание, и все повторные попытки опять приведут к ошибке. Например, если сервер убил подключение по таймауту, то метод Socket.isConnected() на стороне клиента все еще будет возвращать true — со стороны клиента сокет же подключен, все норм.

Можно еще добавить таймаут на случай, если серверу поплохело, и клиент заблокировался на записи в сокет:

public Observable<String> send(String command) {
    return Observable.just(command)
            .doOnNext(cmd -> checkConnection())
            .map(cmd -> cmd.getBytes())
            .map(bytes -> addHeader(bytes))
            .map(bytes -> sendBytes(bytes))
            .timeout(MAX_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS)
            .map(result -> readAnswer())
            .doOnError(throwable -> disconnect())
            .retry(MAX_RETRY_COUNT);
}

Оператор timeout кидает исключение, если в течение заданного времени от Observable не поступило ни одного элемента. А исключения мы уже умеем обрабатывать.

Теперь вторая проблема — у нас все еще блокирующие операции, поэтому если вызывать send() из потока GUI, то можно получить подвисания интерфейса. Нужно просто сказать RxJava, чтобы все эти действия выполнялись в другом потоке.

Для этого есть операторы observeOn() и subscribeOn(). У многих людей возникают проблемы с пониманием, чем отличаются эти операторы — есть куча статей на эту тему и вопросов на stackoverflow. Давайте вновь поднимем эту тему и вместе подумаем, что нам нужно сейчас использовать. Вот что пишут в официальной документации:

SubscribeOn — specify the Scheduler on which an Observable will operate.
ObserveOn — specify the Scheduler on which an observer will observe this Observable.

Observable — это тот, кто поставляет данные. Observer — это тот, кто получает данные и что-то с ними делает. Нам нужно, чтобы все выполнялось в другом потоке. Вернее, нам нужно, чтобы наш Observable поставлял данные изначально в другом потоке. А раз данные поставляются в другом потоке, то и все observer'ы будут их обрабатывать в другом потоке. Это по определению subscribeOn() — он определяет планировщика для Observable, которого мы создали в самом начале:

public Observable<String> send(String command) {
    return Observable.just(command)
            .doOnNext(cmd -> checkConnection())
            .map(cmd -> cmd.getBytes())
            .map(bytes -> addHeader(bytes))
            .map(bytes -> sendBytes(bytes))
            .timeout(MAX_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS)
            .map(result -> readAnswer())
            .doOnError(throwable -> disconnect())
            .retry(MAX_RETRY_COUNT)
            .subscribeOn(Schedulers.io());
    }

Теперь операторы будут выполняться в потоке, который им предоставит планировщик io. Если несколько раз подряд вызвать send(), не дожидаясь ответа, то могут возникнуть проблемы с синхронизацией. По-хорошему, функции переданные в операторы, должны быть чистыми (без побочных эффектов), но в случае с сокетом это проблематично. Чистые функции вообще не очень дружат с вводом/выводом. Нужно синхронизировать обращения к сокету или реализовать что-то типа ConnectionPool'а — тут нужно исходить из задачи.

Стоит иметь в виду, что тогда и обработка ответа подписчиком (он тоже observer) будет осуществляться в другом потоке, а это не всегда хорошо. Например, если мы хотим отобразить ответ в графическом интерфейсе, то скорее всего получим исключение, что мы это делаем не в главном потоке. Для этого нужно поместить обработчик в очередь событий фреймворка, отвечающего за графический интерфейс. В разных фреймворках это делается по-разному. В JavaFX для этого есть метод Platform.runLater(runnable). Можно вызывать его напрямую в обработчике ответа, а можно написать свой планировщик:

public final class FxScheduler extends Scheduler {

    private final static FxScheduler m_instance = new FxScheduler();

    private FxScheduler() {}

    public static FxScheduler getInstance() {
        return m_instance;
    }

    @Override
    public Worker createWorker() {

        return new Worker() {

            private final CompositeSubscription m_subscription = new CompositeSubscription();

            @Override
            public Subscription schedule(Action0 action0) {
                Platform.runLater(action0::call);
                return m_subscription;
            }

            @Override
            public Subscription schedule(Action0 action0, long delay, TimeUnit timeUnit) {
                Timer timer = new Timer();
                timer.schedule(new TimerTask() {
                    @Override
                    public void run() {
                        Platform.runLater(action0::call);
                    }
                }, timeUnit.toMillis(delay));
                return m_subscription;
            }

            @Override
            public void unsubscribe() {
                m_subscription.unsubscribe();
            }

            @Override
            public boolean isUnsubscribed() {
                return m_subscription.isUnsubscribed();
            }
        };
    }
}

К слову, для Android существует AndroidSchedulers.mainThread() в RxAndroid — дополнении для RxJava. Пример отправки команды тогда имеет вид:

send("echo 123")
        .observeOn(FxScheduler.getInstance())
        .subscribe(
                answer -> { /*обработать ответ*/ },
                throwable -> { /*обработать ошибку*/ }
        );

Здесь мы используем уже observeOn() — нам нужно сообщить RxJava, что «следующий observer должен выполняться через такой-то планировщик».

RxJava предоставляет удобное управление конвейером операторов. Рядом с .map(bytes -> sendBytes(bytes)) можно добавить расчет контрольной суммы, потом прогнать байты через кодирование. Можно добавить вначале логгирование исходящей команды, а в конце — полученного ответа. В общем, идею вы поняли.

2. Сканирование с помощью двигателя и датчика


Есть набор точек — это могут быть углы поворота двигателя в градусах или позиции устройства, которое приводится в движение двигателем. В общем, есть какой-то актуатор. А еще есть внешний датчик, с которого можно получать значения. Нужно проехаться двигателем по набору точек, в каждой точке получить значение с датчика, построить кривую на графике. Повторить процедуру n раз (n кривых на графике). При этом двигатель работает не мгновенно, нужно ждать, когда он выйдет на позицию.

Итак, мы имеем набор точек, для каждой нужно что-то сделать (желательно в другом потоке), а результат обрабатывается в GUI потоке (добавить точку на LineChart, например). Похоже на типичную задачу для RxJava.

public Observable<Point> startScan(List<Double> trajectory, int iterationCount) {
    return Observable.from(trajectory)
            .subscribeOn(Schedulers.io())
            .doOnNext(this::moveMotor)
            .doOnNext(this::blockUntilTargetReached)
            .map(this::createResultPoint)
            .repeat(iterationCount);
}

Используем Schedulers.io(): управление двигателем и датчиком — это все-таки операции ввода-вывода. moveMotor() посылает команду двигателю (через написанный ранее Connection, например).

blockUntilTargetReached() запрашивает у двигателя его позицию, сравнивает с целевой и усыпляет поток на сколько-то миллисекунд, если двигатель еще не доехал. createResultPoint() запрашивает у датчика значение в возвращает объект класса Point, содержащий пару чисел — целевую позицию и значение с датчика. repeat() работает почти как retry() — он повторяет весь поток с самого начала каждый раз, а retry() только после ошибки.

Исходный Observable будет выдавать точки по одной. Следующую точку он выдаст только когда предыдущая пройдет все операторы вплоть до подписчика. Это соответствует функциональному подходу с его ленивыми вычислениями и потоковой обработкой. Таким же образом работают StreamAPI и LINQ. За счет этого сканирование будет идти по точкам по очереди, а не forEach(this::moveMotor), затем forEach(this::blockUntilTargetReached) и так далее.

Применение:

final List<Double> trajectory = ...;
final int n = ...;
startScan(trajectory, n)
        .observeOn(FxScheduler.getInstance())
        .subscribe(
                point -> processPoint(point),
                throwable -> processError(throwable),
                () -> processData()
        );

Проблема в том, что подписчик не отличает на каком именно повторе была получена точка. То есть вместо n кривых, мы получим одну кривую в n раз длиннее. Нужно как-то вручную отслеживать, что началось новое сканирование. Например, считать количество точек и начинать новую кривую, если значение счетчика превысило количество точек в траектории. Или сравнивать пришедшую точку с первой точкой траектории.

В subscribe() появился третий аргумент — это обработчик onComplete(), который вызывается, когда в Observable закончились элементы.

subscribe() возвращает объект, имеющий интерфейс Subscription. Если вызвать у него метод unsubscribe(), то у Observable больше не будет подписчика, принимающего данные, поэтому он просто перестанет их выдавать. Принцип ленивых вычислений — если данные никому не нужны, то не нужно их передавать. Побочных эффектов у операторов все равно не должно быть в соответствии с парадигмой функционального программирования, поэтому просто выполнять операторы без подписчика у Observable смысла нет. С помощью unsubscribe() можно реализовать отмену сканирования. Разве что еще двигателю нужно послать команду на останов движения — за это unsubscribe() не отвечает.

3. Обработка данных и отчет


Мы получили после сканирования много полезных данных, теперь их нужно обработать, рассчитать нужные значения и сгенерировать pdf-отчет.

В отчете так же должны быть значения некоторых полей из интерфейса (например, ФИО пользователя) и рисунок полученных графиков. В случае JavaFX рисунок можно получить методом snapshot(), который есть у каждого графического объекта. Так как это действия с объектами JavaFX, выполняться они должны в GUI-потоке. Для этого у нас уже есть FxScheduler.

class ReportMetaInfo {
    private String fileName;
    private String name;
    private WritableImage image;
}
final Observable<ReportMetaInfo> reportGuiData = Observable.just(m_reportInfoProvider)
        .subscribeOn(FxScheduler.getInstance())
        .map(provider -> {
            final ReportMetaInfo info = new ReportMetaInfo();
            info.fileName = provider.getFileName();
            info.name = provider.getName();
            info.image = provider.getChartSnapshot();
            return info;
        });

m_reportInfoProvider — это реализация интерфейса ReportInfoProvider — прослойки между моделью и представлением. По сути это вызов геттеров из TextView, но модели все равно — у нее просто интерфейс.

Для расчетов есть Schedulers.computation().

final Observable<ScanResult> reportComputationalData = Observable.just(scanData)
        .subscribeOn(Schedulers.computation())
        .map(data -> new ResultProcessor(data).calculateAll());

Теперь мы хотим объединить данные из формы и данные из расчетов и поместить все это в тяжелый pdf-файл. Для этого есть оператор zip() и Schedulers.io():

class ReportData {
    ReportMetaInfo metaInfo;
    ScanResult result;

    ReportData(ReportMetaInfo metaInfo, ScanResult result) {
        this.metaInfo = metaInfo;
        this.result = result;
    }
}
Observable.zip(
        reportGuiData,
        reportComputationalData,
        (reportInfo, scanResult) -> new ReportData(reportInfo, scanResult)
)
        .observeOn(Schedulers.io())
        .map(reportData -> ReportGenerator.createPdf(
                reportData.metaInfo.fileName,
                reportData.metaInfo.name,
                reportData.metaInfo.image,
                reportData.result
        )).subscribe(
        isOk -> { /* здесь, в общем-то, делать нечего */ },
        throwable -> { /* что-то пошло не так */ },
        () -> { /* здесь мы окажемся, если все прошло успешно */ }
);

zip() принимает до девяти разных Observable и соединяет элементы из них в кортежи. Функцию для соединения нужно предоставить самому как и результирующий тип для кортежей. В итоге получение данных из интерфейса (включая изображение графика) и обработка результатов сканирования проходят параллельно. Нужно ли распараллеливание таких действий, зависит от конкретных задач и объемов данных — я привел несколько упрощенный пример.

Стоит иметь в виду, что когда у нас несколько потоков данных, может проявляться backpressure. Это различные проблемы связанные с разной производительностью потоков и разной производительностью Observable и Observer. В общем, это ситуации, когда кто-то простаивает, а у кого-то уже через буфер переливается. Так что нужно быть аккуратным.

Заключение


Скорее всего для этих задач есть другие решения (и более эффективные) — если кто-то мне их укажет, я с радостью приму это к сведению и учту в работе. На примере этих задач я постарался показать некоторые особенности RxJava: обработка ошибок, отличие subscribeOn() и observeOn(), кастомные планировщики и получение результата в GUI-потоке, принцип ленивых вычислений и его применение для управления внешними устройствами, прерывание работы Observable, параллельная работа нескольких Observable и их объединение. Так что даже если эти задачи не совсем удачные для RxJava, сами рассмотренные принципы могут быть полезны для других.
Tags:
Hubs:
+24
Comments 3
Comments Comments 3

Articles