[:ru]Лекция 3 по архитектуре андроид-приложений. Часть 2. RxJava в Android [:en]Lecture 3 on the architecture of android applications. Part 2. RxJava on Android[:]

[:ru]Вторая часть Лекции 3 Курса по архитектуре клиент-серверных андроид-приложений.

Введение

RxJava

  1. Введение в RxJava
  2. Создание Observable
  3. Основные операторы
  4. Преобразование потоков данных

RxJava в Android

Дополнительно – проблема Backpressure

  1. Использование специальных операторов
  2. Отказ от использования Observable.create

Практические задания[wpanchor id=”1″]

Ссылки и полезные ресурсы

RxJava в Android

Поскольку RxJava – это фреймворк для Java, нет ничего удивительного в том, что он полностью поддерживается в Android. Мы уже убедились, что RxJava – это мощный инструмент для управления асинхронными потоками данных, разобрали основные операторы, и это все, разумеется, можно применить в разработке под Android.

Вероятно, самая главная причина широкой популярности RxJava в Android – это поддержка RxJava в Retrofit. Да, самая популярная библиотека для сетевых запросов позволяет не только возвращать данные с сервера, но и оборачивать их в Observable, чтобы удобно управлять этими данными и работать с асинхронностью.

Давайте разберем, что нужно сделать, чтобы использовать RxJava для сетевых запросов с помощью Retrofit. На самом деле, практически ничего! Нужно изменить сервис для сетевых запросов, чтобы он возвращал Observable:

public interface MovieService {



@GET("popular/")

Observable<MoviesResponse> popularMovies();

}

И добавить адаптер для Retrofit при создании этих сервисов:

@NonNull

private static MovieService createService() {

return new Retrofit.Builder()

.baseUrl(BuildConfig.API_ENDPOINT)

.client(getClient())

.addConverterFactory(GsonConverterFactory.create())

.addCallAdapterFactory(RxJavaCallAdapterFactory.create())

.build()

.create(MovieService.class);

}

И это все! Теперь мы можем асинхронно выполнять запросы, обрабатывать и получать данные, а также обрабатывать ошибки:

ApiFactory.getMoviesService()

.popularMovies()

.map(MoviesResponse::getMovies)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(this::showMovies, throwable -> showError());

Добавим также показ прогресс бара при старте загрузки и его скрытие после окончания запроса:

ApiFactory.getMoviesService()

.popularMovies()

.map(MoviesResponse::getMovies)

       .doOnSubscribe(mLoadingView::showLoadingIndicator)

       .doAfterTerminate(mLoadingView::hideLoadingIndicator)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(this::showMovies, throwable -> showError());

Что мы можем добавить еще? Разумеется, кэширование данных. Это очень легко исправить – преобразуем Observable так, чтобы он сохранял список элементов:

ApiFactory.getMoviesService()

.popularMovies()

.map(MoviesResponse::getMovies)

.flatMap(movies -> {

Realm.getDefaultInstance().executeTransaction(realm -> {

realm.delete(Movie.class);

realm.insert(movies);

});

return Observable.just(movies);

})

Есть и еще возможность улучшить код с использованием RxJava и показать способ обработки ошибок. Предположим, что мы не смогли получить данные с сервера, но у нас есть закэшированные данные. Тогда в случае ошибки мы можем отобразить их. Это сделать с помощью метода onErrorResumeNext. Этот метод в случае возникновения ошибки в исходном потоке данных меняет его на другой поток данных, который передается в параметре. В нашем случае это будет поток данных, основанный на закэшированных элементах:

.onErrorResumeNext(throwable -> {

Realm realm = Realm.getDefaultInstance();

RealmResults<Movie> results = realm.where(Movie.class).findAll();

return Observable.just(realm.copyFromRealm(results));

})

