首页 > 代码库 > Hbase调用JavaAPI实现批量导入操作

Hbase调用JavaAPI实现批量导入操作

将手机上网日志文件批量导入到Hbase中,操作步骤:

1、将日志文件(请下载附件)上传到HDFS中,利用hadoop的操作命令上传:hadoop  fs -put input  /


 

2、创建Hbase表,通过Java操作

 

Java代码  收藏代码
  1. package com.jiewen.hbase;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.hbase.HBaseConfiguration;  
  7. import org.apache.hadoop.hbase.HColumnDescriptor;  
  8. import org.apache.hadoop.hbase.HTableDescriptor;  
  9. import org.apache.hadoop.hbase.client.Get;  
  10. import org.apache.hadoop.hbase.client.HBaseAdmin;  
  11. import org.apache.hadoop.hbase.client.HTable;  
  12. import org.apache.hadoop.hbase.client.Put;  
  13. import org.apache.hadoop.hbase.client.Result;  
  14. import org.apache.hadoop.hbase.client.ResultScanner;  
  15. import org.apache.hadoop.hbase.client.Scan;  
  16. import org.apache.hadoop.hbase.util.Bytes;  
  17.   
  18. public class HbaseDemo {  
  19.   
  20.     public static void main(String[] args) throws IOException {  
  21.         String tableName = "wlan_log";  
  22.         String columnFamily = "cf";  
  23.   
  24.         HbaseDemo.create(tableName, columnFamily);  
  25.   
  26.         // HbaseDemo.put(tableName, "row1", columnFamily, "cl1", "data");  
  27.         // HbaseDemo.get(tableName, "row1");  
  28.         // HbaseDemo.scan(tableName);  
  29.         // HbaseDemo.delete(tableName);  
  30.     }  
  31.   
  32.     // hbase操作必备  
  33.     private static Configuration getConfiguration() {  
  34.         Configuration conf = HBaseConfiguration.create();  
  35.         conf.set("hbase.rootdir""hdfs://hadoop1:9000/hbase");  
  36.         // 使用eclipse时必须添加这个,否则无法定位  
  37.         conf.set("hbase.zookeeper.quorum""hadoop1");  
  38.         return conf;  
  39.     }  
  40.   
  41.     // 创建一张表  
  42.     public static void create(String tableName, String columnFamily)  
  43.             throws IOException {  
  44.         HBaseAdmin admin = new HBaseAdmin(getConfiguration());  
  45.         if (admin.tableExists(tableName)) {  
  46.             System.out.println("table exists!");  
  47.         } else {  
  48.             HTableDescriptor tableDesc = new HTableDescriptor(tableName);  
  49.             tableDesc.addFamily(new HColumnDescriptor(columnFamily));  
  50.             admin.createTable(tableDesc);  
  51.             System.out.println("create table success!");  
  52.         }  
  53.     }  
  54.   
  55.     // 添加一条记录  
  56.     public static void put(String tableName, String row, String columnFamily,  
  57.             String column, String data) throws IOException {  
  58.         HTable table = new HTable(getConfiguration(), tableName);  
  59.         Put p1 = new Put(Bytes.toBytes(row));  
  60.         p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes  
  61.                 .toBytes(data));  
  62.         table.put(p1);  
  63.         System.out.println("put‘" + row + "‘," + columnFamily + ":" + column  
  64.                 + "‘,‘" + data + "‘");  
  65.     }  
  66.   
  67.     // 读取一条记录  
  68.     public static void get(String tableName, String row) throws IOException {  
  69.         HTable table = new HTable(getConfiguration(), tableName);  
  70.         Get get = new Get(Bytes.toBytes(row));  
  71.         Result result = table.get(get);  
  72.         System.out.println("Get: " + result);  
  73.     }  
  74.   
  75.     // 显示所有数据  
  76.     public static void scan(String tableName) throws IOException {  
  77.         HTable table = new HTable(getConfiguration(), tableName);  
  78.         Scan scan = new Scan();  
  79.         ResultScanner scanner = table.getScanner(scan);  
  80.         for (Result result : scanner) {  
  81.             System.out.println("Scan: " + result);  
  82.         }  
  83.     }  
  84.   
  85.     // 删除表  
  86.     public static void delete(String tableName) throws IOException {  
  87.         HBaseAdmin admin = new HBaseAdmin(getConfiguration());  
  88.         if (admin.tableExists(tableName)) {  
  89.             try {  
  90.                 admin.disableTable(tableName);  
  91.                 admin.deleteTable(tableName);  
  92.             } catch (IOException e) {  
  93.                 e.printStackTrace();  
  94.                 System.out.println("Delete " + tableName + " 失败");  
  95.             }  
  96.         }  
  97.         System.out.println("Delete " + tableName + " 成功");  
  98.     }  
  99.   
  100. }  

 

