RxPY - 转换运算符
buffer
此运算符将从源可观察对象中收集所有值,并在满足给定边界条件后定期发出它们。
语法
buffer(boundaries)
参数
boundaries: 输入是可观察的,它将决定何时停止以便发出收集的值。
返回值
返回值是 observable 的,所有的值都是从源 observable 收集的,持续时间由输入的 observable 决定。
示例
from rx import of, interval, operators as op from datetime import date test = of(1, 2,3,4,5,6,7,8,9,10) sub1 = test.pipe( op.buffer(interval(1.0)) ) sub1.subscribe(lambda x: print("The element is {0}".format(x)))
输出
E:\pyrx>python test1.py The elements are [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ground_by
此运算符将根据给定的 key_mapper 函数对来自源可观察对象的值进行分组。
语法
group_by(key_mapper)
参数
key_mapper: 此函数将负责从源可观察对象中提取密钥。
返回值
它返回一个可观察值,其值根据 key_mapper 函数分组。
示例
from rx import from_, interval, operators as op test = from_(["A", "B", "C", "D"]) sub1 = test.pipe( op.group_by(lambda v: v[0]) ) sub1.subscribe(lambda x: print("The element is {0}".format(x)))
输出
E:\pyrx>python testrx.py The element is <rx.core.observable.groupedobservable.GroupedObservable object at 0x000000C99A2E6550> The element is <rx.core.observable.groupedobservable.GroupedObservable object at 0x000000C99A2E65C0> The element is <rx.core.observable.groupedobservable.GroupedObservable object at 0x000000C99A2E6588> The element is <rx.core.observable.groupedobservable.GroupedObservable object at 0x000000C99A2E6550>
map
此运算符将根据给定的 mapper_func 的输出将源可观察对象中的每个值更改为新值。
语法
map(mapper_func:None)
参数
mapper_func: (可选)它将根据来自此函数的输出更改源可观察值的值。
示例
from rx import of, interval, operators as op test = of(1, 2,3,4,5,6,7,8,9,10) sub1 = test.pipe( op.map(lambda x :x*x) ) sub1.subscribe(lambda x: print("The element is {0}".format(x)))
输出
E:\pyrx>python testrx.py The element is 1 The element is 4 The element is 9 The element is 16 The element is 25 The element is 36 The element is 49 The element is 64 The element is 81 The element is 100
scan
此运算符将对来自源 observable 的值应用累加器函数,并返回具有新值的 observable。
语法
scan(accumulator_func, seed=NotSet)
参数
accumulator_func: 此函数应用于源可观察对象的所有值。
seed:(可选)要在 accumular_func 中使用的初始值。
返回值
此运算符将返回一个具有新值的可观察对象,该新值基于应用于源可观察对象的每个值的累加器函数。
示例
from rx import of, interval, operators as op test = of(1, 2,3,4,5,6,7,8,9,10) sub1 = test.pipe( op.scan(lambda acc, a: acc + a, 0)) sub1.subscribe(lambda x: print("The element is {0}".format(x)))
输出
E:\pyrx>python testrx.py The element is 1 The element is 3 The element is 6 The element is 10 The element is 15 The element is 21 The element is 28 The element is 36 The element is 45 The element is 55