Конечно, сложность кода уже значительно выросла, но если вспомнить, что при этом мы реализовали и асинхронность запроса, и показ прогресса, и кэширование, и обработку ошибок, то нельзя не признать, что с RxJava все выглядит достаточно неплохо. К тому же обычно код для Observable распределяется так, чтобы UI-классы не работали с кэшированием, поэтому все становится еще проще. Но об этом в последующих лекциях.

Разумеется, нужно быть справедливым и сказать про недостатки, которые есть в RxJava применительно к Android. Основной недостаток – это наша любимая проблема с жизненным циклом. RxJava – это фреймворк для Java, которая ничего не знает про проблемы Android. Поэтому решений “из коробки” здесь нет. Но есть масса других возможностей.

Во-первых, мы, разумеется, можем воспользоваться всем арсеналом, который у нас появился за счет прошлых лекций: retain фрагменты, лоадеры, сервисы. Да, это может быть неудобно, но с помощью них мы можем как решить проблему жизненного цикла, так и сохранить мощь RxJava.

Во-вторых, мы можем воспользоваться средствами самой RxJava. В первую очередь можно вспомнить, что метод subscribe у Observable возвращает объект Subscription, который позволяет отписываться от потока данных. Это не до конца решает проблему обработки жизненного цикла, но, по крайней мере, позволяет нам отписываться от запросов при выходе с экрана:

@Nullable

private Subscription mMoviesSubscription;

//..

 

mMoviesSubscription = ApiFactory.getMoviesService()

.popularMovies()

.map(MoviesResponse::getMovies)

//...



@Override

protected void onPause() {

if (mMoviesSubscription != null) {

mMoviesSubscription.unsubscribe();

}

super.onPause();

}

Также у Observable есть метод cache, который позволяет выполнять запрос только один раз. При повторной подписке будет возвращен старый результат. Но при этом нужно каким-то образом сохранять Observable (к примеру, в лоадере).

.doOnSubscribe(mLoadingView::showLoadingIndicator)

.doAfterTerminate(mLoadingView::hideLoadingIndicator)

.cache()

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(this::showMovies, throwable -> showError());

И в-третьих, есть некоторые библиотеки, которые созданы для решения проблемы жизненного цикла при использовании RxJava. К примеру, это библиотека RxLifecycle от Trello. То есть такие проблемы можно решить, и это ничуть не сложнее чем при использовании других средств для выполнения запросов.

Таким образом, RxJava для нас – это крайне удобный инструмент для управления запросами, многопоточностью и обработкой ошибок. Кроме того, нельзя забывать про то, что существует большое количество библиотек, которые позволяют использовать различные компоненты Android в реактивном стиле. Вероятно, самой популярной из таких библиотек является библиотека RxBindings, которая помогает работать с View в реактивном стиле.[wpanchor id=”2″]

Применять RxJava в Android-разработке, конечно, необязательно, но ее использование может очень сильно упростить решений многих задач, поэтому мы будем использовать RxJava на протяжении всех будущих лекций.

 

Дополнительно – проблема Backpressure

Как мы уже поняли, RxJava – это очень мощный инструмент, но им в то же время надо пользоваться очень осторожно. Одна из таких предосторожностей – это отказ от использования Observable.create(). Мы уже видели, что это не делает код очевиднее, при этом нам приходится еще и следить за тем, что подписчик не отписался, иначе ему нельзя передавать данные.

Но тут есть и более серьезная проблема, которой подвержено немалое количество приложений. Эта проблема связана с тем, что Observable может отдавать данные быстрее, чем подписчик сможет их обрабатывать, и она известна под названием Backpressure.

Такая ситуация вполне реальна, ведь данные обычно грузятся в фоне, а подписчик работает в главном потоке приложения, который может быть заблокирован какой-то операцией. К тому же, код для работы вашего подписчика может сам по себе быть не быстрым, к примеру, перерисовывать UI.

