RxJava - 快速指南

RxJava - 概述

RxJava 是基于 Java 的 ReactiveX 扩展。它提供 Java 中的实现或 ReactiveX 项目。以下是 RxJava 的主要特征。

  • 扩展观察者模式。

  • 支持数据/事件序列。

  • 提供运算符以声明方式组合序列。

  • 内部处理线程、同步、线程安全和并发数据结构。

什么是 ReactiveX?

ReactiveX 是一个旨在为各种编程语言提供反应式编程概念的项目。反应式编程是指程序在数据出现时做出反应的场景。它是一个基于事件的编程概念,事件可以传播到寄存器观察者。

根据 Reactive,它们结合了观察者模式、迭代器模式和函数模式的优点。

观察者模式做得很好。ReactiveX 结合了观察者模式、迭代器模式和函数编程的最佳理念。

函数编程

函数编程围绕使用纯函数构建软件。纯函数不依赖于先前的状态,并且始终对传递的相同参数返回相同的结果。纯函数有助于避免与共享对象、可变数据和多线程环境中常见的副作用相关的问题。

反应式编程

反应式编程是指事件驱动编程,其中数据流以异步方式进入并在到达时进行处理。

函数式反应式编程

RxJava 同时实现了这两个概念,其中流的数据随时间而变化,消费者函数做出相应的反应。

反应式宣言

反应式宣言是一份在线文档,阐述了应用软件系统的高标准。根据宣言,以下是反应式软件的关键属性 −

  • 响应式 −应始终及时响应。

  • 消息驱动 − 应在组件之间使用异步消息传递,以便它们保持松散耦合。

  • 弹性 − 即使在高负载下也应保持响应。

  • 弹性 − 即使任何组件发生故障也应保持响应。

RxJava 的关键组件

RxJava 有两个关键组件:可观察对象和观察者。

  • 可观察对象 −它表示一个类似于 Stream 的对象,可以发出零个或多个数据,可以发送错误消息,在发出一组数据时可以控制其速度,可以发送有限数据和无限数据。

  • 观察者 − 它订阅 Observable 的序列数据并对每个可观察项做出反应。每当 Observable 发出数据时,观察者都会收到通知。观察者逐个处理数据。

如果项目不存在或前一个项目未返回回调,观察者永远不会收到通知。

RxJava - 环境设置

本地环境设置

RxJava 是一个 Java 库,因此首要要求是在您的机器上安装 JDK。

系统要求

JDK 1.5 或更高版本。
内存 无最低要求。
磁盘空间 无最低要求。
操作系统 无最低要求。

步骤 1 - 验证您的 Java 安装机器

首先,打开控制台并根据您正在使用的操作系统执行 java 命令。

OS 任务 命令
Windows 打开命令控制台 c:\> java -version
Linux 打开命令终端 $ java -version
Mac 打开终端 machine:< joseph$ java -version

让我们验证所有操作系统的输出 −

OS 输出
Windows

java version "1.8.0_101"

Java(TM) SE Runtime Environment (build 1.8.0_101)

Linux

java version "1.8.0_101"

Java(TM) SE Runtime Environment (build 1.8.0_101)

Mac

java version "1.8.0_101"

Java(TM) SE Runtime Environment (build 1.8.0_101)

如果您的系统上未安装 Java,请从以下链接下载 Java 软件开发工具包 (SDK) https://www.oracle.com。我们假设 Java 1.8.0_101 为本教程的安装版本。

第 2 步 - 设置 JAVA 环境

设置 JAVA_HOME 环境变量以指向您的计算机上安装 Java 的基本目录位置。例如。

OS 输出
Windows 将环境变量 JAVA_HOME 设置为 C:\Program Files\Java\jdk1.8.0_101
Linux export JAVA_HOME = /usr/local/java-current
Mac export JAVA_HOME = /Library/Java/Home

将 Java 编译器位置附加到系统路径。

