RxJS - 快速指南

RxJS - 概述

本章介绍有关 RxJS 的功能、优点和缺点的信息。在这里,我们还将学习何时使用 RxJS。

RxJS 的全称是 Reactive Extension for Javascript。它是一个使用可观察对象进行反应式编程的 JavaScript 库,用于处理异步数据调用、回调和基于事件的程序。RxJS 可以与其他 JavaScript 库和框架一起使用。它受 JavaScript 和 TypeScript 支持。

什么是 RxJS?

根据 RxJS 官方网站,它被定义为使用可观察序列编写异步和基于事件的程序的库。它提供了一种核心类型,即 Observable、附属类型(Observer、Schedulers、Subjects)和受 Array#extras 启发的运算符(map、filter、reduce、every 等),以允许将异步事件作为集合处理。

RxJS 的功能

在 RxJS 中,以下概念负责处理异步任务 −

Observable

Observable 是一个函数,它创建一个观察者并将其附加到需要值的源,例如,点击、来自 dom 元素的鼠标事件或 Http 请求等。

Observer

它是一个具有 next()、error() 和 complete() 方法的对象,当与可观察对象进行交互时,即源进行交互,例如按钮点击、Http 请求,等等。

订阅

当创建可观察对象时,要执行可观察对象,我们需要订阅它。它还可用于取消执行。

操作符

操作符是一个纯函数,它将可观察对象作为输入,输出也是一个可观察对象。

主题

主题是一个可以多播的可观察对象,即与许多观察者对话。考虑一个带有事件监听器的按钮,每次用户单击按钮时,都会调用使用 addlistener 附加到事件的函数,类似的功能也适用于主题。

调度程序

调度程序控制订阅何时启动和通知的执行。

何时使用 RxJS?

如果您的项目包含大量异步任务处理,那么 RxJS 是一个不错的选择。它默认随 Angular 项目一起加载。

使用 RxJS 的优势

以下是使用 RxJS 的优势 −

  • RxJS 可以与其他 Javascript 库和框架一起使用。它受 javascript 和 typescript 支持。一些例子是 Angular、ReactJS、Vuejs、nodejs 等。

  • 在处理异步任务方面,RxJS 是一个很棒的库。RxJS 使用可观察对象来处理处理异步数据调用、回调和基于事件的程序的反应式编程。

  • RxJS 提供了大量的数学、转换、过滤、实用程序、条件、错误处理、连接类别的运算符集合,使与反应式编程一起使用时变得轻松。

使用 RxJS 的缺点

以下是使用 RxJS 的缺点 −

  • 使用可观察对象调试代码有点困难。

  • 当您开始使用可观察对象时,您最终可以将整个代码包装在可观察对象下。

RxJS - 环境设置

在本章中,我们将安装 RxJS。要使用 RxJS,我们需要以下设置 −

  • NodeJS
  • Npm
  • RxJS 包安装

NODEJS 和 NPM 安装

使用 npm 安装 RxJS 非常容易。您需要在系统上安装 nodejs 和 npm。要验证您的系统上是否安装了 NodeJS 和 npm,请尝试在命令提示符中执行以下命令。

E:\>node -v && npm -v
v10.15.1
6.4.1

如果您获取了版本,则表示您的系统上安装了 nodejs 和 npm,并且系统上的当前版本为 10 和 6。

如果它没有打印任何内容,请在您的系统上安装 nodejs。要安装 nodejs,请转到 nodejs 主页 https://nodejs.org/en/download/,然后根据您的操作系统安装软件包。

nodejs 的下载页面将如下所示 −

NodeJS

根据您的操作系统,安装所需的软件包。安装 nodejs 后,npm 也会随之安装。要检查 npm 是否已安装,请在终端中输入 npm –v。它应该显示 npm 的版本。

RxJS 包安装

要开始安装 RxJS,首先创建一个名为 rxjsproj/ 的文件夹,我们将在其中练习所有 RxJS 示例。

创建文件夹 rxjsproj/ 后,运行命令 npm init,进行项目设置,如下所示

E:\>mkdir rxjsproj
E:\>cd rxjsproj
E:
xjsproj>npm init

Npm init 命令在执行过程中会询问几个问题,只需按 Enter 并继续。npm init 执行完成后,它将在 rxjsproj/ 中创建 package.json,如下所示 −

rxjsproj/
    package.json

现在您可以使用以下命令安装 rxjs −

npm install ---save-dev rxjs
E:
xjsproj>npm install --save-dev rxjs
npm notice created a lockfile as package-lock.json. You should commit this file.

