Rx. Постигаем retryWhen и repeatWhen на примерах из Android разработки

В сети очень много русско- и англоязычных статей по Rx операторам retryWhen и repeatWhen.
Несмотря на это, очень часто встречаю нежелание их использовать (ввиду сложного синтаксиса и непонятных диаграмм).

Приведу несколько примеров как можно с их помощью эффективно перезапускать участки цепи и делегировать обработку перезапусков при ошибках и завершениях потока.

В примерах будет Java код с лямбдами (Retrolamda), но переписать его на Kotlin или чистую Java не составит труда.

Императивный способ перезапуска цепи


Предположим, мы используем Retrofit и загрузку начинаем в методе load(). Repository.getSomething() возвращает Single<Something>().

@NonNull
private Subscription loadingSubscription = Subscriptions.unsubscribed();

private void load() {
    subscription.unsubscribe();
    subscription = repository
             .getSomething()
             .subscribe(result -> {}, err -> {});
}

private void update() {
    load();
}

Из какого-нибудь листенера обновлений (e.g. PullToRefreshView) мы вызываем метод update(), который, в свою очередь, вызовет метод load(), где с нуля будет создана подписка.

Предлагаю ко вниманию вариант использования более реактивного, на мой взгляд, способа с вышеупомянутым оператором repeatWhen().

Реактивный способ перезапуска цепи — repeatWhen


Создадим объект PublishSubject updateSubject и передадим в оператор лямбду
repeatHandler -> repeatHandler.flatMap(nothing -> updateSubject.asObservable())

@NonNull
private final PublishSubject<Void> updateSubject = PublishSubject.create();

private void load() {
    repository
            .getSomething()
            .repeatWhen(repeatHandler ->
                                repeatHandler.flatMap(nothing -> updateSubject.asObservable()))
            .subscribe(result -> {}, err -> {});
}

Теперь для обновления загруженных данных нужно заэмитить null в updateSubject.

private void update() {
    updateSubject.onNext(null);
}

Нужно помнить, что работает такой реактивный способ только с Single, который вызывает onComplete() сразу после эмита единственного элемента (будет работать и с Observable, но только после завершения потока).

Реактивный способ обработки ошибок retryWhen


Подобным образом можно обрабатывать и ошибки. Предположим, у пользователя пропала сеть, что приведет к ошибке и вызову onError() внутри Single, который возвращается методом getUser().

В этот момент можно показать пользователю диалог с текстом «Проверьте соединение», а по нажатию кнопки OK вызвать метод retry().

@NonNull
private final PublishSubject<Void> retrySubject = PublishSubject.create();

private void load() {
    repository
            .getSomething()
            .doOnError(err -> showConnectionDialog())
            .retryWhen(retryHandler -> retryHandler.flatMap(nothing -> retrySubject.asObservable()))
            .subscribe(result -> {}, err -> {});
}

private void retry() {
    retrySubject.onNext(null);
}

По вызову retrySubject.onNext(null) вся цепочка выше retryWhen() переподпишется к источнику getUser(), и повторит запрос.

При таком подходе важно помнить, что doOnError() должен находиться выше в цепочке, чем retryWhen(), поскольку последний «поглощает» ошибки до эмита repeatHandler'а.

В данном конкретном случае выигрыша по производительности не будет, а кода стало даже чуть больше, но эти примеры помогут начать мыслить реактивными паттернами.

В следующем, бессовестно притянутом за уши, примере, в методе load() мы объединяем два источника оператором combineLatest.

Первый источник — repository.getSomething() загружает что-то из сети, второй, localStorage.fetchSomethingReallyHuge(), загружает что-то тяжелое из локального хранилища.

public void load() {
    Observable.combineLatest(repository.getSomething(),
                             localStorage.fetchSomethingReallyHuge(),
                             (something, hugeObject) -> new Stuff(something, hugeObject))
            .subscribe(stuff -> {}, err -> {});
}

При обработке ошибки императивным способом, вызывая load() на каждую ошибку, мы будем заново подписываться на оба источника, что, в данном примере, абсолютно ненужно. При сетевой ошибке, второй источник успешно заэмитит данные, ошибка произойдет только в первом. В этом случае императивный способ будет еще и медленней.

Посмотрим, как будет выглядеть реактивный способ.


public void load() {
    Observable.combineLatest(
            repository.getSomething()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               err -> retrySubject.asObservable())),
            localStorage.fetchSomethingReallyHuge()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               nothing -> retrySubject.asObservable())),
            (something, hugeObject) -> new Stuff(something, hugeObject))
            .subscribe(stuff -> {}, err -> {});
}

Прелесть такого подхода в том, что лямбда, переданная в оператор retryWhen() исполняется только после ошибки внутри источника, соответственно, если «ошибется» только один из источников, то и переподписка произойдет только на него, а оставшаяся цепочка ниже будет ожидать переисполнения.

А если ошибка произойдет внутри обоих источников, то один и тот же retryHandler сработает в двух местах.

