Angular 8 - 响应式编程

反应式编程是一种处理数据流和变化传播的编程范例。 数据流可以是静态的或动态的。 静态数据流的一个示例是数据的数组或集合。 它会有一个初始数量,并且不会改变。 动态数据流的一个例子是事件发射器。 每当事件发生时,事件发射器就会发出数据。 最初,可能没有事件,但随着时间的推移,事件发生并且它会被发出。

响应式编程使数据流能够从一个名为 Observable 的源发出,并且发出的数据流可以由名为 Observer 的其他源通过称为订阅的过程捕获。 这种可观察/观察者模式或简单的观察者模式极大地简化了编程上下文中复杂的更改检测和必要的更新。

JavaScript 没有对反应式编程的内置支持。 RxJs 是一个 JavaScript 库,它支持 JavaScript 中的反应式编程。 Angular 广泛使用 RxJs 库来实现下面提到的高级概念 −

  • 组件之间的数据传输。
  • HTTP 客户端。
  • 路由器。
  • 反应式表单。

让我们在本章中使用 RxJs 库学习响应式编程。

可观察

正如之前所了解的,Observable 是数据源,它们可以是静态的或动态的。 Rxjs 提供了很多从常见 JavaScript 对象创建 Observable 的方法。 让我们看看一些常见的方法。

of −按序列发出任意数量的值,最后发出完整的通知。

const numbers$ = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

这里,

  • numbers$ 是一个 Observable 对象,订阅后将按顺序发出 1 到 10 的数据。

  • 变量末尾的美元符号($)用于标识该变量是Observable。

range − 按顺序发出一系列数字。

const numbers$ = range(1,10)

from − 发出数组、promise 或 iterable。

const numbers$ = from([1,2,3,4,5,6,7,8,9,10]);

ajax − 通过 AJAX 获取 url,然后发出响应。

const api$ = ajax({ url: 'https://httpbin.org/delay/1', method: 'POST', headers: { 'Content-Type': 'application/text' }, body: "Hello" });

这里,

https://httpbin.org 是一个免费的 REST API 服务,它将以 JSON 格式返回提供的正文内容,如下所示 −

{ 
   "args": {}, 
   "data": "Hello", 
   "files": {}, 
   "form": {}, 
   "headers": { 
      "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", 
      "Accept-Encoding": "gzip, deflate, br", 
      "Accept-Language": "en-US,en;q=0.9", 
      "Host": "httpbin.org", "Sec-Fetch-Dest": "document", 
      "Sec-Fetch-Mode": "navigate", 
      "Sec-Fetch-Site": "none", 
      "Upgrade-Insecure-Requests": "1", 
      "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36", 
      "X-Amzn-Trace-Id": "Root=1-5eeef468-015d8f0c228367109234953c" 
   }, 
   "origin": "ip address", 
   "url": "https://httpbin.org/delay/1" 
}

fromEvent − 侦听 HTML 元素的事件,然后在侦听事件触发时发出该事件及其属性。

const clickEvent$ = fromEvent(document.getElementById('counter'), 'click');

Angular 在内部广泛使用该概念来提供组件之间的数据传输以及反应式表单。

订阅流程

订阅 Observable 非常简单。 每个Observable对象都会有一个方法,subscribe用于订阅过程。 Observer需要实现三个回调函数来订阅Observable对象。 它们如下 −

  • next − 接收并处理从 Observable 发出的值

  • error − 错误处理回调

  • complete − 当来自 Observable 的所有数据发出时调用的回调函数。

一旦定义了三个回调函数,就必须按如下指定调用 Observable 的 subscribe 方法 −

const numbers$ = from([1,2,3,4,5,6,7,8,9,10]); 
// observer 
const observer = { 
   next: (num: number) => {      this.numbers.push(num); this.val1 += num }, 
      error: (err: any) => console.log(err), 
      complete: () => console.log("Observation completed") 
}; 
numbers$.subscribe(observer);

这里,

  • next − 方法获取发出的数字,然后将其推入局部变量 this.numbers

  • next − 方法还将数字添加到局部变量 this.val1.

  • error − 方法只是将错误消息写入控制台。

  • complete − 方法还将完成消息写入控制台。

我们可以跳过errorcomplete方法,只写next方法,如下所示 −

number$.subscribe((num: number) => { this.numbers.push(num); this.val1 += num; });

操作

Rxjs 库提供了一些操作符来处理数据流。 一些重要的运算符如下:−

filter − 启用使用回调函数过滤数据流。

const filterFn = filter( (num : number) => num > 5 ); 
const filteredNumbers$ = filterFn(numbers$); 
filteredNumbers$.subscribe( (num : number) => { 
this.filteredNumbers.push(num); this.val2 += num } );

map − 允许使用回调函数映射数据流并更改数据流本身。

const mapFn = map( (num : number) => num + num ); const mappedNumbers$ = mappedFn(numbers$);

pipe − 允许组合两个或多个运算符。

const filterFn = filter( (num : number) => num > 5 ); 
const mapFn = map( (num : number) => num + num ); const processedNumbers$ = numbers$.pipe(filterFn, mapFn); 
processedNumbers$.subscribe( (num : number) => { this.processedNumbers.push(num); this.val3 += num } );

让我们创建一个示例应用程序来尝试本章中学到的反应编程概念。

创建一个新的应用程序,使用以下命令进行响应 −

ng new reactive

将目录更改为我们新创建的应用程序。

cd reactive

运行应用程序。

ng serve

更改 AppComponent 组件代码 (src/app/app.component.ts),如下所示 −

import { Component, OnInit } from '@angular/core'; import { Observable, of, range, from, fromEvent } from 'rxjs'; 
import { ajax } from 'rxjs/ajax'; 
import { filter, map, catchError } from 'rxjs/operators'; 
@Component({ 
   selector: 'app-root', 
   templateUrl: './app.component.html', 
   styleUrls: ['./app.component.css'] 
}) 
export class AppComponent implements OnInit { 
   title = 'Reactive programming concept'; 
   numbers : number[] = []; 
   val1 : number = 0; 
   filteredNumbers : number[] = []; 
   val2 : number = 0; 
   processedNumbers : number[] = []; 
   val3 : number = 0; 
   apiMessage : string; 
   counter : number = 0; 
   ngOnInit() { 
      // Observable stream of data Observable<number>
      // const numbers$ = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 
      // const numbers$ = range(1,10); 
      const numbers$ = from([1,2,3,4,5,6,7,8,9,10]); 
      // observer 
      const observer = { 
         next: (num: number) => {this.numbers.push(num); this.val1 += num }, 
         error: (err: any) => console.log(err), 
         complete: () => console.log("Observation completed") 
      }; 
      numbers$.subscribe(observer); 
      const filterFn = filter( (num : number) => num > 5 ); 
      const filteredNumbers = filterFn(numbers$); 
      filteredNumbers.subscribe( (num : number) => {this.filteredNumbers.push(num); this.val2 += num } ); 
      const mapFn = map( (num : number) => num + num ); 
      const processedNumbers$ = numbers$.pipe(filterFn, mapFn); 
      processedNumbers$.subscribe( (num : number) => {this.processedNumbers.push(num); this.val3 += num } ); 
      const api$ = ajax({ 
         url: 'https://httpbin.org/delay/1', 
         method: 'POST', 
         headers: {'Content-Type': 'application/text' }, 
         body: "Hello" 
      }); 
      api$.subscribe(res => this.apiMessage = res.response.data ); 
      const clickEvent$ = fromEvent(document.getElementById('counter'), 'click'); 
      clickEvent$.subscribe( () => this.counter++ ); 
   } 
}

这里,

  • 使用 of、range、from、ajax 和 fromEvent 方法来创建 Observable。
  • 使用过滤器、映射和管道运算符方法来处理数据流。
  • 回调函数捕获发出的数据,对其进行处理,然后将其存储在组件的局部变量中。

按如下指定更改 AppComponent 模板 (src/app/app.component.html)

<h1>{{ title }}</h1> 
<div> 
   The summation of numbers ( <span *ngFor="let num of numbers"> {{ num }} </span> ) is {{ val1 }} 
</div> 
<div> 
   The summation of filtered numbers ( <span *ngFor="let num of filteredNumbers"> {{ num }} </span> ) is {{ val2 }} 
</div> 
<div> 
   The summation of processed numbers ( <span *ngFor="let num of processedNumbers"> {{ num }} </span> ) is {{ val3 }} 
</div> 
<div> 
   The response from the API is <em>{{ apiMessage }}</em> </div> 
<div> 
   <a id="counter" href="#">Click here</a> to increment the counter value. The current counter value is {{ counter }} 
<div>

这里,

显示了Observer回调函数处理的所有局部变量。

打开浏览器,http://localhost:4200。

管道

点击Click here链接五次。 对于每个事件,事件将被发出并转发给Observer。 观察者回调函数将被调用。 回调函数每次点击都会增加计数器,最终结果如下所示 −

Observer