npm WARN rxjsproj@1.0.0 No description
npm WARN rxjsproj@1.0.0 No repository field.

+ rxjs@6.5.3
added 2 packages from 7 contributors and audited 2 packages in 21.89s
found 0 vulnerabilities

我们已经完成了 RxJS 的安装。现在让我们尝试使用 RxJS,为此在 rxjsproj/ 内创建一个文件夹 src/

因此,现在我们将拥有如下所示的文件夹结构 −

rxjsproj/
   node_modules/
   src/
   package.json

src/ 中创建一个文件 testrx.js,并写入以下代码 −

testrx.js

import { of } from 'rxjs;
import { map } from 'rxjs/operators';

map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`Output is: ${v}`));

当我们在命令提示符中使用命令 − node testrx.js 执行上述代码时,它会显示导入错误,因为 nodejs 不知道如何处理导入。

要使导入与 nodejs 一起工作,我们需要使用 npm 安装 ES6 模块包,如下所示 −

E:
xjsproj\src>npm install --save-dev esm
npm WARN rxjsproj@1.0.0 No description
npm WARN rxjsproj@1.0.0 No repository field.

+ esm@3.2.25
added 1 package from 1 contributor and audited 3 packages in 9.32s
found 0 vulnerabilities

一旦包安装完毕,我们现在可以执行testrx.js文件,如下所示 −

E:
xjsproj\src>node -r esm testrx.js
Output is: 1
Output is: 4
Output is: 9

我们现在可以看到输出,显示 RxJS 已安装并可供使用。上述方法将帮助我们在命令行中测试 RxJS。如果您想在浏览器中测试 RxJS,我们需要一些额外的包。

在浏览器中测试 RxJS

在 rxjsproj/ 文件夹中安装以下包 −

npm install --save-dev babel-loader @babel/core @babel/preset-env webpack webpack-cli webpack-dev-server
E:
xjsproj>npm install --save-dev babel-loader 
@babel/core @babel/preset-env webpack webpack-cli webpack-dev-server

npm WARN rxjsproj@1.0.0 No description
npm WARN rxjsproj@1.0.0 No repository field.
npm WARN optional SKIPPING OPTIONAL DEPENDENCY: fsevents@1.2.9
(node_modules\fsevents):
npm WARN notsup SKIPPING OPTIONAL DEPENDENCY: Unsupported platform for fsevents@
1.2.9: wanted {"os":"darwin","arch":"any"} (current: {"os":"win32","arch":"x64"})

+ webpack-dev-server@3.8.0
+ babel-loader@8.0.6
+ @babel/preset-env@7.6.0
+ @babel/core@7.6.0
+ webpack-cli@3.3.8
+ webpack@4.39.3
added 675 packages from 373 contributors and audited 10225 packages in 255.567s
found 0 vulnerabilities

要启动服务器来执行我们的 Html 文件,我们将使用 webpack-server。package.json 中的命令"publish"将帮助我们启动并使用 webpack 打包所有 js 文件。打包的 js 文件(即我们最终要使用的 js 文件)保存在路径 /dev 文件夹中。

要使用 webpack,我们需要运行 npm run publish 命令,该命令添加到 package.json 中,如下所示 −

Package.json

{
   "name": "rxjsproj",
   "version": "1.0.0",
   "description": "",
   "main": "index.js",
   "scripts": {
      "publish":"webpack && webpack-dev-server --output-public=/dev/",
      "test": "echo \"Error: no test specified\" && exit 1"
   },
   "author": "",
   "license": "ISC",
   "devDependencies": {
      "@babel/core": "^7.6.0",
      "@babel/preset-env": "^7.6.0",
      "babel-loader": "^8.0.6",
      "esm": "^3.2.25",
      "rxjs": "^6.5.3",
      "webpack": "^4.39.3",
      "webpack-cli": "^3.3.8",
      "webpack-dev-server": "^3.8.0"
   }
}

要使用 webpack,我们必须首先创建一个名为 webpack.config.js 的文件,其中包含 webpack 工作的配置详细信息。

文件中的详细信息如下 −

var path = require('path');

module.exports = {
   entry: {
      app: './src/testrx.js'
   },
   output: {
      path: path.resolve(__dirname, 'dev'),
      filename: 'main_bundle.js'
   },
   mode:'development',
   module: {
      rules: [
         {
            test:/\.(js)$/,
            include: path.resolve(__dirname, 'src'),
            loader: 'babel-loader',
            query: {
               presets: ['@babel/preset-env']
            }
         }
      ]
   }
};

文件的结构如上所示。它以提供当前路径详细信息的路径开始。

var path = require('path'); //提供当前路径

