首页 > 代码库 > Retrofit+Rxjava+SqlBrite+SqlDelight实现多文件断点续传下载

Retrofit+Rxjava+SqlBrite+SqlDelight实现多文件断点续传下载

介绍

实现功能:

  • 多文件下载
  • 暂停,开始断点续传
  • 进度回调
  • sqlite保存下载信息
  • 程序被杀了,重新打开继续断点下载
  • 。。。

github: https://github.com/tpnet/RetrofitDownloadDemo , 有用可以点个start谢谢

本文需要SqlDelight、SqlBrite、AutoValue的知识。使用的是Rxjava1.2.9版本。

技术分享

技术分享

如何实现

看看流程图:

技术分享

  • 断点续传的原理为: 在请求头添加Range参数,值为bytes=从什么位置开始-

  • 多文件下载的原理为: 创建多个Retrofit实例,利用拦截器进行进度的获取

  • DownManager是下载管理类,里面可以对下载任务进行操作。

  • Downinfo是下载信息bean,Sqlite保存对应的bean。

  • 每创建一个下载任务就创建一个Retrofit对象,每个Retrofit对象对应一个下载任务,利用okhttp的拦截器进行进度的拦截回调,进度回调使用RxBus,当然你也可以使用接口回调,源码里面有保留对应IDownloadProgressListener接口。

  • DownSubscriber<T>里面进行下载状态和进度的回调,并且保存了Retrofit对象、DowInfo对象、回调view的状态接口IOnDownloadListener

  • item是view显示的列表,在DownSubscriber<T>里面通过IOnDownloadListener接口回调状态。

原理不难,难的是如何很好地管理。

下载信息bean

使用SqlDelight+AutoValue的方式管理下载实体类DownInfo。但其实这样子有一个弊端,就是定义的字段变量都为final,不能用set去修改,解决方法是重新new。

这里定义下载信息的bean字段有:

-- 创建下载信息表
create table downinfo(

    _id INTEGER NOT NULL  PRIMARY KEY AUTOINCREMENT,

    -- 文件url,唯一约束
    downUrl TEXT NOT NULL UNIQUE, 

    -- 文件类型
    downType TEXT NOT NULL, 

    -- 保存在手机的路径
    savePath TEXT NOT NULL,

    -- 文件总长度
    totalLength INTEGER NOT NULL,

    -- 当前下载的长度
    downLength INTEGER NOT NULL,

    -- 下载状态
    downState INTEGER AS @DownState Integer NOT NULL,


    -- 开始下载的时间
    startTime INTEGER NOT NULL,

    -- 下载完成时间
    finishTime INTEGER NOT NULL


);

下载状态为注解定义的常量(代替枚举),下载状态有6种:

@AutoValue
public abstract class DownInfo implements Parcelable,DownInfoModel{

    //下面6种下载状态
    public final static int DOWN_START = 0x0;    //开始
    public final static int DOWN_PAUSE = 0x1;    //暂停
    public final static int DOWN_STOP = 0x2;     //停止
    public final static int DOWN_ERROR = 0x3;     //错误
    public final static int DOWN_ING = 0x4;         //下载中
    public final static int DOWN_FINISH = 0x5;      //完成


    public static final Factory<DownInfo> FACTORY = new Factory<>(new DownInfoModel.Creator<DownInfo>() {
        @Override
        public DownInfo create(@NonNull long _id, @NonNull String downUrl, String downType, @NonNull String savePath, long totalLength, long downLength, int downState, long startTime, long finishTime) {
            return new AutoValue_DownInfo(_id, downUrl, downType, savePath, totalLength, downLength, downState, startTime, finishTime);
        }
    });


    public static final RowMapper<DownInfo> LIST_ROW_MAPPER = FACTORY.selectAllMapper();


    public static final RowMapper<String> DOWN_EXIST_MAPPER = FACTORY.selectDowninfoSavePathMapper();


    public static final RowMapper<Long> TOTALLENGTH_MAPPER = FACTORY.selectTotalLengthMapper();


    public static Builder create(DownInfo downInfo) {
        return builder()
                ._id(0)
                .downUrl(downInfo.downUrl())
                .downType(downInfo.downType())
                .savePath(downInfo.savePath())
                .totalLength(downInfo.totalLength())
                .downLength(downInfo.downLength())
                .downState(downInfo.downState())
                .startTime(downInfo.startTime())
                .finishTime(downInfo.finishTime());
    }