OS 输出
Windows 将字符串 C:\Program Files\Java\jdk1.8.0_101\bin 附加到系统变量 Path 的末尾。
Linux export PATH = $PATH:$JAVA_HOME/bin/
Mac not required

使用命令 java -version 验证 Java 安装,如上所述。

步骤 3 - 下载 RxJava2 存档

RxJava @ MVNRepository 下载最新版本的 RxJava jar 文件及其依赖项 Reactive Streams @ MVNRepository 。在撰写本教程时,我们已经下载了 rxjava-2.2.4.jar、reactive-streams-1.0.2.jar 并将其复制到 C:\>RxJava 文件夹中。

OS 存档名称
Windows rxjava-2.2.4.jar, reactive-streams-1.0.2.jar
Linux rxjava-2.2.4.jar, reactive-streams-1.0.2.jar
Mac rxjava-2.2.4.jar, reactive-streams-1.0.2.jar

第 4 步 - 设置 RxJava 环境

设置 RX_JAVA 环境变量以指向 RxJava jar 存储在您机器上的基本目录位置。假设我们已将 rxjava-2.2.4.jar 和 react-streams-1.0.2.jar 存储在 RxJava 文件夹中。

Sr.No 操作系统和说明
1

Windows

将环境变量 RX_JAVA 设置为 C:\RxJava

2

Linux

export RX_JAVA = /usr/local/RxJava

3

Mac

export RX_JAVA = /Library/RxJava

第 5 步 - 设置 CLASSPATH 变量

设置 CLASSPATH 环境变量以指向 RxJava jar 位置。

Sr.No OS & 说明
1

Windows

将环境变量 CLASSPATH 设置为 %CLASSPATH%;%RX_JAVA% xjava-2.2.4.jar;%RX_JAVA% eactive-streams-1.0.2.jar;.;

2

Linux

export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:.

3

Mac

export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:.

第 6 步 - 测试 RxJava 设置

创建一个类 TestRx.java,如下所示−

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!").subscribe(System.out::println);
   }
}

第 7 步 - 验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac Tester.java

验证输出。

Hello World!

RxJava - Observable 的工作原理

Observable 表示数据源,而 观察者(订阅者) 则监听它们。简而言之,Observable 发出项目,然后订阅者使用这些项目。

Observable

  • 一旦订阅者开始监听,Observable 就会提供数据。

  • Observable 可以发出任意数量的项目。

  • Observable 可以只发出完成信号,也可以不发出任何项目。

  • Observable 可以成功终止。

  • Observable 可能永远不会终止。例如按钮可以被点击任意次。

  • Observable 可能在任何时间点抛出错误。

订阅者

  • Observable 可以有多个订阅者。

  • 当 Observable 发出一个项目时,每个订阅者的 onNext() 方法都会被调用。

  • 当 Observable 完成发出项目时,每个订阅者的 onComplete() 方法都会被调用。

  • 如果 Observable 发出错误,每个订阅者的 onError() 方法都会被调用。

RxJava - 创建 Observable

以下是创建 Observable 的基类。

  • Flowable − 0..N 流,发出 0 或 n 个项目。支持 Reactive-Streams 和背压。

  • Observable − 0..N 流,但没有背压。

  • Single − 1 个项目或错误。可以视为方法调用的响应版本。

  • Completable − 没有项目发出。用作完成或错误的信号。可以视为 Runnable 的响应版本。

  • MayBe − 没有项目或发出 1 个项目。可以视为 Optional 的反应版本。

以下是在 Observable 类中创建可观察对象的便捷方法。

  • just(T item) − 返回一个 Observable,该 Observable 发出给定(常量引用)项的信号,然后完成。

  • fromIterable(Iterable source) − 将 Iterable 序列转换为发出序列中项的 ObservableSource。

  • fromArray(T... items) − 将 Array 转换为发出 Array 中项的 ObservableSource。

  • fromCallable(Callable supplier) −返回一个 Observable,当观察者订阅它时,它会调用您指定的函数,然后发出该函数返回的值。

  • fromFuture(Future future) − 将 Future 转换为 ObservableSource。

  • interval(long initialDelay, long period, TimeUnit unit) − 返回一个 Observable,它在 initialDelay 之后发出 0L,此后每个时间段后发出不断增加的数字。