接下来是 module.exports 对象,它具有属性 entry、output 和 module。Entry 是起点。这里,我们需要提供我们想要编译的启动 js 文件。

entry: {
    app: './src/testrx.js'
},

path.resolve(_dirname, 'src/testrx.js') - 将在目录中查找 src 文件夹并在该文件夹中查找 testrx.js。

输出

output: {
    path: path.resolve(__dirname, 'dev'),
    filename: 'main_bundle.js'
},

输出是一个包含路径和文件名详细信息的对象。path 将保存编译文件所在的文件夹,文件名将告诉您 .html 文件中使用的最终文件的名称。

模块

module: {
   rules: [
      {
         test:/\.(js)$/,
         include: path.resolve(__dirname, 'src'),
         loader: 'babel-loader',
         query: {
            presets: ['@babel/preset-env']
         }
      }
   ]
}

Module 是具有规则详细信息的对象,其属性包括 test、include、loader、query。测试将保存所有以 .js 和 .jsx 结尾的 js 文件的详细信息。它具有在给定入口点末尾查找 .js 的模式。

Include 告诉用于查看文件的文件夹。

Loader 使用 babel-loader 来编译代码。

Query 具有属性 presets,它是一个值为"@babel/preset-env"的数组。它将根据您需要的 ES 环境转换代码。

最终文件夹结构如下 −

rxjsproj/
   node_modules/
   src/
      testrx.js
   index.html
   package.json
   webpack.config.js

运行命令

npm run publish 将创建包含 main_bundle.js 文件的 dev/ 文件夹。服务器将启动,您可以在浏览器中测试 index.html,如下所示。

运行命令

打开浏览器并点击网址 − http://localhost:8080/

Main Bundle

输出显示在控制台中。

RxJS - 最新更新

我们在本教程中使用 RxJS 版本 6。RxJS 通常用于处理响应式编程,并且更常与 Angular、ReactJS 一起使用。 Angular 6 默认加载 rxjs6。

与版本 6 相比,RxJS 版本 5 的处理方式有所不同。如果您将 RxJS 5 更新为 6,代码将会中断。在本章中,我们将了解处理版本更新的方式的不同。

如果您要将 RxJS 更新到 6 并且不想进行代码更改,您也可以这样做,并且必须安装以下软件包。

npm install --save-dev rxjs-compact

此软件包将负责提供向后兼容性,旧代码将与 RxJS 版本 6 配合良好。如果您想进行与 RxJS 6 配合良好的代码更改,以下是需要进行的更改。

运算符、可观察对象、主题的软件包已重组,因此,主要更改涉及导入,并对其进行了解释如下。

运算符的导入

根据版本 5,对于运算符,应包含以下导入语句 −

import 'rxjs/add/operator/mapTo'
import 'rxjs/add/operator/take'
import 'rxjs/add/operator/tap'
import 'rxjs/add/operator/map'

在 RxJS 版本 6 中,导入将如下所示 −

import {mapTo, take, tap, map} from "rxjs/operators"

导入方法以创建可观察对象

按照版本 5,在使用可观察对象时,应包括以下导入方法 −

import "rxjs/add/observable/from";
import "rxjs/add/observable/of";
import "rxjs/add/observable/fromEvent";
import "rxjs/add/observable/interval";

在 RxJS 版本 6 中,导入将如下所示 −

import {from, of, fromEvent, interval} from 'rxjs';

导入 Observables

在 RxJS 版本 5 中,使用 Observables 时,应包含以下导入语句 −

import { Observable } from 'rxjs/Observable'

在 RxJS 版本 6 中,导入将如下所示 −

import { Observable } from 'rxjs'

导入 Subject

在 RxJS 版本 5 中,应包含以下 subject −

import { Subject} from 'rxjs/Subject'

在 RxJS 版本 6 中,导入将如下所示 −

import { Subject } from 'rxjs'

如何在 RxJS 6 中使用运算符?

pipe() 方法 可用于创建的可观察对象。它从版本 5.5 开始添加到 RxJS。现在,使用 pipe(),您可以按顺序同时使用多个运算符。这是 RxJS 版本 5 中使用运算符的方式。

示例

import "rxjs/add/observable/from";
import 'rxjs/add/operator/max'

let list1 = [1, 6, 15, 10, 58, 2, 40];
from(list1).max((a,b)=>a-b).subscribe(x => console.log("最大值为 "+x));

从 RxJS 5.5 版开始,我们必须使用 pipe() 来执行运算符 −

示例

import { from } from 'rxjs';
import { max } from 'rxjs/operators';

from(list1).pipe(max((a,b)=>a-b)).subscribe(x => console.log(
"最大值为 "+x)
);

运算符已重命名

在重组软件包期间,一些运算符被重命名,因为它们与 javascript 关键字冲突或匹配。列表如下 −

操作符 重命名为
do() tap()
catch() catchError()
switch() switchAll()
finally() finalize()
throw() throwError()

RxJS - 可观察对象

可观察对象是一个函数,它创建一个观察者并将其附加到源,源可以从中获取值,例如,点击、来自 dom 元素的鼠标事件或 Http 请求等。

观察者是一个具有回调函数的对象,当与可观察对象发生交互时,即源已与按钮点击、Http 请求等发生交互时,将调用该函数。

我们将在本章中讨论以下主题 −

  • 创建可观察对象
  • 订阅可观察对象
  • 执行可观察对象

创建可观察对象

可以使用可观察对象构造函数创建可观察对象,也可以使用可观察对象创建方法,并将订阅函数作为参数传递给它,如下所示 −

testrx.js

import { Observable } from 'rxjs';

var observable = new Observable(
   function subscribe(subscriber) {
      subscriber.next("My First Observable")
   }
);

我们创建了一个可观察对象,并使用 Observable 内部可用的 subscriber.next 方法添加了一条消息"我的第一个可观察对象"。

我们还可以使用 Observable.create() 方法创建可观察对象,如下所示 −

testrx.js

import { Observable } from 'rxjs';
var observer = Observable.create(
   function subscribe(subscriber) {
      subscriber.next("My First Observable")
   }
);

订阅可观察对象

您可以按如下方式订阅可观察对象 −

testrx.js

import { Observable } from 'rxjs';

var observer = new Observable(
   function subscribe(subscriber) {
      subscriber.next("My First Observable")
   }
);
observer.subscribe(x => console.log(x));

当观察者订阅时,它将开始执行可观察对象。

这是我们在浏览器控制台中看到的 −

订阅可观察对象

执行可观察对象

可观察对象在订阅时执行。观察者是一个具有三种通知方法的对象,

next() − 此方法将发送数字、字符串、对象等值。

complete() − 此方法不会发送任何值并指示可观察对象已完成。

error() −如果有错误,此方法将发送错误。

让我们创建包含所有三个通知的可观察对象并执行相同的操作。

testrx.js

import { Observable } from 'rxjs';
var observer = new Observable(
   function subscribe(subscriber) {
      try {
         subscriber.next("My First Observable");
         subscriber.next("Testing Observable");
         subscriber.complete();
      } catch(e){
         subscriber.error(e);
      }
   }
);
observer.subscribe(x => console.log(x), (e)=>console.log(e), 
   ()=>console.log("Observable is complete"));

在上面的代码中,我们添加了next、complete和error方法。

try{
   subscriber.next("My First Observable");
   subscriber.next("Testing Observable");
   subscriber.complete();
} catch(e){
   subscriber.error(e);
}

要执行 next、complete 和 error,我们必须调用 subscribe 方法,如下所示 −

observer.subscribe(x => console.log(x), (e)=>console.log(e),
    ()=>console.log("Observable is complete"));

只有出现错误时才会调用 error 方法。

这是浏览器中看到的输出 −

Execute Observable

RxJS - 运算符

运算符是 RxJS 的重要组成部分。运算符是一个纯函数,它将可观察对象作为输入,输出也是一个可观察对象。

使用运算符

运算符是一个纯函数,它将可观察对象作为输入,输出也是一个可观察对象。

要使用运算符,我们需要一个 pipe() 方法。

使用 pipe() 的示例

let obs = of(1,2,3); // 一个可观察对象
obs.pipe(
   operator1(),
   operator2(),
   operator3(),
   operator3(),
)

在上面的例子中,我们使用 of() 方法创建了一个可观察对象,该方法接受值 1、2 和 3。现在,在这个可观察对象上,您可以使用 pipe() 方法使用任意数量的运算符执行不同的操作,如上所示。运算符的执行将在给定的可观察对象上按顺序进行。

下面是一个工作示例 −

import { of } from 'rxjs';
import { map, reduce, filter } from 'rxjs/operators';

let test1 = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
let case1 = test1.pipe(
   filter(x => x % 2 === 0),
   reduce((acc, one) => acc + one, 0)
)
case1.subscribe(x => console.log(x));

Output

30

在上面的例子中,我们使用了过滤运算符来过滤偶数,接下来我们使用了 reduce() 运算符来添加偶数值并在订阅时给出结果。

