首页 > 代码库 > Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime

Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime

转载请注明出处:Import博客园:http://www.cnblogs.com/thinkpad

注意:本文批处理只是Storm到Hbase批处理入库操作,并非Storm的API的批处理!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

  1 package com.storm.hbaseTest;  2   3 import java.io.IOException;  4 import java.util.HashMap;  5 import java.util.Iterator;  6 import java.util.List;  7 import java.util.Map;  8   9 import org.apache.commons.lang.StringUtils; 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.hbase.HBaseConfiguration; 12 import org.apache.hadoop.hbase.client.HConnection; 13 import org.apache.hadoop.hbase.client.HConnectionManager; 14 import org.apache.hadoop.hbase.client.HTableInterface; 15 import org.apache.hadoop.hbase.client.Put; 16  17 import backtype.storm.Config; 18 import backtype.storm.Constants; 19 import backtype.storm.task.OutputCollector; 20 import backtype.storm.task.TopologyContext; 21 import backtype.storm.topology.BasicOutputCollector; 22 import backtype.storm.topology.IRichBolt; 23 import backtype.storm.topology.OutputFieldsDeclarer; 24 import backtype.storm.topology.base.BaseBasicBolt; 25 import backtype.storm.tuple.Fields; 26 import backtype.storm.tuple.Tuple; 27  28 import org.slf4j.Logger; 29 import org.slf4j.LoggerFactory; 30  31 import com.google.common.collect.Lists; 32  33 /** 34  * @ClassName: HbaseBout.java 35  * @Description:自定义批入库,定时批量和大小批量 36  * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ] 37  * @date 2016年8月27日 下午4:43:19  38  * @version V1.0 39  */ 40 @SuppressWarnings("all") 41 public class HbaseBout implements IRichBolt{ 42      43       private static final long serialVersionUID = 1L; 44       private static final Logger LOG = LoggerFactory.getLogger(HbaseBout.class); 45        46       protected OutputCollector collector; 47       protected HbaseClient hbaseClient; 48       protected String tableName; 49       protected String configKey ="hbase.conf"; 50        51       //批处理大小 52       protected int batchSize = 15000; 53       List<Put> batchMutations; 54       List<Tuple> tupleBatch; 55       //tick Time 56       int flushIntervalSecs = 1; 57        58   /** 59    * @Description:Storm初始化 60    * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ] 61    */ 62     public void prepare(Map map, TopologyContext context, 63             OutputCollector collector) { 64          65         this.collector = collector; 66         Configuration hbConfig = HBaseConfiguration.create(); 67         Map<String,String> conf = (Map)map.get(this.configKey); 68          69         //获取Hbase配置 70         if (conf == null) { 71           throw new IllegalArgumentException("HBase configuration not found using key ‘" + this.configKey + "‘"); 72         } 73          74         //批大小 75         if(map.get("batchSize") != null){ 76             this.batchSize = new Integer(map.get("batchSize").toString()); 77         } 78         //Tick Time 79         if(map.get("flushIntervalSecs") != null){ 80             this.flushIntervalSecs = Integer.valueOf(map.get("flushIntervalSecs").toString()); 81         } 82          83         //Hbase 配置 84         for (String key : conf.keySet()) { 85           hbConfig.set(key, String.valueOf(conf.get(key))); 86         } 87         this.hbaseClient = new HbaseClient(hbConfig, this.tableName); 88     } 89  90   /** 91    * @Description:每次调用 92    * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ] 93    */ 94     @Override 95     public void execute(Tuple tuple) { 96          97          boolean flush = false; 98           99          try {100              //Tick101              if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){102                 LOG.debug("TICK received! current batch status ", Integer.valueOf(this.tupleBatch.size()), Integer.valueOf(this.batchSize)); 103                 flush = true;104             }else{105                 106                 Put put = new Put(tuple.getStringByField("rowKey").getBytes());107                 put.add("cf".getBytes(), "name".getBytes(), tuple.getStringByField("name").getBytes());108                 put.add("cf".getBytes(), "sex".getBytes(), tuple.getStringByField("sex").getBytes());109                 this.batchMutations.add(put);110                 this.tupleBatch.add(tuple);111                 112                 //当前tuple批大小113                 if (this.tupleBatch.size() >= this.batchSize) {114                   flush = true;115                 }116             }117              //持久化操作118              if ((flush) && (!this.tupleBatch.isEmpty())) {119                   this.hbaseClient.batchMutate(this.batchMutations);120                   LOG.debug("acknowledging tuples after batchMutate");121                   for (Iterator<Tuple> tuples = this.tupleBatch.iterator(); tuples.hasNext(); ) { 122                       Tuple t = (Tuple)tuples.next();123                         this.collector.ack(t);124                   }125                   this.tupleBatch.clear();126                   this.batchMutations.clear();127                }128         } catch (Exception e) {129               LOG.debug("inser batch fail");130               this.collector.reportError(e);131               for (Tuple t : this.tupleBatch) {132                   this.collector.fail(t);133                 }134               this.tupleBatch.clear();135               this.batchMutations.clear();136         }137     }138 139   /**140    * @Description:字段声明141    * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]142    */143     public void declareOutputFields(OutputFieldsDeclarer declarer) {144         145     }146     147     /**148        * @Description:配置149        * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]150      */151     @Override152     public Map<String, Object> getComponentConfiguration() {153         Config conf = new Config();154         conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,flushIntervalSecs);155         return conf;156     }157 158     159     /**160        * @Description:清理方法161        * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]162       */163     @Override164     public void cleanup() {165         166     }167 }168 169 /**170  * 171  * @ClassName: HbaseBout.java172  * @Description:Hbase操作相关类173  * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]174  * @date 2016年8月27日 下午6:06:34 175  * @version V1.0176  */177 class HbaseClient{178      179      private static final Logger LOG = LoggerFactory.getLogger(HbaseClient.class);180       HConnection hTablePool = null;181       HTableInterface table = null;182       183       /**184        * @Description:获取连接185        * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]186        * @param configuration:Hbase表配置187        * @param tableName:表名188        */189       public HbaseClient(final Configuration configuration, final String tableName)190       {191         try192         {193             hTablePool =  HConnectionManager.createConnection(configuration) ;194             this.table =  hTablePool.getTable(tableName);195         }196         catch (Exception e) {197           throw new RuntimeException("HBase create failed: " + e.getMessage(), e);198         }199       }200 201       /**202        * 203        * @ClassName: HbaseBout.java204        * @Description:批入库205        * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]206        * @date 2016年8月27日 下午6:05:39 207        * @version V1.0208        */209       public void batchMutate(List<Put> puts) throws Exception {210           211           try {212             this.table.put(puts);213         } catch (Exception e) {214               LOG.warn("Error insert batch to HBase.", e);215               throw e;216         }217       }218 }

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         

Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime