Типичное использование Observable объектов в Angular 4

    Представляю вашему вниманию типичные варианты использования Observable объектов в компонентах и сервисах Angular 4.



    Подписка на параметр роутера и мапинг на другой Observable


    Задача: При открытии страницы example.com/#/users/42, по userId получить данные пользователя.


    Решение: При инициализации компоненты UserDetailsComponent мы подписываемся на параметры роутера. То есть если userId будет меняться — будер срабатывать наша подписка. Используя полученный userId, мы из сервиса userService получаем Observable с данными пользователя.


    // UserDetailsComponent
    
    ngOnInit() {
      this.route.params
        .pluck('userId') // получаем userId из параметров
        .switchMap(userId => this.userService.getData(userId))
        .subscribe(user => this.user = user);
    }




    Подписка на параметр роутера и строку запроса


    Задача: При открытии страницы example.com/#/users/42?regionId=13 нужно выполнить функцию load(userId, regionId). Где userId мы получаем из роутера, а regionId — из параметров запроса.


    Решение: У нас два источника событий, поэтому воспользуемся функцией Observable.combineLatest, которая будет срабатывать, когда каждый из источников генерирует событие.


    ngOnInit() {
      Observable.combineLatest(this.route.params, this.route.queryParams)
        .subscribe(([params, queryParams]) => { // полученный массив деструктурируем
          const userId = params['userId'];
          const regionId = queryParams['regionId'];
          this.load(userId, regionId);
        });
    }
    

    Обратите внимание, что созданные подписки на роутер, при разрушении объекта удалятся, за этим следит ангуляр, поэтому отписываться от параметров роутера не нужно:


    The Router manages the observables it provides and localizes the subscriptions. The subscriptions are cleaned up when the component is destroyed, protecting against memory leaks, so we don't need to unsubscribe from the route params Observable. Mark Rajcok

    Остановка анимации загрузки после окончания выполнения подписки


    Задача: Показать значок загрузки после начала сохранения данных и скрыть его, когда данные сохранятся или произойдет ошибка.


    Решение: За отображение загрузчика у нас отвечает переменная loading, после нажатия на кнопку, установим ее в true. А для установки ее в false воспользуемся Observable.finally функций, которая выполняется после завершения подписки или если произошла ошибка.


    save() {
      this.loading = true;
      this.userService.save(params)
        .finally(() => this.loading = false)
        .subscribe(user => {
          // Успешно сохранили
        }, error => {
          // Ошибка сохранения
        });
    }

    Создание собственного источника событий


    Задача: Создать переменную lang$ в configService, на которую другие компоненты будут подписываться и реагировать, когда язык будет меняться.


    Решение: Воспользуемся классом BehaviorSubject для создания переменной lang$;


    Отличия BehaviorSubject от Subject:


    1. BehaviorSubject должен инициализироваться с начальным значением;
    2. Подписка возвращает последнее значение Subjectа;
    3. Можно получить последнее значение напрямую через функцию getValue().

    Создаём переменную lang$ и сразу инициализируем. Так же добавляем функцию setLang для установки языка.


    // configService
    lang$: BehaviorSubject<Language> = new BehaviorSubject<Language>(DEFAULT_LANG);
    setLang(lang: Language) {
      this.lang$.next(this.currentLang); // тут мы поставим
    }

    Подписываеся на изменение языка в компоненте. Переменная lang$ является "горячим" Observable объектом, то есть подписка требует отписки при разрушении объекта.


    private subscriptions: Subscription[] = [];
    ngOnInit() {
      const langSub = this.configService.lang$
        .subscribe(() => {
          // ...
        });
      this.subscriptions.push(langSub);
    }
    ngOnDestroy() {
      this.subscriptions
        .forEach(s => s.unsubscribe());
    }

    Использование takeUntil для отписки


    Отписываться можно и более изящным вариантом, особенно если в компоненте присутствует больше двух подписок:


    private ngUnsubscribe: Subject<void> = new Subject<void>();
    
    ngOnInit() {
      this.configService.lang$
        .takeUntil(this.ngUnsubscribe) // отписка по условию
        .subscribe(() => {
          // ...
        });
    }
    
    ngOnDestroy() {
      this.ngUnsubscribe.next();
      this.ngUnsubscribe.complete();
    }

    То есть, чтобы не терять память на горячих подписках, компонента будет работать до тех пор, пока значение ngUnsubscribe не изменится. А изменится оно, когда вызовется ngOnDestroy. Плюсы этого варианта в том, что в каждую из подписок достаточно добавить всего одну строчку, чтобы отписка сработала вовремя.


    Использование Observable для автокомплита или поиска


    Задача: Показывать предложения страниц при вводе данных на форме


    Решение: Подпишемся на изменение данных формы, возьмём только меняющиеся данные инпута, поставим небольшую задержку, чтобы событий не было слишком много и отправим запрос в википедию. Результат выведем в консоль. Интересный момент в том, что switchMap отменит предыдущий запрос, если пришли новые данные. Это очень полезно, для избегания нежалательных эффектов от медленных запросов, если, к например, предпоследний запрос выполнялся 2 секунды, а последий 0.2 секунды, то в консоль выведется результат именно последнего запроса.


    ngOnInit() {
      this.form.valueChanges
        .takeUntil(this.ngUnsubscribe)      // отписаться после разрушения
        .map(form => form['search-input'])  // данные инпута
        .distinctUntilChanged()             // брать измененные данные
        .debounceTime(300)                  // реагировать не сразу
        .switchMap(this.wikipediaSearch)    // переключить Observable на запрос в Вики
        .subscribe(data => console.log(data));
    }
    
    wikipediaSearch = (text: string) => {
      return Observable
        .ajax('https://api.github.com/search/repositories?q=' + text)
        .map(e => e.response);
    }

    Кеширование запроса


    Задача: Необходимо закешировать Observable запрос


    Решение: Воспользуемся связкой publishReplay и refCount. Первая функция закеширует одно значение функции на 2 секунды, а вторая будет считать созданные подписки. То есть, Observable завершится, когда все подписки будут выполнены. Тут можно прочитать подробнее.


    // tagService
    
    private tagsCache$ = this.getTags()
      .publishReplay(1, 2000) // кешируем одно значение на 2 секунды
      .refCount()             // считаем ссылки
      .take(1);               // берем 1 значение
    
    getCachedTags() {
      return tagsCache$;
    }

    Последовательный combineLatest


    Задача: Критическая ситуация на сервере! Backend команда сообщила, что для корректного обновления продукта нужно выполнять строго последовательно:


    1. Обновление данных продукта (заголовок и описание);
    2. Обновление списка тегов продукта;
    3. Обновление списка категорий продукта;

    Решение: У нас есть 3 Observable, полученных из productService. Воспользуемся concatMap:


    const updateProduct$ = this.productService.update(product);
    const updateTags$ = this.productService.updateTags(productId, tagList);
    const updateCategories$ = this.productService.updateCategories(productId, categoryList);
    
    Observable
      .from([updateProduct$, updateTags$, updateCategories$])
      .concatMap(a => a)  // выполняем обновление последовательно
      .toArray()          // Возвращает массив из последовательности
      .subscribe(res => console.log(res)); // res содержит массив результатов запросов

    Загадка на посошок


    Если у вас есть желание немного поупражняться, решите предыдущую задачу, но для создания продукта. То есть сначала создаём продукт, потом обновляем теги, а только потом — категории.


    Полезные ссылки


    • Заворожённо посмотреть на шарики: rxviz.com
    • Потаскать шарики мышкой: rxmarbles.com
    Поделиться публикацией
    Реклама помогает поддерживать и развивать наши сервисы

    Подробнее
    Реклама
    Комментарии 12
    • +3
      Огромное Вам, человеческое, спасибо! Весьма годная подборка для начинающего. На русском материала по RxJS практически нет. Вы осветили крайне полезные (по крайней мере, для меня) юзкейсы.
      • +3
        Можно же последний снипет так
        Observable.concat(updateProduct$, updateTags$, updateCategories$).toArray().subscribe(res => console.log(res))
        Ну или в случае с from
        Observable.from([updateProduct$, updateTags$, updateCategories$]).concatAll().toArray().subscribe(res => console.log(res))
        Ну или с of
        Observable.of(updateProduct$, updateTags$, updateCategories$).concatAll().toArray().subscribe(res => console.log(res))
        • 0
          Вопрос такой — а есть ли аналогично takeUntil оператор что то вроде restartWhen?

          Задача такая — UI получает observable из метода getSmth, соответственно хотелось бы в методе saveSmth вызвать next и чтобы все подписки сходили еще раз на бакенд и получили новые данные
          Я пока что реализовал примерно то что мне надо руками, но вдруг уже есть готовое решение.
          • 0

            Думаю что вам нужно посмотреть в сторону retryWhen.

          • 0

            Ужасы какие :-) Давайте я вам лучше покажу, как то же самое реализуется с использованием ОРП...


            При открытии страницы example.com/#/users/42, по userId получить данные пользователя.

            ФРП:


            ngOnInit() {
              this.route.params
                .pluck('userId') // получаем userId из параметров
                .switchMap(userId => this.userService.getData(userId))
                .subscribe(user => this.user = user);
            }

            ОРП:


            // example.com/#user=123
            user() {
                return this.user_serice().data( $mol_state_arg.value( 'user' ) )
            }

            Можно поддержать и формат урлов из задачи, но суть не поменяется.


            При открытии страницы example.com/#/users/42?regionId=13 нужно выполнить функцию load(userId, regionId). Где userId мы получаем из роутера, а regionId — из параметров запроса.

            ФРП:


            ngOnInit() {
              Observable.combineLatest(this.route.params, this.route.queryParams)
                .subscribe(([params, queryParams]) => { // полученный массив деструктурируем
                  const userId = params['userId'];
                  const regionId = queryParams['regionId'];
                  this.load(userId, regionId);
                });
            }

            ОРП:


            // example.com/#user=123/region=456
            data() {
                return this.load( $mol_state_arg.value( 'user' ) , $mol_state_arg.value( 'region' ) )
            }

            Можно поддержать и формат урлов из задачи, но опять же суть не поменяется.


            Показать значок загрузки после начала сохранения данных и скрыть его, когда данные сохранятся или произойдет ошибка.

            ФРП:


            save() {
              this.loading = true;
              this.userService.save(params)
                .finally(() => this.loading = false)
                .subscribe(user => {
                  // Успешно сохранили
                }, error => {
                  // Ошибка сохранения
                });
            }

            ОРП:


            @ $mol_mem
            save() {
                return this.user_service().save( params )
            }

            Да, реально, больше ничего делать не надо — анимация сама начнётся, когда начнётся запрос, закончится, когда запрос завершится, и будет нарисована ошибка в случае ошибки.


            Создать переменную lang$ в configService, на которую другие компоненты будут подписываться и реагировать, когда язык будет меняться.

            ФРП:


            lang$: BehaviorSubject<Language> = new BehaviorSubject<Language>(DEFAULT_LANG);
            setLang(lang: Language) {
              this.lang$.next(this.currentLang); // тут мы поставим
            }

            private subscriptions: Subscription[] = [];
            ngOnInit() {
              const langSub = this.configService.lang$
                .subscribe(() => {
                  // ...
                });
              this.subscriptions.push(langSub);
            }
            ngOnDestroy() {
              this.subscriptions
                .forEach(s => s.unsubscribe());
            }

            ОПР:


            @ $mol_mem
            lang( next = 'en' ) { return next }
            
            title() {
                return this.language_service().text( this.lnag() , 'title' )
            }

            Да-да, подписки/отписки — не наша забота, всё будет работать как надо.


            Показывать предложения страниц при вводе данных на форме

            ФРП:


            ngOnInit() {
              this.form.valueChanges
                .takeUntil(this.ngUnsubscribe)      // отписаться после разрушения
                .map(form => form['search-input'])  // данные инпута
                .distinctUntilChanged()             // брать измененные данные
                .debounceTime(300)                  // реагировать не сразу
                .switchMap(this.wikipediaSearch)    // переключить Observable на запрос в Вики
                .subscribe(data => console.log(data));
            }
            
            wikipediaSearch = (text: string) => {
              return Observable
                .ajax('https://api.github.com/search/repositories?q=' + text)
                .map(e => e.response);
            }

            ОРП:


            suggests() {
                return $mol_http.resource( 'https://api.github.com/search/repositories?q=' + this.query() ) ).json().items
            }

            О подписках/отписках заботиться не надо, актуальное значение всегда есть, при изменении запроса возвращаться будут только данные для актуального запроса, а предыдущие запросы будут автоматически прибиваться. Задержки разве что тут нет, но она реализуется не тут, а компоненте поля ввода.


            Необходимо закешировать Observable запрос

            ФРП:


            private tagsCache$ = this.getTags()
              .publishReplay(1, 2000) // кешируем одно значение на 2 секунды
              .refCount()             // считаем ссылки
              .take(1);               // берем 1 значение
            
            getCachedTags() {
              return tagsCache$;
            }

            ОРП:


            @ $mol_mem
            tags() {
                const resource = $mol_http.resource( '/tags' )
            
                // сбрасываем кеш через 2 секунды
                setTimeout( ()=> resource.json( undefined , $mol_atom_force ) , 2000 )
            
                return resource.json()
            }

            Так как кеширование происходит по умолчанию, то задача своидится к противоположной — сбросить кэш через 2 секунды.


            Критическая ситуация на сервере! Backend команда сообщила, что для корректного обновления продукта нужно выполнять строго последовательно: Обновление данных продукта (заголовок и описание); Обновление списка тегов продукта; Обновление списка категорий продукта.

            ФРП:


            const updateProduct$ = this.productService.update(product);
            const updateTags$ = this.productService.updateTags(productId, tagList);
            const updateCategories$ = this.productService.updateCategories(productId, categoryList);
            
            Observable
              .from([updateProduct$, updateTags$, updateCategories$])
              .concatMap(a => a)  // выполняем обновление последовательно
              .toArray()          // Возвращает массив из последовательности
              .subscribe(res => console.log(res)); // res содержит массив результатов запросов

            ОРП:


            update_product() { return this.product_service().update( this.product() ) }
            update_tags() { return this.product_service().update_tags( this.product() , this.tags() ) }
            
            update_categories() { return this.product_service().update_categories( this.product() , this.categories() ) }
            
            @ $mol_mem
            updating() {
                console.log(
                    this.update_product().valueOf() ,
                    this.update_tags().valueOf() ,
                    this.update_categories().valueOf() ,
                )
            }

            Без valueOf() все запросы пошли бы параллельно. Поэтому мы немедленно требуем результат, чтобы следующий запрос не начался до получения результата предыдущего.


            Загадка на посошок

            Ну и у меня для вас задачка:


            У меня есть список игрушек, у каждой игрушки есть свойства. Свойства могут меняться. Есть функция фильтрации, которая может быть сложной и тяжёлой и которая тоже может меняться динамически. Хотелось бы, чтобы перефильтрация происходила лишь тогда, когда меняются свойства, от которых результат фильтрации реально зависит. Я так понимаю, каждое свойство должно быть стримом и надо как-то подписаться на заданные свойства всех игрушек. Как это лучше всего сделать?


            У меня пока получилось следующее:


            const ToysSource = new Rx.BehaviorSubject( [] )
            const Toys = ToysSource.distinctUntilChanged().debounce( 0 )
            
            const FilterSource = new Rx.BehaviorSubject( toy => toy.count > 0  )
            const Filter = FilterSource.distinctUntilChanged().debounce( 0 )
            
            const ToysFiltered = Filter
            .select( filter => {
              if( !filter ) return Toys
              return Toys.map( toys => toys.filter( filter ) )
            } )
            .switch()
            .distinctUntilChanged()
            .debounce( 0 )

            Но тут, очевидно, при любом изменении игрушек будет происходить повторная фильтрация. Например: фильтруем по числу остатков, а меняется цена — происходит повторная фильтрация, что не хорошо.

            • 0
              > Хотелось бы, чтобы перефильтрация происходила лишь тогда, когда меняются свойства, от которых результат фильтрации реально зависит.

              А ничего, что это алгоритмически неразрешимая задача? filter toy = if arithmetic_is_consistent then test(toy.property1) else test(toy.property2)
              • 0

                Всмысле неразрешимая? ОРП же позволяет выстроить потоки данных так, что изменение цены не будет приводить к перефильтрации, пока мы не фильтруем по цене. Уверен такое можно сделать и на Rx через хитрую комбинацию операторов.

                • 0
                  > Всмысле неразрешимая?

                  Всмысле, нет никакого способа узнать, какие поля требуются для фильтра, не запустив сам фильтр. Смотрите выше пример — надо ли перефильтровывать при изменении property1? а property2?

                  > ОРП же позволяет выстроить потоки данных так, что изменение цены не будет приводить к перефильтрации, пока мы не фильтруем по цене.

                  Либо не позволяет, либо я не понимаю, что вы имеете в виду.
                  • 0
                    Смотрите выше пример — надо ли перефильтровывать при изменении property1? а property2?

                    Допустим у фильтра есть метаданные, в которых указано какие свойства он проверяет, тогда можно заранее сказать какие при изменении каких свойств нужно провести перефильтрацию.


                    Либо не позволяет, либо я не понимаю, что вы имеете в виду.

                    Во время предыдущей фильтрации отслеживается к каким свойствам было обращение. Изменение этих свойств или фильтра приведёт к перефильтрации. Изменение любых других — не приведёт.

                    • 0
                      > Допустим у фильтра есть метаданные

                      Рассчитать программно эти метаданные нельзя, то есть их добавляет к фильтру программист?

                      > Во время предыдущей фильтрации отслеживается к каким свойствам было обращение.

                      А если при том же фильтре в разных случаях обращение к разным полям?
                      • 0
                        Рассчитать программно эти метаданные нельзя, то есть их добавляет к фильтру программист?

                        Агась.


                        А если при том же фильтре в разных случаях обращение к разным полям?

                        Если там не Math.random(), а зависит от какого-либо реактивного свойства, то всё будет ок — при изменении этого свойства будет перефильтрация.

              • 0
                deleted

                Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.