下面是我们将要讨论的 Observable 列表。

  • 创建
  • 数学
  • 连接
  • 转换
  • 过滤
  • 实用程序
  • 条件
  • 多播
  • 错误处理

创建运算符

以下是我们将在创建运算符类别 − 中讨论的运算符

Sr.No 操作符和说明
1 ajax

此操作符将对给定的 URL 发出 ajax 请求。

2 from

此操作符将从数组、类似数组的对象、承诺、可迭代对象或类似可观察对象创建可观察对象。

3 fromEvent

此运算符将输出可观察对象,用于发出事件的元素,例如按钮、点击等。

4 fromEventPattern

此运算符将从用于注册事件处理程序的输入函数创建一个可观察对象。

5 interval

此运算符将为每个时间创建一个可观察对象给定..

6 of

此运算符将接受传递的参数并将其转换为可观察变量。

7 range

此运算符将创建一个可观察变量,它将根据提供的范围为您提​​供一系列数字。

8 throwError

此运算符将创建一个可观察对象,用于通知错误。

9 timer

此操作符将创建一个可观察对象,该可观察对象将在超时后发出值,并且该值将在每次调用后不断增加。

10 iif

此操作符将决定订阅哪个可观察对象。

数学运算符

以下是我们将要在数学运算符类别中讨论的运算符 −

Sr.No 运算符 &描述
1 Count

count() 运算符接收带有值的 Observable,并将其转换为将提供单个值的 Observable

2 Max

Max 方法将接收带有所有值的 Observable,并返回带有最大值的 Observable

3 Min

Min 方法将接受包含所有值的可观察变量并返回包含最小值的可观察变量。

4 Reduce

在 Reduce 运算符中,累加器函数用于输入可观察变量,累加器函数将以可观察变量的形式返回累积值,并将可选的种子值传递给累加器函数。

reduce() 函数将接受 2 个参数,一个是累加器函数,第二个是种子值。

连接运算符

以下是我们使用的运算符将在 Join 运算符类别中讨论。

Sr.No 运算符 &描述
1 concat

此运算符将按顺序发出作为输入的 Observable,然后继续执行下一个。

2 forkJoin

此运算符将作为输入放入数组或字典对象中,并等待 Observable 完成并返回从给定 Observable 发出的最后值。

3 merge

此运算符将接收输入可观察对象,并将发出可观察对象中的所有值并发出一个输出可观察对象。

4 race

它将返回一个可观察对象,该可观察对象将是第一个源可观察对象的镜像副本。

转换运算符

以下是我们将在转换运算符类别中讨论的运算符。

Sr.No 操作符和说明
1 buffer

缓冲区对可观察对象进行操作,并将参数作为可观察对象。它将开始在数组中缓冲其原始可观察对象上发出的值,并在作为参数的可观察对象发出时发出相同的值。一旦作为参数的可观察对象发出,缓冲区就会重置并再次开始在原始对象上缓冲,直到输入的可观察对象发出,并重复相同的场景。

2 bufferCount

对于 buffercount() 运算符,它将从调用它的可观察对象中收集值,并在给定给 buffercount 的缓冲区大小匹配时发出相同的值。

3 bufferTime

这类似于 bufferCount,因此在这里,它将从调用它的可观察对象中收集值并发出bufferTimeSpan 已完成。它接受 1 个参数,即 bufferTimeSpan

4 bufferToggle

对于 bufferToggle(),它接受 2 个参数,openings 和 closingSelector。打开的参数是可订阅的或承诺启动缓冲区,第二个参数 closingSelector 再次可订阅或承诺指示关闭缓冲区并发出收集的值。

5 bufferWhen

此运算符将以数组形式提供值,它接受一个参数作为函数,该函数将决定何时关闭、发出和重置缓冲区。

6 expand

expand 运算符接受一个函数作为参数,该函数以递归方式应用于源可观察对象以及输出observable。最终值是 observable。

7 groupBy

在 groupBy 运算符中,输出根据特定条件分组,这些组项作为 GroupedObservable 发出。

8 map

在 map 运算符的情况下,对源 Observable 上的每个值应用投影函数,并将相同的输出作为 Observable 发出。

9 mapTo

每次源 Observable 发出一个值时,都会随 Observable 一起给出一个常量值作为输出。

10 mergeMap

在 mergeMap 运算符的情况下,将对每个源值应用一个投影函数,并将其输出与输出 Observable 合并。

11 switchMap

在 switchMap 运算符的情况下,将对每个源值应用一个投影函数,并将其输出与输出 Observable 合并,并且给出的值是最新投影的 Observable。

12 window

它接受一个参数 windowboundaries,它是一个可观察对象,并在给定的 windowboundaries 发出时返回一个嵌套的可观察对象