Давайте приведем минимальный код, который позволит нам воспроизвести проблему Backpressure:

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {

@Override

public void call(Subscriber<? super String> subscriber) {

for (int i = 0; i < 1000; i++) {

subscriber.onNext(i + "");

}

}

});



observable.observeOn(Schedulers.computation())

.subscribe(System.out::println, throwable -> {

System.out.println("error: " + throwable);

});

Казалось бы, что это самый обычный код, который вы можете встретить в любом приложении, и, тем не менее, он приводит к крашу (в данном случае в обработчик onError). Проблема заключается в том, что вызов System.out.println все-таки ощутимо медленнее, чем [wpanchor id=”3″]обычный вызов onNext у подписчика. И с такой проблемой вы можете столкнуться в любой ситуации. Проблема изначально заключается в методе Observable.create. И не зря его просят объявить как deprecated. Но раз такая проблема есть, давайте рассмотрим два различных способа, как защититься от нее.

1. Использование специальных операторов

Во-первых, есть большая группа операторов, позволяющих использовать только один элемент из всех, которые пришли за определенное время / количество элементов. К примеру, оператор sample позволяет получать только самый последний элемент из всех элементов во временном периоде. Схема его работы выглядит следующим образом:

[:ru]Лекция 3 по архитектуре андроид-приложений. Часть 2. RxJava в Android [:en]Lecture 3 on the architecture of android applications. Part 2. RxJava on Android[:]

Например, в следующем примере оператор sample позволит использовать только одно значение один раз в 10 микросекунд:

createBackpressureObservable()

.sample(10, TimeUnit.MICROSECONDS)

.observeOn(Schedulers.computation())

.subscribe(System.out::println, throwable -> {

System.out.println("error: " + throwable);

});

В качестве результата мы получим примерно такой вывод (который будет отличаться на разных запусках):

233

419

536

658

797

946

Такой оператор удобно использовать, если, к примеру, ваши данные обновляются очень часто, но при этом вам нужно только последнее значение. Вы не хотите слишком часто обновлять свой UI, поэтому можете настроить sample на секунду, к примеру.

Еще одним оператором из такой группы является оператор debounce, который отдает данные подписчику только в том случае, если уже достаточно долго (время ожидания является параметром) нет поступления новых данных. Схема оператора debounce:

[:ru]Лекция 3 по архитектуре андроид-приложений. Часть 2. RxJava в Android [:en]Lecture 3 on the architecture of android applications. Part 2. RxJava on Android[:]

Самый типичный пример использования оператора debounce – ввод данных пользователем. Если вы хотите подождать, пока пользователь закончит ввод данных, прежде чем выполнить запрос, стоит обратиться к оператору debounce.

Предыдущие операторы всегда отдавали только один элемент из группы, при этом остальные элементы пропадали. Такой подход удобен, но он не всегда применим. Иногда нам нужно обработать все поступающие данные. Логичным решением для такого случая будет использование какого-то буфера, который будет накапливать данные, а потом отдаст данные в подписчик, который обработает все данные в списке разом. Для этого существует оператор buffer, который собирает данные, пока не пройдет достаточного количества времени либо пока не наберется достаточного элемента (с этой возможностью нужно быть крайне аккуратным, чтобы не попасть в ситуацию, когда нужного количества элементов никогда не наберется). Схема оператора buffer:

[:ru]Лекция 3 по архитектуре андроид-приложений. Часть 2. RxJava в Android [:en]Lecture 3 on the architecture of android applications. Part 2. RxJava on Android[:]

Можно применить его и для решения проблемы из нашего примера:

createBackpressureObservable()

.buffer(100)

.observeOn(Schedulers.computation())

.subscribe(System.out::println, throwable -> {

System.out.println("error: " + throwable);

});

[wpanchor id=”4″]Теперь мы выводим не каждый элемент, а порциями по 100 элементов – вероятность получения MissingBackpressureException стремится к нулю.

2. Отказ от использования Observable.create

На самом деле, проблеме Backpressure подвержен только способ создания Observable через метод create. Самое простое решение – отказаться от его использования. Сейчас это возможно. Рассмотрим несколько ситуаций, где вы можете заменить использование create с помощью других операторов.

Во-первых, если у вас есть несколько уже готовых элементов или их список, вы можете использовать методы just, from и другие.

Во-вторых, если у вас есть какой-то тяжелый код, возвращающий один элемент, используйте fromCallable:

Observable.fromCallable(new Callable<List<Integer>>() {

@Override

public List<Integer> call() throws Exception {

//some long-running operation

       return getUserIdsFromDatabase();

}

});

И, наконец, самая главная причина использования метода create – “завернуть” какой-то асинхронный вызов в Observable, чтобы его дальше можно было использовать в реактивном стиле. Один из примеров – использование различных сенсоров. Корректная реализация такого подхода с использованием метода create выглядит следующим образом:

return Observable.create(new Observable.OnSubscribe<SensorEvent>() {

@Override

public void call(final Subscriber<? super SensorEvent> subscriber) {

SensorEventListener sensorEventListener = new SensorEventListener() {

@Override

public void onSensorChanged(SensorEvent event) {

if (subscriber != null && !subscriber.isUnsubscribed()) {

subscriber.onNext(event);

}

}



@Override

public void onAccuracyChanged(Sensor sensor, int accuracy) {

// do nothing

           }

};

sensorManager.registerListener(sensorEventListener, sensor, samplingPeriodUs);

subscriber.add(new MainThreadSubscription() {

@Override

protected void onUnsubscribe() {

sensorManager.unregisterListener(sensorEventListener);

}

});

}

});

Как видно, нам приходится бороться и с проблемой того, что подписчик может отписаться, и с тем, чтобы корректно отписаться от событий сенсора, и с проблемой Backpressure (так как сенсоры могут быть очень чувствительными и отдавать данные очень часто), которую мы рассмотрели ранее. При этом это единственный корректный способ, который содержит много возможностей для ошибок и к тому же очень длинный.

Кажется, что без create здесь не обойтись, но способ есть, он реализуется за счет операции fromEmitter и выглядит следующим образом:

return Observable.fromEmitter(new Action1<AsyncEmitter<SensorEvent>>() {

@Override

public void call(final AsyncEmitter<SensorEvent> sensorEventAsyncEmitter) {

final SensorEventListener sensorListener = new SensorEventListener() {

@Override

public void onSensorChanged(SensorEvent sensorEvent) {

sensorEventAsyncEmitter.onNext(sensorEvent);

}



@Override

public void onAccuracyChanged(Sensor originSensor, int i) {

// do nothing

           }

};



sensorEventAsyncEmitter.setCancellation(() -> sensorManager.unregisterListener(sensorListener, sensor));

sensorManager.registerListener(sensorListener, sensor, samplingPeriodUs);

}

}, AsyncEmitter.BackpressureMode.LATEST);

В чем заключается разница между этим кодом и предыдущим? Во-первых, он автоматически требует решать проблему Backpressure за счет последнего параметра в методе fromEmitter. Возможные значения этого параметра покрывают все случаи:

enum BackpressureMode {

NONE,

ERROR,

BUFFER,

DROP,

LATEST

}

Таким образом вы можете (и должны) полностью отказаться от метода Observable.create![wpanchor id=”5″]

Пактические примеры

Скачайте проект Проект RxJavaSamples

1)Поток строк в поток чисел с предикатом

2)Получение уникальных данных из потока до выполнения условия

3)Сумма всех чисел в потоке данных

4)Переключение между потоками данных

5)Поток из наибольших общих делителей для элементов из входных потоков

6)Поток с долгим вычислением

Практическое задание

Скачать Проект PopularMovies

1) Условие задачи в классе ru.gdgkazan.popularmovies.screen.details.MovieDetailsActivity

2)Выполнить два запроса (на получение трейлеров и отзывов) параллельно

