首页 > 代码库 > Activiti工作流--分布式实现方案

Activiti工作流--分布式实现方案

一、运行环境

以下所有的描述都是基于Activiti的5.20.0.1版本

 1 public interface ProcessEngine extends EngineServices { 2  3   /** the version of the activiti library */ 4   public static String VERSION = "5.20.0.1"; 5  6   /** The name as specified in ‘process-engine-name‘ in  7    * the activiti.cfg.xml configuration file. 8    * The default name for a process engine is ‘default */ 9   String getName();10 11   void close();12 }

 

二、Activiti不支持分布的原因分析

  • 在Activiti工作流的act_ge_property表中通常情况下有3条记录:
  1. next.dbid
  2. schema.history
  3. schema.version

其中next.dbid对应的值为数据库中当前最近一次增长后的最大记录id,每次增长的步长为2500,

 1 protected int idBlockSize = 2500; (在ProcessEngineConfiguration类中)

  • Activiti中所有的id(如:Task的id,Execution的id,ProcessInstance的id等)都是通过IdGenerator来生成的
 1 /** 2  * generates {@link IdBlock}s that are used to assign ids to new objects. 3  *  4  * The scope of an instance of this class is process engine, 5  * which means that there is only one instance in one process engine instance. 6  *  7  * @author Tom Baeyens 8  * @author Joram Barrez 9  */10 public interface IdGenerator {11 12   String getNextId();13 14 }
  • IdGenerator的默认实现是
 1 /** 2  * @author Tom Baeyens 3  */ 4 public class DbIdGenerator implements IdGenerator { 5  6   protected int idBlockSize; 7   protected long nextId = 0; 8   protected long lastId = -1; 9   10   protected CommandExecutor commandExecutor;11   protected CommandConfig commandConfig;12   13   public synchronized String getNextId() {14     if (lastId<nextId) {15       getNewBlock();16     }17     long _nextId = nextId++;18     return Long.toString(_nextId);19   }20 21   protected synchronized void getNewBlock() {22     IdBlock idBlock = commandExecutor.execute(commandConfig, new GetNextIdBlockCmd(idBlockSize));23     this.nextId = idBlock.getNextId();24     this.lastId = idBlock.getLastId();25   }

从上面的代码可以看出,获取下一个id的方法是加锁的,也就是在一台服务器上id的增长是没有问题的,但是如果将Activiti部署在多台服务器上就会有两个问题

  1. 从代码的第17,18行可以看出id是本地自增,如果有多台服务器就会出现id相同的情况;
  2. 获取lastId的方法是操作同一个数据库的,会有问题,代码22中通过执行GetNextIdBlockCmd来获取数据库中的next.dbid的值,如果在多台服务器上由于一台服务器修改后,其他服务器无法知道
 1 /** 2  * @author Tom Baeyens 3  */ 4 public class GetNextIdBlockCmd implements Command<IdBlock> { 5    6   private static final long serialVersionUID = 1L; 7   protected int idBlockSize; 8    9   public GetNextIdBlockCmd(int idBlockSize) {10     this.idBlockSize = idBlockSize;11   }12 13   public IdBlock execute(CommandContext commandContext) {14     PropertyEntity property = (PropertyEntity) commandContext15       .getPropertyEntityManager()16       .findPropertyById("next.dbid");17     long oldValue =http://www.mamicode.com/ Long.parseLong(property.getValue());18     long newValue = http://www.mamicode.com/oldValue+idBlockSize;19     property.setValue(Long.toString(newValue));20     return new IdBlock(oldValue, newValue-1);21   }22 }

 

三、解决方案

要想解决Activiti分布式的问题,就需要解决id生成的问题,也就是要自己实现IdGenerator接口,因此要有一个地方来生成一个全局唯一的id才行。

我在实际工作中是通过redis来实现的,redis也可以做集群,因此不需要考虑redis单点的问题,具体方案如下:

 1 /** 2  * 分布式id生成器 3  *  4  * @version 1.0 5  * @author Pin Xiong 6  * @date 创建时间:2016年8月12日 下午3:22:09 7  */ 8 public class DistributedIdGenerator implements IdGenerator { 9 10     public DistributedIdGenerator(RedisService redisService) {11         this.redisService = redisService;12     }13 14     private RedisService redisService;15 16     @Override17     public String getNextId() {18         return String.format("%sX%s", D.formatDate(Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_PREFIX),this.redisService.incrby(Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_KEY, 1L));20     }21 }

其中,D.formatDate(Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_PREFIX)是通过服务器时间来生成id的前缀,

重点是后面的this.redisService.incrby(MainRK.ACTIVITI_ENGINE_DISTRIBUTED_ID_KEY, 1L)

该方法是在redis中获取key (也就是代码中Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_KEY)对应的值,并自增1

 

 在实际工作中通过该方案可以解决Activiti分布式问题。

如果其他同学有更好的方案,也希望可以一起分享,谢谢!

 

Activiti工作流--分布式实现方案