Pull to refresh

RxJava. Борьба с вызовами суровой действительности

Reading time10 min
Views35K
Реактивное программирование — очень модный тренд в мобильной разработке на данный момент. Если говорить про android разработку, то реактивность представлена в основном библиотекой RxJava.
В сети все больше статей, обучающих видео, презентаций, записей с конференций, в которых рассказывается про данную тематику. Но подавляющее большинство представленного материала содержит теоретические аспекты и весьма тривиальные примеры, на которых разработчик не может оценить реальный профит от использования Rx. А ведь так хочется увидеть реальный и сложный пример из жизни, который очень наглядно покажет всю мощь, красоту и простоту реактивного программирования.
Поэтому данная статья будет посвящена подробному разбору реального и довольно таки сложного примера. И я очень надеюсь, что после прочтения, вы по-настоящему откроете для себя мир реактивного программирования.


Немного о теории


В начале я все-таки хотел бы уделить время для теории, так как при рассмотрении примера вы уже должны более-менее четко себе представлять, что такое Observable, Subscriber, Subscription, операторы Rx и что значат диаграммы для каждого оператора. Как я писал выше, материалов сейчас хватает. Поэтому приведу список ссылок на самые важные и интересные из них.

Самый главный ресурс — это wiki по RX
Особое бы внимание обратил я на этот пункт
В нем перечислены все ссылки на довольно хорошие и качественные статьи.
Лично мне для вступления нравится статья
В ней автор, по-моему мнению, пытается донести до слушателя основную мысль — нужно мыслить не объектами, а Потоками (Stream, не Thread! Многопоточность в данном случае не при чем). Понимание всего процесса Rx, именно как взаимодействие потоков, существенно облегчает дальнейшую работу.
Также для вступления есть хорошая презентация
Блог компании Futurice. Одна из статей
Блог, содержащий много полезных статей, посвященных Rx и не только.
Некоторые из статей — Part 1, Part 2, Part 3, Part 4, Don't break the chain, Loading data from multiple sources with RxJava
Что-то уже есть и на русском:
1. Переводы статей Grokking RxJava Part 1, Part 2, Part 3, Part 4
2. Самостоятельная статья
Если вы используете в своем проекте RxJava, то без RetroLambda дальнейшее ваше существование не имеет смысла :)
Хорошая статья по лямбде

И немало практики


Ну а теперь рассмотрим реальный случай из нашей не простой разработческой жизни.
Нам нужно было получить список выписок (это модель StatementRUR) за конкретный период времени. Скажем за месяц.
Что у нас есть? Запрос (метод statementRURList) с параметрами offset и limit, а также параметром StatementListParameters, в котором мы задаем фильтр (с такой-то даты по такую-то) и порядок сортировки. То есть получение списка выписок за месяц становится уже несколько нетривиальной задачкой в реализации.
Давайте немножко упростим и представим, что метод statementRURList возвращает сразу List<StatementRUR/>.
Тогда стандартный код будет выглядеть приблизительно так:
int i = 0;
List<StatementRUR> commonList = new ArrayList<>();
while (true) {
    List<StatementRUR> list = statementRURList(int i * limit, int limit, String accountId, int periodDaysCount, Date docInfoDocDate);
    commonList.addAll(list);
    if (list.size() < limit) {
        break;  
    }
    i++;
}

В результате получим полный список выписок за месяц.
Но имеем то мы дело с запросами, которые нужно выполнять не UI потоке, и которые нужно слать строго друг за другом, и обрабатывать ответы с сервера в таком же порядке, дабы не сбить сортировку. А еще нужно корректно сложить все результаты, ведь могут же возникнуть проблемы с многопоточностью. И еще помним о корректной обработке ошибок.
С помощью асинхронности про реализацию подобного алгоритма «просто, красиво и надежно» можно забыть. А вот RxJava — это то, что нам нужно!

Уделим время описанию способа связи с сервером и моделей, с которыми будем мы работать.
Вообще для связи с сервером мы используем замечательную библиотеку RetroFit. Еще более замечательное в ней то, что она взаимодействует с Rx. Таким образом наш запрос в сеть на получение списка выписок выглядит следующим образом:
Observable<ResponseApiModel<List<StatementRUR>>> statementRURList
(int offset, int limit, String accountId, int periodDaysCount, Date docInfoDocDate);

, где ResponseApiModel — это единая модель, по которой сервер возвращает ответ на любой наш запрос.
Ниже привожу полный код данной модели:
public class ResponseApiModel<T> {

    @SerializedName("result")
    private T result;
    @SerializedName("errors")
    private List<ErrorResponseApiModel> errors;
    @SerializedName("state")
    private Object state;

    public ResponseApiModel(T result, List<ErrorResponseApiModel> errors) {
        this.result = result;
        this.errors = errors;
    }

    public ResponseApiModel(T result, List<ErrorResponseApiModel> errors, Object state) {
        this.result = result;
        this.errors = errors;
        this.state = state;
    }
    public ResponseApiModel(List<ErrorResponseApiModel> errors, Object state) {
        this.errors = errors;
        this.state = state;
    }

