首页 > 代码库 > Activiti工作流--分布式实现方案
Activiti工作流--分布式实现方案
一、运行环境
以下所有的描述都是基于Activiti的5.20.0.1版本
1 public interface ProcessEngine extends EngineServices { 2 3 /** the version of the activiti library */ 4 public static String VERSION = "5.20.0.1"; 5 6 /** The name as specified in ‘process-engine-name‘ in 7 * the activiti.cfg.xml configuration file. 8 * The default name for a process engine is ‘default */ 9 String getName();10 11 void close();12 }
二、Activiti不支持分布的原因分析
- 在Activiti工作流的act_ge_property表中通常情况下有3条记录:
- next.dbid
- schema.history
- schema.version
其中next.dbid对应的值为数据库中当前最近一次增长后的最大记录id,每次增长的步长为2500,
1 protected int idBlockSize = 2500; (在ProcessEngineConfiguration类中)
- Activiti中所有的id(如:Task的id,Execution的id,ProcessInstance的id等)都是通过IdGenerator来生成的
1 /** 2 * generates {@link IdBlock}s that are used to assign ids to new objects. 3 * 4 * The scope of an instance of this class is process engine, 5 * which means that there is only one instance in one process engine instance. 6 * 7 * @author Tom Baeyens 8 * @author Joram Barrez 9 */10 public interface IdGenerator {11 12 String getNextId();13 14 }
- IdGenerator的默认实现是
1 /** 2 * @author Tom Baeyens 3 */ 4 public class DbIdGenerator implements IdGenerator { 5 6 protected int idBlockSize; 7 protected long nextId = 0; 8 protected long lastId = -1; 9 10 protected CommandExecutor commandExecutor;11 protected CommandConfig commandConfig;12 13 public synchronized String getNextId() {14 if (lastId<nextId) {15 getNewBlock();16 }17 long _nextId = nextId++;18 return Long.toString(_nextId);19 }20 21 protected synchronized void getNewBlock() {22 IdBlock idBlock = commandExecutor.execute(commandConfig, new GetNextIdBlockCmd(idBlockSize));23 this.nextId = idBlock.getNextId();24 this.lastId = idBlock.getLastId();25 }
从上面的代码可以看出,获取下一个id的方法是加锁的,也就是在一台服务器上id的增长是没有问题的,但是如果将Activiti部署在多台服务器上就会有两个问题
- 从代码的第17,18行可以看出id是本地自增,如果有多台服务器就会出现id相同的情况;
- 获取lastId的方法是操作同一个数据库的,会有问题,代码22中通过执行GetNextIdBlockCmd来获取数据库中的next.dbid的值,如果在多台服务器上由于一台服务器修改后,其他服务器无法知道
1 /** 2 * @author Tom Baeyens 3 */ 4 public class GetNextIdBlockCmd implements Command<IdBlock> { 5 6 private static final long serialVersionUID = 1L; 7 protected int idBlockSize; 8 9 public GetNextIdBlockCmd(int idBlockSize) {10 this.idBlockSize = idBlockSize;11 }12 13 public IdBlock execute(CommandContext commandContext) {14 PropertyEntity property = (PropertyEntity) commandContext15 .getPropertyEntityManager()16 .findPropertyById("next.dbid");17 long oldValue =http://www.mamicode.com/ Long.parseLong(property.getValue());18 long newValue = http://www.mamicode.com/oldValue+idBlockSize;19 property.setValue(Long.toString(newValue));20 return new IdBlock(oldValue, newValue-1);21 }22 }
三、解决方案
要想解决Activiti分布式的问题,就需要解决id生成的问题,也就是要自己实现IdGenerator接口,因此要有一个地方来生成一个全局唯一的id才行。
我在实际工作中是通过redis来实现的,redis也可以做集群,因此不需要考虑redis单点的问题,具体方案如下:
1 /** 2 * 分布式id生成器 3 * 4 * @version 1.0 5 * @author Pin Xiong 6 * @date 创建时间:2016年8月12日 下午3:22:09 7 */ 8 public class DistributedIdGenerator implements IdGenerator { 9 10 public DistributedIdGenerator(RedisService redisService) {11 this.redisService = redisService;12 }13 14 private RedisService redisService;15 16 @Override17 public String getNextId() {18 return String.format("%sX%s", D.formatDate(Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_PREFIX),this.redisService.incrby(Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_KEY, 1L));20 }21 }
其中,D.formatDate(Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_PREFIX)是通过服务器时间来生成id的前缀,
重点是后面的this.redisService.incrby(MainRK.ACTIVITI_ENGINE_DISTRIBUTED_ID_KEY, 1L)
该方法是在redis中获取key (也就是代码中Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_KEY)对应的值,并自增1
在实际工作中通过该方案可以解决Activiti分布式问题。
如果其他同学有更好的方案,也希望可以一起分享,谢谢!
Activiti工作流--分布式实现方案