首页 > 代码库 > 使用调度者
使用调度者
调度程序控制订阅何时开始以及何时发布通知。它由三个组件组成。它首先是一个数据结构。当计划要完成的任务时,它们被放入调度器以基于优先级或其他标准进行排队。它还提供了一个执行上下文,它表示在哪里执行任务(例如,在线程池,当前线程或另一个应用程序域中)。最后,它有一个时钟为自己提供时间的概念(通过访问调度器的Now属性)。在特定调度程序上调度的任务将遵守仅由该时钟表示的时间。
调度器还引入了虚拟时间的概念(由VirtualScheduler类型表示),其与在我们的日常生活中使用的实际时间不相关。例如,指定要花费100年时间完成的序列可以安排在仅仅5分钟内在虚拟时间内完成。这将在测试和调试可观察序列主题中介绍。
调度程序类型
Rx提供的各种调度器类型都实现了IScheduler接口。可以通过使用Scheduler类型的静态属性来创建和返回这些对象。 ImmediateScheduler(通过访问静态Immediate属性)将立即开始指定的操作。 CurrentThreadScheduler(通过访问静态CurrentThread属性)将调度在执行原始调用的线程上执行的操作。操作不会立即执行,而是放在队列中,并且只在当前操作完成后执行。 DispatcherScheduler(通过访问静态Dispatcher属性)将调度对当前调度程序的操作,这对使用Rx的Silverlight开发人员有利。然后将指定的操作委派给Silverlight中的Dispatcher.BeginInvoke()方法。 NewThreadScheduler(通过访问静态NewThread属性)在新线程上调度操作,并且是调度长时间运行或阻塞操作的最佳选择。 TaskPoolScheduler(通过访问静态TaskPool属性)在特定任务工厂调度操作。 ThreadPoolScheduler(通过访问静态ThreadPool属性)调度线程池上的操作。两个池调度程序都针对短时运行操作进行了优化。
使用计划程序
您可能已经在Rx代码中使用了调度程序,而没有明确说明要使用的调度程序的类型。这是因为所有处理并发的Observable操作符都有多个重载。如果不使用将调度程序作为参数的重载,Rx将使用最小并发性原则选择一个默认调度程序。这意味着选择引入满足运营商需求的最少并发性的调度器。例如,对于返回具有有限和少量消息的observable的运算符,Rx调用Immediate。对于返回潜在大或无限数量的消息的操作符,调用CurrentThread。对于使用定时器的操作符,使用ThreadPool。
因为Rx使用最小并发性调度程序,如果您想为性能目的引入并发性,或者遇到线程相关性问题,则可以选择不同的调度程序。前者的一个例子是,当你不想阻塞一个特定的线程,在这种情况下,你应该使用ThreadPool。后者的一个示例是,当您想要在UI上运行计时器时,在这种情况下,您应该使用Dispatcher。要指定特定的调度器,可以使用那些接受调度器的运算符重载,例如Timer(TimeSpan.FromSeconds(10),Scheduler.DispatcherScheduler())。
在以下示例中,源可观察序列以疯狂的速度生成值。 Timer运算符的默认重载将在ThreadPool上放置OnNext消息。
Observable.Timer(Timespan.FromSeconds(0.01))
.Subscribe(…);
这将在观察者上快速排队。我们可以通过使用ObserveOn运算符来改进此代码,这允许您指定要用于将推送通知(OnNext)发送给观察者的上下文。默认情况下,ObserveOn运算符确保OnNext将在当前线程上调用尽可能多的次数。您可以使用其重载并将OnNext输出重定向到不同的上下文。此外,您可以使用SubscribeOn运算符返回将操作委派给特定调度程序的代理observable。例如,对于UI密集型应用程序,您可以委派所有后台操作在后台运行的调度程序上使用SubscribeOn并传递给它一个ThreadPoolScheduler。为了接收被推出并且访问任何UI元素的通知,您可以将DispatcherScheduler的实例传递给ObserveOn运算符。
以下示例将在当前调度程序上计划任何OnNext通知,以便在UI线程上发送任何推出的值。这对使用Rx的Silverlight开发人员特别有利。
Observable.Timer(Timespan.FromSeconds(0.01))
.ObserveOn(Scheduler.DispatcherScheduler)
.Subscribe(…);
而不是使用ObserveOn运算符来更改observable序列生成消息的执行上下文,我们可以在正确的位置创建并发开始。 当运算符通过提供调度程序参数重载来参数化并发性引入时,传递合适的调度程序将导致使用ObserveOn运算符的位置减少。 例如,我们可以通过更改源使用的调度器来解除阻塞观察者并直接订阅UI线程,如下面的示例所示。 在这段代码中,通过使用Timer重载,它需要一个调度器,并提供Scheduler.Dispatcher实例,从这个可观察序列推出的所有值都将来自UI线程。
Observable.Timer(Timespan.FromSeconds(0.01), Scheduler.DispatcherScheduler)
.Subscribe(…);
您还应该注意,通过使用ObserveOn运算符,将为通过原始可观察序列的每个消息计划一个操作。 这可能改变定时信息以及对系统施加额外的压力。 如果你有一个查询,组成在许多不同的执行上下文运行的各种可观察序列,并且在查询中进行过滤,最好在查询中稍后放置ObserveOn。 这是因为查询可能会过滤掉大量消息,并且将ObserveOn运算符放在查询中较早的位置会对将被过滤掉的消息执行额外的工作。 在查询结束时调用ObserveOn运算符将产生最小的性能影响。
明确指定调度程序类型的另一个优点是,您可以为性能目的引入并发性,如以下代码所示。
seq.GroupBy(...)
.Select(x=>x.ObserveOn(Scheduler.NewThread))
.Select(x=>expensive(x)) // perform operations that are expensive on resources
使用调度者