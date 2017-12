Почему реактивное программирование?

Если вы новичок в общении с RxJava или пытались разобраться в этом, но не довели дело до конца, то ниже вы найдете для себя кое-что новое.Нам в GO-JEK требуется выполнять большое количество асинхронных операций в приложениях и мы не можем позволить себе идти на компромиссы в ущерб скорости работы и плавности пользовательского интерфейса.Написание сложных многопоточных Android приложений может быть достаточно трудоемким процессом, который время от времени будет вас сильно перегружать из-за необходимости заботиться о большом количестве связанных друг с другом вещей. Это и многие другие причины убедили нас использовать RxJava в разрабатываемых Android приложениях.В этой статье мы поговорим о том как мы использовали реальные возможности работы с многопоточностью в RxJava для того, чтобы сделать процесс разработки приложения максимально простым, легким и веселым. Во всех примерах кода ниже будет использоваться RxJava 2, но описанные концепции можно будет применять и в других реактивных расширениях Каждая статья о реактивном программировании начинается с такого обязательного блока и мы не нарушим эту традицию. Существует несколько преимуществ использования реактивного подхода к построению Android приложений, обратим внимание на те, которые вамнужны.Если вы давно разрабатываете под Android, то, должно быть, заметили, как быстро вещи становятся через чур сложными и неподконтрольными с использованием вложенных обратных вызовов.Это происходит, когда вы выполняете несколько асинхронных операций последовательно и хотите, чтобы дальнейшие действия зависели от результата предыдущих операций. Почти сразу код становится слишком перегруженным и сложным для поддержки.В императивном мире, в ситуации когда выполняется множество сложных асинхронных операций, ошибки могут возникать в большом количестве мест. И в каждом месте вы должны обрабатывать эти ошибки, в результате появляется много повторяющегося шаблонного кода, методы становятся громоздкими.Все мы знаем (и тайно признаем) насколько иногда сложной может быть работа с многопоточностью в Java. Например, выполнение части кода в фоновом потоке и возврат результата обратно в главный поток. Это только звучит просто, но на практике появляется много подводных камней, которые нужно обходить.на ваш выбор, заботясь о корректной синхронизации и позволяя без проблем переключаться между потоками.Преимущества RxJava бесконечны. Мы можем говорить об этом часами и адски утомить вас, но вместо этого давайте копнем глубже и начнем изучать реальную работу с многопоточностью в RxJava.RxJava по умолчанию не многопоточна в любом случае. Определение, данное для RxJava на официальном сайте, выглядит примерно следующим образом:Увидев слово «асинхронных», многие люди ошибочно полагают, что RxJava многопоточна по умолчанию. Да, RxJava поддерживает многопоточность, предлагает множество мощных возможностей для легкой работы с асинхронными операциями, но это не значит что поведение RxJava по умолчанию многопоточно.Если вы уже немного работали с RxJava, то её знаете базовые конструкции:Если вы запустите данный пример кода, то ясно увидите, что все действия выполняются в основном потоке приложения (проследите за именами потоков в логе в консоли). Этот пример показывает, чтоВсё выполняется в том же потоке, в котором вызван код.Интересно, что же делает? Это не что иное, какоператор. Он помогает внедряться в цепочку объектови выполнять грязныеоперации. Например, внедрять дополнительный код в цепочке вызовов для отладки. Прочитать больше можно здесь Для того, чтобы начать работать с многопоточностью с применением RxJava необходимо познакомиться с базовыми классами и методами, такими какДавайте рассмотрим один из самых простых примеров. Допустим, мы хотим получить список объектовсетевым запросом и показать его в основном потоке приложения. Довольно общий и понятный пример для начала.Здесь мы видим метод, который осуществляет сетевой вызов и собирает список книг для нас. Сетевой вызов занимает время (несколько миллисекунд или секунд), поэтому мы используеми указываем планировщикдля выполнения операции в потоке ввода-вывода.Также мы используем операторвместе с планировщикомдля того, чтобы обрабатывать результат в основном потоке и показать список книг в пользовательском интерфейсе приложения.Не волнуйтесь, скоро мы перейдем к более продвинутым вещам. Этот пример был предназначен только для того, чтобы вспомнить базовые понятия, прежде чем погрузиться глубже.RxJava предоставляет мощный набор планировщиков. Вы не можете получить прямой доступ к потокам или управлять ими. Если вам нужно работать с потоками, то необходимо воспользоваться встроенными планировщиками.Можете представлять планировщики какГоворя проще, если вам нужно выполнить задачу в отдельном потоке — необходимо использовать верный планировщик, который возьмёт поток из своего пула доступных потоков и выполнит в нём задачу.В RxJava доступны несколько типов планировщиков. Самая сложная часть — выбрать верный планировщик для вашей задачи. Задача никогда не будет выполняться оптимально, если вы не выберете верный планировщик. Давайте разберем каждый планировщик.Этот планировщик, например, доступ к файловой системе, выполнение сетевых вызовов, доступ к базе данных и так далее. Количество потоков в этом планировщике неограничено и может расти по мере необходимости.Этот планировщик, такой как обработка больших объемов данных, изображений и так далее. ПланировщикТак как этот планировщик подходит только для интенсивной работы с ЦП — количество его потоков ограничено. Сделано это для того, чтобы потоки не конкурировали за процессорное время и не простаивали.Этот планировщикВ данном случае использование пула потоков не принесет никакой выгоды. Потоки очень затратно создавать и уничтожать. Вы должны быть осторожны и не злоупотреблять чрезмерным созданием потоков, так как это может привести в замедлению работы системы и переполнению памяти.В идеале вы должны использовать этот планировщик довольно редко, в основном для выведения в отдельный поток долго работающих частей программы.Этот планировщикОн может быть очень полезен, когда у вас есть набор фоновых заданий в разных местах вашего приложения, но нельзя допустить одновременного выполнения более чем одного из этих заданий.Этот планировщик будет основываться на вашем собственном. Может возникнуть ситуация, в которой необходимо будет выполнять определенные задачи в планировщике на основании собственной логики распределения потоков.Допустим, вы хотите ограничить число параллельных сетевых вызовов, которые делает ваше приложение. Можно создать собственный планировщик, который будет работать на базе ограниченного в размерах пула потоков () и использовать его во всех местах, связанных с сетевыми вызовами.Это специальный планировщик, который недоступен в библиотеке RxJava. Необходимо использовать расширяющую библиотеку RxAndroid для доступа к этому планировщику. Этот планировщикПо умолчанию этот планировщик ставит задания в очередь в, связанный с основным потоком, но есть возможность переопределения:Будьте осторожны в использовании планировщиков, основанных на неограниченных пулах потоков, таких как. Всегда есть риск бесконечного роста количества потоков.Теперь, когда у вас есть представление о типах планировщиков, разберемв деталях.Вы должны глубоко разбираться в том, как эти два оператора работают по отдельности и вместе, чтобы профессионально работать с многопоточностью в RxJava.Простыми словами,. Вы должны уяснить важность слова. Когда у вас цепь наблюдаемых элементов, источник— это всегда корневой элемент или верхняя часть цепи, откуда происходит создание событий.Как вы уже видели, если не использовать, то все события происходят в том потоке, в котором произошел вызов кода (в нашем случае —поток).Давайте перенаправим события в вычислительный поток с помощьюи планировщика. Когда вы запустите нижеследующий пример кода, то увидите, что события происходят в одном из вычислительных потоков, доступных в пуле —В целях сокращения кода мы не будем полностью переопределять все методы, так как нам не нужно переопределять. Воспользуемсяи лямбдами.Не важно в каком месте в цепочке вызовов вы используете. Он работает только с наблюдаемым источником, и контролирует в какой поток наблюдаемый источник передает события.В нижеследующем примере после observable-источника создаются другие объекты observable (методами), а операторпомещен в конце цепочки вызовов. Но как только вы запустите этот код, то заметите, что все события будут возникать в потоке, указанном в. Это станет более понятным при добавлениив цепь вызовов. И даже если мы разместимниже, то логика работы не изменится.работает только с наблюдаемым источникомТакже важно понять, что нельзя использоватьнесколько раз в одной цепочке вызовов. Можно, конечно, написать ещё раз, но никаких изменений это не повлечет. В примере ниже мы последовательно вызываем три различных планировщика, можете ли вы догадаться, какой планировщик сработает при запуске?Если вы ответили, то вы правы! Даже если делать вызов многократно —Стоит потратить ещё немного времени на более подробное изучение рассмотренного примера. Почему срабатывает только планировщик? Обычно все думают, что сработает, так как этот вызов находится в конце цепочки.Необходимо понять, что в RxJava подписка создаётся после обратного вызова всех экземпляров. Код ниже поможет нам разобраться в этом. Это ранее рассмотренный пример, но расписанный подробнее.Для того, чтобы понять как всё работает — начнем разбирать всё с последней строки примера. В ней целевой подписчик, вызывает методу observable объекта, который затем делает неявный вызову своего родительского observable объекта. Реализация наблюдателя, предоставляемая объектом, умножает переданные числа на 10.Процесс повторяется инеявно вызываету объекта, передавая реализацию наблюдателя, которая позволяет обрабатывать только четные числа. Теперь мы достигли корневого элемента (), у которого нет родителя для последующего вызова. На этом этапе завершается цепочка наблюдаемыхэлементов, после чего observable-источник начинает передаватьэлементы.Теперь для вас должна быть понятна концепция работы подписок в RxJava. К настоящему времени у вас должно появиться понимание того, как формируются цепочки наблюдаемыхобъектов и как события распространяются, начиная с observable-источника.Как мы уже видели,указывает observable-источнику передавать элементы в определенный поток и этот поток будет отвечать за продвижение элементов вплоть до подписчика. Поэтому, по умолчанию, подписчик получает обработанные элементы в этом же потоке.Но это может быть не то поведение, которого вы ожидаете. Предположим, вы хотите получить некие данные из сети и отобразить их в пользовательском интерфейсе.Нужно выполнить две вещи:У вас будет, который осуществляет сетевой вызов в потоке ввода-вывода и передает результат подписчику. Если вы используете только, то целевой подписчик будет обрабатывать результат в том же потоке ввода-вывода. И нам не повезло, так как работать с пользовательским интерфейсом в Android можно только в основном потоке.Теперь нам крайне необходимо переключить потоки и мы используем для этого. Когдавстречается в цепочке вызовов, то передаваемые observable-источником элементы незамедлительно перебрасываются в поток, указанный вВ этом придуманном примере мы наблюдаем получение целых чисел из сети и их дальнейшую передачу из observable-источника. В реальных примерах это может быть любая другая асинхронная операция, например, чтение большого файла, выборка данных из базы данных и т.д. Вы можете запустить данный пример и посмотреть на результаты, просто следите за логами в консоли.Теперь рассмотрим более сложный пример, в которомбудет вызываться несколько раз для переключения потоков в процессе обработки данных.В примере выше observable-источник передаёт элементы в цепочку обработчиков в потоке ввода вывода, так как мы использоваливместе с. Далее мы хотим преобразовать каждый элемент, используя оператор, но сделать это нужно в вычислительном потоке. Для этого используемвместе сперед вызовомдля переключения потока и передачи элементов в целевой вычислительный поток.Следующим шагом отфильтруем некоторые элементы и по какой-то причине мы хотим выполнить эту операцию в новом потоке для каждого из элементов. Используем снова, но уже в паре сперед вызовом операторадля передачи каждого элемента в новый поток.В итоге мы хотим, чтобы подписчик получил результат обработки в потоке пользовательского интерфейса. Для этого используемвместе с планировщикомНо что произойдет, если использоватьнесколько раз последовательно? В примере ниже в каком потоке подписчик получит результат?Если запустите пример, то увидите, что подписчик получит элементы в вычислительном потоке. Это значит, что сработал последний вызванный. Интересно почему?Возможно вы уже догадались. Как мы знаем, подпискавызывается после обратного обхода всех, но с передачей событийвсё происходит наоборот, то есть в обычном порядке, как записан код. Вызов происходит от observable-источника и далее вниз по цепочке вызова вплоть до подписчика.Операторвсегда работает в прямом порядке, поэтому последовательно происходит переключение потоков и последним происходит переключение на вычислительный поток (). Итак, когда нужно переключить поток для обработки данных в новом потоке, то просто сначала вызовите, а далее обрабатывайте элементы. Синхронизация, исключение состояния гонки, всё это и многие другие сложности многопоточности RxJava обрабатывает за вас.Сейчас у вас должно быть достаточно хорошее представление о том, как правильно использовать RxJava для написания многопоточных приложений, обеспечивающих быструю и плавную работу пользовательского интерфейса.Если понимание не пришло сразу, ничего страшного. Прочитайте статью ещё раз, поэкспериментируйте с примерами кода. Здесь достаточно много нюансов для понимания, не торопитесь.