首页 > 代码库 > 线程池的简单实现

线程池的简单实现

主要是两个队列,一个任务队列,一个工作者队列,都是线程

线程池初始化时根据参数构造一定量的工作者线程,并启动,这是工作者线程检查任务队列为空则wait等待

一旦客户端提交任务到线程池,会加入到任务队列并notify工作线程执行对应线程

根据队列的不同线程排队方式不同

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
/**
 * Created by itworker365 on 5/10/2017.
 */
public class SimpleThreadPool<Job extends Runnable>{
    // 最大worker数
    private static final int MAX_WORKERS = 10;
    // 默认worker数
    private static final int DEFAULT_WORKERS = 5;
    // 需要执行的任务队列
    private final LinkedList<Job> jobs = new LinkedList<Job>();
    // 工作者线程的列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    // 工作者线程的数量
    private int workerNum;

    //初始化一定数目的worker thread,任务队列为空时wait()等待唤醒
    public SimpleThreadPool() {
        this.workerNum = DEFAULT_WORKERS;
        initializeWorkers(this.workerNum);
    }

    public SimpleThreadPool(int num) {
        if (num > MAX_WORKERS) {
            this.workerNum = DEFAULT_WORKERS;
        } else {
            this.workerNum = num;
        }
        initializeWorkers(this.workerNum);
    }
    //初始化工作者线程
    private void initializeWorkers(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            //添加到工作者线程的列表
            workers.add(worker);
            //启动工作者线程
            Thread thread = new Thread(worker);
            thread.start();
        }
    }
    //有任务进来添加到任务队列并Notify()worker线程
    public void execute(Job job) {
        if (job != null) {
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }
    //关闭线程池即关闭每个工作者线程
    public void shutdown() {
        for (Worker w : workers) {
            w.shutdown();
        }
    }
    //增加工作者线程
    public void addWorkers(int num) {
        //加锁,防止该线程还么增加完成而下个线程继续增加导致工作者线程超过最大值
        synchronized (jobs) {
            if (num + this.workerNum > MAX_WORKERS) {
                num = MAX_WORKERS - this.workerNum;
            }
            initializeWorkers(num);
            this.workerNum += num;
        }
    }

    //减少工作者线程
    public void removeWorker(int num) {
        synchronized (jobs) {
            if(num>=this.workerNum){
                throw new IllegalArgumentException("超过了已有的线程数量");
            }
            for (int i = 0; i < num; i++) {
                Worker worker = workers.get(i);
                if (worker != null) {
                    worker.shutdown();
                    workers.remove(i);
                }
            }
            this.workerNum -= num;
        }
    }
    //工作者线程,没有任务执行时wait()
    class Worker implements Runnable {
        private volatile boolean running = true;

        public void run() {
            while (running) {
                Job job = null;
                synchronized (jobs) {
                    if (jobs.isEmpty()) {
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            //感知到外部对该线程的中断操作,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    // 取出一个job
                    job = jobs.removeFirst();
                }
                //执行job
                if (job != null) {
                    job.run();
                }
            }
        }
        public void shutdown() {
            running = false;
        }
    }

    public static void main (String[] args) {
        SimpleThreadPool pool = new SimpleThreadPool(3);
        pool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("This1 is started");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("This1 is finished");
            }
        });
        pool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("This2 is started");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("This2 is finished");
            }
        });
        pool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("This3 is started");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("This3 is finished");
            }
        });
    }
}

 

线程池的简单实现