    public static Builder builder() {
        return new AutoValue_DownInfo.Builder();
    }


    @AutoValue.Builder
    public abstract static class Builder {

        public abstract Builder _id(long id);

        public abstract Builder downUrl(String downUrl);

        public abstract Builder downType(String downType);

        public abstract Builder savePath(String savePath);

        public abstract Builder totalLength(long totalLength);

        public abstract Builder downLength(long downLength);

        public abstract Builder downState(@DownState int downState);


        public abstract Builder startTime(long startTime);

        public abstract Builder finishTime(long finishTime);


        public abstract DownInfo build();

        //创建任务初始化其他参数
        public DownInfo create(){

            _id(0);

            downType("");

            startTime(0);

            finishTime(0);

            totalLength(0);

            downLength(0);

            downState(DOWN_START);

            return build();
        }

    }
}

下载状态注解标识,替代枚举:


    /**
     * 下载状态
     * Created by litp on 2017/4/10.
     */

    @IntDef({DOWN_START,DOWN_PAUSE,DOWN_STOP,DOWN_ERROR,DOWN_ING,DOWN_FINISH})
    @Retention(RetentionPolicy.SOURCE)
    public @interface DownState {}

数据库操作类

DBUtil, 单例实现,使用全局的Application的Context。使用SqlBrite进行数据库的RX操作,对下载时候的进度、下载状态、开始完成时间进行保存。搭配SqlDelight杠杆的。


/**
 * 数据库操作类,单例
 * Created by litp on 2017/4/10.
 */

public class DBUtil {

    private static DBUtil databaseUtil;

    //数据库操作类
    private BriteDatabase db;


    public DBUtil() {


        db = new SqlBrite.Builder().build().wrapDatabaseHelper(getHelper(DownManager.getInstance().getContext()), Schedulers.io());

    }

    /**
     * 获取单例
     *
     * @return
     */
    public static DBUtil getInstance() {
        if (databaseUtil == null) {
            synchronized (DBUtil.class) {
                if (databaseUtil == null) {
                    databaseUtil = new DBUtil();
                }
            }
        }
        return databaseUtil;
    }


    public DBHelper getHelper(Context context) {
        int v;
        try {
            v = context.getPackageManager().getPackageInfo(context.getPackageName(), PackageManager.GET_ACTIVITIES).versionCode;
        } catch (PackageManager.NameNotFoundException e) {
            e.printStackTrace();
            v = 1;
        }
        return new DBHelper(context, v);
    }


    /**
     * 查询所有的下载
     *
     * @return
     */
    public Observable<List<DownInfo>> getAllDown() {
        SqlDelightStatement sqlDelightStatement = DownInfo.FACTORY.selectAll();
        return db.createQuery(DownInfo.TABLE_NAME, sqlDelightStatement.statement, sqlDelightStatement.args)
                .mapToList(new Func1<Cursor, DownInfo>() {
                    @Override
                    public DownInfo call(Cursor cursor) {
                        return DownInfo.LIST_ROW_MAPPER.map(cursor);
                    }
                });
    }


    /**
     * 插入下载信息
     *
     * @param downInfo
     */
    public void insertDownInfo(DownInfo downInfo) {

        //插入下载信息
        DownInfo.InsertDowninfo insert = new DownInfoModel.InsertDowninfo(db.getWritableDatabase());
        insert.bind(downInfo.downUrl(), downInfo.downType(), downInfo.savePath(), downInfo.totalLength(), downInfo.downLength(), downInfo.downState(),
                downInfo.startTime(), downInfo.finishTime());

        insert.program.executeInsert();

    }


    /**
     * 更新下载进度
     *
     * @param downUrl
     */
    public void updateDownLength(final long downLength, final String downUrl) {

        Observable.just(downUrl)
                .observeOn(Schedulers.computation())
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        Log.e("@@", "更新下载中长度:" + downLength);
                        DownInfo.UpdateDownLength update = new DownInfoModel.UpdateDownLength(db.getWritableDatabase());
                        update.bind(downLength, s);
                        return update.program.executeUpdateDelete();
                    }
                })
                .subscribe();

    }


    //更新下载状态
    public Integer updateState(@DownState int state, String url) {

        DownInfo.UpdateDownState update = new DownInfoModel.UpdateDownState(db.getWritableDatabase());
        update.bind(state, url);
        int row = update.program.executeUpdateDelete();
        Log.e("@@", "更新数据库下载状态" + state);
        return row;


    }

    /**
     * 更新开始下载的时间
     */
    public void updateStartTime(String downUrl) {

        Observable.just(downUrl)
                .observeOn(Schedulers.computation())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        DownInfoModel.UpdateDownStartTime update = new DownInfoModel.UpdateDownStartTime(db.getWritableDatabase());
                        update.bind(System.currentTimeMillis(), s);
                        update.program.executeUpdateDelete();
                    }
                });

    }


    /**
     * 更新完成下载的时间
     */
    public void updateFinishTime(String downUrl) {
        Observable.just(downUrl)
                .observeOn(Schedulers.computation())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        DownInfoModel.UpdateDownFinishTime update = new DownInfoModel.UpdateDownFinishTime(db.getWritableDatabase());
                        update.bind(System.currentTimeMillis(), s);
                        update.program.executeUpdateDelete();
                    }
                });

    }


    /**
     * 更新总长度,如果本来的总长度大于新的,就不更新了
     *
     * @param downUrl
     */
    public void updateTotalLength(final long totalLength, final String downUrl) {

        SqlDelightStatement sqlDelightStatement = DownInfo.FACTORY.selectTotalLength(downUrl);
        db.createQuery(DownInfo.TABLE_NAME, sqlDelightStatement.statement, sqlDelightStatement.args)
                .mapToOneOrDefault(new Func1<Cursor, Long>() {
                    @Override
                    public Long call(Cursor cursor) {
                        return TOTALLENGTH_MAPPER.map(cursor);
                    }
                }, 0L)
                .observeOn(Schedulers.computation())
                .filter(new Func1<Long, Boolean>() {
                    @Override
                    public Boolean call(Long aLong) {

                        return aLong < totalLength;
                    }
                })
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        DownInfo.UpdateTotalLength update = new DownInfoModel.UpdateTotalLength(db.getWritableDatabase());
                        update.bind(totalLength, downUrl);
                        update.program.executeUpdateDelete();
                    }
                });

    }


    /**
     * 更新下载类型
     *
     * @param type
     * @param downUrl
     */
    public void updateDownType(final String type, final String downUrl) {
        Observable.just(downUrl)
                .observeOn(Schedulers.computation())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        DownInfoModel.UpdateDownType update = new DownInfoModel.UpdateDownType(db.getWritableDatabase());
                        update.bind(type, s);
                        update.program.executeUpdateDelete();
                    }
                });

    }


    /**
     * 更新下载信息
     *
     * @param downInfo
     * @return
     */
    public Integer updateDowninfo(DownInfo downInfo) {
        DownInfoModel.UpdateDowninfo updateDowninfo = new DownInfoModel.UpdateDowninfo(db.getWritableDatabase());
        updateDowninfo.bind(downInfo.savePath(), downInfo.totalLength(), downInfo.downLength(), downInfo.downState(), downInfo.startTime(), downInfo.finishTime(), downInfo.downUrl());
        return updateDowninfo.program.executeUpdateDelete();
    }

    /**
     * 获取保存的手机路径
     *
     * @param downUrl
     * @return
     */
    public Observable<String> getDownSavePath(String downUrl) {
        SqlDelightStatement sqlDelightStatement = DownInfo.FACTORY.selectDowninfoSavePath(downUrl);
        return db.createQuery(DownInfo.TABLE_NAME, sqlDelightStatement.statement, sqlDelightStatement.args)
                .mapToOneOrDefault(new Func1<Cursor, String>() {
                    @Override
                    public String call(Cursor cursor) {
                        return DOWN_EXIST_MAPPER.map(cursor);
                    }
                }, "");

    }


}

DownManager下载管理类

下载任务管理类,处理下载状态和数据库的管理。

/**
 * 统一下载管理器
 * Created by litp on 2017/4/10.
 */

public class DownManager {


    //回调观察者队列,downUrl标识,暂停和错误都会取消订阅对应的观察者
    private Map<String, DownSubscriber<DownInfo>> downSubs;


    //单例
    private volatile static DownManager INSTANCE;

    private Context context;


    public DownManager() {
        downSubs = new HashMap<>();

    }


