首页 > 代码库 > Java 并发编程之任务取消(六)

Java 并发编程之任务取消(六)

关闭ExecutorService

ExecutorService提供了两种关闭方法,使用Shutdown正常关闭,以及使用ShutdownNow强行关闭。在进行强行关闭时,shutdownNow首先关闭当前正在执行的任务。然后返回所有尚未启动的任务清单 。

返回未启动任务清单这句没明白返回的方式,于是去查看了一下源码

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  For example, typical
     * implementations will cancel via {@link Thread#interrupt}, so any
     * task that fails to respond to interrupts may never terminate.
     *
     * @return list of tasks that never commenced execution
     * @throws SecurityException if a security manager exists and
     *         shutting down this ExecutorService may manipulate
     *         threads that the caller is not permitted to modify
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")},
     *         or the security manager's {@code checkAccess} method
     *         denies access.
     */
    List<Runnable> shutdownNow();

是用List的形式返回submit的Runnable


还是像上一篇一样使用日志服务做为栗子

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

public class LogService {
	private final ExecutorService exec = Executors.newSingleThreadExecutor();
	private final int TIMEOUT = 100;
	...	
	public void start() {
	}

	public void stop() throws InterruptedException {
		try {
			exec.shutdown();
			exec.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS);
		} finally {
			writer.close();
		}
	}

	public void log(String msg){
		try{
			exec.execute(new writerTask(msg)));
		}catch(RejectedExecutionException ignored){
			
		}
	}
}
省略了部分代码。因为和上一篇中的代码都一样,主要展现的是利用ExecutorService后Stop方法修改后的样子

毒丸对象

这是另一种消费者生产者的栗子,毒丸是指一个放在队列上的对象 ,其作用是当得到这个对象的时候,立即停止。在FIFO队列中,毒丸对象 将确保消费者在关闭之前首先完成队列中的所有工作。

举个栗子。。。哦。。花了好长时间才调试好。。

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class InderXingService {
	private static final File POISON = new File("");
	private final IndexerThread consumer = new IndexerThread();
	private final CrawlerThread producer = new CrawlerThread();
	private final BlockingQueue<File> queue = new LinkedBlockingQueue<File>();
	private final FileFilter fileFilter;
	private final File root = new File("F://Desktop/Open");

	public static void main(String[] args) {
		InderXingService index = new InderXingService(null, null);
		index.start();
	}

	public InderXingService(FileFilter fileFilter, File root) {
		this.fileFilter = fileFilter;
	}

	public void start() {
		producer.start();
		consumer.start();
	}

	public void stop() {
		producer.interrupt();
	}

	public void awaitTermination() throws InterruptedException {
		consumer.join();
	}

	class CrawlerThread extends Thread {

		@Override
		public void run() {
			// TODO Auto-generated method stub
			try {
				crawl(root);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} finally {
				System.out.println("putpoison");
				while (true) {
					try {
						queue.put(POISON);
						break;
					} catch (InterruptedException e1) {
					}
				}
			}
		}

		private void crawl(File root) throws InterruptedException {
			// 为文件添加内容

			File[] entries = root.listFiles();
			if (entries != null) {
				for (File entry : entries) {
					if (entry.isDirectory()) {
						crawl(entry.getAbsoluteFile());
					} else if (!alreadindex(entry)) {
						queue.put(entry);
					}
				}
			}
		}

		private boolean alreadindex(File entry) {
			// TODO Auto-generated method stub
			if (queue.contains(entry)) {
				return true;
			}
			return false;
		}
	}

	class IndexerThread extends Thread {
		@Override
		public void run() {
			while (true) {
				File file;
				try {
					file = queue.take();
					if (file == POISON) {
						break;
					} else {
						indexFile(file);
					}

				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}

			}
		}

		private void indexFile(File root) throws InterruptedException {
			System.out.println(root.getName());
		}
	}

}
这个是遍历一个目录的文件的栗子-0-

刚才试着遍历了一下整个F盘。。貌似消费者跟的上。而且没啥压力看来都可以用了


	public static void main(String[] args) {
		InderXingService index = new InderXingService(null, null);
		index.start();
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		index.stop();
	}
试了一下中断方法


UpgradeReport.xslt
UpgradeReport_Minus.gif
UpgradeReport_Plus.gif
java.lang.InterruptedException
putpoison
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(Unknown Source)
    at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(Unknown Source)
    at java.util.concurrent.LinkedBlockingQueue.put(Unknown Source)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:73)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:71)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:71)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:71)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:71)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:71)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:71)
    at InderXingService$CrawlerThread.run(InderXingService.java:49)

结果也对。


使用毒丸君的注意事顶:

只有在生产者和消费者的数量都已知的情况下,才可以使用毒丸对象。当生产者多的时候 ,可以加一个计数器,当所有生产者的丸子都放在队列里边的时候再进行打断。多消费者的时候 ,一个生产者可以放入与消费者数量相同的丸子。因为每个消费者都只能接收一个丸子。当两者数量都比较大时就不太好用了。只有在无界队列中。毒丸对象才能可靠的工作




Java 并发编程之任务取消(六)