首页 > 代码库 > Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime
Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime
转载请注明出处:Import博客园:http://www.cnblogs.com/thinkpad
注意:本文批处理只是Storm到Hbase批处理入库操作,并非Storm的API的批处理!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
1 package com.storm.hbaseTest; 2 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Iterator; 6 import java.util.List; 7 import java.util.Map; 8 9 import org.apache.commons.lang.StringUtils; 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.hbase.HBaseConfiguration; 12 import org.apache.hadoop.hbase.client.HConnection; 13 import org.apache.hadoop.hbase.client.HConnectionManager; 14 import org.apache.hadoop.hbase.client.HTableInterface; 15 import org.apache.hadoop.hbase.client.Put; 16 17 import backtype.storm.Config; 18 import backtype.storm.Constants; 19 import backtype.storm.task.OutputCollector; 20 import backtype.storm.task.TopologyContext; 21 import backtype.storm.topology.BasicOutputCollector; 22 import backtype.storm.topology.IRichBolt; 23 import backtype.storm.topology.OutputFieldsDeclarer; 24 import backtype.storm.topology.base.BaseBasicBolt; 25 import backtype.storm.tuple.Fields; 26 import backtype.storm.tuple.Tuple; 27 28 import org.slf4j.Logger; 29 import org.slf4j.LoggerFactory; 30 31 import com.google.common.collect.Lists; 32 33 /** 34 * @ClassName: HbaseBout.java 35 * @Description:自定义批入库,定时批量和大小批量 36 * @author Import QQ:99123550 博客 [ http://www.cnblogs.com/thinkpad ] 37 * @date 2016年8月27日 下午4:43:19 38 * @version V1.0 39 */ 40 @SuppressWarnings("all") 41 public class HbaseBout implements IRichBolt{ 42 43 private static final long serialVersionUID = 1L; 44 private static final Logger LOG = LoggerFactory.getLogger(HbaseBout.class); 45 46 protected OutputCollector collector; 47 protected HbaseClient hbaseClient; 48 protected String tableName; 49 protected String configKey ="hbase.conf"; 50 51 //批处理大小 52 protected int batchSize = 15000; 53 List<Put> batchMutations; 54 List<Tuple> tupleBatch; 55 //tick Time 56 int flushIntervalSecs = 1; 57 58 /** 59 * @Description:Storm初始化 60 * @author Import QQ:99123550 博客 [ http://www.cnblogs.com/thinkpad ] 61 */ 62 public void prepare(Map map, TopologyContext context, 63 OutputCollector collector) { 64 65 this.collector = collector; 66 Configuration hbConfig = HBaseConfiguration.create(); 67 Map<String,String> conf = (Map)map.get(this.configKey); 68 69 //获取Hbase配置 70 if (conf == null) { 71 throw new IllegalArgumentException("HBase configuration not found using key ‘" + this.configKey + "‘"); 72 } 73 74 //批大小 75 if(map.get("batchSize") != null){ 76 this.batchSize = new Integer(map.get("batchSize").toString()); 77 } 78 //Tick Time 79 if(map.get("flushIntervalSecs") != null){ 80 this.flushIntervalSecs = Integer.valueOf(map.get("flushIntervalSecs").toString()); 81 } 82 83 //Hbase 配置 84 for (String key : conf.keySet()) { 85 hbConfig.set(key, String.valueOf(conf.get(key))); 86 } 87 this.hbaseClient = new HbaseClient(hbConfig, this.tableName); 88 } 89 90 /** 91 * @Description:每次调用 92 * @author Import QQ:99123550 博客 [ http://www.cnblogs.com/thinkpad ] 93 */ 94 @Override 95 public void execute(Tuple tuple) { 96 97 boolean flush = false; 98 99 try {100 //Tick101 if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){102 LOG.debug("TICK received! current batch status ", Integer.valueOf(this.tupleBatch.size()), Integer.valueOf(this.batchSize)); 103 flush = true;104 }else{105 106 Put put = new Put(tuple.getStringByField("rowKey").getBytes());107 put.add("cf".getBytes(), "name".getBytes(), tuple.getStringByField("name").getBytes());108 put.add("cf".getBytes(), "sex".getBytes(), tuple.getStringByField("sex").getBytes());109 this.batchMutations.add(put);110 this.tupleBatch.add(tuple);111 112 //当前tuple批大小113 if (this.tupleBatch.size() >= this.batchSize) {114 flush = true;115 }116 }117 //持久化操作118 if ((flush) && (!this.tupleBatch.isEmpty())) {119 this.hbaseClient.batchMutate(this.batchMutations);120 LOG.debug("acknowledging tuples after batchMutate");121 for (Iterator<Tuple> tuples = this.tupleBatch.iterator(); tuples.hasNext(); ) { 122 Tuple t = (Tuple)tuples.next();123 this.collector.ack(t);124 }125 this.tupleBatch.clear();126 this.batchMutations.clear();127 }128 } catch (Exception e) {129 LOG.debug("inser batch fail");130 this.collector.reportError(e);131 for (Tuple t : this.tupleBatch) {132 this.collector.fail(t);133 }134 this.tupleBatch.clear();135 this.batchMutations.clear();136 }137 }138 139 /**140 * @Description:字段声明141 * @author Import QQ:99123550 博客 [ http://www.cnblogs.com/thinkpad ]142 */143 public void declareOutputFields(OutputFieldsDeclarer declarer) {144 145 }146 147 /**148 * @Description:配置149 * @author Import QQ:99123550 博客 [ http://www.cnblogs.com/thinkpad ]150 */151 @Override152 public Map<String, Object> getComponentConfiguration() {153 Config conf = new Config();154 conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,flushIntervalSecs);155 return conf;156 }157 158 159 /**160 * @Description:清理方法161 * @author Import QQ:99123550 博客 [ http://www.cnblogs.com/thinkpad ]162 */163 @Override164 public void cleanup() {165 166 }167 }168 169 /**170 * 171 * @ClassName: HbaseBout.java172 * @Description:Hbase操作相关类173 * @author Import QQ:99123550 博客 [ http://www.cnblogs.com/thinkpad ]174 * @date 2016年8月27日 下午6:06:34 175 * @version V1.0176 */177 class HbaseClient{178 179 private static final Logger LOG = LoggerFactory.getLogger(HbaseClient.class);180 HConnection hTablePool = null;181 HTableInterface table = null;182 183 /**184 * @Description:获取连接185 * @author Import QQ:99123550 博客 [ http://www.cnblogs.com/thinkpad ]186 * @param configuration:Hbase表配置187 * @param tableName:表名188 */189 public HbaseClient(final Configuration configuration, final String tableName)190 {191 try192 {193 hTablePool = HConnectionManager.createConnection(configuration) ;194 this.table = hTablePool.getTable(tableName);195 }196 catch (Exception e) {197 throw new RuntimeException("HBase create failed: " + e.getMessage(), e);198 }199 }200 201 /**202 * 203 * @ClassName: HbaseBout.java204 * @Description:批入库205 * @author Import QQ:99123550 博客 [ http://www.cnblogs.com/thinkpad ]206 * @date 2016年8月27日 下午6:05:39 207 * @version V1.0208 */209 public void batchMutate(List<Put> puts) throws Exception {210 211 try {212 this.table.put(puts);213 } catch (Exception e) {214 LOG.warn("Error insert batch to HBase.", e);215 throw e;216 }217 }218 }
Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime