反应式编程

反应式编程是一种处理数据流和变化传播的编程范式。这意味着当一个组件发出数据流时,变化将通过反应式编程库传播到其他组件。变化的传播将持续到到达最终接收者。事件驱动和反应式编程之间的区别在于,事件驱动编程围绕事件展开,而反应式编程围绕数据展开。

用于反应式编程的 ReactiveX 或 RX

ReactiveX 或 Raective Extension 是反应式编程最著名的实现。ReactiveX 的工作依赖于以下两个类 −

Observable 类

此类是数据流或事件的来源,它打包传入的数据,以便数据可以从一个线程传递到另一个线程。直到有观察者订阅它时,它才会提供数据。

观察者类

此类使用 可观察对象 发出的数据流。可观察对象可以有多个观察者,每个观察者将接收发出的每个数据项。观察者可以通过订阅可观察对象 − 接收三种类型的事件

  • on_next() 事件 − 它表示数据流中有一个元素。

  • on_completed() 事件 − 它表示发射结束,没有更多项目到来。

  • on_error() 事件 − 它还表示发射结束,但前提是 observable 抛出错误。

RxPY – 用于反应式编程的 Python 模块

RxPY 是一个可用于反应式编程的 Python 模块。我们需要确保已安装该模块。以下命令可用于安装 RxPY 模块 −

pip install RxPY

示例

以下是 Python 脚本,它使用 RxPY 模块及其类 ObservableObserve 进行反应式编程。基本上有两个类 −

  • get_strings() − 用于从观察者获取字符串。

  • PrintObserver() − 用于从观察者打印字符串。它使用观察者类的所有三个事件。它还使用 subscribe() 类。

from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

输出

Received Ram
Received Mohan
Received Shyam
Finished

用于反应式编程的 PyFunctional 库

PyFunctional是另一个可用于反应式编程的 Python 库。它使我们能够使用 Python 编程语言创建函数式程序。它很有用,因为它允许我们使用链式函数运算符创建数据管道。

RxPY 和 PyFunctional 之间的区别

这两个库都用于反应式编程,并以类似的方式处理流,但它们之间的主要区别取决于数据的处理。RxPY 处理系统中的数据和事件,而 PyFunctional 专注于使用函数式编程范例进行数据转换。

安装 PyFunctional 模块

我们需要在使用此模块之前安装它。可以使用 pip 命令进行安装,如下所示 −

pip install py functional

示例

以下示例使用 PyFunctional 模块及其 seq 类,该类充当流对象,我们可以使用它进行迭代和操作。在此程序中,它使用 lamda 函数映射序列,该函数将每个值加倍,然后过滤 x 大于 4 的值,最后将序列简化为所有剩余值的总和。

from functional import seq

result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)

print ("Result: {}".format(result))

输出

Result: 6