首页 > 代码库 > 第八章、线程池的使用

第八章、线程池的使用

 

线程工厂的使用:

  在创建线程时,应该要初始化它的线程名称,以便以后更好的查找错误,下面的示例展示了线程工厂的使用,创建线程是并发的,因此count使用原子类。

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by chenkaiwei on 2017/6/8.
 */
public class MyThreadFactory implements ThreadFactory{
    private final String poolName;
    private AtomicInteger count=new AtomicInteger(0);
    public MyThreadFactory(String poolName) {
        this.poolName=poolName;
        System.out.println("Thread name is "+poolName);
    }
    @Override
    public Thread newThread(Runnable r) {
        Thread t=new Thread(r);
        t.setName(poolName+"-"+count.incrementAndGet());
        return t;
    }
}

 

   测试类:

package com.company;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        ExecutorService exec=Executors.newCachedThreadPool(new MyThreadFactory("thread"));
        exec.submit(new Thread1());
        exec.submit(new Thread1());

        exec.shutdown();

    }
}
class Thread1 implements Runnable{

    public void run(){
        System.out.println("the thread is running!");
        System.out.println("my name is "+Thread.currentThread().getName());
    }
}

 

 

扩展ThreadPoolExecutor

  下面演示使用给线程增加日志和计时功能,startTime使用ThreadLocal是因为全局变量,可以发现,在这里的全局变量要保证线程安全:

package com.company;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by chenkaiwei on 2017/6/8.
 */
public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime=new ThreadLocal<Long>();
    private final AtomicLong numTasks=new AtomicLong();
    private final AtomicLong totalTime=new AtomicLong();

    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

    }

    @Override
    protected void beforeExecute(Thread t, Runnable r){
        startTime.set(System.nanoTime());
        super.beforeExecute(t, r);
        System.out.println("在线程执行之前打印!");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        long endTime=System.nanoTime();
        long taskTime=endTime-startTime.get();
        numTasks.incrementAndGet();
        totalTime.addAndGet(taskTime);
        System.out.println(String.format("Thread %s:end %s, time=%dns",Thread.currentThread().getName(),endTime,taskTime));

        System.out.println("在线程执行之后打印!");
        super.afterExecute(r, t);
    }

    @Override
    protected void terminated() {
        super.terminated();
        System.out.println("在线程池关闭后打印!");
    }
}

 

递归算法的并行化

  这里要比较多线程和单线程递归地处理某个文件夹下全部文件的时间快慢。唯一不方便的是使用多线程处理时的时间统计,由于不知道提交了多少个处理线程,因此不能及时地关闭线程池(如果有大神,请指点一二),使用等待固定时间的方式,让线程池最终关闭。这里使用TimingThreadPool这个类来统计各个线程的执行时间,并每个线程执行完后将时间统计到totalTime中,在线程池关闭后再输出总时间。

package com.company;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by chenkaiwei on 2017/6/8.
 */
public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime=new ThreadLocal<Long>();
    private final AtomicLong numTasks=new AtomicLong();
    private final AtomicLong totalTime=new AtomicLong();

    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

    }

    @Override
    protected void beforeExecute(Thread t, Runnable r){
        startTime.set(System.nanoTime());
        super.beforeExecute(t, r);
        //System.out.println("在线程执行之前打印!");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        long endTime=System.nanoTime();
        long taskTime=endTime-startTime.get();
        numTasks.incrementAndGet();
        totalTime.addAndGet(taskTime);

        //System.out.println(String.format("Thread %s:end %s, time=%dns",Thread.currentThread().getName(),endTime,taskTime));
        //System.out.println("在线程执行之后打印!");
        //System.out.println("线程总运行时间为:"+totalTime.get()/1000000000+"秒");
        super.afterExecute(r, t);
    }

    @Override
    protected void terminated() {
        super.terminated();
        System.out.println("在线程池关闭后打印!总共用时:"+totalTime.get()/1000000000+"秒");
    }
}

 

  主方法,其中Thread1是使用多线程来执行,可以修改TimingThreadPool的初始状态可以调整线程数量。Thread2是使用单线程处理。在代码中,我假设处理文件的时间都是1毫秒(单单遍历文件时,多线程要比单线程的慢,但加入一些处理之后,多线程就占优势了)。可以看到我的测试数据有580259个文件,使用单线程时一共用时818秒,而使用3个线程时用时798秒,4线程416秒,2线程之类的846秒。因此最合适的是3线程也就快那么20秒,但是如果处理文件不仅仅是1毫秒,而是更多,那么这两者的差距会更大。

package com.company;

import java.io.File;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class Main {
    public final AtomicLong atom=new AtomicLong(0);
    public static void main(String[] args) {
        ExecutorService exec=Executors.newFixedThreadPool(1);
        //ThreadPoolExecutor pool=new TimingThreadPool(1,1,2,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(),new MyThreadFactory("findThread"));
        exec.submit(new Thread1());
        exec.shutdown();
    }

    //单线程遍历某个文件夹
    public void getAllFiles(File file){
        if(file.isDirectory()) {
            for (File f:file.listFiles()){
                getAllFiles(f);
            }
        }else {
            try{
                Thread.sleep(1);
            }catch(Exception e){
                e.printStackTrace();
            }
            atom.incrementAndGet();
        }
    }

    //多线程遍历某个文件夹
    public void parallelRecursive(Executor exec,File file){
        if(file.isDirectory()) {
            for (File f:file.listFiles()){
                exec.execute(new Thread(){
                    public void run(){
                        parallelRecursive(exec,f);
                    }
                });
            }
        }else {
            try{
                Thread.sleep(1);
            }catch(Exception e){
                e.printStackTrace();
            }
            atom.incrementAndGet();
        }
    }

}
class Thread1 implements Runnable{
    public void run(){
        Main m=new Main();
        File file=new File("/Users/chenkaiwei");

        ThreadPoolExecutor pool=new TimingThreadPool(2,2,5,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(),new MyThreadFactory("findThread"));


        m.parallelRecursive(pool,file);
        try{
            Thread.sleep(818000);
            pool.shutdown();
            System.out.println("共访问【"+m.atom.get()+"】个文件");
            //System.out.println("线程池退出");
            //exec.awaitTermination(Integer.MAX_VALUE,TimeUnit.SECONDS);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    //798秒 3线程
    //816秒 4线程
}

class Thread2 implements Runnable{
    public void run(){
        Main m=new Main();
        File file=new File("/Users/chenkaiwei");
        long startTime=System.currentTimeMillis();
        m.getAllFiles(file);
        long endTime=System.currentTimeMillis();
        System.out.println("总共用时:"+(endTime-startTime)/1000+"秒");
        System.out.println("共访问【"+m.atom.get()+"】个文件");
    }
    //818秒
}
//580259

 

第八章、线程池的使用