首页 > 代码库 > 使用Java多线程实现任务分发

使用Java多线程实现任务分发

多线程下载由来已久,如 FlashGet、NetAnts 等工具,它们都是依懒于 HTTP 协议的支持(Range 字段指定请求内容范围),首先能读取出请求内容 (即欲下载的文件) 的大小,划分出若干区块,把区块分段分发给每个线程去下载,线程从本段起始处下载数据及至段尾,多个线程下载的内容最终会写入到同一个文件中。

    只研究有用的,工作中的需求:要把多个任务分派给Java的多个线程去执行,这其中就会有一个任务列表指派到线程的策略思考:已知:1. 一个待执行的任务列表,2. 指定要启动的线程数;问题是:每个线程实际要执行哪些任务。

    使用Java多线程实现这种任务分发的策略是:任务列表连续按线程数分段,先保证每线程平均能分配到的任务数,余下的任务从前至后依次附加到线程中——只是数量上,实际每个线程执行的任务都还是连续的。如果出现那种僧多(线程) 粥(任务) 少的情况,实际启动的线程数就等于任务数,一挑一。这里只实现了每个线程各扫自家门前雪,动作快的完成后眼见别的线程再累都是爱莫能助。

    实现及演示代码如下:由三个类实现,写在了一个 Java 文件中:TaskDistributor 为任务分发器,Task 为待执行的任务,WorkThread 为自定的工作线程。代码中运用了命令模式,如若能配以监听器,用上观察者模式来控制 UI 显示就更绝妙不过了,就能实现像下载中的区块着色跳跃的动感了,在此定义下一步的着眼点了。

    代码中有较为详细的注释,看这些注释和执行结果就很容易理解的。main() 是测试方法

package com.alpha.thread;  

  

import java.util.ArrayList;  

import java.util.List;  

  

/** 

 * 指派任务列表给线程的分发器 

 */  

public class TaskDistributor {  

      

    /** 

     * 测试方法 

     * @param args 

     */  

    @SuppressWarnings("unchecked")  

    public static void main(String[] args) {  

        // 初始化要执行的任务列表  

        List taskList = new ArrayList();  

        for (int i = 0; i < 100; i++) {  

            taskList.add(new Task(i));  

        }  

        // 设定要启动的工作线程数为 4 个  

        int threadCount = 4;  

        List[] taskListPerThread = distributeTasks(taskList, threadCount);  

        System.out.println("实际要启动的工作线程数:" + taskListPerThread.length);  

        for (int i = 0; i < taskListPerThread.length; i++) {  

            Thread workThread = new WorkThread(taskListPerThread[i], i);  

            workThread.start();  

        }  

    }  

  

    /** 

     * 把 List 中的任务分配给每个线程,先平均分配,剩于的依次附加给前面的线程 返回的数组有多少个元素 (List) 就表明将启动多少个工作线程 

     *  

     * @param taskList 

     *            待分派的任务列表 

     * @param threadCount 

     *            线程数 

     * @return 列表的数组,每个元素中存有该线程要执行的任务列表 

     */  

    @SuppressWarnings("unchecked")  

    public static List[] distributeTasks(List taskList, int threadCount) {  

        // 每个线程至少要执行的任务数,假如不为零则表示每个线程都会分配到任务  

        int minTaskCount = taskList.size() / threadCount;  

        // 平均分配后还剩下的任务数,不为零则还有任务依个附加到前面的线程中  

        int remainTaskCount = taskList.size() % threadCount;  

        // 实际要启动的线程数,如果工作线程比任务还多  

        // 自然只需要启动与任务相同个数的工作线程,一对一的执行  

        // 毕竟不打算实现了线程池,所以用不着预先初始化好休眠的线程  

        int actualThreadCount = minTaskCount > 0 ? threadCount  

                : remainTaskCount;  

        // 要启动的线程数组,以及每个线程要执行的任务列表  

        List[] taskListPerThread = new List[actualThreadCount];  

        int taskIndex = 0;  

        // 平均分配后多余任务,每附加给一个线程后的剩余数,重新声明与 remainTaskCount  

        // 相同的变量,不然会在执行中改变 remainTaskCount 原有值,产生麻烦  

        int remainIndces = remainTaskCount;  

        for (int i = 0; i < taskListPerThread.length; i++) {  

            taskListPerThread[i] = new ArrayList();  

            // 如果大于零,线程要分配到基本的任务  

            if (minTaskCount > 0) {  

                for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {  

                    taskListPerThread[i].add(taskList.get(j));  

                }  

                taskIndex += minTaskCount;  

            }  

            // 假如还有剩下的,则补一个到这个线程中  

            if (remainIndces > 0) {  

                taskListPerThread[i].add(taskList.get(taskIndex++));  

                remainIndces--;  

            }  

        }  

        // 打印任务的分配情况  

        for (int i = 0; i < taskListPerThread.length; i++) {  

            System.out.println("线程 "  

                    + i  

                    + " 的任务数:"  

                    + taskListPerThread[i].size()  

                    + " 区间["  

                    + ((Task) taskListPerThread[i].get(0)).getTaskId()  

                    + ","  

                    + ((Task) taskListPerThread[i].get(taskListPerThread[i].size() - 1))  

                            .getTaskId() + "]");  

        }  

        return taskListPerThread;  

    }  

}  




