RxJava - 快速指南
RxJava - 概述
RxJava 是基于 Java 的 ReactiveX 扩展。它提供 Java 中的实现或 ReactiveX 项目。以下是 RxJava 的主要特征。
扩展观察者模式。
支持数据/事件序列。
提供运算符以声明方式组合序列。
内部处理线程、同步、线程安全和并发数据结构。
什么是 ReactiveX?
ReactiveX 是一个旨在为各种编程语言提供反应式编程概念的项目。反应式编程是指程序在数据出现时做出反应的场景。它是一个基于事件的编程概念,事件可以传播到寄存器观察者。
根据 Reactive,它们结合了观察者模式、迭代器模式和函数模式的优点。
观察者模式做得很好。ReactiveX 结合了观察者模式、迭代器模式和函数编程的最佳理念。
函数编程
函数编程围绕使用纯函数构建软件。纯函数不依赖于先前的状态,并且始终对传递的相同参数返回相同的结果。纯函数有助于避免与共享对象、可变数据和多线程环境中常见的副作用相关的问题。
反应式编程
反应式编程是指事件驱动编程,其中数据流以异步方式进入并在到达时进行处理。
函数式反应式编程
RxJava 同时实现了这两个概念,其中流的数据随时间而变化,消费者函数做出相应的反应。
反应式宣言
反应式宣言是一份在线文档,阐述了应用软件系统的高标准。根据宣言,以下是反应式软件的关键属性 −
响应式 −应始终及时响应。
消息驱动 − 应在组件之间使用异步消息传递,以便它们保持松散耦合。
弹性 − 即使在高负载下也应保持响应。
弹性 − 即使任何组件发生故障也应保持响应。
RxJava 的关键组件
RxJava 有两个关键组件:可观察对象和观察者。
可观察对象 −它表示一个类似于 Stream 的对象,可以发出零个或多个数据,可以发送错误消息,在发出一组数据时可以控制其速度,可以发送有限数据和无限数据。
观察者 − 它订阅 Observable 的序列数据并对每个可观察项做出反应。每当 Observable 发出数据时,观察者都会收到通知。观察者逐个处理数据。
如果项目不存在或前一个项目未返回回调,观察者永远不会收到通知。
RxJava - 环境设置
本地环境设置
RxJava 是一个 Java 库,因此首要要求是在您的机器上安装 JDK。
系统要求
JDK | 1.5 或更高版本。 |
---|---|
内存 | 无最低要求。 |
磁盘空间 | 无最低要求。 |
操作系统 | 无最低要求。 |
步骤 1 - 验证您的 Java 安装机器
首先,打开控制台并根据您正在使用的操作系统执行 java 命令。
OS | 任务 | 命令 |
---|---|---|
Windows | 打开命令控制台 | c:\> java -version |
Linux | 打开命令终端 | $ java -version |
Mac | 打开终端 | machine:< joseph$ java -version |
让我们验证所有操作系统的输出 −
OS | 输出 |
---|---|
Windows | java version "1.8.0_101" Java(TM) SE Runtime Environment (build 1.8.0_101) |
Linux | java version "1.8.0_101" Java(TM) SE Runtime Environment (build 1.8.0_101) |
Mac | java version "1.8.0_101" Java(TM) SE Runtime Environment (build 1.8.0_101) |
如果您的系统上未安装 Java,请从以下链接下载 Java 软件开发工具包 (SDK) https://www.oracle.com。我们假设 Java 1.8.0_101 为本教程的安装版本。
第 2 步 - 设置 JAVA 环境
设置 JAVA_HOME 环境变量以指向您的计算机上安装 Java 的基本目录位置。例如。
OS | 输出 |
---|---|
Windows | 将环境变量 JAVA_HOME 设置为 C:\Program Files\Java\jdk1.8.0_101 |
Linux | export JAVA_HOME = /usr/local/java-current |
Mac | export JAVA_HOME = /Library/Java/Home |
将 Java 编译器位置附加到系统路径。
OS | 输出 |
---|---|
Windows | 将字符串 C:\Program Files\Java\jdk1.8.0_101\bin 附加到系统变量 Path 的末尾。 |
Linux | export PATH = $PATH:$JAVA_HOME/bin/ |
Mac | not required |
使用命令 java -version 验证 Java 安装,如上所述。
步骤 3 - 下载 RxJava2 存档
从 RxJava @ MVNRepository 下载最新版本的 RxJava jar 文件及其依赖项 Reactive Streams @ MVNRepository 。在撰写本教程时,我们已经下载了 rxjava-2.2.4.jar、reactive-streams-1.0.2.jar 并将其复制到 C:\>RxJava 文件夹中。
OS | 存档名称 |
---|---|
Windows | rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Linux | rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Mac | rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
第 4 步 - 设置 RxJava 环境
设置 RX_JAVA 环境变量以指向 RxJava jar 存储在您机器上的基本目录位置。假设我们已将 rxjava-2.2.4.jar 和 react-streams-1.0.2.jar 存储在 RxJava 文件夹中。
Sr.No | 操作系统和说明 |
---|---|
1 | Windows 将环境变量 RX_JAVA 设置为 C:\RxJava |
2 | Linux export RX_JAVA = /usr/local/RxJava |
3 | Mac export RX_JAVA = /Library/RxJava |
第 5 步 - 设置 CLASSPATH 变量
设置 CLASSPATH 环境变量以指向 RxJava jar 位置。
Sr.No | OS & 说明 |
---|---|
1 | Windows 将环境变量 CLASSPATH 设置为 %CLASSPATH%;%RX_JAVA% xjava-2.2.4.jar;%RX_JAVA% eactive-streams-1.0.2.jar;.; |
2 | Linux export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:. |
3 | Mac export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:. |
第 6 步 - 测试 RxJava 设置
创建一个类 TestRx.java,如下所示−
import io.reactivex.Flowable; public class TestRx { public static void main(String[] args) { Flowable.just("Hello World!").subscribe(System.out::println); } }
第 7 步 - 验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac Tester.java
验证输出。
Hello World!
RxJava - Observable 的工作原理
Observable 表示数据源,而 观察者(订阅者) 则监听它们。简而言之,Observable 发出项目,然后订阅者使用这些项目。
Observable
一旦订阅者开始监听,Observable 就会提供数据。
Observable 可以发出任意数量的项目。
Observable 可以只发出完成信号,也可以不发出任何项目。
Observable 可以成功终止。
Observable 可能永远不会终止。例如按钮可以被点击任意次。
Observable 可能在任何时间点抛出错误。
订阅者
Observable 可以有多个订阅者。
当 Observable 发出一个项目时,每个订阅者的 onNext() 方法都会被调用。
当 Observable 完成发出项目时,每个订阅者的 onComplete() 方法都会被调用。
如果 Observable 发出错误,每个订阅者的 onError() 方法都会被调用。
RxJava - 创建 Observable
以下是创建 Observable 的基类。
Flowable − 0..N 流,发出 0 或 n 个项目。支持 Reactive-Streams 和背压。
Observable − 0..N 流,但没有背压。
Single − 1 个项目或错误。可以视为方法调用的响应版本。
Completable − 没有项目发出。用作完成或错误的信号。可以视为 Runnable 的响应版本。
MayBe − 没有项目或发出 1 个项目。可以视为 Optional 的反应版本。
以下是在 Observable 类中创建可观察对象的便捷方法。
just(T item) − 返回一个 Observable,该 Observable 发出给定(常量引用)项的信号,然后完成。
fromIterable(Iterable source) − 将 Iterable 序列转换为发出序列中项的 ObservableSource。
fromArray(T... items) − 将 Array 转换为发出 Array 中项的 ObservableSource。
fromCallable(Callable supplier) −返回一个 Observable,当观察者订阅它时,它会调用您指定的函数,然后发出该函数返回的值。
fromFuture(Future future) − 将 Future 转换为 ObservableSource。
interval(long initialDelay, long period, TimeUnit unit) − 返回一个 Observable,它在 initialDelay 之后发出 0L,此后每个时间段后发出不断增加的数字。
RxJava - 单一可观察对象
单一类表示单一值响应。单一可观察对象只能发出单个成功值或错误。它不会发出 onComplete 事件。
类声明
以下是 io.reactivex.Single<T> 类的声明 −
public abstract class Single<T> extends Object implements SingleSource<T>
协议
以下是 Single Observable 操作的顺序协议 −
onSubscribe (onSuccess | onError)?
单个示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.concurrent.TimeUnit; import io.reactivex.Single; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableSingleObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create the observable Single<String> testSingle = Single.just("Hello World"); //Create an observer Disposable disposable = testSingle .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith( new DisposableSingleObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } }); Thread.sleep(3000); //start observing disposable.dispose(); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Hello World
RxJava - MayBe Observable
MayBe 类表示延迟响应。MayBe observable 可以发出单个成功值或不发出任何值。
类声明
以下是 io.reactivex.Single<T> 类的声明 −
public abstract class Maybe<T> extends Object implements MaybeSource<T>
协议
以下是 MayBe Observable 操作的顺序协议 −
onSubscribe (onSuccess | onError | OnComplete)?
MayBe 示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.concurrent.TimeUnit; import io.reactivex.Maybe; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableMaybeObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create an observer Disposable disposable = Maybe.just("Hello World") .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableMaybeObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } @Override public void onComplete() { System.out.println("Done!"); } }); Thread.sleep(3000); //start observing disposable.dispose(); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Hello World
RxJava - Completable Observable
Completable 类表示延迟响应。Completable observable 可以指示成功完成或错误。
类声明
以下是 io.reactivex.Completable 类的声明 −
public abstract class Completable extends Object implements CompletableSource
协议
以下是 Completable Observable 操作的顺序协议 −
onSubscribe (onError | onComplete)?
Completable 示例
使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。
ObservableTester.java
import java.util.concurrent.TimeUnit; import io.reactivex.Completable; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableCompletableObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create an observer Disposable disposable = Completable.complete() .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableCompletableObserver() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onStart() { System.out.println("Started!"); } @Override public void onComplete() { System.out.println("Done!"); } }); Thread.sleep(3000); //start observing disposable.dispose(); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Started! Done!
RxJava - 使用 CompositeDisposable
CompositeDisposable 类表示一个可以容纳多个一次性用品的容器,并提供添加和删除一次性用品的 O(1) 复杂度。
类声明
以下是 io.reactivex.disposables.CompositeDisposable 类的声明 −
public final class CompositeDisposable extends Object implements Disposable, io.reactivex.internal.disposables.DisposableContainer
CompositeDisposable 示例
使用您选择的任何编辑器(例如,在 C:\> RxJava 中)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Maybe; import io.reactivex.Single; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableMaybeObserver; import io.reactivex.observers.DisposableSingleObserver; import io.reactivex.schedulers.Schedulers; import java.util.concurrent.TimeUnit; public class ObservableTester { public static void main(String[] args) throws InterruptedException { CompositeDisposable compositeDisposable = new CompositeDisposable(); //Create an Single observer Disposable disposableSingle = Single.just("Hello World") .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith( new DisposableSingleObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } }); //Create an observer Disposable disposableMayBe = Maybe.just("Hi") .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableMaybeObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } @Override public void onComplete() { System.out.println("Done!"); } }); Thread.sleep(3000); compositeDisposable.add(disposableSingle); compositeDisposable.add(disposableMayBe); //start observing compositeDisposable.dispose(); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Hello World Hi
RxJava - 创建运算符
以下是用于创建 Observable 的运算符。
Sr.No. | 运算符和说明 |
---|---|
1 | Create 从头开始创建 Observable 并允许观察者方法以编程方式调用。 |
2 | Defer 在观察者订阅之前不要创建 Observable。为每个观察者创建一个新的可观察对象。 |
3 | Empty/Never/Throw 创建一个具有有限行为的可观察对象。 |
4 | From 将对象/数据结构转换为可观察对象。 |
5 | Interval 创建一个按指定时间间隔按顺序发射整数的可观察对象。 |
6 | Just 将对象/数据结构转换为 Observable,以发出相同或相同类型的对象。 |
7 | Range 创建一个 Observable,按给定范围的顺序发出整数。 |
8 | Repeat 创建一个 Observable,按顺序重复发出整数。 |
9 | Start 创建一个 Observable,以发出函数。 |
10 | Timer 创建一个 Observable,在给定延迟后发出单个项目。 |
创建运算符示例
使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; //使用 fromArray 运算符创建 Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .map(String::toUpperCase) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
ABCDEFG
RxJava - 转换运算符
以下是用于转换从 Observable 发出的项目的运算符。
Sr.No. | 运算符和说明 |
---|---|
1 | Buffer 定期将 Observable 中的项目收集到包中,然后发出包而不是项目。 |
2 | FlatMap 用于嵌套的可观察对象。将项目转换为可观察对象。然后将项目展平为单个 Observable。 |
3 | GroupBy 将一个 Observable 划分为按键组织的 Observable 集合,以发出不同的项目组。 |
4 | Map 将函数应用于每个发出的项目以对其进行转换。 |
5 | Scan 按顺序将函数应用于每个发出的项目,然后发出连续的值。 |
6 | Window 定期将 Observable 中的项目收集到 Observable 窗口中,然后发出窗口而不是项目。 |
转换运算符示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; //使用 map 操作符转换 Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .map(String::toUpperCase) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
ABCDEFG
RxJava - 过滤运算符
以下是用于从 Observable 中选择性发出项目的运算符。
Sr.No. | 运算符 &描述 |
---|---|
1 | Debounce 仅在发生超时时发出项目,而不发出其他项目。 |
2 | Distinct 仅发出唯一项目。 |
3 | ElementAt 仅发出 Observable 发出的位于 n 索引处的项目。 |
4 | Filter 仅发出通过给定谓词函数。 |
5 | First 发出第一个项目或第一个通过给定条件的项目。 |
6 | IgnoreElements 不从 Observable 发出任何项目但标记完成。 |
7 | Last 从 Observable 发出最后一个元素。 |
8 | Sample 以给定的时间间隔发出最新的项目。 |
9 | Skip 跳过 Observable 中的前 n 个项目。 |
10 | SkipLast 跳过 Observable 中的后 n 个项目。 |
11 | Take 从 Observable 中获取前 n 个项目。 |
12 | TakeLast 从 Observable 中获取最后 n 个项目。 |
过滤运算符示例
使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; //使用 take 运算符过滤 Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .take(2) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
ab
RxJava - 组合运算符
以下是用于从多个 Observable 创建单个 Observable 的运算符。
Sr.No. | 运算符 &描述 |
---|---|
1 |
And/Then/When
使用 Pattern 和 Plan 中介合并项目集。 |
2 |
CombineLatest
通过指定函数合并每个 Observable 发出的最新项目并发出结果项目。 |
3 |
Join
如果在第二个 Observable 发出项目的时间范围内发出,则合并两个 Observable 发出的项目。 |
4 |
Merge
合并 Observables 发出的项目。 |
5 |
StartWith
在开始从源 Observable 发出项目之前,发出指定的项目序列 |
6 |
Switch
发出 Observable 发出的最新项目。 |
7 |
Zip
根据功能组合 Observable 的项目并发出结果项目。 |
组合运算符示例
使用您的任何编辑器创建以下 Java 程序选择,例如,C:\> RxJava。
ObservableTester.java
import io.reactivex.Observable; //使用combineLatest运算符组合Observables public class ObservableTester { public static void main(String[] args) { Integer[] numbers = { 1, 2, 3, 4, 5, 6}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable1 = Observable.fromArray(letters); Observable<Integer> observable2 = Observable.fromArray(numbers); Observable.combineLatest(observable1, observable2, (a,b) -> a + b) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
g1g2g3g4g5g6
RxJava - 实用运算符
以下是通常对 Observable 有用的运算符。
Sr.No. | 运算符 &描述 |
---|---|
1 | Delay 注册操作以处理 Observable 生命周期事件。 |
2 | Materialize/Dematerialize 表示发出的项目和发送的通知。 |
3 | ObserveOn 指定要观察的调度程序。 |
4 | Serialize 强制 Observable 进行序列化调用。 |
5 | Subscribe 对 Observable 发出的项目和通知(如完成)进行操作 |
6 | SubscribeOn 指定订阅 Observable 时要使用的调度程序。 |
7 | TimeInterval 将 Observable 转换为发出两次发射之间经过的时间量的指示。 |
8 | Timeout 如果在指定时间内没有发出任何项目,则发出错误通知。 |
9 | Timestamp 将时间戳附加到发出的每个项目。 |
10 |
Using 创建一次性资源或与 Observable 相同的生命周期。 |
实用程序运算符示例
使用您选择的任何编辑器在 C:\> 中创建以下 Java 程序RxJava。
ObservableTester.java
import io.reactivex.Observable; //使用订阅运算符订阅 Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable.subscribe( letter -> result.append(letter)); System.out.println(result); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
abcdefg
RxJava - 条件运算符
以下是评估一个或多个可观察对象或发出的项目的运算符。
Sr.No. | 运算符 &描述 |
---|---|
1 |
All 评估所有发出的项目是否符合给定条件。 |
2 |
Amb 仅当给定多个 Observable 时,才从第一个 Observable 发出所有项目。 |
3 |
Contains 检查 Observable 是否发出特定项目。 |
4 |
DefaultIfEmpty 如果 Observable 未发出任何内容,则发出默认项目。 |
5 |
SequenceEqual 检查两个 Observable 是否发出相同的项目序列。 |
6 |
SkipUntil 丢弃第一个 Observable 发出的项目,直到第二个 Observable 发出一个项目。 |
7 |
SkipWhile 丢弃 Observable 发出的项目,直到给定条件变为 false。 |
8 |
TakeUntil 在第二个 Observable 发出项目或终止后丢弃 Observable 发出的项目。 |
9 |
TakeWhile 在指定条件变为 false 后丢弃 Observable 发出的项目。 |
条件运算符示例
创建以下 Java 程序使用您选择的任何编辑器,例如,C:\> RxJava。
ObservableTester.java
import io.reactivex.Observable; //使用 defaultIfEmpty 运算符对 Observable 进行操作 public class ObservableTester { public static void main(String[] args) { final StringBuilder result = new StringBuilder(); Observable.empty() .defaultIfEmpty("No Data") .subscribe(s -> result.append(s)); System.out.println(result); String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result1 = new StringBuilder(); Observable.fromArray(letters) .firstElement() .defaultIfEmpty("No data") .subscribe(s -> result1.append(s)); System.out.println(result1); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
No Data a
RxJava - 数学运算符
以下是操作 Observable 发出的整个项目的运算符。
Sr.No. | 运算符 &描述 |
---|---|
1 | Average 计算所有项目的平均值并发出结果。 |
2 | Concat 从多个 Observable 中发出所有项目,不进行交错。 |
3 | Count 计算所有项目并发出结果。 |
4 | Max 计算所有项目中的最大值并发出结果。 |
5 | Min 计算所有项目中最小值的项目并发出结果。 |
6 | Reduce 对每个项目应用一个函数并返回结果。 |
7 | Sum 计算所有项目的总和并发出结果。 |
数学运算符示例
使用您选择的任何编辑器在 C:\> 中创建以下 Java 程序RxJava。
ObservableTester.java
import io.reactivex.Observable; //使用 concat 运算符对多个 Observable 进行操作 public class ObservableTester { public static void main(String[] args) throws InterruptedException { Integer[] numbers = { 1, 2, 3, 4, 5, 6}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable1 = Observable.fromArray(letters); Observable<Integer> observable2 = Observable.fromArray(numbers); Observable.concat(observable1, observable2) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
abcdefg123456
RxJava - 可连接运算符
以下是可以更精确控制订阅的运算符。
Sr.No. | 运算符 &描述 |
---|---|
1 | Connect 指示可连接的 Observable 向其订阅者发出项目。 |
2 | Publish 将 Observable 转换为可连接的 Observable。 |
3 | RefCount 将可连接的 Observable 转换为普通 Observable。 |
4 | Replay 确保相同的顺序每个订阅者可以看到已发出的项目,即使在 Observable 已开始发出项目并且订阅者稍后订阅之后也是如此。 |
Connectable 运算符示例
使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; import io.reactivex.observables.ConnectableObservable; //在 ConnectableObservable 上使用 connect 运算符 public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); ConnectableObservable<String> connectable = Observable.fromArray(letters).publish(); connectable.subscribe(letter -> result.append(letter)); System.out.println(result.length()); connectable.connect(); System.out.println(result.length()); System.out.println(result); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
0 7 abcdefg
RxJava - 主题
根据 Reactive,主题既可以充当可观察对象,也可以充当观察者。
主题是一种桥梁或代理,在 ReactiveX 的某些实现中可用,既可以充当观察者,也可以充当可观察对象。因为它是一个观察者,所以它可以订阅一个或多个可观察对象,并且因为它是一个可观察对象,所以它可以通过重新发射它观察到的项目来传递它们,并且它还可以发射新的项目。
有四种类型的主题 −
Sr.No. | 主题 &描述 |
---|---|
1 | PublishSubject 仅发出订阅后发出的项目。 |
2 |
ReplaySubject
发出源 Observable 发出的所有项目,无论它何时订阅了 Observable。 |
3 | BehaviorSubject 订阅后,发出最新项目,然后继续发出源 Observable 发出的项目。 |
4 | AsyncSubject 在源 Observable 完成发射后,发射其发射的最后一项。 |
RxJava - PublishSubject
PublishSubject 向当前订阅的观察者发送项目,并向当前或后期观察者发送终止事件。
类声明
以下是 io.reactivex.subjects.PublishSubject<T> 类的声明 −
public final class PublishSubject<T> extends Subject<T>
PublishSubject 示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.subjects.PublishSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); PublishSubject<String> subject = PublishSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //输出为 abcd System.out.println(result1); //输出仅为 d //在 c 项目发出后进行订阅。 System.out.println(result2); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
abcd d
RxJava - BehaviorSubject
BehaviorSubject 向每个订阅的观察者发出它观察到的最新项目,然后将所有后续观察到的项目发送给每个订阅的观察者。
类声明
以下是 io.reactivex.subjects.BehaviorSubject<T> 类的声明 −
public final class BehaviorSubject<T> extends Subject<T>
BehaviorSubject 示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.subjects.BehaviorSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); BehaviorSubject<String> subject = BehaviorSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //输出为 abcd System.out.println(result1); //输出将是 cd 成为 BehaviorSubject //(c is last item emitted before subscribe) System.out.println(result2); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
abcd cd
RxJava - ReplaySubject
ReplaySubject 将事件/项目重播给当前和后期观察者。
类声明
以下是 io.reactivex.subjects.ReplaySubject<T> 类的声明 −
public final class ReplaySubject<T> extends Subject<T>
ReplaySubject 示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.subjects.ReplaySubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); ReplaySubject<String> subject = ReplaySubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //输出为 abcd System.out.println(result1); //输出将是 abcd abcdbeing ReplaySubject //因为 ReplaySubject 发出了所有的项目 System.out.println(result2); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
abcd abcd
RxJava - AsyncSubject
AsyncSubject 向观察者发出唯一的最后一个值,然后是完成事件或收到的错误。
类声明
以下是 io.reactivex.subjects.AsyncSubject<T> 类的声明 −
public final class AsyncSubject<T> extends Subject<T>
AsyncSubject 示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.subjects. AsyncSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); AsyncSubject<String> subject = AsyncSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //输出将是 d,这是最后发出的项目 System.out.println(result1); //输出将是 d,这是最后发出的项目 System.out.println(result2); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
d d
RxJava - Schedulers 调度程序
调度程序用于多线程环境中,与 Observable 运算符配合使用。
根据 Reactive,调度程序用于安排运算符链如何应用于不同的线程。
默认情况下,Observable 和您应用于它的运算符链将在调用其 Subscribe 方法的同一线程上完成其工作并通知其观察者。SubscribeOn 运算符通过指定 Observable 应在其上运行的不同调度程序来更改此行为。 ObserveOn 操作符指定 Observable 将用来向其观察者发送通知的不同 Scheduler。
RxJava 中有以下类型的 Scheduler −
Sr.No. | Scheduler & 说明 |
---|---|
1 | Schedulers.computation() 创建并返回用于计算工作的 Scheduler。要调度的线程数取决于系统中存在的 CPU。每个 CPU 允许一个线程。最适合事件循环或回调操作。 |
2 | Schedulers.io() 创建并返回用于 IO 绑定工作的 Scheduler。线程池可以根据需要进行扩展。 |
3 | Schedulers.newThread() 创建并返回一个 Scheduler,该 Scheduler 为每个工作单元创建一个新的线程。 |
4 | Schedulers.trampoline() 创建并返回一个 Scheduler,该 Scheduler 将当前线程上的工作排队,以便在当前工作完成后执行。 |
4 | Schedulers.from(java.util.concurrent.Executor executor) 将 Executor 转换为新的 Scheduler 实例。 |
RxJava - Trampoline Scheduler
Schedulers.trampoline() 方法创建并返回一个 Scheduler,该 Scheduler 将工作排队到当前线程上,以便在当前工作完成后执行。
Schedulers.trampoline() 示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.trampoline())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Processing Thread main Receiver Thread main, Item length 1 Processing Thread main Receiver Thread main, Item length 2 Processing Thread main Receiver Thread main, Item length 3
RxJava - NewThread Scheduler
Schedulers.newThread() 方法创建并返回一个 Scheduler,该 Scheduler 为每个工作单元创建一个新的线程。
Schedulers.newThread() 示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.newThread())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Processing Thread RxNewThreadScheduler-1 Receiver Thread RxNewThreadScheduler-1, Item length 1 Processing Thread RxNewThreadScheduler-2 Receiver Thread RxNewThreadScheduler-2, Item length 2 Processing Thread RxNewThreadScheduler-3 Receiver Thread RxNewThreadScheduler-3, Item length 3
RxJava - Computation Scheduler
Schedulers.computation() 方法创建并返回用于计算工作的 Scheduler。要调度的线程数取决于系统中现有的 CPU。每个 CPU 允许一个线程。最适合事件循环或回调操作。
Schedulers.computation() 示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.computation())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Processing Thread RxComputationThreadPool-1 Receiver Thread RxComputationThreadPool-1, Item length 1 Processing Thread RxComputationThreadPool-2 Receiver Thread RxComputationThreadPool-2, Item length 2 Processing Thread RxComputationThreadPool-3 Receiver Thread RxComputationThreadPool-3, Item length 3
RxJava - IO Scheduler
Schedulers.io() 方法创建并返回一个用于 IO 绑定工作的 Scheduler。线程池可以根据需要扩展。最适合 I/O 密集型操作。
Schedulers.io() 示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.io())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 1 Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 2 Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 3
RxJava - From Scheduler
Schedulers.from(Executor) 方法将 Executor 转换为新的 Scheduler 实例。
Schedulers.from(Executor) 示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.Random; import java.util.concurrent.Executors; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3)))) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Processing Thread pool-1-thread-1 Processing Thread pool-3-thread-1 Receiver Thread pool-1-thread-1, Item length 1 Processing Thread pool-4-thread-1 Receiver Thread pool-4-thread-1, Item length 3 Receiver Thread pool-3-thread-1, Item length 2
RxJava - 缓冲
缓冲运算符允许将 Observable 发出的项目收集到列表或包中,并发出这些包而不是项目。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用缓冲,3 个项目将一起发出。
Buffering 示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; import java.util.List; import java.util.concurrent.TimeUnit; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9); observable.subscribeOn(Schedulers.io()) .delay(2, TimeUnit.SECONDS, Schedulers.io()) .buffer(3) .subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) { System.out.println("Subscribed"); } @Override public void onNext(List<Integer> integers) { System.out.println("onNext: "); for (Integer value : integers) { System.out.println(value); } } @Override public void onError(Throwable e) { System.out.println("Error"); } @Override public void onComplete() { System.out.println("Done! "); } }); Thread.sleep(3000); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Subscribed onNext: 1 2 3 onNext: 4 5 6 onNext: 7 8 9 Done!
RxJava - 窗口化
窗口化运算符的工作原理类似于缓冲区运算符,但它允许将 Observable 发出的项目收集到另一个 Observable 中,而不是集合中,并发出这些 Observable 而不是集合。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用窗口运算符,3 个 Observable 将一起发出。
Windowing 示例
使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; import java.util.concurrent.TimeUnit; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9); observable.subscribeOn(Schedulers.io()) .delay(2, TimeUnit.SECONDS, Schedulers.io()) .window(3) .subscribe(new Observer<Observable<Integer>>() { @Override public void onSubscribe(Disposable d) { System.out.println("Subscribed"); } @Override public void onNext(Observable<Integer> integers) { System.out.println("onNext: "); integers.subscribe(value -> System.out.println(value)); } @Override public void onError(Throwable e) { System.out.println("Error"); } @Override public void onComplete() { System.out.println("Done! "); } }); Thread.sleep(3000); } }
验证结果
使用 javac 编译器编译类,如下所示 −
C:\RxJava>javac ObservableTester.java
现在运行 ObservableTester,如下所示 −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Subscribed onNext: 1 2 3 onNext: 4 5 6 onNext: 7 8 9 Done!