    /**
     * 获取单例
     */
    public static DownManager getInstance() {
        if (INSTANCE == null) {
            synchronized (DownManager.class) {
                if (INSTANCE == null) {
                    INSTANCE = new DownManager();

                }
            }
        }
        return INSTANCE;
    }


    /**
     * 初始化
     *
     * @param context
     */
    public static void init(Context context) {
        DownManager.getInstance().setContext(context);
    }

    public void setContext(Context context) {
        this.context = context;
    }

    public Context getContext() {
        if (context == null) {
            throw new IllegalArgumentException("下载管理器还没有进行初始化");
        } else {
            return context;
        }
    }


    public void startDown(final DownInfo info) {
        //正在下载不处理
        if (info == null) {
            return;
        }

        //添加回调处理类
        DownSubscriber<DownInfo> subscriber;

        if (downSubs.get(info.downUrl()) != null) {  //切换界面下载 情况

            if (downSubs.get(info.downUrl()).getDownInfo().downState() == DownInfo.DOWN_ING) {
                //切换界面下载中,返回,继续下载
                return;
            }

            if(downSubs.get(info.downUrl()).isUnsubscribed()){  //重试下载
                Log.e("@@","重试下载");
                subscriber = new DownSubscriber<DownInfo>(
                        downSubs.get(info.downUrl()).getListener()
                ,downSubs.get(info.downUrl()).getDownInfo()
                ,downSubs.get(info.downUrl()).getService()
                ,downSubs.get(info.downUrl()).getPrePercent());

                //downSubs.remove(info.downUrl());
                //覆盖订阅者
                downSubs.put(info.downUrl(), subscriber);
            }else{
                subscriber = downSubs.get(info.downUrl());
            }

        } else {  //第一次下载

            subscriber = new DownSubscriber<DownInfo>(info);
            //更新订阅者
            downSubs.put(info.downUrl(), subscriber);
        }


        DownInterface service;

        if (downSubs.get(info.downUrl()).getService() != null) {
            //获取service
            service = downSubs.get(info.downUrl()).getService();
        } else {

            service = createService(new DownloadInterceptor(info.downUrl()));

            downSubs.get(info.downUrl()).setService(service);

            //插入下载信息到数据库
            DBUtil.getInstance().insertDownInfo(downSubs.get(info.downUrl()).getDownInfo());

        }


        Log.e("@@", "断点续传长度为:" + downSubs.get(info.downUrl()).getDownInfo().downLength());
        // 断点下载,每次返回的总长度是减去断点续传的长度
        service.download("bytes=" + downSubs.get(info.downUrl()).getDownInfo().downLength() + "-", downSubs.get(info.downUrl()).getDownInfo().downUrl())
                //在线程中下载
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .map(new Func1<ResponseBody, DownInfo>() {    //写入文件
                    @Override
                    public DownInfo call(ResponseBody responseBody) {

                        Log.e("@@", "数据回调map call保存到文件: contentLength=" + responseBody.contentLength() + " 类型:" + responseBody.contentType().toString());

                        //更新总长度
                        DBUtil.getInstance().updateTotalLength(responseBody.contentLength(), info.downUrl());

                        //更新类型
                        DBUtil.getInstance().updateDownType(responseBody.contentType().toString(), info.downUrl());

                        //更新下载中状态
                        DBUtil.getInstance()
                                .updateState(DownInfo.DOWN_ING, info.downUrl());

                        try {
                            FileUtil.writeFile(responseBody, new File(downSubs.get(info.downUrl()).getDownInfo().savePath()), downSubs.get(info.downUrl()).getDownInfo());
                        } catch (IOException e) {
                            //throw e;
                            Log.e("@@", "写入文件错误" + e.getMessage());
                        }

                        return downSubs.get(info.downUrl()).getDownInfo();
                    }
                })
                .observeOn(AndroidSchedulers.mainThread()) //回调在主线程
                .subscribe(subscriber);    //数据回调

        Log.e("@@", "开始下载");


    }


    private DownInterface createService(Interceptor interceptor) {

        OkHttpClient client = new OkHttpClient.Builder()
                .connectTimeout(Constant.TIME_OUT, TimeUnit.SECONDS)
                .addInterceptor(interceptor)
                .build();


        //创建Retrofit
        return new Retrofit.Builder()
                .client(client)
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .baseUrl("https://www.baidu.com/")  //下载文件,基地址可以不用正确,下载文件的url是全路径即可
                .build()
                .create(DownInterface.class);
    }


    //添加view回调监听器
    public void addListener(String downUrl, IOnDownloadListener<DownInfo> listener) {
        if (downSubs.get(downUrl) != null) {
            Log.e("@@", "添加监听器" + downUrl);
            downSubs.get(downUrl).setListener(listener);

        }
    }


    /**
     * 开始下载调用,更新下载时间
     *
     */
    public void onStartDown(String downUrl) {

        //更新开始的下载状态到数据库
        DBUtil.getInstance()
                .updateState(DownInfo.DOWN_START, downUrl);

        //更新开始下载时间到数据库
        DBUtil.getInstance()
                .updateStartTime(downUrl);
    }


    /**
     * 完成下载调用
     *
     * @param downUrl
     */
    public void onFinishDown(String downUrl) {


        //更新下载完成状态
        DBUtil.getInstance()
                .updateState(DownInfo.DOWN_FINISH, downUrl);

        //更新完成下载时间到数据库
        DBUtil.getInstance()
                .updateFinishTime(downUrl);

        remove(downUrl);
    }


    /**
     * 停止下载,进度设置为0,状态未开始
     *
     * @param info
     */
    public void stopDown(DownInfo info) {


        if (handleDown(info, DownInfo.DOWN_STOP) > 0) {
            if (downSubs.get(info.downUrl()).getListener() != null) {
                downSubs.get(info.downUrl()).getListener().onStop();
            }
        }

    }


    /**
     * 下载错误
     *
     * @param info
     */
    public void errorDown(DownInfo info, final Throwable e) {

        if (handleDown(info, DownInfo.DOWN_ERROR) > 0) {
            if (downSubs.get(info.downUrl()).getListener() != null) {
                downSubs.get(info.downUrl()).getListener().onError(e);
            }
        }

    }

    /**
     * 暂停下载
     *
     * @param info
     */
    public void pauseDown(DownInfo info) {

        if (handleDown(info, DownInfo.DOWN_PAUSE) > 0) {
            if (downSubs.get(info.downUrl()).getListener() != null) {
                downSubs.get(info.downUrl()).getListener().onPuase();
            }
        }

    }

    //处理下载状态
    private Integer handleDown(DownInfo info, @DownState int state) {
        if (info == null) return null;

        if (downSubs.get(info.downUrl()) != null) {

            //解除订阅就不会下载了
            downSubs.get(info.downUrl()).unsubscribe();

            downSubs.get(info.downUrl()).setDownloadState(state);

            //防止下载速度太快导致继续下载回调
            //downSubs.get(info.downUrl()).setListener(null);
            //downSubs.remove(info.downUrl());

        }

        return DBUtil.getInstance()
                .updateState(state, info.downUrl());
    }


    /**
     * 回调更新下载长度到数据库,在DownManager统一管理数据库。
     *
     * @param downLength
     * @param downUrl
     */
    public void onSetDownLength(long downLength, String downUrl) {

        //更新下载长度到数据库
        DBUtil.getInstance()
                .updateDownLength(downLength, downUrl);

    }


    /**
     * 停止全部下载
     */
    public void stopAllDown() {


        for (String key : downSubs.keySet()) {
            stopDown(downSubs.get(key).getDownInfo());
        }

        downSubs.clear();
    }


    /**
     * 暂停全部下载
     */
    public void pauseAllDown() {

        for (String key : downSubs.keySet()) {
            pauseDown(downSubs.get(key).getDownInfo());
        }

        downSubs.clear();
    }


    /**
     * 移除下载数据
     *
     * @param downUrl
     */
    public void remove(String downUrl) {
        downSubs.remove(downUrl);
    }


}

进度状态类

DownSubscriber为进度的回调和各种状态的回调,是Rxjava的订阅者,所以是继承Subscriber。里面维护了四个变量:

  • listener是回调view的监听器
  • downInfo 是当前订阅者对应的下载信息
  • service是Retrofit的下载接口
  • prePercent是前一个下载的进度百分比,为了防止频繁更新数据库和view。

在创建的时候注册了Rxbus,进行下载过程中的进度回调。

/**
 * 下载的Rxjava观察者
 * Created by litp on 2017/4/10.
 */

public class DownSubscriber<T> extends Subscriber<T> {


    //弱引用结果回调,挂了就回收。  回调view监听器
    private WeakReference<IOnDownloadListener> listener;

