RxPY - 使用调度程序的并发
RxPy 的一个重要特性是并发性,即允许任务并行执行。 为实现这一点,我们有两个运算符 subscribe_on() 和 observe_on() 将与调度程序一起工作,这将决定订阅任务的执行。
这是一个工作示例,显示了对 subscibe_on()、observe_on() 和调度程序的需求。
示例
import random import time import rx from rx import operators as ops def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)) ).subscribe( lambda s: print("From Task 1: {0}".format(s)), lambda e: print(e), lambda: print("Task 1 complete") ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)) ).subscribe( lambda s: print("From Task 2: {0}".format(s)), lambda e: print(e), lambda: print("Task 2 complete") ) input("Press any key to exit\n")
在上面的示例中,我有 2 个任务:任务 1 和任务 2。任务的执行是按顺序进行的。 第二个任务仅在第一个任务完成时开始。
输出
E:\pyrx>python testrx.py From Task 1: 1 From Task 1: 2 From Task 1: 3 From Task 1: 4 From Task 1: 5 Task 1 complete From Task 2: 1 From Task 2: 2 From Task 2: 3 From Task 2: 4 Task 2 complete
RxPy 支持多种 Scheduler,这里我们将使用 ThreadPoolScheduler。 ThreadPoolScheduler 主要尝试管理可用的 CPU 线程。
在示例中,我们之前已经看到,我们将使用一个多处理模块,它将为我们提供 cpu_count。 该计数将提供给 ThreadPoolScheduler,后者将设法根据可用线程使任务并行工作。
这是一个工作示例 −
import multiprocessing import random import time from threading import current_thread import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as ops # calculate cpu count, using which will create a ThreadPoolScheduler thread_count = multiprocessing.cpu_count() thread_pool_scheduler = ThreadPoolScheduler(thread_count) print("Cpu count is : {0}".format(thread_count)) def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 1: {0}".format(s)), lambda e: print(e), lambda: print("Task 1 complete") ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 2: {0}".format(s)), lambda e: print(e), lambda: print("Task 2 complete") ) input("Press any key to exit\n")
在上面的例子中,我有 2 个任务,cpu_count 是 4。因为任务是 2,我们可用的线程是 4,所以两个任务可以并行启动。
输出
E:\pyrx>python testrx.py Cpu count is : 4 Press any key to exit From Task 1: 1 From Task 2: 1 From Task 1: 2 From Task 2: 2 From Task 2: 3 From Task 1: 3 From Task 2: 4 Task 2 complete From Task 1: 4 From Task 1: 5 Task 1 complete
如果您看到输出,则两个任务已并行启动。
现在,考虑一个场景,其中任务超过 CPU 数量,即 CPU 数量为 4,任务为 5。在这种情况下,我们需要检查任务完成后是否有线程空闲,以便将其分配给队列中可用的新任务。
为此,我们可以使用 observe_on() 运算符,它会观察调度程序是否有任何线程空闲。 这是一个使用 observe_on() 的工作示例
示例
import multiprocessing import random import time from threading import current_thread import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as ops # calculate cpu count, using which will create a ThreadPoolScheduler thread_count = multiprocessing.cpu_count() thread_pool_scheduler = ThreadPoolScheduler(thread_count) print("Cpu count is : {0}".format(thread_count)) def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 1: {0}".format(s)), lambda e: print(e), lambda: print("Task 1 complete") ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 2: {0}".format(s)), lambda e: print(e), lambda: print("Task 2 complete") ) #Task 3 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 3: {0}".format(s)), lambda e: print(e), lambda: print("Task 3 complete") ) #Task 4 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 4: {0}".format(s)), lambda e: print(e), lambda: print("Task 4 complete") ) #Task 5 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.observe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 5: {0}".format(s)), lambda e: print(e), lambda: print("Task 5 complete") ) input("Press any key to exit\n")
输出
E:\pyrx>python testrx.py Cpu count is : 4 From Task 4: 1 From Task 4: 2 From Task 1: 1 From Task 2: 1 From Task 3: 1 From Task 1: 2 From Task 3: 2 From Task 4: 3 From Task 3: 3 From Task 2: 2 From Task 1: 3 From Task 4: 4 Task 4 complete From Task 5: 1 From Task 5: 2 From Task 5: 3 From Task 3: 4 Task 3 complete From Task 2: 3 Press any key to exit From Task 5: 4 Task 5 complete From Task 1: 4 From Task 2: 4 Task 2 complete From Task 1: 5 Task 1 complete
如果你看到输出,当任务 4 完成时,线程被分配给下一个任务,即任务 5 并开始执行。