RxJava. Убираем магию

Я долго боялся использовать RxJava в production. Её назначение и принцип работы оставались для меня загадкой. Чтение исходного кода не добавляло ясности, а статьи только путали. Под катом попытка ответить на вопросы: «Какие задачи эта технология решает лучше аналогов?» и «Как это работает?» с помощью аналогий с классической Java и простых метафор.

image

Применение


RxJava отлично заменяет Streams API из Java 8 на более ранних версиях Java. Так как Android Java 8 поддерживается далеко не с 4.0, Rx будет оптимальным решением. В статье RxJava рассматривается именно с этого ракурса, так как, по-моему, он наиболее понятный и по-настоящему реактивное приложение под Android с помощью чистой Rx реализовать сложно.

Emitter


Всем нам знаком паттерн Iterator.

interface Iterator<T> {
    T next();
    boolean hasNext();
}

За интерфейсом скрывается какой-нибудь источник данных, причём совершенно не важно, какой. Iterator полностью скрывает все детали реализации, предоставляя всего два метода:

next — получить следующий элемент
hasNext — узнать, есть ли ещё данные в источнике

У этого паттерна есть одна особенность: потребитель запрашивает данные и ждёт («зависает»), пока источник не выдаст их. Поэтому в качестве источника обычно выступает конечная, часто заранее сформированная коллекция.

Проведём небольшой рефакторинг.

interface Iterator<T> {
    T getNext();
    boolean isComplete();
}

Думаю, вы уже поняли, к чему я. Интерфейс Emitter из RxJava (для потребителей он дублируется в Observer (Subscriber в RxJava 1)):

interface Emitter<T> {
    void onNext(T value);
    void onComplete();
    void onError(Throwable error);
}

Он похож на Iterator, но работает в обратную сторону: источник сообщает потребителю о том, что появились новые данные.

Это позволяет разрешить все проблемы с многопоточностью на стороне источника и, например, если вы проектируете UI, то вы сможете рассчитывать на то, что весь код, отвечающий за графический интерфейс — последовательный. Невероятно удобно. Прощайте, каллбэки! Скучать не буду.

Аналогия с Iterator взята из [1]

Sources


Теперь немного о самих источниках. Они бывают множества типов: Observable, Single, Maybe… И все они похожи на капусту (и монады, но это не так важно).

image

Потому что создав один источник, можно заворачивать его в другой источник, который можно ещё раз завернуть в ещё один источник и так до OutOfMemory. (Но так как обычный источник весит меньше 100 байт, скорее, пока заряд не кончится.)

Давайте завернём в источник ответ на тот самый вопрос.

Observable.just(42)

Как мы знаем, получение ответа — довольно долгая операция. Поэтому завернём в источник, который выполнит вычисления в специальном потоке:

Observable.just(42)
                .subscribeOn(computation())

А ещё мы хотим, чтобы приложение не упало при ответе. Заворачиваем в источник, который вернёт ответ в главном потоке:

Observable.just(42)
                .subscribeOn(computation())
                .observeOn(mainThread())

И, наконец, запускаем:

Observable.just(42)
                .subscribeOn(computation())
                .observeOn(mainThread())
                .subscribe(new DisposableObserver<Integer>() {
                    @Override
                    public void onNext(Integer answer) {
                        System.out.print(answer);
                    }
                    @Override public void onComplete() {}
                    @Override public void onError(Throwable e) {}
                });

В консоль вывелся ответ, но что же произошло?

Метод subscribe определён в Observable. Он делает проверки и подготовку, а затем вызывает метод subscribeActual, который уже по-разному определён для разных источников.

В нашем случае метод subscribe вызвал метод subscribeActual у ObservableObserveOn, который вызывает метод subscribe завёрнутого в него источника, уточнив, в какой поток нужно вернуть результат.

В ObservableObserveOn лежит ObservableSubscribeOn. Его subscribeActual запускает subscribe завёрнутого в заданном потоке.

И, наконец, в ObservableSubscribeOn завёрнут ObservableJust, который просто выдаёт в onNext своё значение.

