[:ru]Третья лекция Курса по архитектуре клиент-серверных android-приложений, в которой мы познакомимся с RxJava и основными операторами, а также узнаем, как создавать Observable, преобразовывать потоки данных, работать с RxJava в Android и решать проблему Backpressure.
Ссылки на исходный код ваших решений вы можете оставлять в комментариях. Делитесь вашими решениями с сообществом, получайте обратную связь и конструктивную критику. Лучшие решения будут опубликованы на нашем канале и сайте fandroid.info с указанием авторства победителей!

Введение

RxJava

  1. Введение в RxJava
  2. Создание Observable
  3. Основные операторы
  4. Преобразование потоков данных

RxJava в Android

Дополнительно – проблема Backpressure

  1. Использование специальных операторов
  2. Отказ от использования Observable.create

Практические задания

Ссылки и полезные ресурсы

Введение

Сейчас мы знаем немало способов выполнять какую-то работу в фоновых потоках. Под работой здесь чаще всего понимаются серверные запросы. Но насколько все хорошо с этими способами? Нужно признать, что у них немало недостатков.

Во-первых, эти способы не гибкие. Допустим, нужно выполнить два запроса параллельно, а потом, дождавшись, когда оба завершат работу, выполнить третий запрос с использованием результатов предыдущих запросов. Если вы попробуете реализовать такую задачу с использованием лоадеров, то скорее всего у вас получится большое количество булевских флагов, полей для сохранения результата и много кода. К проблемам гибкости можно также отнести и то, что сложно хорошо реализовать периодическое обновление данных.

Во-вторых, неудобно обрабатывать ошибки. Опять же, в случае лоадеров мы можем вернуть только один результат. Конечно, можно использовать специальные классы, которые будут служить как для передачи данных, так и для передачи ошибок, но это неудобно.

Именно поэтому нам нужно рассмотреть другие возможности для выполнения сетевых запросов и для работы с данными. И в первую очередь такой возможностью является популярный фреймворк RxJava.

RxJava

Фреймворк RxJava позволяет использовать парадигму функционального реактивного программирования (FRP) в Android. Понять, что это значит, весьма непросто, поэтому требуется немало пояснений. Во-первых, слово функциональное означает то, что в FRP основным понятием являются функции и в этом эта парадигма схожа с обычным функциональным программированием. Конечно, в Android весьма затруднительно использовать полноценное функциональное программирование, но все же с помощью RxJava мы смещаем приоритет от объектов к функциям. Во-вторых, реактивное программирование означает программирование с асинхронными потоками данных. Проще всего это пояснить на практике: поток данных – это любой ваш запрос на сервер, данные из базы, да и обычный ввод данных от пользователя (если говорить совсем уж откровенно, то создать поток данных можно абсолютно из чего угодно), который чаще всего выполняется в фоновом потоке, то есть асинхронно. Если объединить два объяснения, то получим, что функциональное реактивное программирование – это программирование с асинхронными потоками данных, которыми можно манипулировать с помощью различных функций.

Определение звучит красиво, но только совсем непонятно, зачем это может понадобиться. Оказывается, еще как может. RxJava позволяет решить почти все проблемы, которые были озвучены во введении. В качестве основных преимуществ RxJava выделяются следующие:

  • Обеспечение многопоточности. RxJava позволяет гибко управлять асинхронностью выполнения запросов, а также переключать выполнение операций в различные потоки. Кроме того, что немаловажно для Android, RxJava также позволяет легко обрабатывать результат в главном потоке приложения.
  • Управление потоками данных. Это позволяет преобразовывать данные в потоке, применять операции к данным в потоке (к примеру, сохранять их данные из потока в базу), объединять несколько потоков в один, изменять поток в зависимости от результата другого и многое другое.
  • Обработка ошибок. Это еще одно очень важное преимущество RxJava, которое позволяет обрабатывать различные ошибки, возникающее в потоке, повторять серверные запросы в случае ошибки и передавать ошибки подписчикам.

И самое приятное, что все преимущества выше достигаются буквально за пару строчек кода!

Использовать RxJava непросто, а использовать ее правильно еще сложнее, и это требует достаточно долгого изучения. RxJava – это очень большой фреймворк, чтобы правильно работать с ним (и в частности с парадигмой функционального реактивного программирования), нужно очень много изучать и практиковаться. RxJava достойна отдельного курса, по ней существует огромное количество статей, и их число увеличивается с каждым днем. По RxJava написана уже не одна книга, поэтому нельзя надеяться, что можно хорошо изучить этот фреймворк в рамках одной лекции. Но мы постараемся рассмотреть основные возможности RxJava и то, как ее можно применить для Android.

Мы немного рассмотрели суть RxJava и функционального реактивного программирования, но чтобы продвинуться дальше, нам нужно знать основные элементы этого фреймворка, к примеру, что подразумевается под потоком данных и как его создать, что такое подписчик и как его использовать и так далее.

RxJava в качестве своей основы использует паттерн Observer. Как выглядит этот паттерн в классическом виде? Основными сущностями в нем являются объект, значение которого может изменяться, и подписчик, который может подписаться на эти изменения (каждый раз при изменении значения объекта у подписчика будет вызван определенный метод). Схематично это можно представить следующим образом:

 

Суть RxJava почти такая же, только вместо одного объекта подписчики используют целый поток данных. Подписчик может подписаться на поток данных, и тогда он будет получать информацию о каждом новом элементе в потоке, о произошедших ошибках, а также о завершении потока.

Тогда схема для RxJava будет такой:

С подписчиком все более менее понятно, а что же такое поток данных? Поток данных – это всего лишь набор каких-то элементов (необязательно конечный), которые передаются подписчику. В качестве потока данных могут выступать как простые объекты и последовательности, так и бесконечные, последовательности и различные события, к примеру, ввод данных.

Рассмотрим один из примеров, который поможет понять, где именно встречаются потоки данных и как в работе с ними может помочь RxJava. Допустим, стоит задача искать людей с определенным именем, когда пользователь вводит текст в поле поиска. Ее можно решить следующими простыми строчками кода:

Мы детально разберем каждый оператор далее, но пока только объясним, что происходит в коде, чтобы понять, насколько мощным инструментом является RxJava.

Во-первых, мы превращаем ввод текста в поток данных, за которым мы будем наблюдать. Наш поток может выглядеть, например, так: “J”, “Jo”, “Joh”, “John”, … Это поток данных.

Как уже говорилось, мы можем управлять данными в потоке. Допустим, мы не хотим, чтобы запрос выполнялся слишком часто. Для этого используется оператор debounce, который передает данные подписчику только тогда, когда между поступающими данным в потоке произошла пауза (500 миллисекунд в нашем примере). Тогда, к примеру, наш поток данных может стать таким: “J”, “John”, …

Далее с помощью оператора map все данные в потоке превращаются в строки нижнего регистра. После этого поток выглядит следующим образом: “j”, “john”, …

И после этого выполняется запрос на сервер для поиска человека с таким именем. В результате наш поток данных из строк превращается в поток данных из людей и может выглядеть вот так:

И более того, все это выполняется в фоновом потоке, а результат возвращается в главный поток приложения. Поразительный результат за 7 строчек кода!

Введение в RxJava

После краткого объяснения сути RxJava перейдем непосредственно к коду и способам работы с этим фреймворком. В RxJava в качестве потока данных выступает класс Observable. И как уже говорилось выше, поток данных может быть асинхронным, а также над каждым элементом в потоке или даже над всем потоком могут выполняться различные операции преобразования.

Рассмотрим простейший способ создания потока данных (Observable) из нескольких элементов с помощью метода just:

Все крайне просто. Мы создали поток данных из 3 элементов. Теперь осталось только подписаться на него и, к примеру, вывести в лог все элементы:

Сложно не заметить, что сейчас все резко перестало быть простым и компактным. Но это мы исправим, а пока все же поясним, что здесь написано. Как видно, в качестве подписчика на поток выступает реализация интерфейса Observer. В нем определено 3 метода. Метод onNext вызывается, когда подписчику передается следующий элемент из потока. Методы onError и onCompleted вызываются, когда в потоке данных происходит ошибка и когда поток данных завершается соответственно.

Теперь давайте изменим наш код, чтобы он стал короче и приятнее для чтения. Видно, что мы никак не используем методы onError и onCompleted, поэтому они нам не нужны. К счастью, у метода subscribe есть множество разных форм, и они позволяют использовать только нужные методы. К примеру, таким образом можно только обрабатывать вызов onNext:

А вот этот код уже сильно короче и понятнее. И мы к тому же не лишаемся возможности обрабатывать ошибки, нужно только вторым параметром передать этот обработчик:

Разумеется, ситуация с onComplete полностью аналогична.

А вот теперь очень интересный и справедливый вопрос. Это, конечно, круто, что мы использовали потоки данных, подписчики и мощь RxJava, но мы же только вывели данные в лог. Я могу сделать это немного проще:

И это, конечно, правильно. Чтобы показать превосходство RxJava, давайте устроим небольшое соревнование, где каждая задача будет реализовываться как с помощью RxJava, так и с помощью циклов.

Задача 1: Дан набор степеней двойки от 1 до 64 включительно, каждый их них нужно вывести в лог как в предыдущем примере. При этом вызов String.valueOf смотрится некрасиво, нужно переделать так, чтобы данные в потоке / массиве превращались в строки.

Какого-то заметного преимущества RxJava тут не заметно. Так что идем дальше.

Задача 2: Нужно изменить данные так, чтобы в лог попали только числа от 13 и выше:

И хоть заметной разницы в количестве кода нет, нужно сказать, что код с RxJava выглядит чище, логичнее и понятнее. Давайте добьем способы работы через циклы следующей задачей.

Задача 3: все операции над данными должны выполняться в фоновом потоке, а вывод данных в лог – в главном потоке приложения:

Решение предыдущей задачи в 2 строки – вот она, истинная мощь RxJava при работе с потоками (при этом код остается в разы чище и понятнее). А при использовании циклов нам пришлось создавать Executor для работы в фоне и Handler для передачи данные в главный поток приложения.

Даже такие игрушечные примеры показывают то, что у RxJava есть преимущества, которыми нужно пользоваться. Хотя, конечно, если говорить прямо, то RxJava и циклы – это вещи, которые нельзя сравнивать, так как это совершенно разные парадигмы.

Давайте продолжим знакомство с RxJava и ее различными операторами. И начнем с рассмотрения способов создания потоков данных.

Создание Observable

Мы уже видели использование метода just для создания Observable. Примерно аналогичным является метод from, который позволяет создать поток данных из списка элементов:

На самом деле все методы создания потоков данных в итоге вызывают стандартный метод Observable.create. Поэтому рассмотрим и этот способ. По сути, при использовании такого подхода мы сами управляем тем, какие данные передавать подписчику, и сами вызываем методы onNext, onError и onCompleted. Простой пример создания Observable через create:

Как видно в примере, мы вручную передаем данные в подписчик. При этом такой поток данных будет отдан каждому подписчику, который подпишется на созданный Observable.

Использовать метод create напрямую разработчикам не рекомендуется. Немного скажем об этом сейчас, а подробнее будет в дополнительной лекции по проблеме Backpressure. В этой же лекции будут рассмотрены и другие способы создания Observable.

Чем же неудобен метод create? Во-первых, вам нужно корректно обрабатывать все потенциальные ошибки и передавать их в подписчик самостоятельно. Во-вторых, нужно всегда следить за тем, что подписчик еще подписан на поток данных. К примеру, рассмотрим код подписки на Observable, который был описан выше:

В этом коде подписчик подписывается на получение данных и сразу от них отписывается. А код Observable ждет 300мс перед тем, как передать подписчику последний элемент, и в момент передачи подписчик уже отписался от получения данных. Поэтому происходит ошибка. Поэтому код в методе create нужно модифицировать, добавив проверку на то, что подписчику еще нужны данные из потока:

При этом такую проверку нужно добавлять перед каждым вызовом onNext, onError и onCompleted (здесь это опущено для удобства), что неудобно и часто приводит к ошибкам. Поэтому метод create использовать не нужно.

Кроме непосредственного создания потока данных, важно знать, как выполнять операции над данными в фоновом потоке, а также как обрабатывать результат в определенном потоке. Для этого служат операторы subscribeOn и observeOn, в которых очень часто путаются разработчики, только начинающие свое знакомство с RxJava.