3)Отображать процесс загрузки данных[wpanchor id=”6″]

4)Сохранить данные в базу и использовать закэшированные данные в случае ошибки

5)Обрабатывать пересоздание Activity

Ссылки и полезные ресурсы

  1. Приложения из репозитория:
    1. RxJavaSamples – примеры кода с RxJava и практические задачки.
    2. PopularMovies – пример использования RxJava в Android для выполнения сетевых запросов и практическое задание.
    3. BackpressureProblem – демонстрация примеров MissingBackpressureException и способов решения этой проблемы.
  2. Хорошее введение в функциональное реактивное программирование.
  3. Документация по Rx.
  4. Вики по RxJava.
  5. Большой список различных статей по Rx и RxJava.
  6. Вводные статьи про RxJava и RxAndroid и их перевод на хабре.
  7. Вводные видео про RxJava.
  8. Документация по операторам из Rx.
  9. Интерактивный список операторов из Rx.
  10. Хорошая статья про использование RxJava для загрузки данных из нескольких источников.
  11. Доклад про использования RxJava в Яндекс.
  12. Книга RxJava Essentials.
  13. Документация про Subjects.
  14. Проблема Backpressure: описание, способы борьбы с ней и ответ на SO, статья про fromEmitter.

Продолжение:

Лекция 4 по архитектуре андроид приложения. Clean Arcitecture

[:en]The second part Lectures 3 Course on the architecture of client-server android applications.

Introduction

RxJava

  1. Introduction to RxJava
  2. Creating an Observable
  3. Basic Operators
  4. Converting Data Streams

RxJava on Android

Additional – Backpressure problem

  1. Using special operators
  2. Abandon the use of Observable.create

Practical assignments[wpanchor id=”1″]

Links and useful resources

RxJava on Android

Since RxJava is a Java framework, there is nothing surprising in that it is fully supported in Android. We have already seen that RxJava is a powerful tool for managing asynchronous data flows, disassembled the main operators, and this, of course, can be applied in development for Android.

Probably the most important reason for RxJava’s widespread popularity in Android is the support for RxJava in Retrofit. Yes, the most popular library for network requests allows not only to return data from the server, but also wrap them in Observable, so that it is convenient to manage this data and work with asynchrony.

Let’s look at what you need to do to use RxJava for network queries with Retrofit. In fact, almost nothing! You need to change the service for network requests so that it returns Observable:

public interface MovieService {



@GET("popular/")

Observable<MoviesResponse> popularMovies();

}

И добавить адаптер для Retrofit при создании этих сервисов:

@NonNull

private static MovieService createService() {

return new Retrofit.Builder()

.baseUrl(BuildConfig.API_ENDPOINT)

.client(getClient())

.addConverterFactory(GsonConverterFactory.create())

.addCallAdapterFactory(RxJavaCallAdapterFactory.create())

.build()

.create(MovieService.class);

}

And it’s all! Now we can asynchronously execute requests, process and receive data, and also handle errors:

ApiFactory.getMoviesService()

.popularMovies()

.map(MoviesResponse::getMovies)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(this::showMovies, throwable -> showError());

Добавим также показ прогресс бара при старте загрузки и его скрытие после окончания запроса:

ApiFactory.getMoviesService()

.popularMovies()

.map(MoviesResponse::getMovies)

       .doOnSubscribe(mLoadingView::showLoadingIndicator)

       .doAfterTerminate(mLoadingView::hideLoadingIndicator)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(this::showMovies, throwable -> showError());

What can we add more? Of course, data caching. This is very easy to fix – we convert Observable so that it keeps a list of elements:

ApiFactory.getMoviesService()

.popularMovies()

.map(MoviesResponse::getMovies)

.flatMap(movies -> {

Realm.getDefaultInstance().executeTransaction(realm -> {

realm.delete(Movie.class);

realm.insert(movies);

});

return Observable.just(movies);

})

