首页 > 代码库 > 任务执行引擎的工程
任务执行引擎的工程
任务执行引擎的工程实践
来某厂接近半年了,几乎没写过C++代码,说实话还真的有点手生。最近刚好有一个需求,然而我感觉我也没有办法用C++以外的语言去实现它。于是还是花了几天时间用C++完成编码,这是一个简单的任务执行引擎,它被我称作panguan。写这 篇文章主要记录一下开发过程中的一些思路和想法。不足之处,劳烦大家给予指出。
1.写在前面
1.1需求来源
最近有一个需求,假设我们有很多任务需要定时执行甚至定时重复运行,并且其中有一些任务之间可能存在控制依赖和/或数据依赖,甚至我们希望可以利用一些原子任务去组成一个更大的任务。
所以我们需要有一个工具可以将这些任务管理起来:1)解决依赖问题;2)将能并发的任务最大化地并发起来;3)使用简单,只需填写配置文件,不需要写任何代码即可让一堆任务有序地Run起来;4)能够正确调度这些任务的顺序;5)may be more。
我调研了一些已经存在的任务执行引擎,包括FStack。然而试用了一下FStack,发现我不会用,这就很尴尬了。
报着“杀鸡焉用牛刀”的想法,于是最终花了几天自己写了一个可实际运行的工具,即panguan(判官)。
1.2名字来源
这个任务执行引擎被命名为“判官”,就是想给人一种不明觉厉的装X气质。
据百度百科记载,传说中阴曹地府中的判官判处人的轮回生死,对坏人进行惩罚,对好人进行奖励。而该任务执行引擎的主要工作是调度任务的顺序和生存周期,对执行成功和失败的任务进行处理。感觉这个名字很有这个场景的气质,于是配置任务属性的配置文件就顺便被命名为shengsibu.xml了。
1.3目前支持的特性
- 通过XML文件配置任务。
- 支持多任务同时运行并且互不干扰。
- 支持单个任务由多个存在数据依赖和控制依赖或者可以并行执行的子任务组成。子任务可以继续由多个子任务组成,由此可以递归成为一棵复杂的任务树。
- 能够调度各种复杂的任务场景的任务的顺序,使其并发最大化。
- 支持立即执行、定时和重复执行。
- 对任务进行超时检测。
- 展示每一个任务的当前任务状态和运行时间。
- 支持原子任务去调用python和shell的脚本。另外可以不实际去执行脚本而让程序模拟结果,目前返回时间是二项分布~(100,0.05)的随机结果,是否执行成功是p为0.88的伯努利分布的随机结果。
2.工程实践
以下主要简要介绍一些实践中的想法和思路。
2.1异步引擎加线程池
为了支持定时机制、多任务、任务组装、任务依赖等特性,同步的做法显得力不从心,当然也可能是我水平差导致写不出来。于是在这里优先采用了异步引擎加线程池的方案。相对于同步的做法,异步的做法开销更小;然而会导致程序的实现难度增加,也更加难以理解。
我们需要:1)一个异步引擎;2)两个不同方向的线程安全的消息队列,一个是请求MQ,一个是返回MQ;3)一个工作线程池。
异步引擎将每个原子任务派发到请求MQ中,线程中的工作线程监听请求MQ,执行完后插入另一个返回MQ中。异步引擎从返回取回结果。
如下图所示。
我们可以继续推广至一个多机的版本,如下图。
Agent同时监听两个消息队列(一个自身的MQ和一个Center的请求MQ):Agent从Center拉取消息的模式,可以自动实现负载均衡;Center推给Agent的模式,可以指定将任务派发至某个机器,以此提高执行效率。原因可能是这个任务距离数据目标端的物理距离更近。
2.2对任务进行抽象
2.2.1使用DAG去定义任务
对任务依赖进行抽象,可能最常见的做法是DAG(有向无环图)。像Linkedin开源的Azkaban,定义一个任务做法像下图中这样。
2.2.2使用多叉树去定义任务
开发过程中,我感觉使用多叉树去表达任务会显得更加自然一些。
在一棵多叉树中,可以存在两种结点,即叶子结点和非叶结点。于是可以使用叶子结点去表示一个原子任务,非叶结点去表示由一些原子任务组合而成的任务,这时它更像一个控制结点。这样就可以实现任务的组装,并且去定义它们之间的依赖关系。
假设有这么一个任务,我们用多叉树来表示它,如下图所示。
L是这根树的叶子,它代表一个原子任务。非叶结点标P表示它的子任务们是并行的,非叶结点标S表示它的子任务们是串行的。事实上我们还可以将任务类型进行扩展。举个例子,比如子任务是串行的,我们可以不需要所有子任务都运行结束,只需要遇到第一个执行成功的子任务即可。在这种模型下,我们可以将任务的组合推广至无限复杂的组合,其中包括树的层级和子结点个数的扩增、结点类型的扩展(可以超越并行和串行这两个基本类型)。
这样的多叉树结构可以很容易用xml格式去定义它。我在实现中使用了tinyxml这个parser去解析文本,实现上它只有六个cpp/h文件,直接放在自己的代码目录下参与编译即可。
在tinyxml中,类TiXmlNode定义了多叉树的数据结构。而TiXmlElement继承了TiXmlNode,增加了一些关于属性的字段的解析代码。
我在这里定义一个TaskNode的类也去继承了TiXmlNode类,复用了它的多叉树的代码。tinyxml的parser解析文本并生成一棵结点类型为TiXmlElement多叉树之后,我们就递归遍历这棵多叉树,调用它的属性解析函数解析出每个属性的值,建立一棵对应的类型为TaskNode类的多叉树。这两棵树它的树型结构是一致,区别在于将XML中定义中每个格点的属性解析出来赋值给了对应的TaskNode类的成员变量。
TaskNode类可以同时用来定义任务的叶子结点和非叶结点,而它们之间使用一些指针去连接成一棵多叉树的结构。
2.2.3多叉树加有限状态机
有限状态机,是表示有限个状态以及在这些状态之间的转移和动作等行为的数学模型。
我们为每个任务结点再加上一个状态,表示它的当前运行状态,使用{未初始化、等待、错误和完成}四个状态。可以发现父结点的状态是依赖于子结点们的状态的。任务结点在不同的状态做不同的动作。
再进一步统一叶子结点和非叶结点的接口。通过查阅设计模式的书,会发现这种场景可以用“组合模式”去套用。它可以使得对叶子结点和非叶结点的访问具有一致性,具有一致的接口使得在使用时可以不去特别区别这两种不同的对象。
统一接口后的伪代码如下。所有结点的运行入口都是相同一个接口,从Run函数进入。根据自身不同的状态调用不同的函数。而相同一个函数如RunUninit(),经过重载,叶子结点和非叶结点也可以调用不一样的代码。这样的做法可以使上层接口完全一致,屏蔽掉异构结点在不同状态 下的细节差异。
void TaskNode::Run()
{
switch(结点状态)
{
case 未初始化:
RunUninit();
break;
case 等待:
RunWait();
break;
case 错误:
RunError();
break;
case 完成:
RunSuccess();
break;
default:
break;
}
}
2.2.4任务顺序调度算法
在任务树已经存在的情况下,我们需要有一个调度算法来决定这些任务的顺序。它能够正确调度这些任务的顺序,将能够并行的任务都并行起来。这里的并行起来的实际含义是同时(实际是在很短的时间间隔内)推至请求MQ中,由线程池来消费它。并且能够在运行完的任务乱序返回之后,在任务树中找到下一个可以运行的任务。我们将这个函数命名为TaskNode TaskNode::NextAvailTask(TaskNode cur);
作为一个异步的引擎,它应该要能做到在一次执行中,将一棵任务树从一个可运行状态一直推至一个不可运行状态才停止。这时的做法就是将所有的可执行的原子任务推至请求MQ中,并且在这过程中处理所有结点的状态。
执行过程伪代码如下,它表示的是任务树在一次执行过程所进行的所有操作。
void TaskNode::Execute(TaskNode *cur)
{
if(!cur) return;
TaskNode *task = cur;
do
{
task->Run();
if(状态是成功或者失败)
{
//立即执行多余任务
task->Run();
}
//找出下一个可以执行的任务
task = TaskNode::NextAvailTask(task);
}while(task);
}
所以我们需要NextAvailTask函数,它要满足:
目标:找出一棵复杂任务树中,下一个可以执行的任务结点
输入:任意一个任务结点
输出:下一个可以执行的任务结点
分析之后,可以发现输入虽然可以为任何一个TaskNode,但是可以分为两种情况:
- 在任务树新建时,从根结点往下执行,这时WAIT状态的传播是自上而下的,这时只需要自上而下地不断找到最左边的结点即可,直到第一次遍历到叶子结点。这时如果该叶子结点是父结点的属性是并行的,就可以继续向右遍历,直到最右。然而关键问题来了,如果其它可以同时并行的任务结点和该结点不在同一个父结点下时怎么处理?如上图中那棵任务树,遍历到最左的叶子之后,下一个结点应该是第二层第二个结点的最左的那个叶子结点。然而我们该怎么才到找到它?这时我们给每个结点一个关键的Flag,布尔量 isChildOfParallelChildTask,它的含义是,标记该结点的爷爷结点及以上的结点是不是一个并行任务结点。这时通过判断这个flag,可以将结点遍历回溯到更高的层次。然后再自上而下去遍历,就像一开始任务新建时做的那样。
- 乱序返回的结点,ERROR和SUCCESS的传播是自下而上的,其余的规则也和1中一致。
使用这样的遍历规则,就可以在这棵任务树中准确地遍历,不管实际上这棵树的组成有多么的复杂。
以下是伪代码,它是一个静态函数,它的逻辑有些小复杂,该代码中我们可以很容易地扩充新的任务类型,不局限于只有并行和串行。这个算法的时间度复杂度为O(1)~O(h),h为树的高度,而且大部份时候为O(1),只在需要向上回溯时才处在O(1)~O(h)之间。
TaskNode *TaskNode::NextAvailTask(TaskNode *cur)
{
TaskNode *task = cur;
while(task) //当task不为空,循环继续执行
{
if(该结点是非叶结点,且未初始化的子任务数量为0)
{
return 该结点第一个子结点;
}
if(父结点存在,且子任务们是串行的)
{
if(状态是错误)
{
return 父结点;
}
else if(状态是成功的)
{
return 下一个结点(优先)或者父结点;
}
}
if(父结点存在,且子任务们是并行的)
{
if(子任务全部完成或者状态是错误)
{
return 父结点;
}
if(存在未初始化的兄弟结点)
{
return 下一个兄弟结点;
}
}
if(爷爷结点及以上的结点是不是一个并行任务结点)
{
task = 该结点的父结点;
//这时会跳回while的开头
}
else
{
return NULL;
}
}
return NULL;
}
2.3异步引擎的主流程
这个异步的任务执行引擎的主流程要同时处理定时生成新任务进行执行、超时检测、从返回队列获取返回值并进行任务执行。实现的目标是要高效,不浪费不必要的CPU时间。
关键问题有:
- 不能被返回队列阻塞导致不返回而错过了开始定时任务和超时检测;
- 也不能过于频繁地唤醒自己,做没必要的轮询。
解决方法是在每次从之前从返回MQ读取数据前,计算最大可容忍阻塞时间,时间为下次最近一次定时任务开始时间和下一个即将超时的任务的时间二者取小,将该时间作为阻塞超时时间。
可以看到整个流程最多会陷入三层while循环:
- 主流程的while最外层的循环,它处理时处理定时生成新任务进行执行、超时检测、从返回队列获取返回值并进行任务执行。
- 第二层循环是void TaskNode::Execute(TaskNode *task)里的循环。它会将某个任务树从可执行的状态一直推至不可执行状态。
- 最里层的循环是TaskNode TaskNode::NextAvailTask(TaskNode cur)里的循环,它在任务树里不断遍历以找出下一个可执行的任务,帮助TaskNode::Execute(task)的状态不断向前。
主流程简化完的伪代码如下。
void TaskEngine::Start()
{
//初始化
//从配置文件读取的所有任务的参数,生成一堆任务模板以及它们的任务属性,比如何时启动,重复执行次数,执行间隔,超时时间等。
if(!_Init())
return;
while(1)
{
if(任务模板中有当前时刻可以运行的新任务)
{
for(所有可以生成的新任务的任务模板)
{
//从任务模板中fork一个新的任务task,加入执行任务队列。
//为避免没必要地析构多叉树,造成大量没必要的new和delete
//任务模板中有Buffer池,buffer池没有现成的多叉树时,
//才真正去构造,否则直接从池中去获得
TaskNode::Execute(task);
}
}
//计算最大可容忍阻塞时间
//如该值为一个无限大的值,表示已经没有任何任务,可以退出程序
if(最大可容忍阻塞时间为无限大)
{
do
{
//回收所有线程
}while(还有线程未结束);
break;
}
StRes res;//返回数据
if(在最大可容忍阻塞时间内从返回MQ获得数据)
{
//找出该返回数据对应的任务结点的ID号
if(如果该任务结点的ID号还处在执行任务队列)
{
TaskNode *thisTask = 该任务结点的指针;
//任务结点获取返回数据
thisTask->GetRes(res);
//从返回的位置进入执行代码
TaskNode::Execute(thisTask);
//处理整个任务的成功或者失败的情况
//对应的措施是将这棵多叉树放回对应的模板的buffer池
_ProcessSuccessOrError(该任务结点对应的ID号);
}
}
else if(最大可容忍阻塞时间 >= 下一个即将超时的任务的时间)
{
//去处理已经超时的任务
//一样是把多叉树放回对应的任务模板的buffer池中
_ProcessTimeout(当前时间);
}
}
}
3.举例
其实感觉还是有些抽象,也可能是我的文字水平有限不能将它讲明白。
下面的例子是形如上面那个任务树的一个执行过程演示。
我们将这棵树的逻辑结构用xml的格式写入shengsibu.xml,如下。
<Task Name="task0" StartTime="0" ExecuteTimes="1" TimeInterval="0" Timeout="3600" >
<TaskNode Name="root_task_parallel" Type="parallel">
<TaskNode Name="child_task_serial" Type="serial">
<TaskNode Name="py_success" Type="leaf" ScriptType="python" ScriptAddr="script/py_success.py" Arguments=""/>
<TaskNode Name="sh_success" Type="leaf" ScriptType="shell" ScriptAddr="script/sh_success.sh" Arguments=""/>
<TaskNode Name="random" Type="leaf" ScriptType="random" ScriptAddr="" Arguments=""/>
</TaskNode>
<TaskNode Name="child_task_parallel" Type="parallel">
<TaskNode Name="py_success" Type="leaf" ScriptType="python" ScriptAddr="script/py_success.py" Arguments=""/>
<TaskNode Name="sh_success" Type="leaf" ScriptType="shell" ScriptAddr="script/sh_success.sh" Arguments=""/>
<TaskNode Name="random" Type="leaf" ScriptType="random" ScriptAddr="" Arguments=""/>
</TaskNode>
<TaskNode Name="random" Type="leaf" ScriptType="random" ScriptAddr="" Arguments=""/>
</TaskNode>
</Task>
以下展示任务的执行过程:
图1,初始状态,所有任务都处理未运行状态。缩进代表树的层级和父子关系。
图2,第一次开始执行之后。可以并行的任务全部已经并行起来,处于WAIT等待返回结果的状态。
图3,child_task_parallel下的sh_success返回,不能推进整个任务前进。
图4,child_task_serial下的py_success返回,推进了chile_task_serial下sh_success执行。
图5,又两个任务返回后,其中child_task_serial下的sh_success返回推进了hild_task_serial下的random执行。
图6,所有任务纷纷返回之后,在最后一个叶子任务运行结束后,把整个任务组推至成功运行。
任务执行引擎的工程