3、将日志文件导入Hbase表wlan_log中:

 

Java代码  收藏代码
  1. import java.text.SimpleDateFormat;  
  2. import java.util.Date;  
  3.   
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.hbase.client.Put;  
  6. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  
  7. import org.apache.hadoop.hbase.mapreduce.TableReducer;  
  8. import org.apache.hadoop.hbase.util.Bytes;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.NullWritable;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.mapreduce.Counter;  
  13. import org.apache.hadoop.mapreduce.Job;  
  14. import org.apache.hadoop.mapreduce.Mapper;  
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  17.   
  18. public class HbaseBatchImport {  
  19.   
  20.     public static void main(String[] args) throws Exception {  
  21.         final Configuration configuration = new Configuration();  
  22.         // 设置zookeeper  
  23.         configuration.set("hbase.zookeeper.quorum""hadoop1");  
  24.   
  25.         // 设置hbase表名称  
  26.         configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");  
  27.   
  28.         // 将该值改大,防止hbase超时退出  
  29.         configuration.set("dfs.socket.timeout""180000");  
  30.   
  31.         final Job job = new Job(configuration, "HBaseBatchImport");  
  32.   
  33.         job.setMapperClass(BatchImportMapper.class);  
  34.         job.setReducerClass(BatchImportReducer.class);  
  35.         // 设置map的输出,不设置reduce的输出类型  
  36.         job.setMapOutputKeyClass(LongWritable.class);  
  37.         job.setMapOutputValueClass(Text.class);  
  38.   
  39.         job.setInputFormatClass(TextInputFormat.class);  
  40.         // 不再设置输出路径,而是设置输出格式类型  
  41.         job.setOutputFormatClass(TableOutputFormat.class);  
  42.   
  43.         FileInputFormat.setInputPaths(job, "hdfs://hadoop1:9000/input");  
  44.   
  45.         job.waitForCompletion(true);  
  46.     }  
  47.   
  48.     static class BatchImportMapper extends  
  49.             Mapper<LongWritable, Text, LongWritable, Text> {  
  50.         SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss");  
  51.         Text v2 = new Text();  
  52.   
  53.         protected void map(LongWritable key, Text value, Context context)  
  54.                 throws java.io.IOException, InterruptedException {  
  55.             final String[] splited = value.toString().split("\t");  
  56.             try {  
  57.                 final Date date = new Date(Long.parseLong(splited[0].trim()));  
  58.                 final String dateFormat = dateformat1.format(date);  
  59.                 String rowKey = splited[1] + ":" + dateFormat;  
  60.                 v2.set(rowKey + "\t" + value.toString());  
  61.                 context.write(key, v2);  
  62.             } catch (NumberFormatException e) {  
  63.                 final Counter counter = context.getCounter("BatchImport",  
  64.                         "ErrorFormat");  
  65.                 counter.increment(1L);  
  66.                 System.out.println("出错了" + splited[0] + " " + e.getMessage());  
  67.             }  
  68.         };  
  69.     }  
  70.   
  71.     static class BatchImportReducer extends  
  72.             TableReducer<LongWritable, Text, NullWritable> {  
  73.         protected void reduce(LongWritable key,  
  74.                 java.lang.Iterable<Text> values, Context context)  
  75.                 throws java.io.IOException, InterruptedException {  
  76.             for (Text text : values) {  
  77.                 final String[] splited = text.toString().split("\t");  
  78.   
  79.                 final Put put = new Put(Bytes.toBytes(splited[0]));  
  80.                 put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes  
  81.                         .toBytes(splited[1]));  
  82.                 // 省略其他字段,调用put.add(....)即可  
  83.                 context.write(NullWritable.get(), put);  
  84.             }  
  85.         };  
  86.     }  
  87.   
  88. }  

 4、查看导入结果:



 

Hbase调用JavaAPI实现批量导入操作