Естественно, просто с числом не интересно. Поэтому вот источник, который получает список товаров и узнаёт для них цены. Цены можно получать только по 20 штук (у InAppBilling API такое же ограничение).

github.com/a-dminator/rx-products-and-prices

Этот пример создан для демонстрации принципа работы, а не для использования в реальных проектах.

В RxJava огромное количество разных реализаций источников. Все они работают по одному принципу, а детали отлично описаны в документации. Поэтому не буду останавливаться на них.

Операции


Все операции над источниками делятся на 2 типа:

Не терминальные возвращают новый источник, который завернул исходный
Терминальные исполняют цепочку и получают данные (subscribe, map...)

И да, ничего не исполнится, пока не будет выполнена терминальная операция. Цепочка может сколько угодно лежать в памяти, не делая вообще ничего. И это хорошо, потому что если мы не получаем данные, то зачем их производить? (Ленивые вычисления без Haskell в комплекте!).

По аналогии со Streams API из [2]

Dispose (Unsubscribe в RxJava 1)


Исполнение цепочки можно прервать. Делается это вызовом dispose() у DisposableObserver (unsubscribe() у Subscriber в RxJava 1).

После этого RxJava прекратит исполнение цепочек, отпишет всех Observer'ов и вызовет iterrupt() у потоков, которые больше не нужны.

Так же можно узнать, не прервано ли исполнение из источников. Для этого у Emitter есть метод isDispose() (isUnsubscribe() для RxJava 1).

У этого есть логичная, но неприятная особенность: так как Observer отвечает за обработку ошибок, теперь все ошибки крашат приложение. Я пока не нашёл решения, о котором готов написать.

Заключение


RxJava:

— Позволяет легко компоновать запросы к сети, базе данных и т.д; организуя их асинхронное выполнение. Это означает, что ваши пользователи получат более быстрое и отзывчивое приложение.

— Не содержит в себе никакой магии. Только составление и исполнение цепочек источников.

— (Для меня) Решает больше проблем, чем создаёт!

Всем спасибо.

