首页 > 代码库 > 云计算设计模式(十五)——管道和过滤器模式

云计算设计模式(十五)——管道和过滤器模式

云计算设计模式(十五)——管道和过滤器模式


分解,执行复杂处理成一系列可重复使用分立元件的一个任务这种模式可以允许执行的处理进行部署和独立缩放任务元素提高性能,可扩展性和可重用性

背景和问题


一个应用程序可能需要执行各种关于它处理信息不同复杂任务。一个简单,但不灵活的方式来实施这个应用程序可以执行此处理单一模块。然而,这种方法有可能减少用于重构代码对其进行优化或者重新使用它,如果在应用程序中其他地方所需要的相同的处理部件机会。

图1通过使用单片式的方式示出了与处理数据问题一个应用程序接收并处理来自两个来源的数据进行处理从每个源数据是由执行一系列任务来转换该数据,并传递结果给应用程序的业务逻辑之前的独立模块进行处理。

图1  - 使用单一模块实现的解决方案


部分单片模块执行的任务在功能上是非常相似的,但模块已被分开设计的实现该任务的代码被紧密模块内耦合并且此代码已开发具有很少或没有给定重新使用或可伸缩性的思想

然而由每个模块或每个任务的部署要求执行的处理任务,可能会改变,因为业务需求进行修改。有些任务可能是计算密集型的,并可能受益于强大的硬件上运行其他人可能并不需要如此昂贵的资源。此外,额外的处理可能需要在将来顺序,其中由所述处理执行的任务可能会改变一个解决方案是必需的,解决了这些问题,并且增加的可能性代码重用。

解决方案


分解需要为每个数据流转换为一组离散的元件或过滤器)的处理,其中每一个执行单任务通过标准化每个组件接收发射的数据的格式这些过滤器可以组合在一起成为一个管道这有助于避免重复代码并且可以很容易地移除,替换或集成额外的组件,如果处理要求改变图2显示了这种结构的一个例子。

 

图2 - 通过使用管道和过滤器实现的解决方案


处理一个请求所花费的时间取决于最慢的过滤器管道中的速度。这可能是一个或多个滤波器可能被证明是一个瓶颈,尤其是如果出现一个特定的数据源的数据流的大量请求。流水线结构一个关键优点是它提供了机会,运行速度慢的过滤器并联情况下,使系统能够分散负载提高吞吐量。

可以独立缩放组成一个管道可以在不同的机器上运行过滤器,使他们可以利用弹性,许多云计算环境提供优势。过滤器计算密集型可以在高性能的硬件上运行而其他要求不高的过滤器可以对商品便宜)的硬件来承载过滤器甚至不需要是在同一数据中心或地理位置,它允许一个管道中的每个元素的环境下接近它需要的资源来运行

图3示出了从源1施加到管道中的数据的一个例子。

图3  - 一个管道负载平衡组件


如果一个滤波器的输入输出被构造一个,它可能是能够进行的处理并行的每个过滤器在流水线的第一个过滤器可以开始工作,并开始发射结果它们会直接传递到序列中的下一个过滤器之前第一过滤器已经完成它的工作。

另一个好处灵活性,这种模式可以提供如果一个过滤器发生故障或者其上运行机器不再可用时,管道可能能够重新安排滤波器所执行的工作,并指示工作到组件的另一个实例。单个过滤器故障不会必然导致整个管道故障。

使用管道和过滤器补偿交易模式相结合的模式可以提供一种替代的方法来实现分布式事务分布式事务可以被分解成单独的的任务,每个都可以通过使用一个过滤器,也实现了补偿事务图案来实现。一个管道中的过滤器可以在运行接近它们保持数据被实现为单独的托管工作。

 

问题和注意事项


在决定如何实现这个模式时,您应考虑以下几点:
?复杂性增加的灵活性,这种模式提供了还可以引入复杂性,特别是如果分布在不同的服务器上在管道的过滤器。
?可靠性使用一个基础结构,可以确保在管道中的过滤器之间流动的数据也不会丢失
?幂等性如果在管道中的过滤失败接收到消息,任务被重新调度到过滤器的另一个实例,所述部分工作可能已经完成。如果这个工作更新的全局状态的某些方面存储在数据库中的信息,同样更新可以重复如果公布结果,在管道中的下一个过滤器后,过滤器出现故障,但在此之前表示,该公司已经成功地完成了它的工作可能会出现类似的问题。在这些情况下,相同的工作可以由过滤器的另一个实例被重复,导致相同的结果要贴两次。这可能导致在管道随后过滤两次处理相同的数据因此,一个管道的过滤器应该被设计为幂等欲了解更多信息,请参见乔纳森·奥利弗的博客幂等模式
?重复的消息如果管道中的过滤器可以发布一个消息给流水线的下一个阶段之后发生故障时,过滤器的另一个实例,可以执行幂等考虑以上所描述的,并且将发布相同消息的拷贝到流水线可能导致同样的信息的两个实例被传递到下一个过滤器为了避免这种情况管道应检测并消除重复的消息。