RxJava - 单一可观察对象

单一类表示单一值响应。单一可观察对象只能发出单个成功值或错误。它不会发出 onComplete 事件。

类声明

以下是 io.reactivex.Single<T> 类的声明 −

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

协议

以下是 Single Observable 操作的顺序协议 −

onSubscribe (onSuccess | onError)?

单个示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      //Create the observable
      Single<String> testSingle = Single.just("Hello World");

      //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Hello World

RxJava - MayBe Observable

MayBe 类表示延迟响应。MayBe observable 可以发出单个成功值或不发出任何值。

类声明

以下是 io.reactivex.Single<T> 类的声明 −

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

协议

以下是 MayBe Observable 操作的顺序协议 −

onSubscribe (onSuccess | onError | OnComplete)?

MayBe 示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Hello World

RxJava - Completable Observable

Completable 类表示延迟响应。Completable observable 可以指示成功完成或错误。

类声明

以下是 io.reactivex.Completable 类的声明 −

public abstract class Completable
extends Object
implements CompletableSource

协议

以下是 Completable Observable 操作的顺序协议 −

onSubscribe (onError | onComplete)?

Completable 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

      //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Started!
Done!

RxJava - 使用 CompositeDisposable

CompositeDisposable 类表示一个可以容纳多个一次性用品的容器,并提供添加和删除一次性用品的 O(1) 复杂度。

类声明

以下是 io.reactivex.disposables.CompositeDisposable 类的声明 −

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

CompositeDisposable 示例

使用您选择的任何编辑器(例如,在 C:\> RxJava 中)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer 
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Hello World
Hi

RxJava - 创建运算符

以下是用于创建 Observable 的运算符。

Sr.No. 运算符和说明
1

Create

从头开始创建 Observable 并允许观察者方法以编程方式调用。

2

Defer

在观察者订阅之前不要创建 Observable。为每个观察者创建一个新的可观察对象。

3

Empty/Never/Throw

创建一个具有有限行为的可观察对象。

4

From

将对象/数据结构转换为可观察对象。

5

Interval

创建一个按指定时间间隔按顺序发射整数的可观察对象。

6

Just

将对象/数据结构转换为 Observable,以发出相同或相同类型的对象。

7

Range

创建一个 Observable,按给定范围的顺序发出整数。

8

Repeat

创建一个 Observable,按顺序重复发出整数。

9

Start

创建一个 Observable,以发出函数。

10

Timer

创建一个 Observable,在给定延迟后发出单个项目。