[1] Видео
[2] Видео
Метки:
Поделиться публикацией
Похожие публикации
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Реклама
Комментарии 35
  • +6
    Сколько можно мусолить про Rx?) Хватит глянуть одного видео про это и все http://xgrommx.github.io/rx-book/content/resources/video/index.html (первое по списку)
    • +2
      Смею с Вами не согласиться — RxJava ( а в особенности — вторая ветка ) является очень даже глубокой темой для обсуждения, другое дело, что таких триалов как у автора в сети, равно как и на хабре, тысячи, а вот хороших комплексных юз-кейсов в продашне как раз-таки не хватает.
    • +1
      Я долго боялся использовать RxJava в production


      Скажу честно, мы вот полгода как выпустили продукт, который 100% Kotlin и на «Реактивной тяге».

      Почему вы боитесь Rx??? Вы просто не умеете его готовить!!! Он вкусный, надо просто научиться.

      PS. Никогда. Повторяюсь Никогда, не используйте конструкции «Зачем мне использовать новую технологию, если я могу писать стандартными методами».
      • +1
        А еще он очень простой) Правда могут быть у новичков запарки с Subjects ну и с подогревом или охлаждением Observable) Как кстати Kotlin в бою?
        • 0

          Kotlin для Android хорош, я долго ждал выхода. До этого пробовал писать под Android на Scala и даже Groovy. В Scala убивало то что компиляция долгая и официальный джар нельзя использовать из-за размера, приходилось обходными путями. Ну а Groovy тоже мороки навалом. Порог вхождения в язык Kotlin почти нулевой, да и генерация из Java помогает очень. Впечатления только положительные, ждем версию 1.1.

          • 0
            ну у скалы рантайм под андроид очень большой и иногда прогвард не справляется. А вот у Kotlin есть еще прикольная штука Anko
            • 0

              Да Anko буквально сегодня для себя открыл, вечерком начну колупать на очередном проекте.

              • 0
                Начиная с 2.12 у скалы размеры сильно сократились.
                • 0

                  И минимальная версия джавы стала 8.

                • 0
                  Попробуйте Kotlin Android Extensions
                • –1
                  А вообще тут много чего) https://kotlin.link/
                • 0
                  И с Вами также смею не согласиться, rx далеко не простой инструмент, многие по нескольку лет осваивают его. Как минимум понимание проблемы упомянутых Ваши Subject-ов доходит не сразу до человека, начавшего свой «реактивный» путь
                  • 0
                    Котлин в бою превосходен. Код более удобочитаем и понятен. Ушли вечные проблемы Java с NPE. Скажу честно. Год как пишу на Kotlin, возвращаться не Java нет желания. Всем советую попробовать.

                    А у «Новичков» проблемы будут не только с Hot-Cold-Observable (Это лишь малая часть, которую легко выучить), а скорее с правильной архитектурой на RxJava
                  • 0
                    Никогда. Повторяюсь Никогда, не используйте конструкции «Зачем мне использовать новую технологию, если я могу писать стандартными методами».

                    Вы так говорите, будто это что-то плохое. Лучше, конечно, использовать в продакшене первую попавшуюся хайповую библиотечку и фреймворк. В крайности бросаться не надо, многие (да что там, большинство) приложения жили без Rx, живут и вполне себе проживут дальше. И странно их за это осуждать.

                    • 0
                      Скорее всего мы с вами просто о разных вещах подумали.
                      Допустим мы вошли в какой нибудь старый проект интерпрайза, где уже оверхед костылей и есть свой кодстайл. Естественно, внедрять в него новомодные штуки-дрюки не имеет смысла, так как вероятнее всего это приведет к новым костылям.
                      Однако, если мы начинаем новый проект и у нас есть свобода выбора технического стека, то почему бы сразу не начать строить проект с использованием тех же самых Di, Rx, DataBinding?

                      Я скорее имел ввиду, что если мы начинаем как раз новый проект, то имеет смысл пробовать новые технологии, о которых все говорят.

                      • +2

                        Di — это вообще не о библиотеках, а принцип разработки inversion of control. И реализовать этот принцип вполне можно без даггера того же. Rx — опять же вкусовщина. Если кто-то выберет lightweight stream api + bolts, или еще что-то типа async job queue, я его осуждать не буду. Data binding — интересная штука, но в ограниченных случаях, на полноценное mvvm в андроиде не легло у нас.

                    • 0
                      Не то чтобы я спорил с вами, но вы не раскрыли чем обосновано ваше последнее утверждение.
                      • 0
                        Всё же, не зря боялся. На тестировании всплыло, например, что на некоторых устройствах приложение падает при ∽50 одновременных соединениях. Или то, что где-то InAppBilling API не цепляется к серверу, не выдавая никакой ошибки (даже timeout). А теперь на нём завязаны некоторые запросы.

                        Осторожность связна не столько с самим Rx, как бы он ни был хорош, сколько с совместимостью со всеми остальными технологиями, которые мы, порой вынужденно, используем
                      • +4

                        Здесь вы ошибаетесь, проводя параллели со streams api. Не нужно представлять rxJava как библиотеку для java <v8. Между streams api и реактивными расширениями есть одна существенная разница, которая заключается в том что streams по сути построены на pull а observable построены на push. Результат таков что на streams можно "подписаться" только один раз а у observable может быть сколько угодно подписчиков.


                        По мне так главная мешанина с rxJava начинается с того что люди мешают многопоточный и асинхронные подходы. Просто в Java из за отсутствия машины состояний(state machine) асинхронизм построен поверх многопоточных библиотек.


                        В Kotlin 1.1, кстати, обещают ко-рутины построенные поверх машины состояний(state machine).

                        • 0

                          А зачем вы дублируете машину состояний (state machine)? И что это значит?


                          Просто в Java из за отсутствия машины состояний(state machine) асинхронизм построен поверх многопоточных библиотек.
                          • 0

                            Я был не совсем уверен в переводе на русский язык как машина состояний. Проверил в википедии, называется абстрактный автомат. Ну после вашего комментария, мой уже не поправить.

                          • 0
                            Согласен, что Rx не заканчивается на замене streams, но применение этого, скорее, про сервера и Scala. Я пока не видел кейсов под Android, в которых долгоиграющий Observable или Flowable приносит больше пользы, чем проблем из-за сложного жизненного цикла UI-компонентов (потребителей данных), который нужно обслуживать. Буду рад, если вы приведёте пример
                            • 0

                              Я например был вдохновлен вот этим примером от команды Яндекса.

                              • 0
                                Спасибо, действительно интересный кейс
                              • 0
                                Пример — пагинация списка, здесь можно посмотреть мою реализацию с использованием RxJava, а также еще пару кейсов использования реактивного расширения в Android: https://gitlab.com/i.komarov/my-feed
                                • 0
                                  Спасибо за пример.

                                  Смущает использование Flowable, он может упасть из-за размера буфера и имеет больший оверхед, чем Observable. Так же не понимаю, зачем нужен synchoronized в BindableAdapter. Он тоже создаёт оверхед и не имеет смысла. RxJava позволяет писать код без синхронизаций.
                            • +1
                              А как в RxJava решается проблема, когда запрос к сети завершился, когда Activity было свернуто? В Rx же так же в onResume идет подписка и в onPause отписка? И как сделать так чтобы не было повторного запроса, если он еще выполняется, при пересоздании Activity (речь не про поворот экрана, а про то что Activity можно закрыть кнопкой назад, а потом вернуться обратно).
                              • 0

                                Можно по разному. Вот, например, не идеальный вариант, но вполне рабочий. В синглтоне все операции идут, а фрагмент/активити лишь подписываются для получения последней полученной информации:


                                Используем RxJava и Retrofit на Android, учитывая поворот экрана

                                • 0
                                  Дорогой мой друг, это решается построением человеческой архитектуры (допустим MVP) где при паузе происходит дроп View. Таким образом даже если Subscription завершит свою работу мы не получит ответ в пустой вью. Ну или отписку можно делать.

                                  Это нужно для того, что если пользователь сворачивает активити, некоторые процессы должны продолжаться. Для повторной переподписки на presenter.

                                  Извиняюсь за офтоп. Ушел чутка от темы)
                                  • 0
                                    Тут больше вопрос не к Rx, а к используемому шаблону проектирования.
                                    Используя MVP, вы делаете любую реализацию какого-нить «кеша» в презентере. А потом просто опрашиваете презентер, если данные есть, берете готовые, если нет загружаете новые. Если загрузка в процессе, то просто ждете конца.
                                    В Rx же так же в onResume идет подписка и в onPause отписка?

                                    Нет, Rx не привязан к контексту активити.
                                    • 0
                                      Тут bind-unbind самого View имел ввиду
                                    • 0
                                      Можно использовать паттерн MVP и делать запросы в presenter. Ну а чтобы повторно запрос не выполнялся, можно хранить состояние представления и перерисовывать при необходимости. Например как в moxy.
                                    • 0
                                      Observable.just(42)
                                                      .subscribeOn(computation())
                                      

                                      Если не ошибаюсь с оператором just subscribeOn бесполезен. Код все равно выполнится на вызывающем потоке. Для того чтобы все работало как Вы хотите необходимо, например, еще обернуть в defer
                                      • 0
                                        Да, верно, вычисление самого числа произойдёт в основном потоке, но исполнение ObservableJust — в потоке для вычислений. Здесь я использовал just просто чтобы не усложнять пример, а уже на github выложил более сложный кейс
                                      • 0
                                        Колупаю rx2, после использования rx я в затруднении :), прочитал про отсутствие backpressure в Observable и наличие в Flowable. При этом в пояснениях указано, что если вы имеете дело с генерацией событий в размере «около 1000», то используйте Obsrvable, в противном случае Flowable. Я так понимаю, что если Observable генерит события быстрее чем может обработать Subscriber, то он рухнет с исключением. В данном случае просто предполагается, что я в обязательном порядке реализую логику кэширования самостоятельно или я что то не понимаю в использовании Observable в rx2?

                                        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.