注意:

如果要实现管道使用消息队列微软的Azure服务总线队列消息队列基础设施可以提供自动重复消息检测和清除


?上下文状态。在管道中每个过滤器主要运行在孤立和不应该这件事是如何被调用的任何假设。意味着,每一个过滤器必须具有足够的上下文与它能够执行它的工作提供这种情况下可包含相当数量的状态信息

何时使用这个模式


使用这种模式
?由一个应用程序所需的处理可以很容易地被分解成一组离散的独立的步骤。
?由应用程序执行的处理步骤具有不同的可扩展性要求

注意:

它可能会过滤器扩展一起相同的过程。欲了解更多信息,请参阅计算资源整合模式


?灵活性是必需的,以允许通过一个应用程序能力进行添加和删除步骤中的处理步骤重新排序。
?该系统可以受益于分配处理不同服务器的步骤。
?一个可靠的解决方案是必需的,当数据正在被处理的最小化一个步骤失败的影响。

这种模式可能不适合
?通过应用程序执行的处理步骤并不是独立的或者他们必须共同作为同一事务的一部分来执行。
?在一个步骤所需上下文或状态的信息量使得这种方法效率很低。它可能会持续状态信息到数据库代替,但不要使用此策略,如果在数据库上的额外负载会导致过度竞争

例子


可以使用消息队列一个序列,以提供执行流水线所需的基础设施最初的消息队列接收未处理的消息实现为过滤器的任务侦听队列的消息的组件,它执行其工作然后投递转化的消息序列中的下一个队列另一个过滤器的任务可以侦听在这个队列中的消息对其进行处理后的结果到另一个队列,依此类推,直到完全转化的数据出现在队列中的最后一个消息。

4 - 通过使用消息队列实现管道


如果你正在构建一个解决方案,在Azure上,你可以使用服务总线队列提供了可靠的,可扩展的排队机制。下面所示的ServiceBusPipeFilter提供了一个例子它演示了如何实现接收从队列中输入消息处理这些邮件的过滤器张贴结果到另一个队列

注意:

ServiceBusPipeFilterPipesAndFilters解决方案PipesAndFilters.Shared项目定义此示例代码都可以可以下载本指导意见

public class ServiceBusPipeFilter
{
  ...
  private readonly string inQueuePath;
  private readonly string outQueuePath;
  ...
  private QueueClient inQueue;
  private QueueClient outQueue;
  ...

  public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
  {
     ...
     this.inQueuePath = inQueuePath;
     this.outQueuePath = outQueuePath;
  }

  public void Start()
  {
    ...
    // Create the outbound filter queue if it does not exist.
    ...
    this.outQueue = QueueClient.CreateFromConnectionString(...);
    
    ...
    // Create the inbound and outbound queue clients.
    this.inQueue = QueueClient.CreateFromConnectionString(...);
  }

  public void OnPipeFilterMessageAsync(
    Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...) 
  {
    ...

    this.inQueue.OnMessageAsync(
      async (msg) =>
    {
      ...
      // Process the filter and send the output to the 
      // next queue in the pipeline.
      var outMessage = await asyncFilterTask(msg);

      // Send the message from the filter processor 
      // to the next queue in the pipeline.
      if (outQueue != null)
      {
        await outQueue.SendAsync(outMessage);
      }

      // Note: There is a chance that the same message could be sent twice 
      // or that a message may be processed by an upstream or downstream 
      // filter at the same time.
      // This would happen in a situation where processing of a message was
      // completed, it was sent to the next pipe/queue, and then failed 
      // to complete when using the PeekLock method.
      // Idempotent message processing and concurrency should be considered 
      // in a real-world implementation.
    },
    options);
  }

  public async Task Close(TimeSpan timespan)
  {
    // Pause the processing threads.
    this.pauseProcessingEvent.Reset();

    // There is no clean approach for waiting for the threads to complete
    // the processing. This example simply stops any new processing, waits
    // for the existing thread to complete, then closes the message pump 
    // and finally returns.
    Thread.Sleep(timespan);

    this.inQueue.Close();
    ...
  }

