RxJS - 使用主题

主题是可以进行多播的可观察对象,即与许多观察者对话。考虑一个带有事件侦听器的按钮,每次用户单击按钮时,都会调用使用添加侦听器附加到事件的函数,类似的功能也适用于主题。

我们将在本章中讨论以下主题 −

  • 创建主题
  • 可观察对象和主题之间的区别是什么?
  • 行为主题
  • 重放主题
  • AsyncSubject

创建主题

要使用主题,我们需要导入主题,如下所示 −

import { Subject } from 'rxjs';

您可以按如下方式创建主题对象 −

const subject_test = new Subject();

该对象是一个观察者,具有三个方法 −

  • next(v)
  • error(e)
  • complete()

订阅主题

您可以按如下方式在主题上创建多个订阅 −

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});

订阅已注册到主题对象,就像我们之前讨论的 addlistener 一样。

将数据传递给主题

您可以使用 next() 方法将数据传递给创建的主题。

subject_test.next("A");

数据将传递给主题上添加的所有订阅。

示例

这里是主题的一个工作示例 −

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.next("B");

通过调用 new Subject() 创建 subject_test 对象。subject_test 对象引用了 next()、error() 和 complete() 方法。上述示例的输出如下所示 −

输出

Passing Data

我们可以使用 complete() 方法停止 subject 执行,如下所示。

示例

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
    next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
    next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.complete();
subject_test.next("B");

一旦我们调用 complete,后面调用的下一个方法就不会被调用。

输出

传递数据方法

现在让我们看看如何调用 error() 方法。

示例

下面是一个工作示例 −

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.error(new Error("There is an error"));

输出

传递数据错误

可观察对象和主题之间的区别是什么?

可观察对象将与订阅者一对一对话。每当您订阅可观察对象时,执行都会从头开始。使用 ajax 进行 Http 调用,2 个订阅者调用可观察对象。您将在浏览器网络选项卡中看到 2 个 HttpHttp 请求。

示例

以下是相同的工作示例 −

import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
letsubscriber1 = final_val.subscribe(a => console.log(a));
letsubscriber2 = final_val.subscribe(a => console.log(a));

输出

Observable Observable Ex

现在,问题是,我们希望共享相同的数据,但不能以 2 次 Http 调用为代价。我们希望进行一次 Http 调用并在订阅者之间共享数据。

使用 Subjects 可以实现这一点。它是一个可以进行多播的可观察对象,即与许多观察者对话。它可以在订阅者之间共享值。

示例

这是一个使用主题的工作示例 −

import { Subject } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(v)
});
subject_test.subscribe({
   next: (v) => console.log(v)
});

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber = final_val.subscribe(subject_test);

输出

Observable possible

现在你只能看到一个 Http 调用,并且相同的数据在调用的订阅者之间共享。

Observable 订阅者

行为主题

行为主题将在调用时为你提供最新值。

你可以创建行为主题,如下所示 −

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject("Testing Behaviour Subject");
// 使用值初始化行为主体:测试行为主体

示例

以下是使用行为主体的一个工作示例 −

import { BehaviorSubject } from 'rxjs';
const behavior_subject = new BehaviorSubject("测试行为主体");
// 0 是初始值

behavior_subject.subscribe({
   next: (v) => console.log(`observerA: ${v}`)
});

behavior_subject.next("Hello");
behavior_subject.subscribe({
   next: (v) => console.log(`observerB: ${v}`)
});
behavior_subject.next("Last call to Behaviour Subject");

输出

行为主体

重放主体

重放主体类似于行为主体,其中,它可以缓冲值并将其重放给新订阅者。

示例

以下是重放主体的一个工作示例 −

import { ReplaySubject } from 'rxjs';
const replay_subject = new ReplaySubject(2);
// 缓冲 2 个值但新订阅者

replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject A: ${v}`)
});

replay_subject.next(1);
replay_subject.next(2);
replay_subject.next(3);
replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject B: ${v}`)
});

replay_subject.next(5);

重放主题使用的缓冲值为 2。因此,最后两个值将被缓冲并用于调用的新订阅者。

输出

Replay Subject

AsyncSubject

对于 AsyncSubject,调用的最后一个值将传递给订阅者,并且只有在调用 complete() 方法后才会完成。

示例

以下是相同的工作示例 −

import { AsyncSubject } from 'rxjs';

const async_subject = new AsyncSubject();

async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject A: ${v}`)
});

async_subject.next(1);
async_subject.next(2);
async_subject.complete();
async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject B: ${v}`)
});

此处,在调用 complete 之前,传递给主题的最后一个值是 2,并且将其提供给订阅者。

输出

Async Subject