Метод subscribeOn служит для указания потока, в котором выполняется код для создания данных в Observable. Если смотреть с точки зрения кода, то код в методе call в интерфейсе onSubscribe будет выполнен в том потоке, который был передан в метод subscribeOn. Метод observeOn указывает поток, в котором данные должны обрабатываться подписчиком. Можно запоминать так, что подписчик – это наблюдатель (observer) и вы указываете, где именно ему нужно наблюдать – метод observeOn. А оставшийся метод служит для указания, где должен работать поток данных.

Не совсем корректно говорить, что в методах observeOn и subscribeOn мы указываем потоки, нет – мы передаем в них экземпляры Scheduler, которые в том числе берут на себя работу по планированию задач. Можно создавать свои Scheduler, но обычно хватает использования стандартных:

  • io() – выполнение задач, которые не сильно нагружают процессор, но являются долгими. Это, к примеру, обращения к серверу или к базе данных. Размер пула потоков не ограничен.
  • computation() – выполнение вычислительных задач. Размер этого пула эквивалентен количеству ядер процессора.
  • newThread() – создает новый поток для каждой задачи.
  • immediate() – выполнение задачи в том же потоке, откуда вызывается Observable. Чаще всего его используют для тестирования.

Кроме того, в библиотеке RxAndroid есть AndroidSchedulers.mainThread(), который переносит выполнение кода подписчика в главный поток.

Исходя из объяснений понятно, что чаще всего вы будете использовать Schedulers.io() для запросов к серверу и базе данных, а в AndroidSchedulers.mainThread() будете обрабатывать результат. Поэтому код для большинства Observable будет выглядеть вот так:

Основные операторы

Рассмотрим основные операторы, которые вы можете использовать для преобразования данных в потоке. Каждый из этих операторов принимает в качестве значения какую-то функцию – в этом и реализуется парадигма функционального реактивного программирования.

Какие-то из операторов мы уже видели. Разумеется, это в первую очередь оператор map, который преобразовывает элемент в потоке в любой другой элемент. Его работу можно наглядно представлять с помощью следующей схемы:

Мы уже применяли оператор map ранее для преобразования числа в строки. Это не единственное применение, вы можете преобразовывать что угодно во что угодно, главное, чтобы это нужно было по логике вашего приложения.

Нужно сказать еще одно замечание: если для преобразования одного объекта в другой требуется несколько операций, лучше использовать несколько операторов map, чем писать несколько операций в одном операторе. Это сильно улучшит читаемость вашего кода. То есть вместо такого кода:

Второй вариант кода не является единственно правильным или более быстрым, но он проще для понимания и лучше согласуется с принципами функционального реактивного программирования.

Большинство из этих операторов либо просты, либо не очень часто употребляются, поэтому для следующих операторов мы приведем только краткое описание со схемой и без примеров кода.

Также популярным является оператор filter, который оставляет в потоке только данные, удовлетворяющие переданному в качестве параметра условию.

Также есть и другие методы, предназначенные для фильтрации данных в потоке, к примеру, skip, take, first и другие:

Существует еще очень большое количество различных операторов, полное описание и схему работы которых можно найти в документации, поэтому смысла перечислять их здесь нет.

В классе Observable определено много стандартных и полезных операторов. Но что делать, если вы хотите каким-то образом преобразовать Observable, а стандартные операторы позволяют сделать это за несколько действий или лишними строчками кода? К примеру, для каждого сетевого запроса вы наверняка будете писать следующий код для управления потоками:

И это, подчеркнем еще раз, будет для каждого запроса. Было бы логично, если бы могли использовать свой оператор для этой задачи. Для таких целей служит интерфейс Transformer. Создадим свой трансформер для потока данных:

И теперь мы можем использовать этот трансформер универсальным образом:

 

Преобразование потоков данных

Кроме непосредственно преобразования данных в потоке, вы можете управлять и самим потоком: к примеру, при наступлении определенного условия мы можете заменить один поток на другой, или же объединить выполнение нескольких потоков вместе или последовательно. Для этого есть другая большая группа операторов.

Чтобы выполнить последовательно несколько Observable, можно использовать метод concat, схема которого выглядит следующим образом:

И в коде этот метод можно использовать примерно так:

Аналогично, если вам нужно выполнить несколько Observable не последовательно, а параллельно (что часто бывает нужно, чтобы ускорить загрузку при выполнении нескольких запросов), то вы можете использовать оператор merge:

При этом порядок поступления элементов не определен. Этот метод используется аналогично методу concat:

Нужно также сказать, что и метод concat, и метод merge требуют, чтобы данные в Observable были одного типа, что не всегда удобно.

Есть и более интересный метод для параллельного выполнения запросов, который к тому же потом позволяет обработать результаты всех потоков вместе и преобразовать данные нужным образом. Это метод zip, который принимает на вход список Observable, которые будут выполняться параллельно, а также функцию для преобразования данных из всех запросов:

И его использование в коде:

В этом примере создается один поток данных из имен людей, еще один из возрастов, после этого эти потоки выполняются параллельно и преобразовываются в поток людей, используя данные из обоих исходных потоков.

Но, вероятно, одним из самых популярных операторов является оператор flatMap. Он похож на оператор map, но с небольшим исключением. Если map предназначен для преобразования объекта в объект, то flatMap каждый объект в потоке данных преобразует в Observable, а потом соединяет все получившиеся потоки данных. Схема этого оператора выглядит следующим образом:

Оператор flatMap часто используется для различных преобразований над потоками, к примеру, чтобы вернуть ошибку, в случае если пришли некорректные данные или что-то пошло не так, как задумывалось. Мы и сами использовали такой пример в прошлой лекции, когда обрабатывали изменение статуса запроса. Сейчас мы можем разобрать этот фрагмент кода с большим пониманием:

В первую очередь нужно заметить, что мы получаем только один элемент в Observable со статусами, и на основании него изменяем поток данных.

Во-первых, если статус запроса IN_PROGRESS, то нам больше не нужно продолжать выполнение запроса, поэтому мы возвращаем пустой поток данных. Во-вторых, если произошла ошибка (вот это и есть пример, когда никакого явного Exception не возникает, однако логически произошла ошибка), мы возвращаем Observable с ошибкой. И наконец, если запрос выполнился успешно, то мы меняем поток данных со статусом запрос на поток данных с прогнозом погоды. Здесь оператор flatMap демонстрирует всю свою мощь.

Это, вероятно, основные и наиболее часто используемые операторы. Однако снова нужно сказать, что существует еще очень большое количество различных операторов, каждый из которых хорошо подходит для определенного случая.

Продолжение: RxJava в Android

 [:en]The third lecture on the architecture of client-server android-applications, in which we will get acquainted with RxJava and the main operators, and also learn how to create Observable, convert data streams, work with RxJava on Android and solve the problem of Backpressure.
Links to the source code of your solutions you can leave in the comments. Share your decisions with the community, get feedback and constructive criticism. The best solutions will be published on our channel and the website fandroid.info with the authorship of the winners!

Introduction

RxJava

  1. Introduction to RxJava
  2. Creating an Observable
  3. Basic Operators
  4. Converting Data Streams

RxJava in Android

Additionally — Backpressure problem

  1. Using special operators
  2. Refusal to use Observable.create

Practical assignments

Links and useful resources

Introduction

Now we know a lot of ways to do some work in background streams. Under the work here is most often understood server requests. But how good is it with these methods? We must admit that they have many shortcomings.

First, these methods are not flexible. Let’s say you need to execute two requests in parallel, and then wait for both to complete the work, execute the third query using the results of previous queries. If you try to implement such a task using loaders, then most likely you will get a large number of boolean flags, fields to save the result and a lot of code. To the problems of flexibility can also be attributed to the fact that it is difficult to implement a periodic update of data.

Secondly, it is inconvenient to process errors. Again, in the case of loaders, we can return only one result. Of course, you can use special classes that will serve both for data transfer and error transmission, but this is inconvenient.

That’s why we need to consider other possibilities for performing network requests and for working with data. And first of all such possibility is the popular framework RxJava.

RxJava

The RxJava framework allows you to use the functional reactive programming paradigm (FRP) in Android. It is very difficult to understand what this means, therefore, many explanations are required. First, the word functional means that in FRP the basic concept is functions and in this this paradigm is similar to the usual functional programming. Of course, in Android it is very difficult to use full functional programming, but still with RxJava we shift the priority from objects to functions. Second, reactive programming means programming with asynchronous data flows. The easiest way to explain this is in practice: the data stream is any of your requests for a server, data from the database, and the usual input of data from the user (if you speak quite frankly, you can create a data stream from absolutely anything), which is most often performed In the background thread, that is, asynchronously. If we combine two explanations, then we get that functional reactive programming is programming with asynchronous data streams that can be manipulated using various functions.