过滤运算符

以下是我们将在过滤运算符类别中讨论的运算符。

Sr.No 运算符 &描述
1 debounce

一段时间后从源 Observable 发出的值,发射由作为 Observable 或承诺给出的另一个输入决定。

2 debounceTime

仅在时间完成后才会从源 Observable 发出值。

3 distinct

此运算符将给出源可观察对象中与前一个值不同的所有值。

4 elementAt

此运算符将根据给定的索引给出源可观察对象的单个值。

5 filter

此运算符将根据谓词函数过滤源可观察对象中的值给出。

6 first

此运算符将给出源 Observable 发出的第一个值。

7 last

此运算符将给出源 Observable 发出的最后一个值。

8 ignoreElements

此运算符将忽略所有值来自源 Observable 并仅执行对完成或错误回调函数的调用。

9 sample

此运算符将提供来自源 Observable 的最新值,输出将取决于传递给它的参数。

10 skip

此运算符将返回一个可观察对象,该可观察对象将跳过作为输入的 count 项的第一次出现。

11 throttle

此运算符将输出并忽略源可观察值,时间由作为参数的输入函数确定,并将重复相同的过程。

实用运算符

以下是我们将在实用运算符类别中讨论的运算符。

Sr.No 运算符和说明
1 tap

此运算符将具有与源可观察值相同的输出,并可用于将可观察值记录给用户。主要值,如果有错误或任务已完成,则为错误。

2 delay

此运算符根据给定的超时延迟从源 Observable 发出的值。

3 delayWhen

此运算符根据作为输入的另一个 Observable 的超时延迟从源 Observable 发出的值。

4 observeOn

此运算符基于输入的调度程序,将从源 Observable 重新发出通知。

5 subscribeOn

此运算符有助于根据作为输入的调度程序异步订阅源 Observable。

6 timeInterval

此运算符将返回一个对象,其中包含当前值以及使用调度程序输入计算出的当前值与上一个值之间的时间间隔已采取。

7 timestamp

返回时间戳以及源 Observable 发出的值,该值告知发出该值的时间。

8 timeout

如果源 Observable 在给定的超时后未发出值,此运算符将引发错误。

9 toArray

累积来自 Observable 的所有源值,并在源完成时将它们作为数组输出。

条件运算符

以下是我们将在条件运算符类别中讨论的运算符。

Sr.No 运算符 &描述
1 defaultIfEmpty

如果源可观察对象为空,此运算符将返回默认值。

2 every

它将根据输入函数是否满足源可观察对象上每个值的条件返回一个可观察对象。

3 find

当源 Observable 的第一个值满足作为输入的谓词函数的条件时,这将返回可观察对象。

4 findIndex

此基于输入调度程序的运算符将重新发出来自源 Observable 的通知。

5 isEmpty

如果输入可观察对象进行完整回调,此运算符将使输出为 true不发出任何值,如果输入可观察对象发出任何值,则返回 false。

多播运算符

以下是我们将在多播运算符类别中讨论的运算符。

Sr.No 操作符和说明
1 multicast

多播操作符与其他订阅者共享创建的单个订阅。多播接收的参数是主题或工厂方法,它返回具有 connect() 方法的 ConnectableObservable。要订阅,必须调用 connect() 方法。

2 publish

此操作符返回 ConnectableObservable,需要使用 connect() 方法订阅可观察对象。

3 publishBehavior

publishBehaviour 使用 BehaviourSubject,并返回 ConnectableObservable。必须使用 connect() 方法来订阅创建的可观察对象。

4 publishLast

publishBehaviour 使用 AsyncSubject,并返回 ConnectableObservable。必须使用 connect() 方法来订阅创建的可观察对象。

5 publishReplay

publishReplay 使用行为主体,它可以缓冲值并将其重播给新订阅者并返回 ConnectableObservable。必须使用 connect() 方法来订阅创建的可观察对象。

6 share

它是 mutlicast() 运算符的别名,唯一的区别是您不必手动调用 connect () 方法来启动订阅。

错误处理运算符

以下是我们将在错误处理运算符类别中讨论的运算符。

Sr.No 运算符 &描述
1 catchError

此操作符通过返回新的 Observable 或错误来捕获源 Observable 上的错误。

2 retry

如果出现错误,此操作符将负责在源 Observable 上重试,并且将根据给定的输入计数进行重试。

RxJS - 使用订阅

当创建可观察对象时,要执行可观察对象,我们需要订阅它。

count() 运算符

下面是如何订阅可观察对象的简单示例。

示例 1