Делегирование обработки ошибок


Следующим шагом можно делегировать обработку повторов некоему RetryManager. Перед этим еще можно немного подготовиться к переезду на Rx2 и убрать из наших потоков null объекты, которые запрещены в Rx2. Для этого можно создать класс:

public class RetryEvent {
}

Без ничего. Позже туда можно будет добавлять разные флаги, но это другая история. Интерфейс RetryManager может выглядеть как-то так:

interface RetryManager {

    Observable<RetryEvent> observeRetries(@NonNull Throwable error);

}

Реализация может проверять ошибки, показывать диалоги, снэкбар, устанавливать бесшумный таймаут — всё, что душе угодно. И слушать коллбэки от всех этих UI компонентов, чтобы в последствии заэмитить RetryEvent в наш retryHandler.

Предыдущий пример с использованием такого RetryManager будет выглядеть вот так:

//pass this through constructor, DI or use singleton (but please don't)
private final RetryManager retryManager;

public void load() {
    Observable.combineLatest(
            repository.getSomething()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               err -> retryManager.observeRetries())),
            localStorage.fetchSomethingReallyHuge()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               nothing -> retryManager.observeRetries())),
            (something, hugeObject) -> new Stuff(something, hugeObject))
            .subscribe(stuff -> {}, err -> {});
}


Таким нехитрым образом обработка повторов при ошибках делегирована сторонней сущности, которую можно передавать как зависимость.

Надеюсь, эти примеры окажутся кому-то полезны и соблазнят попробовать repeatWhen() и retryWhen() в своих проектах.
Метки:
Поделиться публикацией
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Реклама
Комментарии 12
  • 0
    Какую литературу/статьи можно почитать, чтобы также познать «реактивный подход»? )
    Во многих случаях мое использование Rx скатывается в императивный подход и при чтении подобных статей у меня возникает вопрос: «А что и тут так надо?»
    • +1
      Сложно посоветовать какую-то литературу, потому-что большинство материалов, вроде «RxJava for Android App Development» — K. Matt Dupree, покрывает только самый базовый уровень (хотя именно эту 30 страничную брошюру стоит пробежать, там популярная реактивная обёртка на SearchView продемонстрирована).
      Самым эффективным способом будет идти по официальной документации, брать каждый оператор и гуглить отдельно по нему небольшие статьи: «rx debounce usage android», «rx compose transformers usage android».
      Очень много годноты находится на хабре и медиуме.
      Позже, когда документация, диаграммы из неё и десятки прочитанных примеров врежутся в память, мозг сам начнет выдавать тонны (часто нерабочих) способов переделать собственный код и реализовать какую-то привычную штуку через Rx )
      • 0
        можете глянуть тут http://xgrommx.github.io/rx-book/content/resources/articles/index.html Если у кого-то есть время может сгруппировать по языкам
        • 0
          От языка зависит, для Java очень хорошо Reactive Programming with RxJava Creating Asynchronous, Event-Based Applications
          • 0
            Могу порекомендовать книгу «Нуркевич, Кристенсен: Реактивное программирование с использованием RxJava». В ней достаточно подробно описаны операторы + приведены примеры по их использованию.
          • 0
            Надо уточнить, что примеры работают только для версии 2.
            • 0
              Имеете ввиду Rx2? Как раз наоборот, работают на Rx1, для миграции на Rx2 нужно не эмитить нуллы, а создавать класс RetryEvent, о чем я написал в середине статьи. Подозреваю, что-то еще придется менять, так как примеры чисто на первой версии писал.
              • 0
                В версии 1.2.9 (видимо, крайняя на данный момент) у Single нет метода repeatWhen, только retryWhen.
                • 0
                  Для Single можно вызвать .toObservable() — результат будет такой же.
                  • 0
                    И еще
                    repeatWhen(repeatHandler -> repeatHandler.flatMap(nothing -> updateSubject.asObservable()))
                    можно сократить до
                    repeatWhen(updateSubject)
                    • 0
                      Нeльзя, во-пeрвых, там сигнатура Func1<Ovservable, Observable>, поэтому как минимум repeatWhen(error -> repearSubject.asObservable()).
                      Во-вторых, спецификация обязывать возвращать подстрим данного notificationHandler'а. Пeрeданный туда сторонний статичный стрим работать нe будeт.
                      • 0
                        Можно-можно, у меня это работает.
                        PublishSubject наследник Observable, asObservable() делает явное приведение типа, но проканает и неявное.
                        А спецификация обязывает возвращать Observable. Другое дело, что handler служит как бы сигналом, что очередная последовательность закончилась, и по нему можно синхронизировать эмитирование из возвращаемого Observable. Но в этом случае можно использовать BehaviorSubject, который отдает последний эмит новому подписчику. Тогда даже если вызов update() произойдет пока getSomething() еще не вернул результат, то replayWhen(updateSubject) сразу приведет к переподписке на getSomething.

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.