package com.alpha.thread;  

  

/** 

 * 要执行的任务,可在执行时改变它的某个状态或调用它的某个操作 例如任务有三个状态,就绪,运行,完成,默认为就绪态 要进一步完善,可为 Task 

 * 加上状态变迁的监听器,因之决定UI的显示 

 */  

class Task {  

    public static final int READY = 0;  

    public static final int RUNNING = 1;  

    public static final int FINISHED = 2;  

    @SuppressWarnings("unused")  

    private int status;  

    // 声明一个任务的自有业务含义的变量,用于标识任务  

    private int taskId;  

  

    // 任务的初始化方法  

    public Task(int taskId) {  

        this.status = READY;  

        this.taskId = taskId;  

    }  

  

    /** 

     * 执行任务 

     */  

    public void execute() {  

        // 设置状态为运行中  

        setStatus(Task.RUNNING);  

        System.out.println("当前线程 ID 是:" + Thread.currentThread().getName()  

                + " | 任务 ID 是:" + this.taskId);  

        // 附加一个延时  

        try {  

            Thread.sleep(1000);  

        } catch (InterruptedException e) {  

            e.printStackTrace();  

        }  

        // 执行完成,改状态为完成  

        setStatus(FINISHED);  

    }  

  

    public void setStatus(int status) {  

        this.status = status;  

    }  

  

    public int getTaskId() {  

        return taskId;  

    }  

}  



package com.alpha.thread;  

  

import java.util.List;  

  

/** 

 * 自定义的工作线程,持有分派给它执行的任务列表 

 */  

class WorkThread extends Thread {  

    // 本线程待执行的任务列表,你也可以指为任务索引的起始值  

    private List<Task> taskList = null;  

    @SuppressWarnings("unused")  

    private int threadId;  

  

    /** 

     * 构造工作线程,为其指派任务列表,及命名线程 ID 

     *  

     * @param taskList 

     *            欲执行的任务列表 

     * @param threadId 

     *            线程 ID 

     */  

    @SuppressWarnings("unchecked")  

    public WorkThread(List taskList, int threadId) {  

        this.taskList = taskList;  

        this.threadId = threadId;  

    }  

  

    /** 

     * 执行被指派的所有任务 

     */  

    public void run() {  

        for (Task task : taskList) {  

            task.execute();  

        }  

    }  

}  


执行结果如下,注意观察每个Java多线程分配到的任务数量及区间。直到所有的线程完成了所分配到的任务后程序结束:

线程 0 的任务数:25 区间[0,24]  

线程 1 的任务数:25 区间[25,49]  

线程 2 的任务数:25 区间[50,74]  

线程 3 的任务数:25 区间[75,99]  

实际要启动的工作线程数:4  

当前线程 ID 是:Thread-0 | 任务 ID 是:0  

当前线程 ID 是:Thread-3 | 任务 ID 是:75  

当前线程 ID 是:Thread-1 | 任务 ID 是:25  

当前线程 ID 是:Thread-2 | 任务 ID 是:50  

当前线程 ID 是:Thread-1 | 任务 ID 是:26  

当前线程 ID 是:Thread-3 | 任务 ID 是:76  

当前线程 ID 是:Thread-0 | 任务 ID 是:1  

当前线程 ID 是:Thread-2 | 任务 ID 是:51  


本文出自 “小西天” 博客,请务必保留此出处http://gggcgba.blog.51cto.com/3889163/1437919