创建运算符示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//使用 fromArray 运算符创建 Observable
public class ObservableTester  {
   public static void main(String[] args) { 
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

ABCDEFG

RxJava - 转换运算符

以下是用于转换从 Observable 发出的项目的运算符。

Sr.No. 运算符和说明
1

Buffer

定期将 Observable 中的项目收集到包中,然后发出包而不是项目。

2

FlatMap

用于嵌套的可观察对象。将项目转换为可观察对象。然后将项目展平为单个 Observable。

3

GroupBy

将一个 Observable 划分为按键组织的 Observable 集合,以发出不同的项目组。

4

Map

将函数应用于每个发出的项目以对其进行转换。

5

Scan

按顺序将函数应用于每个发出的项目,然后发出连续的值。

6

Window

定期将 Observable 中的项目收集到 Observable 窗口中,然后发出窗口而不是项目。

转换运算符示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//使用 map 操作符转换 Observable
public class ObservableTester  { 
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

ABCDEFG

RxJava - 过滤运算符

以下是用于从 Observable 中选择性发出项目的运算符。

Sr.No. 运算符 &描述
1

Debounce

仅在发生超时时发出项目,而不发出其他项目。

2

Distinct

仅发出唯一项目。

3

ElementAt

仅发出 Observable 发出的位于 n 索引处的项目。

4

Filter

仅发出通过给定谓词函数。

5

First

发出第一个项目或第一个通过给定条件的项目。

6

IgnoreElements

不从 Observable 发出任何项目但标记完成。

7

Last

从 Observable 发出最后一个元素。

8

Sample

以给定的时间间隔发出最新的项目。

9

Skip

跳过 Observable 中的前 n 个项目。

10

SkipLast

跳过 Observable 中的后 n 个项目。

11

Take

从 Observable 中获取前 n 个项目。

12

TakeLast

从 Observable 中获取最后 n 个项目。

过滤运算符示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//使用 take 运算符过滤 Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

ab

RxJava - 组合运算符

以下是用于从多个 Observable 创建单个 Observable 的运算符。

Sr.No. 运算符 &描述
1 And/Then/When

使用 Pattern 和 Plan 中介合并项目集。

2 CombineLatest

通过指定函数合并每个 Observable 发出的最新项目并发出结果项目。

3 Join

如果在第二个 Observable 发出项目的时间范围内发出,则合并两个 Observable 发出的项目。

4 Merge

合并 Observables 发出的项目。

5 StartWith

在开始从源 Observable 发出项目之前,发出指定的项目序列

6 Switch

发出 Observable 发出的最新项目。

7 Zip

根据功能组合 Observable 的项目并发出结果项目。

组合运算符示例

使用您的任何编辑器创建以下 Java 程序选择,例如,C:\> RxJava。

ObservableTester.java

import io.reactivex.Observable;
//使用combineLatest运算符组合Observables
public class ObservableTester {
   public static void main(String[] args) {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

g1g2g3g4g5g6

RxJava - 实用运算符

以下是通常对 Observable 有用的运算符。

Sr.No. 运算符 &描述
1

Delay

注册操作以处理 Observable 生命周期事件。

2

Materialize/Dematerialize

表示发出的项目和发送的通知。

3

ObserveOn

指定要观察的调度程序。

4

Serialize

强制 Observable 进行序列化调用。

5

Subscribe

对 Observable 发出的项目和通知(如完成)进行操作

6

SubscribeOn

指定订阅 Observable 时要使用的调度程序。

7

TimeInterval

将 Observable 转换为发出两次发射之间经过的时间量的指示。

8

Timeout

如果在指定时间内没有发出任何项目,则发出错误通知。

9

Timestamp

将时间戳附加到发出的每个项目。

10

Using

创建一次性资源或与 Observable 相同的生命周期。

实用程序运算符示例

使用您选择的任何编辑器在 C:\> 中创建以下 Java 程序RxJava。

ObservableTester.java

import io.reactivex.Observable;
//使用订阅运算符订阅 Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

abcdefg

RxJava - 条件运算符

以下是评估一个或多个可观察对象或发出的项目的运算符。

Sr.No. 运算符 &描述
1

All

评估所有发出的项目是否符合给定条件。

2

Amb

仅当给定多个 Observable 时,才从第一个 Observable 发出所有项目。

3

Contains

检查 Observable 是否发出特定项目。

4

DefaultIfEmpty

如果 Observable 未发出任何内容,则发出默认项目。

5

SequenceEqual

检查两个 Observable 是否发出相同的项目序列。

6

SkipUntil

丢弃第一个 Observable 发出的项目,直到第二个 Observable 发出一个项目。

7

SkipWhile

丢弃 Observable 发出的项目,直到给定条件变为 false。

8

TakeUntil

在第二个 Observable 发出项目或终止后丢弃 Observable 发出的项目。

9

TakeWhile

在指定条件变为 false 后丢弃 Observable 发出的项目。

条件运算符示例

创建以下 Java 程序使用您选择的任何编辑器,例如,C:\> RxJava。

ObservableTester.java

import io.reactivex.Observable;
//使用 defaultIfEmpty 运算符对 Observable 进行操作
public class ObservableTester  {
   public static void main(String[] args) {    
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")   
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

No Data
a

RxJava - 数学运算符

以下是操作 Observable 发出的整个项目的运算符。

Sr.No. 运算符 &描述
1

Average

计算所有项目的平均值并发出结果。

2

Concat

从多个 Observable 中发出所有项目,不进行交错。

3

Count

计算所有项目并发出结果。

4

Max

计算所有项目中的最大值并发出结果。

5

Min

计算所有项目中最小值的项目并发出结果。

6

Reduce

对每个项目应用一个函数并返回结果。

7

Sum

计算所有项目的总和并发出结果。

数学运算符示例

使用您选择的任何编辑器在 C:\> 中创建以下 Java 程序RxJava。

ObservableTester.java

import io.reactivex.Observable;
//使用 concat 运算符对多个 Observable 进行操作
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

abcdefg123456

RxJava - 可连接运算符

以下是可以更精确控制订阅的运算符。

Sr.No. 运算符 &描述
1

Connect

指示可连接的 Observable 向其订阅者发出项目。

2

Publish

将 Observable 转换为可连接的 Observable。

3

RefCount

将可连接的 Observable 转换为普通 Observable。

4

Replay

确保相同的顺序每个订阅者可以看到已发出的项目,即使在 Observable 已开始发出项目并且订阅者稍后订阅之后也是如此。

Connectable 运算符示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//在 ConnectableObservable 上使用 connect 运算符
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();      
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

0
7
abcdefg

RxJava - 主题

根据 Reactive,主题既可以充当可观察对象,也可以充当观察者。

主题是一种桥梁或代理,在 ReactiveX 的某些实现中可用,既可以充当观察者,也可以充当可观察对象。因为它是一个观察者,所以它可以订阅一个或多个可观察对象,并且因为它是一个可观察对象,所以它可以通过重新发射它观察到的项目来传递它们,并且它还可以发射新的项目。

有四种类型的主题 −

Sr.No. 主题 &描述
1

PublishSubject

仅发出订阅后发出的项目。

2 ReplaySubject

发出源 Observable 发出的所有项目,无论它何时订阅了 Observable。

3

BehaviorSubject

订阅后,发出最新项目,然后继续发出源 Observable 发出的项目。

4

AsyncSubject

在源 Observable 完成发射后,发射其发射的最后一项。

RxJava - PublishSubject

PublishSubject 向当前订阅的观察者发送项目,并向当前或后期观察者发送终止事件。

类声明

以下是 io.reactivex.subjects.PublishSubject<T> 类的声明 −

public final class PublishSubject<T>
extends Subject<T>

PublishSubject 示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      PublishSubject<String> subject = PublishSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //输出为 abcd
      System.out.println(result1);
      //输出仅为 d
      //在 c 项目发出后进行订阅。
      System.out.println(result2);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

abcd
d

RxJava - BehaviorSubject

BehaviorSubject 向每个订阅的观察者发出它观察到的最新项目,然后将所有后续观察到的项目发送给每个订阅的观察者。

类声明

以下是 io.reactivex.subjects.BehaviorSubject<T> 类的声明 −

public final class BehaviorSubject<T>
extends Subject<T>

BehaviorSubject 示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         
      BehaviorSubject<String> subject =  BehaviorSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();
      //输出为 abcd
      System.out.println(result1);
      //输出将是 cd 成为 BehaviorSubject
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

abcd
cd

RxJava - ReplaySubject

ReplaySubject 将事件/项目重播给当前和后期观察者。

类声明

以下是 io.reactivex.subjects.ReplaySubject<T> 类的声明 −

public final class ReplaySubject<T>
extends Subject<T>

ReplaySubject 示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      ReplaySubject<String> subject = ReplaySubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //输出为 abcd
      System.out.println(result1);
      //输出将是 abcd abcdbeing ReplaySubject
      //因为 ReplaySubject 发出了所有的项目
      System.out.println(result2);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

abcd
abcd

RxJava - AsyncSubject

AsyncSubject 向观察者发出唯一的最后一个值,然后是完成事件或收到的错误。

类声明

以下是 io.reactivex.subjects.AsyncSubject<T> 类的声明 −

public final class  AsyncSubject<T>
extends Subject<T>

AsyncSubject 示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      AsyncSubject<String> subject =  AsyncSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //输出将是 d,这是最后发出的项目
      System.out.println(result1);
      //输出将是 d,这是最后发出的项目     
      System.out.println(result2);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

d
d

RxJava - Schedulers 调度程序

调度程序用于多线程环境中,与 Observable 运算符配合使用。

根据 Reactive,调度程序用于安排运算符链如何应用于不同的线程。

默认情况下,Observable 和您应用于它的运算符链将在调用其 Subscribe 方法的同一线程上完成其工作并通知其观察者。SubscribeOn 运算符通过指定 Observable 应在其上运行的不同调度程序来更改此行为。 ObserveOn 操作符指定 Observable 将用来向其观察者发送通知的不同 Scheduler。

RxJava 中有以下类型的 Scheduler −

Sr.No. Scheduler & 说明
1

Schedulers.computation()

创建并返回用于计算工作的 Scheduler。要调度的线程数取决于系统中存在的 CPU。每个 CPU 允许一个线程。最适合事件循环或回调操作。

2

Schedulers.io()

创建并返回用于 IO 绑定工作的 Scheduler。线程池可以根据需要进行扩展。

3

Schedulers.newThread()

创建并返回一个 Scheduler,该 Scheduler 为每个工作单元创建一个新的线程。

4

Schedulers.trampoline()

创建并返回一个 Scheduler,该 Scheduler 将当前线程上的工作排队,以便在当前工作完成后执行。

4

Schedulers.from(java.util.concurrent.Executor executor)

将 Executor 转换为新的 Scheduler 实例。

RxJava - Trampoline Scheduler

Schedulers.trampoline() 方法创建并返回一个 Scheduler,该 Scheduler 将工作排队到当前线程上,以便在当前工作完成后执行。

Schedulers.trampoline() 示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

RxJava - NewThread Scheduler

Schedulers.newThread() 方法创建并返回一个 Scheduler,该 Scheduler 为每个工作单元创建一个新的线程。

Schedulers.newThread() 示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

RxJava - Computation Scheduler

Schedulers.computation() 方法创建并返回用于计算工作的 Scheduler。要调度的线程数取决于系统中现有的 CPU。每个 CPU 允许一个线程。最适合事件循环或回调操作。

Schedulers.computation() 示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

RxJava - IO Scheduler

Schedulers.io() 方法创建并返回一个用于 IO 绑定工作的 Scheduler。线程池可以根据需要扩展。最适合 I/O 密集型操作。

Schedulers.io() 示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

RxJava - From Scheduler

Schedulers.from(Executor) 方法将 Executor 转换为新的 Scheduler 实例。

Schedulers.from(Executor) 示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

RxJava - 缓冲

缓冲运算符允许将 Observable 发出的项目收集到列表或包中,并发出这些包而不是项目。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用缓冲,3 个项目将一起发出。

Buffering 示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .buffer(3)
         .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(List<Integer> integers) {
               System.out.println("onNext: ");
               for (Integer value : integers) {
                  System.out.println(value);
               }
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done! 

RxJava - 窗口化

窗口化运算符的工作原理类似于缓冲区运算符,但它允许将 Observable 发出的项目收集到另一个 Observable 中,而不是集合中,并发出这些 Observable 而不是集合。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用窗口运算符,3 个 Observable 将一起发出。

Windowing 示例

使用您选择的任何编辑器(例如,C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

验证结果

使用 javac 编译器编译类,如下所示 −

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示 −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done!