There is also the opportunity to improve the code using RxJava and show how to handle errors. Suppose we could not get data from the server, but we have cached data. Then in case of error, we can display them. This is done using the onErrorResumeNext method. This method, in the event of an error in the source data stream, changes it to another data stream that is transmitted in the parameter. In our case, this will be a data stream based on cached elements:

.onErrorResumeNext(throwable -> {

Realm realm = Realm.getDefaultInstance();

RealmResults<Movie> results = realm.where(Movie.class).findAll();

return Observable.just(realm.copyFromRealm(results));

})

Of course, the complexity of the code has already grown significantly, but if we remember that while we implemented asynchronous query, showing progress, and caching, and error handling, we can not but admit that everything looks good with RxJava. In addition, usually the code for Observable is distributed so that UI classes do not work with caching, so everything becomes even easier. But about this in later lectures.

Of course, you need to be fair and say about the shortcomings that are in RxJava with reference to Android. The main drawback is our favorite problem with the life cycle. RxJava is a Java framework that does not know anything about Android problems. Therefore, decisions “out of the box” are not here. But there are many other possibilities.

First, of course, we can take advantage of the whole arsenal, which we had at the expense of past lectures: retain fragments, loaders, services. Yes, it can be inconvenient, but with the help of them we can both solve the problem of the life cycle, and preserve the power of RxJava.

Secondly, we can use the resources of RxJava itself. First of all, you can remember that the subscribe method of Observable returns a Subscription object that allows you to unsubscribe from the data stream. This does not fully solve the problem of processing the life cycle, but at least it allows us to unsubscribe from queries when exiting the screen:

@Nullable

private Subscription mMoviesSubscription;

//..

 

mMoviesSubscription = ApiFactory.getMoviesService()

.popularMovies()

.map(MoviesResponse::getMovies)

//...



@Override

protected void onPause() {

if (mMoviesSubscription != null) {

mMoviesSubscription.unsubscribe();

}

super.onPause();

}

Also, Observable has a cache method that allows you to execute the query only once. When you resubscribe, the old result will be returned. But you need to somehow save Observable (for example, in a loader).

.doOnSubscribe(mLoadingView::showLoadingIndicator)

.doAfterTerminate(mLoadingView::hideLoadingIndicator)

.cache()

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(this::showMovies, throwable -> showError());

And thirdly, there are some libraries that are created to solve the life cycle problem when using RxJava. For example, this is a library RxLifecycle

From Trello. That is, such problems can be solved, and this is not at all more difficult than using other means to perform queries.

Thus, RxJava for us is an extremely convenient tool for managing queries, multithreading and error handling. In addition, we must not forget that there are a large number of libraries that allow you to use various components of Android in a reactive style. Probably the most popular of these libraries is the library RxBindings, which helps to work with View in a reactive style. [Wpanchor id = “2”]

Of course, it’s not necessary to use RxJava in Android development, but its use can greatly simplify the solutions of many tasks, so we will use RxJava throughout all future lectures.

Additional – Backpressure problem

As we have already understood, RxJava is a very powerful tool, but at the same time it should be used very carefully. One such precaution is the failure to use Observable.create (). We have already seen that this does not make the code more obvious, but we also have to watch for the fact that the subscriber did not unsubscribe, otherwise he can not transmit data.

But there is a more serious problem, which is subject to a considerable number of applications. This problem is due to the fact that Observable can give data faster than the subscriber can process it, and it is known as Backpressure.

This situation is quite real, because the data is usually loaded in the background, and the subscriber is working in the main thread of the application, which can be blocked by some operation. In addition, the code for the operation of your subscriber may itself not be fast, for example, to redraw the UI.

Let’s give a minimal code that will allow us to reproduce the problem of Backpressure:

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {

@Override

public void call(Subscriber<? super String> subscriber) {

for (int i = 0; i < 1000; i++) {

subscriber.onNext(i + "");

}

}

});



observable.observeOn(Schedulers.computation())

.subscribe(System.out::println, throwable -> {

System.out.println("error: " + throwable);

});

It would seem that this is the most common code that you can find in any application, and, nevertheless, it leads to the crash (in this case, the onError handler). The problem is that the call to System.out.println is still noticeably slower than [wpanchor id = “3”] the usual onNext call on the subscriber. And you can face such a problem in any situation. The problem is initially the Observable.create method. And knowingly it is asked to declare it as deprecated. But since there is such a problem, let’s look at two different ways how to protect ourselves from it.

1. Using special operators

First, there is a large group of operators that allow you to use only one element of all that came in a certain time / number of elements. For example, the sample operator allows you to get only the most recent element from all elements in the time period. The scheme of his work is as follows:

[:ru]Лекция 3 по архитектуре андроид-приложений. Часть 2. RxJava в Android [:en]Lecture 3 on the architecture of android applications. Part 2. RxJava on Android[:]

For example, in the following example, the sample operator allows only one value to be used once every 10 microseconds:

createBackpressureObservable()

.sample(10, TimeUnit.MICROSECONDS)

.observeOn(Schedulers.computation())

.subscribe(System.out::println, throwable -> {

System.out.println("error: " + throwable);

});

As a result, we get about such a conclusion (which will differ on different starts):

233

419

536

658

797

946

This operator is convenient to use if, for example, your data is updated very often, but you only need the last value. You do not want to update your UI too often, so you can set the sample for a second, for example.

Another operator from such a group is the debounce operator, which gives data to the subscriber only if it is already long enough (waiting time is a parameter) that no new data is received. The debounce statement schema:

[:ru]Лекция 3 по архитектуре андроид-приложений. Часть 2. RxJava в Android [:en]Lecture 3 on the architecture of android applications. Part 2. RxJava on Android[:]

The most typical example of using the debounce operator is user input. If you want to wait for the user to complete the data entry, before you execute the query, you should contact the debounce operator.

Previous operators always gave only one element from the group, with the remaining elements disappearing. This approach is convenient, but it is not always applicable. Sometimes we need to process all incoming data. The logical decision for such a case will be the use of some buffer that will accumulate data, and then give the data to the subscriber, which will process all the data in the list at once. To do this, there is a buffer operator that collects the data until it has elapsed enough time or until a sufficient element is found (with this possibility one needs to be extremely careful not to get into a situation where the required number of elements is never typed). Buffer operator’s scheme:

[:ru]Лекция 3 по архитектуре андроид-приложений. Часть 2. RxJava в Android [:en]Lecture 3 on the architecture of android applications. Part 2. RxJava on Android[:]

You can apply it to solve the problem from our example:

createBackpressureObservable()

.buffer(100)

.observeOn(Schedulers.computation())

.subscribe(System.out::println, throwable -> {

System.out.println("error: " + throwable);

});

[wpanchor id=”4″]Now we do not display every element, but in 100-element batches – the probability of getting MissingBackpressureException tends to zero.

2. Abandon the use of Observable.create

In fact, only the way to create Observable through the create method is affected by the Backpressure problem. The simplest solution is to refuse to use it. Now it’s possible. Consider several situations where you can replace the use of create with the help of other operators.

First, if you have several ready-made elements or their list, you can use the methods just, from and others.

Secondly, if you have some heavy code that returns one element, use fromCallable:

Observable.fromCallable(new Callable<List<Integer>>() {

@Override

public List<Integer> call() throws Exception {

//some long-running operation

       return getUserIdsFromDatabase();

}

});

And, finally, the most important reason for using the create method is to wrap some asynchronous call in the Observable so that it can be used in a reactive style. One example is the use of different sensors. The correct implementation of this approach using the create method is as follows:

return Observable.create(new Observable.OnSubscribe<SensorEvent>() {

@Override

public void call(final Subscriber<? super SensorEvent> subscriber) {

SensorEventListener sensorEventListener = new SensorEventListener() {

@Override

public void onSensorChanged(SensorEvent event) {

if (subscriber != null && !subscriber.isUnsubscribed()) {

subscriber.onNext(event);

}

}



@Override

public void onAccuracyChanged(Sensor sensor, int accuracy) {

// do nothing

           }

};

sensorManager.registerListener(sensorEventListener, sensor, samplingPeriodUs);

subscriber.add(new MainThreadSubscription() {

@Override

protected void onUnsubscribe() {

sensorManager.unregisterListener(sensorEventListener);

}

});

}

});

Apparently, we have to deal with the problem that the subscriber can unsubscribe, and in order to correctly unsubscribe from the sensor events, and with the problem of Backpressure (since the sensors can be very sensitive and give data very often), which we considered earlier . In doing so, this is the only correct way, which contains many opportunities for errors and is also very long.

It seems that you can not do without create, but there is a way, it is implemented through the operation fromEmitter and looks like this:

return Observable.fromEmitter(new Action1<AsyncEmitter<SensorEvent>>() {

@Override

public void call(final AsyncEmitter<SensorEvent> sensorEventAsyncEmitter) {

final SensorEventListener sensorListener = new SensorEventListener() {

@Override

public void onSensorChanged(SensorEvent sensorEvent) {

sensorEventAsyncEmitter.onNext(sensorEvent);

}



@Override

public void onAccuracyChanged(Sensor originSensor, int i) {

// do nothing

           }

};



sensorEventAsyncEmitter.setCancellation(() -> sensorManager.unregisterListener(sensorListener, sensor));

sensorManager.registerListener(sensorListener, sensor, samplingPeriodUs);

}

}, AsyncEmitter.BackpressureMode.LATEST);

What is the difference between this code and the previous one? First, it automatically requires to solve the problem of Backpressure due to the last parameter in the method fromEmitter. Possible values of this parameter cover all cases:

enum BackpressureMode {

NONE,

ERROR,

BUFFER,

DROP,

LATEST

}

So you can (and should) completely abandon the Observable.create method![wpanchor id=”5″]

Practical examples

Download the project Project RxJavaSamples 

1) Stream of strings in a stream of numbers with a predicate

2) Obtaining unique data from the stream until condition

3) The sum of all the numbers in the data stream

4) Switching between data streams

5) Flow from the greatest common divisors for elements from input streams

6) Flow with a long calculation

Practical assignment

Download Project PopularMovies

1) The condition of the problem in the class ru.gdgkazan.popularmovies.screen.details.MovieDetailsActivity

2) Carry out two requests (to receive trailers and reviews) in parallel

3) Display the process of loading data [wpanchor id = “6”]

4) Save the data to the database and use the cached data in case of an error

5) Process re-creation Activity

  1.  Applications from the repository:
    1. RxJavaSamples are examples of code with RxJava and practical tasks.
    2. Popular Movies – an example of using RxJava in Android to perform network requests and a practical task.
    3. BackpressureProblem – demonstration of examples of MissingBackpressureException and ways of the decision of this problem.
  2. A good introduction to functional reactive programming.
  3. Documentation for Rx.
  4. Wiki on RxJava.
  5. A large list of various articles on Rx and RxJava.
  6. Introductory articles about RxJava and RxAndroid and their translation on the Habr.
  7. Introductory video about RxJava.
  8. Documentation for operators from Rx.
  9. Interactive list of operators from Rx.
  10. A good article about using RxJava to load data from several sources.
  11. Report on the use of RxJava in Yandex.
  12. The book RxJava Essentials.
  13. Documentation about Subjects.
  14. The problem of Backpressure: the description, the ways to combat it and the answer to SO, the article about fromEmitter.

Continuation:

Lecture 4 on the architecture of the android application. Clean Architecture

[:]

Коментарі: 1
Додати коментар