首页 > 代码库 > hbase 结合MapReduce 批量导入
hbase 结合MapReduce 批量导入
hbase结合Mapreduce的批量导入:
直接给出代码讲述:(具体操作结合代码中的注释)
package hbase; import java.io.IOException; import java.net.URISyntaxException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class hbaseApp { /** * @user XD 基本思路:先创建表 --> 书写MapReduce批量导入 */ static enum Num{ exNum } //创建表 @SuppressWarnings("deprecation") public static void createTable() throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ //配置 必须书写 Configuration conf = HBaseConfiguration.create(); String tableName = "wlan"; //表名 String family_name = "content"; //列族 conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase"); conf.set("hbase.zookeeper.quorum","localhost"); final HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); if(!hbaseAdmin.tableExists(tableName)){ HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor family = new HColumnDescriptor(family_name); tableDescriptor.addFamily(family); hbaseAdmin.createTable(tableDescriptor); } } //导入的文件 static final String INPUT_PATH = "hdfs://localhost:9000/input1/wlan"; public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException { hbaseApp.createTable(); final Configuration conf = new Configuration(); conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase"); conf.set("hbase.zookeeper.quorum","localhost"); //表名 conf.set(TableOutputFormat.OUTPUT_TABLE,"wlan"); conf.set("dfs.socket.timeout", "180000"); Job job = new Job(conf,hbaseApp.class.getSimpleName()); FileInputFormat.setInputPaths(job, INPUT_PATH); job.setMapperClass(Map.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setJarByClass(hbaseApp.class); job.setReducerClass(Reduce.class); //直接创建表 和 导入数据 到hbase里面 所以不需要指定 输出文件路径 输出reducer类型 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass( TableOutputFormat.class); job.waitForCompletion(true); } static class Map extends Mapper <LongWritable , Text , LongWritable , Text >{ //时间格式 SimpleDateFormat format1 = new SimpleDateFormat("yy-MM-dd HH:mm:ss"); private Text v2 = new Text(); protected void map(LongWritable key , Text value , Context context) throws IOException, InterruptedException{ final String[] splited = value.toString().split("\t"); try{ final Date date = new Date(Long.parseLong(splited[0].trim())); final String dateFormat = format1.format(date); String rowKey = splited[1]+":"+dateFormat; //行键 v2.set(rowKey + "\t" + value.toString()); context.write(key, v2); }catch(NumberFormatException e){ final Counter counter = context.getCounter(Num.exNum); counter.increment(1L); System.out.println("出错"+splited[0]+" "+e.getMessage()); } } } //注意是TableReducer static class Reduce extends TableReducer <LongWritable , Text , NullWritable>{ protected void reduce(LongWritable key , Iterable<Text>values , Context context) throws IOException, InterruptedException{ for(Text val : values){ final String[] splited = val.toString().split("\t"); final Put put = new Put(Bytes.toBytes(splited[0])); //行键 put.add(Bytes.toBytes("content"),Bytes.toBytes("phone"),Bytes.toBytes(splited[1])); //列族, 列, 列值 context.write(NullWritable.get(), put); } } } }
结果如下:
对应表中的行键 列族 列 列值
hbase 结合MapReduce 批量导入
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。