首页 > 代码库 > MongoDB线程安全批量处理

MongoDB线程安全批量处理

Mongo批处理工具类:

package com.saike.solr.server.util;import java.net.UnknownHostException;import java.util.ArrayList;import com.mongodb.BasicDBObject;import com.mongodb.DB;import com.mongodb.DBCollection;import com.mongodb.DBCursor;import com.mongodb.DBObject;import com.mongodb.Mongo;import com.mongodb.MongoException;import com.mongodb.MongoOptions;/** * 批处理工具类 * @author xieyong * */public class UtileMongDB {        UtilThreadLocal<ArrayList<DBObject>> localBatch;    /**mongo单例对象  根据官方文档mongojava是线程安全的*/    private static Mongo mongo;    private static DBCollection coll;    //private static Log log = LogFactory.getLog(UtileMongDB.class);    private static DB db;        static{           /** 实例化db*/           MongoOptions options = new MongoOptions();                      options.autoConnectRetry = true;                      options.connectionsPerHost = 1000;                      options.maxWaitTime = 5000;                      options.socketTimeout = 0;                      options.connectTimeout = 15000;                      options.threadsAllowedToBlockForConnectionMultiplier = 5000;            try {                mongo = new Mongo(MongoDBConstant.MONGO_HOST,MongoDBConstant.MONGO_PORT);            } catch (UnknownHostException | MongoException e) {                e.printStackTrace();            }            // boolean auth = db.authenticate(myUserName, myPassword);    }        public UtileMongDB(){        try {            localBatch = new UtilThreadLocal<ArrayList<DBObject>>(ArrayList.class);        } catch (Exception e) {            e.printStackTrace();        }     }    /**     * 返回db对象     * @return db     */    public static DB getDB(){        if(db==null){            db = mongo.getDB(MongoDBConstant.MONGO_DB);        }        return db;    }        /**     * 返回mongo     * @return mongo连接池     */    public static Mongo getMong(){        return mongo;    }        /**     * 读取集合     * @return mongo集合     * */    public static DBCollection getColl(String collname){        return getDB().getCollection(collname);    }        public static DBCollection getColl(){        return getDB().getCollection(MongoDBConstant.MONGO_COLLECTION);    }        /**  crud操作 */    public void addBatch(String key,String value){        BasicDBObject basicDB = new BasicDBObject();        basicDB.put(key, value);        /** 这里用线程本地变量,不用会存在竞技条件*/        localBatch.newGet().add(basicDB);    }        /**     * 执行批处理     * */    public void executeInsertBatch(){        getColl().insert(localBatch.get());        localBatch.get().clear();    }    /**     * 执行批量删除     */    public void executeDeleteBatch(){        ArrayList<DBObject> array = localBatch.get();            for(DBObject obj:array){            getColl().remove(obj);        }        localBatch.get().clear();    }                        public DBCursor query(String key,String value){        BasicDBObject basicDBObject = new BasicDBObject();        basicDBObject.put(key,value);        return getColl().find(basicDBObject);    }        }

 

 

ThreadLocal的封装:

package com.saike.solr.server.util;import java.lang.reflect.Constructor;/** *  * @author xieyong * * @param <T> 本地线程变量对象了类型 */public class UtilThreadLocal<T> extends ThreadLocal<T> {    /**参数集合*/    Object[] obj;    /**实例化构造函数*/    Constructor<T> construct;        /**     *      * @param clazz        本地变量的class     * @param args        构造函数的参数     * @throws NoSuchMethodException     * @throws SecurityException     */    public UtilThreadLocal(Class clazz,Object... args) throws NoSuchMethodException, SecurityException{        this.obj = obj;        Class[] clazzs = null;        /** new 获取参数class供获取构造函数用*/        if(args != null)            if(args.length !=0){                clazzs = new Class[args.length];                for(int i = 0;i<args.length;i++){                    clazzs[i] = args[i].getClass();                }            }        this.construct = clazz.getConstructor(clazzs);    }        /**     * 如果当前线程没有对象创建一个新对象     * @return     */    public T newGet(){        T tar = super.get() ;        if(tar == null){            try {                tar = construct.newInstance(obj);                super.set(tar);            }catch(Exception e){                e.printStackTrace();            }        }        return tar;    }}