    /**
     * if result == null && errors != null -> throw new ResponseAPIException!
     * @return result field
     */
    public T getResult() {
        if (result == null && errors != null) {
            throw new ResponseAPIException(errors);
        }
        return result;
    }

    public List<ErrorResponseApiModel> getErrors() {
        return errors;
    }

    public Object getState() {
        return state;
    }
}

Для нас наибольший интерес представляет поле T result, в котором содержатся запрашиваемые данные в случае успешного ответа сервера. Если же сервер возвращает ошибку в поле errors, то при попытке получения результата в методе T getResult() выкидывается исключение ResponseAPIException, которое потом перехватывается Subscriber.onError(Throwable e)
Параметры String accountId, int periodDaysCount, Date docInfoDocDate потом преобразуются в упомянутый выше StatementListParameters, но это уже детали.
В результате сервер в случае успеха возвращает нам List<StatementRUR/> (поле T result).

Ну а теперь самое интересное! Как это реализовать в Rx?
Ниже приводится подробная диаграмма и алгоритм работы.
Для диаграммы следующие допущения: на сервере 160 выписок, LIMIT = 50



  1. Для начала нужно запустить бесконечный цикл, который после каждой итерации будет инкрементировать свой счетчик (переменная i в коде выше).
    Для этой цели существует оператор range. В программном виде это будет так:
    Observable.range(0, Integer.MAX_VALUE - 1)
    

    Данный Observable «выпускает» инкрементирующееся каждый раз значение. То есть на выходе получается последовательность чисел: 0, 1, 2, 3...Integer.MAX_VALUE - 1.
  2. Используя значение счетчика, нужно делать запросы в сеть — метод statementRURList. Но при этом каждый следующий запрос должен слаться после получения ответа на предыдущий. Это необходимо для корректной работы RetroFit (чтобы не завалить его большим количество одновременных запросов) и для соблюдения сортировки.
    На помощь приходит оператор concatMap. Как видно из диаграммы, данный оператор создает новый Observable, на вход которому поступают элементы с предыдущего Observable, и который «излучает» новые элементы. Но главное отличие concatMap от похожего с виду flatMap заключается в том, что новый Observable соблюдает такую же последовательность элементов, что и предыдущий. То есть, если на вход оператору поступают нулевой, первый и второй элементы, новый Observable выпустит переработанные значения этих элементов в точно таком же порядке.
    Но лучше один раз увидеть :) Внимание на диаграмму, а именно на первые два Потока («горизонтальные стрелки», Stream)

    И программный код теперь:
    Observable
            // get All statements from current date for periodDaysCount (with offset, limit)
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(new Func1<Integer, Observable<ResponseApiModel<List<StatementRUR>>>>() {
                @Override
                public Observable<ResponseApiModel<List<StatementRUR>>> call(Integer increment) {
                    return statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate);
                }
            }
    );
    

    Как-то многовато кода, не находите? Может используем lambda? И тогда получим:
    Observable
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate));
    

    Вот так гораздо лучше :)
  3. Новый Observable «излучает» элементы ResponseApiModel<List<StatementRUR/>>. Но нам то нужен список выписок или, в данном контексте, поле T result модели ResponseApiModel. Для получения поля result в модели есть метод T getResult(). Таким образом необходимо преобразовать элементы, выпущенные последним Observable. Для этого существует оператор map.
    Диаграмма. Третий поток.

    Программный код:
    Observable
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate))
            .map(new Func1<ResponseApiModel<List<StatementRUR>>, List<StatementRUR>>() {
                @Override
                public List<StatementRUR> call(ResponseApiModel<List<StatementRUR>> listResponseApiModel) {
                    return listResponseApiModel.getResult();
                }
            }
    );
    

    И с RetroLambda
    Observable
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate))
            .map(ResponseApiModel::getResult);
    

  4. Теперь у нас есть бесконечный цикл отправки запросов на сервер (с последовательно увеличивающимся offset) и приема в точно такой же последовательности ответов. И наша задача на данном этапе — остановить Землю цикл.
    Как уже писалось выше, если в ответе количество выписок меньше значения limit, значит слать больше запросы не имеет смысла.
    Воспользуемся оператором takeUntil, который задает условие прекращения получения новых элементов. При достижении данного условия, все вышестоящие observable также прекращают свою работу.
    Что под этим подразумевается? Взглянем на диаграмму. Четвертый поток.

    Код:
    Observable
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate))
            .map(ResponseApiModel::getResult)
            .takeUntil(new Func1<List<StatementRUR>, Boolean>() {
                @Override
                public Boolean call(List<StatementRUR> list) {
                    return list.size() < LIMIT;
                }
           }
    );
    

    Код с RetroLambda:
    Observable
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate))
            .map(ResponseApiModel::getResult)
            .takeUntil(list -> list.size() < LIMIT);
    

  5. На данном этапе необходимо объединить полученные результаты и выдать полноценный список выписок за месяц с сортировкой по дате. Текущий наш observable «выпускает» элементы, которые представляют собой списки выписок (каждый список — это ответ на запрос на сервер). Эти списки нужно объединить. В этом помогут оператор toList, который объединит все «выпущенные» элементы в единый список, то есть в итоге мы получим список списков выписок, и оператор map, который преобразует список списков выписок в необходимый нам список выписок.
    Диаграмма. Пятый и шестой поток.

    Код. Позвольте сразу писать с RetroLambda:
    Observable
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate))
            .map(ResponseApiModel::getResult)
            .takeUntil(list -> list.size() < LIMIT)
            .toList()
            .map(this::safeMerge);
    
    private <T> List<T> safeMerge(List<List<T>> lists) {
        List<T> list = new ArrayList<>();
        for (List<T> statementOperationRURList : lists) {
            list.addAll(statementOperationRURList);
        }
        return list;
    }
    

    Те, кто уже с Rx «не на Вы» могут возразить мне, что для аккумулирования «выпущенных» элементов существует специальный оператор scan. Зачем же городить огород из toList и map? Дело в том, что, если первый же запрос вернет нам пустой список (и тогда этот запрос будет единственным), scan выдаст ошибку:
    java.util.NoSuchElementException: Sequence contains no elements at 
    rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:131) at 
    rx.internal.operators.OperatorTakeLastOne$ParentSubscriber.onCompleted(OperatorTakeLastOne.java:106) at 
    rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:123) at 
    rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:99) at 
    rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:65) at 
    rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:171)
    ... 
    

    И обойти ее можно только таким, несколько окольным, путем.


Скажите, ну, как вам код? Ну ведь правда просто, красиво и надежно!
А еще представьте такую ситуацию. Получили мы список выписок. А теперь еще дополнительно надо:
  1. Пройтись по каждой выписке
  2. По каждой выписке получить список Операций (StatementOperationRUR). Причем получаем мы с помощью запроса
    Observable<ResponseApiModel<List<StatementOperationRUR>>> statementOperationRURList(String id, int offset, int limit);
    

    , где параметр String id — это id Выписки, по которой запрашивается список операций.
    Таким образом, как и для выписок, чтобы получить список операций, нам заранее неизвестно, сколько нужно слать запросов на сервер из-за параметров offset, limit
  3. Сложить полученные операции в единый список

То есть, если нам нужен список операций за месяц, причем отсортированный, необходимо выполнить минимум 31 запрос (минимум один запрос на получение выписки и минимум 30 запросов на получение списка операций по каждой выписке), а точнее 31 цикл отправки запросов, прерывающихся по определенному условию. И ведь это все надо как-то соединить.
Но с RxJava… ну вы понимаете :)
Внимание на код.

private Observable<List<StatementRUR>> getAllRURStatements(String accountId, Date docInfoDocDate, int periodDaysCount) {
    return Observable
            // get All statements from current date for periodDaysCount (with offset, limit)
            .range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementRURList(LIMIT * increment, LIMIT, accountId, periodDaysCount, docInfoDocDate))
            .map(ResponseApiModel::getResult)
            .takeUntil(list -> list.size() < LIMIT)
            // merge all results from statement requests in one List
            .toList()
            .map(this::safeMerge);
}

// get All Operations from concrete statement (with offset, limit)
private Observable<List<StatementOperationRUR>> getOperationsFromRURStatement(StatementRUR statementRUR) {
    return Observable.range(0, Integer.MAX_VALUE - 1)
            .concatMap(increment -> statementOperationRURList(statementRUR.getId(), LIMIT * increment, LIMIT))
            .map(ResponseApiModel::getResult)
            .takeUntil(list -> list.size() < LIMIT)
            // merge all results from operations requests in one List
            .toList()
            .map(this::safeMerge);
}

public <List<StatementOperationRUR>> getAllRUROperations(String accountId, Date docInfoDocDate, int periodDaysCount) {
   return
        // get all statements
        getAllRURStatements(accountId, docInfoDocDate, daysLimit)
        // from Statement list to each item emitting
        .concatMap(Observable::from)
        // get All Operations from all statement
        .concatMap(this::getOperationsFromRURStatement)
        // merge all results from operation requests in one List
        .toList()
        .map(this::safeMerge);
}


Вроде тоже ничего сложного уже нет. В первых двух методах наблюдается дублирование кода, но я оставил его для большей понятности.

Я постарался максимально подробно объяснить первый пример, так как он, по-моему мнению, самый сложный в построении. Я потратил немало сил и времени, чтобы написать такой простой с виду код. По крайней мере в сети я не видел подобных решений.
Второй пример, по сути, есть результат использования первого примера плюс пару простых операторов. Но зато он демонстрирует, насколько удобна RX в плане конструирования сложных алгоритмов. Rx и есть конструктор.

С удовольствием жду комментариев, замечаний и предложений!
Tags:
Hubs:
Total votes 17: ↑17 and ↓0+17
Comments7

Articles