    private DownInfo downInfo;   //下载bean


    private DownInterface service;     // Retrofit的服务端


    private int prePercent = 0;  //上一次的进度,防止频繁更新View


    public DownSubscriber(DownInfo downInfo) {
        this(null, downInfo, null, 0);

    }


    public DownSubscriber(IOnDownloadListener listener, DownInfo downInfo, DownInterface service, int prePercent) {
        setListener(listener);
        setDownInfo(downInfo);
        setService(service);
        setPrePercent(prePercent);

        //接收下载过程中的进度回调
        RxBus.with().setEvent(downInfo.downUrl())
                .onNext(new Action1<Events<?>>() {
                    @Override
                    public void call(Events<?> events) {
                        ProgressEvent event = events.getContent();
                        update(event.getDownLength(), event.getTotalLength(), event.isFinish());
                    }
                })
                .create();

    }


    public int getPrePercent() {
        return prePercent;
    }

    public void setPrePercent(int prePercent) {
        this.prePercent = prePercent;
    }

    //到了下载的列表界面就设置监听器
    public void setListener(IOnDownloadListener listener) {
        this.listener = new WeakReference<IOnDownloadListener>(listener);
    }


    public IOnDownloadListener getListener() {
        return listener != null ? listener.get() : null;
    }

    public void setDownInfo(DownInfo data) {

        this.downInfo = data;

    }

    public DownInfo getDownInfo() {
        return downInfo;
    }


    public DownInterface getService() {
        return service;
    }

    public void setService(DownInterface service) {
        this.service = service;
    }

    /**
     * 开始下载
     */
    @Override
    public void onStart() {
        super.onStart();
        if (listener != null && listener.get() != null) {
            //回调开始
            listener.get().onStart();
        }

        Log.e("@@", "Subscriber onStart开始下载");
        ToastUtil.show("开始下载");

        setDownloadState(DownInfo.DOWN_START);

        //更新bean的下载完成时间,可更新可以不更新

        DownManager.getInstance().onStartDown(downInfo.downUrl());


    }


    /**
     * 完成回调
     */
    @Override
    public void onCompleted() {
        if (listener != null && listener.get() != null) {
            //回调开始
            listener.get().onComplete();
        }

        Log.e("@@", "onCompleted完成下载");


    }

    /**
     * 下载错误
     *
     * @param e
     */
    @Override
    public void onError(Throwable e) {
        Log.e("@@", "onErro下载失败: " + e.toString());
        ToastUtil.show("下载错误");

        DownManager.getInstance().errorDown(downInfo, e);

    }


    /**
     * 下载完成
     * @param t
     */
    @Override
    public void onNext(T t) {
        Log.e("@@", "onNext下载完毕");
        ToastUtil.show("下载完毕");

        if (listener != null && listener.get() != null) {
            listener.get().onNext(t);
        }


        //AutoValue标注的bean不能setter,需要重新new一个
        setDownloadState(DownInfo.DOWN_FINISH);

        //更新bean的下载完成时间,可更新可以不更新

        DownManager.getInstance().onFinishDown(downInfo.downUrl());


    }


    //下载进度回调
    public void update(long down, final long total, final boolean finish) {

        //Log.e("@@","下载进度: downBytes="+down+" responseBody.contentLength()="+total);

        long downLength = down;


        //设置当前下载状态
        DownInfo.Builder builder = DownInfo.create(downInfo);
        if (downInfo.totalLength() > total) {
            //断点续传下载的情况
            downLength = downInfo.totalLength() - total + down;
        } else if (downInfo.totalLength() < total) {
            builder.totalLength(total);
        }

        //Log.e("@@", "下载长度" + downLength);

        //如果已经解除订阅了,代表暂停停止出错了,不更新状态了
        if (!isUnsubscribed()) {
            //Log.e("@@","设置下载中状态");
            builder.downState(DownInfo.DOWN_ING);
        }

        downInfo = builder
                .downLength(downLength)
                .build();

        Observable.just(downInfo.downLength())
                .map(new Func1<Long, Integer>() {
                    @Override
                    public Integer call(Long aLong) {
                        return (int) (100 * downInfo.downLength() / downInfo.totalLength());

                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer percent) {

                        if (percent > prePercent) {

                            prePercent = percent;

                            //更新下载长度到数据库
                            DownManager.getInstance()
                                    .onSetDownLength(downInfo.downLength(), downInfo.downUrl());

                            if(listener != null && listener.get() != null && !isUnsubscribed()){
                                //回调进度
                                listener.get().updatePercent(percent);

                            }

                        }else{
                            if(listener != null && listener.get() != null && !isUnsubscribed()){
                                //回调长度
                                listener.get().updateLength(downInfo.downLength(), downInfo.totalLength(), percent);

                            }
                        }

                    }
                });
    }


    /**
     * 设置当前的下载状态
     *
     * @param state
     */
    public void setDownloadState(@DownState int state) {

        Log.e("@@", "sub更新状态" + state);

        this.downInfo = DownInfo.create(downInfo)
                .downState(state)
                .build();


        //开始,暂停,错误时候,更新下载长度到数据库
        DownManager.getInstance()
                .onSetDownLength(downInfo.downLength(), downInfo.downUrl());
    }


}

使用

1. 以module形式添加到项目

2. 在Application进行初始化下载器

DownManager.init(this):

3. 添加下载

        DownInfo downInfo = DownInfo.builder()
                .savePath(getPath(url))   //文件保存的路径
                .downUrl(url)               //下载的url,要全路径
                .create();
        //开始下载
        DownManager.getInstance().startDown(downInfo);

4. 查询当前的下载

        //查询当前所有的下载
        DBUtil.getInstance().getAllDown()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(adapter);

5. 设置view的回调监听器

注意要在startDown之后设置监听器,例如demo里面的ListViewHolder:

/**
 * 
 * Created by litp on 2017/4/18.
 */

class ListViewHolder extends RecyclerView.ViewHolder implements View.OnClickListener {

    private Button mBtHandle;
    private TextView mTvName;
    private TextView mTvDownLength;
    private ProgressBar mPrbDown;

    private TextView mTvDownStartTime;
    private TextView mTvDownFinishTime;


    private DownInfo downInfo;

    public ListViewHolder(View itemView) {
        super(itemView);
        mBtHandle = (Button) itemView.findViewById(R.id.bt_handle);
        mTvName = (TextView) itemView.findViewById(R.id.tv_name);
        mTvDownLength = (TextView) itemView.findViewById(R.id.tv_down_length);
        mPrbDown = (ProgressBar) itemView.findViewById(R.id.prb_down);
        mTvDownStartTime = (TextView) itemView.findViewById(R.id.tv_down_start_time);
        mTvDownFinishTime = (TextView) itemView.findViewById(R.id.tv_down_finish_time);


        mBtHandle.setOnClickListener(this);

    }


    public void setData(DownInfo data, int position) {

        this.downInfo = data;


        switch (downInfo.downState()) {
            case DownInfo.DOWN_ING:
            case DownInfo.DOWN_START:
                mBtHandle.setText("暂停");
                DownManager.getInstance().startDown(downInfo);
                break;
            case DownInfo.DOWN_STOP:
            case DownInfo.DOWN_PAUSE:
                mBtHandle.setText("开始");

                break;
            case DownInfo.DOWN_ERROR:
                mBtHandle.setText("重试");
                break;
            case DownInfo.DOWN_FINISH:
                mBtHandle.setText("打开");

                mTvDownFinishTime.setVisibility(View.VISIBLE);
                //设置下载完成时间
                mTvDownFinishTime.setText(
                        String.format("完成时间: %s",
                                new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(downInfo.finishTime())
                        )
                );

                break;

        }

        //添加view回调监听器,放在startDown后面,防止程序终止时候打开列表重新下载的问题
        DownManager.getInstance().addListener(data.downUrl(), listener);


        //查询名字
        DatabaseUtil.getInstance().getName(downInfo.downUrl())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        mTvName.setText(s);
                    }
                });


        //设置进度文本
        mTvDownLength.setText(
                String.format("%s/%s"
                        , FileUtil.getFormatSize(data.downLength()), FileUtil.getFormatSize(data.totalLength())));


        //计算进度
        if (downInfo.totalLength() == 0) {
            mPrbDown.setProgress(100);
        } else {
            mPrbDown.setProgress((int) (downInfo.downLength() * 100 / downInfo.totalLength()));
        }

        //设置开始下载时间
        if (downInfo.startTime() > 0) {
            mTvDownStartTime.setText(
                    String.format("开始时间: %s",
                            new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(downInfo.startTime())
                    )
            );

        }


    }


    /*下载回调*/
    IOnDownloadListener<DownInfo> listener = new IOnDownloadListener<DownInfo>() {
        @Override
        public void onNext(DownInfo baseDownEntity) {
            Log.e("@@", "listsner onNext下载完成");
            mBtHandle.setText("打开");

            downInfo = DownInfo.create(downInfo)
                    .downState(DownInfo.DOWN_FINISH)
                    .build();

            mTvDownFinishTime.setVisibility(View.VISIBLE);
            //设置下载完成时间
            mTvDownFinishTime.setText(
                    String.format("完成时间: %s",
                            SimpleDateFormat.getInstance().format(new Date(downInfo.finishTime()))
                    )
            );

        }

        @Override
        public void onStart() {
            Log.e("@@", "listsner onStart开始下载");
            mBtHandle.setText("暂停");
            downInfo = DownInfo.create(downInfo)
                    .downState(DownInfo.DOWN_START)
                    .build();
            mTvDownStartTime.setText(
                    String.format("开始时间: %s",
                            SimpleDateFormat.getInstance().format(new Date(downInfo.startTime()))
                    )
            );

        }

        @Override
        public void onComplete() {
            Log.e("@@", "listsner onComplete完成");
            //mBtHandle.setText("完成");


        }

        @Override
        public void one rror(Throwable e) {
            Log.e("@@", "listsner one rror下载错误");
            super.onError(e);
            mBtHandle.setText("重试");

            downInfo = DownInfo.create(downInfo)
                    .downState(DownInfo.DOWN_ERROR)
                    .build();
        }


        @Override
        public void onPuase() {
            Log.e("@@", "listsner onPause下载暂停:" + downInfo.downState());
            super.onPuase();
            mBtHandle.setText("开始");

            downInfo = DownInfo.create(downInfo)
                    .downState(DownInfo.DOWN_PAUSE)
                    .build();
        }

        @Override
        public void onStop() {
            Log.e("@@", "listsner onPause下载停止");
            super.onStop();
            mBtHandle.setText("开始");


            downInfo = DownInfo.create(downInfo)
                    .downState(DownInfo.DOWN_STOP)
                    .build();

        }

        @Override
        public void updateLength(long readLength, long totalLength, int percent) {

            //Log.e("@@", "listsner updateLength下载中:" + percent + " " + readLength + " " + totalLength);

            //设置文本
            mTvDownLength.setText(
                    String.format("%s/%s"
                            , FileUtil.getFormatSize(readLength), FileUtil.getFormatSize(totalLength)));

        }

        @Override
        public void updatePercent(int percent) {

            Log.e("@@", "listsner updatePercent更新进度:" + percent);


            mBtHandle.setText("暂停");

            //计算进度
            mPrbDown.setProgress(percent);
        }
    };


    @Override
    public void onClick(View v) {

        if (v.getId() == R.id.bt_handle) {
            switch (downInfo.downState()) {
                case DownInfo.DOWN_ING:
                case DownInfo.DOWN_START:
                    //需要暂停
                    Log.e("@@", "点击了暂停");
                    DownManager.getInstance().pauseDown(downInfo);
                    break;
                case DownInfo.DOWN_STOP:
                case DownInfo.DOWN_PAUSE:
                case DownInfo.DOWN_ERROR:
                    //需要开始
                    Log.e("@@", "点击了 开始下载");
                    //需要设置监听器,
                    //downInfo.setListener(listener);
                    DownManager.getInstance().startDown(downInfo);
                    break;
                case DownInfo.DOWN_FINISH:
                    //需要打开
                    Log.e("@@", "点击了 完成");
                    if (FileUtil.getExtensionName(downInfo.savePath()).equals("apk")) {
                        //如果是安装包、
                        Intent intent = new Intent(Intent.ACTION_VIEW);
                        intent.setDataAndType(Uri.fromFile(new File(downInfo.savePath())),
                                "application/vnd.android.package-archive");
                        mBtHandle.getContext().startActivity(intent);
                    } else if (downInfo.downType().equals("application/octet-stream")) {

                        ToastUtil.show("文件类型: 二进制流,不知道文件类型。" + downInfo.downType());
                    } else {
                        ToastUtil.show("文件类型: " + downInfo.downType());
                    }

                    break;

            }
        }

    }
}
<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    Retrofit+Rxjava+SqlBrite+SqlDelight实现多文件断点续传下载