import { of } from 'rxjs';
import { count } from 'rxjs/operators';

let all_nums = of(1, 7, 5, 10, 10, 20);
let final_val = all_nums.pipe(count());
final_val.subscribe(x => console.log("计数为 "+x));

输出

计数为 6

订阅有一个名为 unsubscribe() 的方法。调用 unsubscribe() 方法将删除用于该可观察对象的所有资源,即可观察对象将被取消。以下是使用 unsubscribe() 方法的一个实际示例。

示例 2

import { of } from 'rxjs';
import { count } from 'rxjs/operators';

let all_nums = of(1, 7, 5, 10, 10, 20);
let final_val = all_nums.pipe(count());
let test = final_val.subscribe(x => console.log("计数为 "+x));
test.unsubscribe();

订阅存储在变量 test 中。我们已使用 test.unsubscribe() 可观察对象。

输出

计数为 6

RxJS - 使用主题

主题是可以进行多播的可观察对象,即与许多观察者对话。考虑一个带有事件侦听器的按钮,每次用户单击按钮时,都会调用使用添加侦听器附加到事件的函数,类似的功能也适用于主题。

我们将在本章中讨论以下主题 −

  • 创建主题
  • 可观察对象和主题之间的区别是什么?
  • 行为主题
  • 重放主题
  • AsyncSubject

创建主题

要使用主题,我们需要导入主题,如下所示 −

import { Subject } from 'rxjs';

您可以按如下方式创建主题对象 −

const subject_test = new Subject();

该对象是一个观察者,具有三个方法 −

  • next(v)
  • error(e)
  • complete()

订阅主题

您可以按如下方式在主题上创建多个订阅 −

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});

订阅已注册到主题对象,就像我们之前讨论的 addlistener 一样。

将数据传递给主题

您可以使用 next() 方法将数据传递给创建的主题。

subject_test.next("A");

数据将传递给主题上添加的所有订阅。

示例

这里是主题的一个工作示例 −

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.next("B");

通过调用 new Subject() 创建 subject_test 对象。subject_test 对象引用了 next()、error() 和 complete() 方法。上述示例的输出如下所示 −

输出

Passing Data

我们可以使用 complete() 方法停止 subject 执行,如下所示。

示例

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
    next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
    next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.complete();
subject_test.next("B");

一旦我们调用 complete,后面调用的下一个方法就不会被调用。

输出

传递数据方法

现在让我们看看如何调用 error() 方法。

示例

下面是一个工作示例 −

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.error(new Error("There is an error"));

输出

传递数据错误

可观察对象和主题之间的区别是什么?

可观察对象将与订阅者一对一对话。每当您订阅可观察对象时,执行都会从头开始。使用 ajax 进行 Http 调用,2 个订阅者调用可观察对象。您将在浏览器网络选项卡中看到 2 个 HttpHttp 请求。

示例

以下是相同的工作示例 −

import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
letsubscriber1 = final_val.subscribe(a => console.log(a));
letsubscriber2 = final_val.subscribe(a => console.log(a));

输出

Observable Observable Ex

现在,问题是,我们希望共享相同的数据,但不能以 2 次 Http 调用为代价。我们希望进行一次 Http 调用并在订阅者之间共享数据。

使用 Subjects 可以实现这一点。它是一个可以进行多播的可观察对象,即与许多观察者对话。它可以在订阅者之间共享值。

示例

这是一个使用主题的工作示例 −

import { Subject } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(v)
});
subject_test.subscribe({
   next: (v) => console.log(v)
});

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber = final_val.subscribe(subject_test);

输出

Observable possible

现在你只能看到一个 Http 调用,并且相同的数据在调用的订阅者之间共享。

Observable 订阅者

行为主题

行为主题将在调用时为你提供最新值。

你可以创建行为主题,如下所示 −

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject("Testing Behaviour Subject");
// 使用值初始化行为主体:测试行为主体

示例

以下是使用行为主体的一个工作示例 −

import { BehaviorSubject } from 'rxjs';
const behavior_subject = new BehaviorSubject("测试行为主体");
// 0 是初始值

behavior_subject.subscribe({
   next: (v) => console.log(`observerA: ${v}`)
});

behavior_subject.next("Hello");
behavior_subject.subscribe({
   next: (v) => console.log(`observerB: ${v}`)
});
behavior_subject.next("Last call to Behaviour Subject");

输出

行为主体

重放主体

重放主体类似于行为主体,其中,它可以缓冲值并将其重放给新订阅者。

示例

以下是重放主体的一个工作示例 −

