首页 > 代码库 > 优先级线程池实现

优先级线程池实现

运维在升级,无聊写博客

  最近在实现消息通知平台上面,对于针对不同的通知需要设置优先级,实现当通知队列堵塞的时候可以有限推送高优先级的消息。为了保证通知队列的高效并发,通知队列的消费端是采用多线程并发处理的,因此需要实现一个可以实现优先级的多线程处理逻辑:

对于ThreadPollExecutor来说,

public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

如果实现优先级线程池需要注意一下三点

1.线程池中新加入的线程会放到workQueue中,如果是优先级队列,那么该参数必须要是PriorityBlockingQueue。

2.PriorityBlockingQueue容器中最终存储的是FutureTask对象,改对象是newTaskFor实例化的,因此需要实现继承自Comparable的FutureTask实现【例如:ComparableFutureTask】

3.ComparableFutureTask中实现比较线程的优先级,需要将实例化具有优先级的线程对象【例如:PriorityTask】

 

如上根据上面的点,可参考的代码如下

【PriorityTask】

技术分享
public abstract class PriorityTask implements Runnable, Comparable<PriorityTask> {    private Integer prority;    public PriorityTask(Integer prority) {        this.prority = prority;    }    @Override    public abstract void run();        @Override    public int compareTo(PriorityTask o) {        return prority.compareTo(o.prority);    }}
View Code

 

【带有ComparableFutureTask的PriorityThreadPoolExecutor】

技术分享
import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.FutureTask;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.RunnableFuture;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,                    long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);            }            public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,                    long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,                    RejectedExecutionHandler handler) {                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);            }            public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,                    long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,                    ThreadFactory threadFactory) {                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);            }            public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,                    long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,                    ThreadFactory threadFactory, RejectedExecutionHandler handler) {                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);            }            @Override            protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {                return new ComparableFutureTask<T>(runnable, value);            }            @Override            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {                return new ComparableFutureTask<T>(callable);            }            protected class ComparableFutureTask<V>                    extends FutureTask<V> implements Comparable<ComparableFutureTask<V>> {                private Object object;                public ComparableFutureTask(Callable<V> callable) {                    super(callable);                    object = callable;                }                public ComparableFutureTask(Runnable runnable, V result) {                    super(runnable, result);                    object = runnable;                }                @Override                @SuppressWarnings("unchecked")                public int compareTo(ComparableFutureTask<V> o) {                    if (this == o) {                        return 0;                    }                    if (o == null) {                        return -1; // high priority                    }                    if (object != null && o.object != null) {                        if (object.getClass().equals(o.object.getClass())) {                            if (object instanceof Comparable) {                                return ((Comparable) object).compareTo(o.object);                            }                        }                    }                    return 0;                }            }}
View Code

【使用代码如下】

技术分享
import java.util.Queue;import java.util.concurrent.ExecutorService;import java.util.concurrent.PriorityBlockingQueue;import java.util.concurrent.TimeUnit;import javax.annotation.PostConstruct;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import com.wjs.common.config.ConfigUtil;import com.wjs.message.bo.NotifyMessage;import com.wjs.message.service.notify.NotifyService;import com.wjs.message.service.queue.QueueService;/** * 消息发送队列,优先级队列实现 *  * @author Silver * @date 2016年12月20日 下午8:20:45 *  * */@Service("queueService")public class QueueServicePriorityImpl implements QueueService {    private static final Logger LOGGER = LoggerFactory.getLogger(QueueServicePriorityImpl.class);    private volatile static Queue<NotifyMessage> queue = new PriorityBlockingQueue<NotifyMessage>(100000);    @Autowired    NotifyService notifyService;    /**     * 服务初始化,启动队列消费     *      * @author Silver     * @date 2016年12月21日 上午9:07:20     */    @PostConstruct    public void init() {        new Thread(new Runnable() {            @Override            public void run() {                Integer execSize = ConfigUtil.getInteger("message.queue.poll.execsize");                if (null == execSize || execSize == 0) {                    // 由于任务后续是发送邮件/短信/调用APP推送/调用dubbo的,是非CPU密集型的计算,因此线程数控制在核数  * 3的值                    execSize = Double.valueOf(Runtime.getRuntime().availableProcessors() * 3).intValue();                }                LOGGER.info("Queue_Consume_thread_size:{}",execSize);                ExecutorService es = new PriorityThreadPoolExecutor(execSize, execSize,                                0L, TimeUnit.MILLISECONDS,                                new PriorityBlockingQueue<Runnable>());                while (true) {                    //**    poll         移除并返问队列头部的元素    如果队列为空,则返回null                    final NotifyMessage message = queue.poll();                    if (null != message) {                        es.submit(new PriorityTask(message.getPriority()) {                            @Override                            public void run() {                                try {                                    System.out.println(message);                                } catch (Exception e) {                                    LOGGER.error("MessageQueue-ERROR->:{},", message, e);                                }                            }                        });                    }                }            }        }).start();    }    @Override    public boolean push(NotifyMessage message) {        //        offer  添加一个元素并返回true 如果队列已满,或者异常情况,则返回false        return queue.offer(message);    }    @Override    public Integer size() {        return queue.size();    }}
View Code

【单测代码如下】

技术分享
import org.junit.Test;import org.springframework.beans.factory.annotation.Autowired;import com.wjs.message.bo.NotifyMessage;import com.wjs.message.bo.NotifyMessageEmail;import com.wjs.message.bo.NotifyMessagePush;import com.wjs.message.bo.NotifyMessageSms;import com.wjs.message.bo.NotifyMessageSys;import com.wjs.message.service.BaseServiceTest;public class QueueTestService extends BaseServiceTest{        @Autowired    QueueService queueService;        @Test    public void testPull(){        for (int i = 10000; i > 1; i--) {            NotifyMessage message = new NotifyMessage();            switch (i % 3) {            case 0:                message = new NotifyMessageEmail();                message.setContent("Email"+i);                break;            case 1:                message = new NotifyMessageSys();                message.setContent("Sys"+i);                break;            case 2:                message = new NotifyMessageSms();                message.setContent("Sms"+i);                break;            case 3:                message = new NotifyMessagePush();                message.setContent("Push"+i);                break;            default:                break;            }            message.setContent(i+"");            message.setPriority(i);            queueService.push(message);        }                try {            Thread.sleep(1000000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}
View Code

 

优先级线程池实现