The definition sounds nice, but it’s completely unclear why it might be needed. It appears, still as can. RxJava allows you to solve almost all the problems that were voiced in the introduction. The main advantages of RxJava are the following:

  • Ensuring multithreading. RxJava allows you to flexibly manage the asynchronous execution of queries, as well as switch the execution of operations in different threads. In addition, which is important for Android, RxJava also allows you to easily process the result in the main stream of the application.
  • Flow control. This allows you to convert data in a stream, apply operations to data in a stream (for example, save their data from a stream to a database), merge several threads into one, change the stream depending on the result of the other, and much more.
  • Error processing. This is another very important advantage of RxJava, which allows you to handle various errors that occur in the thread, repeat server requests in case of errors, and pass errors to subscribers.

And the most pleasant thing is that all the advantages above are achieved literally in a couple of lines of code!

It’s not easy to use RxJava, and using it correctly is even more difficult, and this requires a fairly long study. RxJava is a very large framework for correctly working with it (and in particular with the functional programming paradigm), it is necessary to study and practice a lot. RxJava is worthy of a separate course, there are a huge number of articles on it, and their number is increasing every day. RxJava has already written more than one book, so you can not hope that you can study this framework well in one lecture. But we will try to consider the main features of RxJava and how it can be applied to Android.

We took a little look at the essence of RxJava and functional reactive programming, but in order to move on, we need to know the basic elements of this framework, for example, what is meant by the flow of data and how to create it, what is the subscriber and how to use it, and so on.

RxJava uses the Observer pattern as its basis. What does this pattern look like in classical form? The main entities in it are an object whose value can change, and a subscriber who can subscribe to these changes (each time the value of the object is changed, a certain method will be called from the subscriber). Schematically it can be represented as follows:

The essence of RxJava is almost the same, but instead of one object subscribers use the whole data stream. A subscriber can subscribe to the data stream, and then he will receive information about each new element in the stream, about the errors that occurred, and about the completion of the thread.

Then the scheme for RxJava will be:

With a subscriber, it’s all the less clear, but what is a data stream? The data flow is just a collection of some elements (not necessarily the final one) that are transmitted to the subscriber. As the data stream can act as simple objects and sequences, and infinite, sequences and various events, for example, data entry.

Let’s consider one of the examples that will help to understand exactly where data flows meet and how RxJava can help with working with them. Let’s say the task is to search for people with a specific name when the user enters text in the search field. It can be solved by the following simple lines of code:

We’ll look at each statement in detail, but for now, just explain what’s happening in the code to see how powerful RxJava is.

First, we turn text input into a data stream, which we will observe. Our thread might look like this: «J», «Jo», «Joh», «John», … This is a data stream.

As already mentioned, we can manage the data in the stream. Let’s say we do not want the query to run too often. To do this, use the debounce operator, which transmits data to the subscriber only when there is a pause between incoming data in the stream (500 milliseconds in our example). Then, for example, our data stream can become like this: «J», «John», …

Next, using the map operator, all the data in the stream turns into lowercase letters. After that the thread looks like this: «j», «john», …

And then a request is made to the server to search for a person with that name. As a result, our data stream of strings turns into a data stream from people and can look like this:

Moreover, all this is done in the background thread, and the result is returned to the main application thread. Striking result for 7 lines of code!

Introduction to RxJava

After a brief explanation of the essence of RxJava, let’s go directly to the code and how to work with this framework. In RxJava, the Observable class acts as the data stream. And, as mentioned above, the data stream can be asynchronous, and also various conversion operations can be performed on each element in the stream or even over the entire flow.

Consider the simplest way to create a data stream (Observable) from several elements using the method just:

Everything is extremely simple. We created a data stream of 3 elements. Now it remains only to subscribe to it and, for example, to output to the log all the elements:

It’s hard not to notice that now everything has suddenly ceased to be simple and compact. But we will correct this, but for the time being we will explain what is written here. As you can see, the implementation of the Observer interface acts as a subscriber to the stream. It defines 3 methods. The onNext method is called when the next element from the stream is transferred to the subscriber. The onError and onCompleted methods are called when an error occurs in the data stream and when the data flow ends, respectively.