  ...
}


ServiceBusPipeFilterStart方法连接到一对输入和输出队列,以及关闭方法从输入队列断开OnPipeFilterMessageAsync方法执行消息的实际处理;asyncFilterTask参数这种方法指定要执行的处理。OnPipeFilterMessageAsync方法等待输入队列中收到的消息,因为它到达,张贴结果到输出队列通过运行在每个邮件的asyncFilterTask参数指定的代码。队列本身构造函数中指定

样品溶液的过滤器实现了一组工作角色每个工人的作用可独立进行调整,这取决于它执行业务处理的复杂性,或者它需要执行此处理的资源。此外,辅助角色的多个实例可以并行地运行,以提高吞吐量。

下面的代码显示了一个名为PipeFilterARoleEntry的Azure工作者角色,这是样品溶液中PipeFilterA项目定义

public class PipeFilterARoleEntry : RoleEntryPoint
{
  ...
  private ServiceBusPipeFilter pipeFilterA;

  public override bool OnStart()
  {
    ...
    this.pipeFilterA = new ServiceBusPipeFilter(
      ...,
      Constants.QueueAPath,
      Constants.QueueBPath);

    this.pipeFilterA.Start();
    ...
  }

  public override void Run()
  {
    this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
    {
      // Clone the message and update it.
      // Properties set by the broker (Deliver count, enqueue time, ...) 
      // are not cloned and must be copied over if required.
      var newMsg = msg.Clone();
      
      await Task.Delay(500); // DOING WORK

      Trace.TraceInformation("Filter A processed message:{0} at {1}", 
        msg.MessageId, DateTime.UtcNow);

      newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");

      return newMsg;
    });

    ...
  }

  ...
}


这个角色包含ServiceBusPipeFilter对象。角色OnStart方法连接到队列接收输入的信息并张贴输出消息队列名称在常量类中定义Run方法调用OnPipeFilterMessagesAsync方法接收到的本例中,该处理通过等待较短的时间模拟的)的每个消息执行某些处理何时处理完成时,一个新的消息被构造包含结果(在这种情况下,输入消息被简单地增加了一个自定义属性),并将该消息发送到输出队列

示例代码中包含一个名为PipeFilterBRoleEntryPipeFilterB项目的另一名工人的作用。这个角色类似于PipeFilterARoleEntry不同之处在于Run方法进行不同的处理在本例中的解决方案,这两种作用结合起来,构建一个管道;PipeFilterARoleEntry角色输出队列用于PipeFilterBRoleEntry角色输入队列。

样品溶液还提供了两个名为InitialSenderRoleEntryInitialSender项目FinalReceiverRoleEntryFinalReceiver项目),进一步的角色InitialSenderRoleEntry作用提供了在管道中的初始消息OnStart方法连接到单个队列运行方法的帖子的方法此队列这个队列所使用的PipeFilterARoleEntry作用,所以发布一条消息这个队列输入队列导致PipeFilterARoleEntry作用来接收处理消息经处理的信息,然后通过PipeFilterBRoleEntry作用传递

FinalReceiveRoleEntry角色输入队列用于PipeFilterBRoleEntry角色的输出队列Run方法FinalReceiveRoleEntry作用,如下图所示,接收到该消息,并且执行一些最后的处理然后将其写入过滤器管道跟踪输出添加自定义属性值。

public class FinalReceiverRoleEntry : RoleEntryPoint
{
  ...
  // Final queue/pipe in the pipeline from which to process data.
  private ServiceBusPipeFilter queueFinal;

  public override bool OnStart()
  {
    ...
    // Set up the queue.
    this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);
    this.queueFinal.Start();
    ...
  }

  public override void Run()
  {
    this.queueFinal.OnPipeFilterMessageAsync(
      async (msg) =>
      {
        await Task.Delay(500); // DOING WORK

        // The pipeline message was received.
        Trace.TraceInformation(
          "Pipeline Message Complete - FilterA:{0} FilterB:{1}",
          msg.Properties[Constants.FilterAMessageKey],
          msg.Properties[Constants.FilterBMessageKey]);

        return null;
      });
    ...
  }

  ...
}


 

本文翻译自MSDN:http://msdn.microsoft.com/en-us/library/dn568100.aspx

云计算设计模式(十五)——管道和过滤器模式