import { ReplaySubject } from 'rxjs';
const replay_subject = new ReplaySubject(2);
// 缓冲 2 个值但新订阅者

replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject A: ${v}`)
});

replay_subject.next(1);
replay_subject.next(2);
replay_subject.next(3);
replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject B: ${v}`)
});

replay_subject.next(5);

重放主题使用的缓冲值为 2。因此,最后两个值将被缓冲并用于调用的新订阅者。

输出

Replay Subject

AsyncSubject

对于 AsyncSubject,调用的最后一个值将传递给订阅者,并且只有在调用 complete() 方法后才会完成。

示例

以下是相同的工作示例 −

import { AsyncSubject } from 'rxjs';

const async_subject = new AsyncSubject();

async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject A: ${v}`)
});

async_subject.next(1);
async_subject.next(2);
async_subject.complete();
async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject B: ${v}`)
});

此处,在调用 complete 之前,传递给主题的最后一个值是 2,并且将其提供给订阅者。

输出

Async Subject

RxJS - 使用 Scheduler

调度程序控制订阅何时启动和通知的执行。

要使用调度程序,我们需要以下内容 −

import { Observable, asyncScheduler } from 'rxjs';
import { observerOn } from 'rxjs/operators';

这是一个工作示例,其中,我们将使用决定执行的调度程序。

示例

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

var observable = new Observable(function subscribe(subscriber) {
   subscriber.next("My First Observable");
   subscriber.next("Testing Observable");
   subscriber.complete();
}).pipe(
   observeOn(asyncScheduler)
);
console.log("Observable Created");
observable.subscribe(
   x => console.log(x),
   (e)=>console.log(e),
   ()=>console.log("Observable is complete")
);

console.log('Observable Subscribed');

输出

Scheduler

如果没有调度程序,输出将如下所示 −

Scheduler Controls

使用 RxJS 和 Angular

在本章中,我们将了解如何将 RxJs 与 Angular 结合使用。我们不会在这里介绍 Angular 的安装过程,要了解 Angular 安装,请参阅此链接 −https://www.tutorialspoint.com/angular7/angular7_environment_setup.htm

我们将直接使用一个示例,其中将使用 RxJS 中的 Ajax 来加载数据。

示例

app.component.ts

import { Component } from '@angular/core';
import { environment } from './../environments/environment';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators'

@Component({
   selector: 'app-root',
   templateUrl: './app.component.html',
   styleUrls: ['./app.component.css']
})
export class AppComponent {
   title = '';
   data;
   constructor() {
      this.data = "";
      this.title = "Using RxJs with Angular";
      let a = this.getData();
   }
   getData() {
      const response =
      ajax('https://jsonplaceholder.typicode.com/users')
         .pipe(map(e => e.response));
      response.subscribe(res => {
         console.log(res);
         this.data = res;
      });
   }
}

app.component.html

<div>
   <h3>{{title}}</h3>
   <ul *ngFor="let i of data">
      <li>{{i.id}}: {{i.name}}</li>
   </ul>
</div>

<router-outlet></router-outlet>

我们使用了 RxJS 中的 ajax,它将从此 url 加载数据 − https://jsonplaceholder.typicode.com/users

编译时显示内容如下所示 −

RxJs with Angular

使用 RxJS 和 ReactJS

在本章中,我们将了解如何将 RxJs 与 ReactJS 结合使用。我们不会在这里介绍 Reactjs 的安装过程,要了解 ReactJS 安装,请参阅此链接:/reactjs/reactjs_environment_setup.htm

示例

我们将直接使用下面的示例,其中将使用 RxJS 中的 Ajax 加载数据。

index.js

import React, { Component } from "react";
import ReactDOM from "react-dom";
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';
class App extends Component {
   constructor() {
      super();
      this.state = { data: [] };
   }
   componentDidMount() {
      const response = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
      response.subscribe(res => {
         this.setState({ data: res });
      });
   }
   render() {
      return (
         <div>
            <h3>Using RxJS with ReactJS</h3>
            <ul>
               {this.state.data.map(el => (
                  <li>
                     {el.id}: {el.name}
                  </li>
               ))}
            </ul>
         </div>
      );
   }
}
ReactDOM.render(<App />, document.getElementById("root"));

index.html

<!DOCTYPE html>
<html>
   <head>
      <meta charset = "UTF-8" />
      <title>ReactJS Demo</title>
   <head>
   <body>
      <div id = "root"></div>
   </body>
</html>

我们使用了 RxJS 中的 ajax,它将从此 Url 加载数据 − https://jsonplaceholder.typicode.com/users

编译时,显示内容如下所示 −

RxJs with ReactJS