Now let’s change our code to make it shorter and more pleasant to read. It can be seen that we do not use onError and onCompleted methods, so we do not need them. Fortunately, the subscribe method has many different forms, and they allow you to use only the methods you need. For example, in this way, you can only handle the onNext call:

But this code is already much shorter and more understandable. And we, moreover, do not lose the ability to handle errors, we only need to pass this handler to the second parameter:

Of course, the situation with onComplete is completely analogous.

But now a very interesting and fair question. This, of course, is cool, that we used data streams, subscribers and RxJava’s power, but we just output the data to the log. I can make it a bit simpler:

And this, of course, is correct. To show the superiority of RxJava, let’s arrange a small competition, where each task will be implemented both with the help of RxJava, and with the help of loops.

Task 1: Given a set of powers of two from 1 to 64 inclusive, each of them must be output to the log as in the previous example. In this case, the call to String.valueOf looks ugly, it needs to be rewritten so that the data in the stream / array turns into strings.

There is no noticeable advantage to RxJava here. So we go further.

Task 2: It is necessary to change the data so that only numbers from 13 and above fall into the log:

And although there is no noticeable difference in the amount of code, I must say that the code with RxJava looks cleaner, more logical and understandable. Let’s get the ways of working through loops the next task.

Task 3: all data operations must be performed in the background thread, and the data output to the log is in the main application thread:

The solution of the previous task in 2 lines — here it is, the true power of RxJava when working with threads (while the code remains much cleaner and more understandable). And when using cycles, we had to create Executor to work in the background and Handler to transfer the data to the main application thread. [Wpanchor id = «4»]

Even such toy examples show that RxJava has the advantages to use. Although, of course, to put it bluntly, RxJava and cycles are things that can not be compared, since these are completely different paradigms.

Let’s continue our acquaintance with RxJava and its various operators. And start by considering how to create data streams.

Creating an Observable

We’ve already seen using the just method to create the Observable. Approximately similar is the from method, which allows you to create a data stream from the list of items:

In fact, all methods of creating data streams eventually call the standard method Observable.create. Therefore, we consider this method. In fact, when using this approach, we control which data to send to the subscriber, and we call the onNext, onError, and onCompleted methods ourselves. A simple example of creating Observable through create:

As you can see in the example, we manually transfer the data to the subscriber. In this case, such a data stream will be given to each subscriber who will subscribe to the created Observable.

Use the create method directly to developers is not recommended. Let’s say a few words about this now, and more will be in the extra lecture on the problem of Backpressure. In the same lecture, other ways of creating Observable will be considered.

Why is the create method inconvenient? First, you need to handle all potential errors correctly and transfer them to the subscriber yourself. Secondly, you must always ensure that the subscriber is still subscribed to the data stream. For example, consider the subscription code for Observable, which was described above:

In this code, the subscriber subscribes to receive data and immediately unsubscribes from them. And the Observable code waits 300ms before sending the last element to the subscriber, and at the time of the transfer, the subscriber has unsubscribed from receiving the data. Therefore, an error occurs. Therefore, the code in the create method needs to be modified by adding a check to the fact that the subscriber still needs data from the stream:

At the same time, this check should be added before each call onNext, onError and onCompleted (here it is omitted for convenience), which is inconvenient and often leads to errors. Therefore, you do not need to use the create method.

In addition to directly creating a data stream, it is important to know how to perform operations on data in the background thread, and how to process the result in a specific thread. For this purpose, the operators subscribeOn and observeOn serve, in which developers are often confused, just starting their acquaintance with RxJava.

The subscribeOn method is used to specify the thread in which the code is run to create data in the Observable. If you look from the point of view of the code, the code in the call method in the onSubscribe interface will be executed in the thread that was passed to the subscribeOn method. The observeOn method specifies the stream in which the data is to be processed by the subscriber. You can remember that the subscriber is an observer and you specify where exactly he needs to observe the observeOn method. And the remaining method is used to indicate where the data stream should work.

It’s not entirely correct to say that in the methods observeOn and subscribeOn we specify threads, no — we pass in them Scheduler instances, which, among other things, take on the task of scheduling tasks. You can create your own Scheduler, but you usually use the standard ones:

  • Io () — execution of tasks that do not heavily burden the processor, but are long. This, for example, calls to the server or to the database. The size of the thread pool is unlimited.
  • Computation () — execution of computational tasks. The size of this pool is equivalent to the number of processor cores.
  • NewThread () creates a new thread for each task.
  • Immediate () — execution of the task in the same thread from which the Observable is called. Most often it is used for testing.

In addition, the RxAndroid library has AndroidSchedulers.mainThread (), which transfers the execution of the subscriber code to the main thread.

Based on the explanations it is clear that most often you will use Schedulers.io () for queries to the server and database, and in AndroidSchedulers.mainThread () will process the result. Therefore, the code for most Observable will look like this:

Basic Operators

Consider the basic operators that you can use to convert data in a stream. Each of these operators takes a function as a value — in this, the paradigm of functional reactive programming is realized.

We have already seen some of the operators. Of course, this is primarily a map operator, which converts an element in a stream to any other element. Its work can be visually represented using the following scheme:

We already used the map operator earlier to convert a number to a string. This is not the only application, you can convert anything into anything, the main thing is that it should be according to the logic of your application.

I need to say one more thing: if several operations are required to convert one object to another, it is better to use several map operators than to write several operations in one statement. This will greatly improve the readability of your code. That is, instead of this code:

The second version of the code is not the only correct one or the faster one, but it is easier to understand and better consistent with the principles of functional reactive programming.

Most of these operators are either simple or not very often used, so for the following operators, we give only a brief description with a schema and without code examples.

Also popular is the filter operator, which leaves only data in the stream that satisfy the condition passed to the parameter.

There are also other methods for filtering data in the stream, for example, skip, take, first and others:

There is still a very large number of different operators, a full description and the scheme of which can be found in the documentation, so there is no sense in listing them here.

In the Observable class, many standard and useful operators are defined. But what if you want to somehow transform Observable, and standard operators allow you to do this in a few actions or with extra lines of code? For example, for each network request, you will probably write the following code to control the flow:

And this, we emphasize once again, will be for every request. It would be logical if we could use our operator for this task. For such purposes, the Transformer interface is used. Create your own transformer for the data stream:

And now we can use this transformer in a universal way:

Converting Data Streams

In addition to directly converting data in a stream, you can manage the flow itself: for example, when a certain condition occurs, you can replace one stream with another, or you can combine the execution of several threads together or sequentially. For this, there is another large group of operators.

To execute several Observable consecutively, you can use the concat method, whose schema looks like this:

And in code this method can be used like this:

Likewise, if you need to execute several Observables not sequentially, but in parallel (which is often necessary to speed up the load when running multiple requests), then you can use the merge operator:

In this case, the order of receipt of the elements is not defined. This method is used similarly to the concat method:

It must also be said that both the concat method and the merge method require that the data in the Observable be of the same type, which is not always convenient.

There is also a more interesting method for parallel execution of queries, which also allows you to process the results of all threads together and convert the data in the right way. This is a zip method that takes an input list of Observable, which will be executed in parallel, as well as a function for converting data from all queries:

And its use in the code:

In this example, one data stream is created from the names of people, one more of the ages, then these threads are executed in parallel and converted into a stream of people using data from both source streams.

But, probably, one of the most popular operators is the flatMap operator. It is similar to the map operator, but with a small exception. If map is for converting an object to an object, flatMap converts each object into a data stream into an Observable, and then merges all the resulting data streams. The scheme of this operator is as follows:

The flatMap operator is often used for various transformations over threads, for example, to return an error, if incorrect data came or something went wrong as intended. We ourselves used this example in the last lecture, when we processed the change in the status of the request. Now we can parse this code fragment with a lot of understanding:

First of all, it should be noted that we get only one element in the Observable with statuses, and on the basis of it we change the data flow.

First, if the status of the IN_PROGRESS request, then we no longer need to continue executing the query, so we return an empty data stream. Secondly, if an error occurred (this is an example where no explicit Exception occurs, but a logical error occurred), we return the Observable with an error. Finally, if the query succeeds, we change the data stream with the status of a request for a data stream with a weather forecast. Here, flatMap demonstrates all its power.

This is probably the main and most commonly used operators. However, again it must be said that there are still a very large number of different operators, each of which is well suited for a particular case.

Continuation: RxJava in Android

 [:]

Добавить комментарий