首页 > 代码库 > hbase 结合MapReduce 批量导入

hbase 结合MapReduce 批量导入

hbase结合Mapreduce的批量导入:

直接给出代码讲述:(具体操作结合代码中的注释)

package hbase;

import java.io.IOException;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class hbaseApp {
	/**
	 * @user XD 基本思路:先创建表 --> 书写MapReduce批量导入
	 */
	static enum Num{
		exNum
	}
	//创建表
	@SuppressWarnings("deprecation")
	public static void createTable() throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
		//配置 必须书写
		Configuration conf = HBaseConfiguration.create();
		String tableName = "wlan";		//表名
		String family_name = "content";			//列族
		conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase");
		conf.set("hbase.zookeeper.quorum","localhost");
		final HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
		if(!hbaseAdmin.tableExists(tableName)){
			HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
			HColumnDescriptor family = new HColumnDescriptor(family_name);
			tableDescriptor.addFamily(family);
			hbaseAdmin.createTable(tableDescriptor);
		}
	}
	//导入的文件
	static final String INPUT_PATH = "hdfs://localhost:9000/input1/wlan";
	
	public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
		hbaseApp.createTable();
		final	Configuration conf = new Configuration();
		conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase");
		conf.set("hbase.zookeeper.quorum","localhost");
		//表名
		conf.set(TableOutputFormat.OUTPUT_TABLE,"wlan");
		conf.set("dfs.socket.timeout", "180000");
		
		Job job = new Job(conf,hbaseApp.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setMapperClass(Map.class);
		
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setJarByClass(hbaseApp.class);
		job.setReducerClass(Reduce.class);
		
		//直接创建表 和 导入数据 到hbase里面 所以不需要指定 输出文件路径 输出reducer类型
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass( TableOutputFormat.class);
		
		job.waitForCompletion(true);
	}
	static class Map extends Mapper <LongWritable , Text , LongWritable , Text >{
		//时间格式
		SimpleDateFormat format1 = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
		private Text v2 = new Text();
		
		protected void map(LongWritable key , Text value , Context context) throws IOException, InterruptedException{
			final String[] splited = value.toString().split("\t");
			try{
				final Date date = new Date(Long.parseLong(splited[0].trim()));
				final String dateFormat = format1.format(date);
				String rowKey = splited[1]+":"+dateFormat; 		//行键
				v2.set(rowKey + "\t" + value.toString());
				context.write(key, v2);
			}catch(NumberFormatException e){
				final Counter counter = context.getCounter(Num.exNum);
				counter.increment(1L);
				System.out.println("出错"+splited[0]+" "+e.getMessage());
			}		
		}
	}
	//注意是TableReducer
	static class Reduce extends TableReducer <LongWritable , Text , NullWritable>{
		protected void reduce(LongWritable key , Iterable<Text>values , Context context) throws IOException, InterruptedException{
			for(Text val : values){
				final String[] splited = val.toString().split("\t");
				final Put put = new Put(Bytes.toBytes(splited[0])); 		//行键
				put.add(Bytes.toBytes("content"),Bytes.toBytes("phone"),Bytes.toBytes(splited[1]));	//列族, 列, 列值
				context.write(NullWritable.get(), put);
			}
		}
	}
}

结果如下:


对应表中的行键 列族 列 列值

hbase 结合MapReduce 批量导入