首页 > 代码库 > sqoop 补充
sqoop 补充
1、用 sqoop 将MySQL中的数据导入hbase中
sqoop import \
--connect jdbc:mysql://***.***.*.***:3306/mysql \
--hbase-table Nbigdata \
--column-family gps \
--hbase-row-key id \
--username root \
--password 123456 \
--table ceshi
--compression-codec lzo -z ( -z 表示采用压缩方式)
mysql :数据库中的数据库名 Nbigdata:导入数据到hbase 中的表名 gps:列簇名 id:mysql中作为rowkey的字段名 ceshi:mysql表名 username ,password :连接MySQL的用户名和密码
如果 没有 指定--hbase-row-key id ,会自动用MySQL的第一列作为rowkey
2、用 sqoop 将MySQL中的数据导入hbase时,用MySQL中多个字段连接起来作为rowkey
第一种方法:
在MySQL表中插入一列 (列名:rowkey): update mysqltable set rowkey=姓名_出生年月_地点
再用sqoop语句导入
第二种方法:
扩展PutTransformer类,自定义导入HBase的put格式。并在执行导入命令时指定该类
操作步骤:
package com.gamewave.sqoop.extensions.tansformat_row_key; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.sqoop.hbase.PutTransformer; public class DdtMapInfoTransFormat extends PutTransformer { public static final Log LOG = LogFactory.getLog(DdtMapInfoTransFormat.class.getName()); private Map<String, byte[]> serializedFieldNames; public DdtMapInfoTransFormat() { serializedFieldNames = new TreeMap<String, byte[]>(); }<br> /** * Return the serialized bytes for a field name, using the cache if it‘s * already in there. */ private byte[] getFieldNameBytes(String fieldName) { byte[] cachedName = serializedFieldNames.get(fieldName); if (null != cachedName) { // Cache hit. We‘re done. return cachedName; } // Do the serialization and memoize the result. byte[] nameBytes = Bytes.toBytes(fieldName); serializedFieldNames.put(fieldName, nameBytes); return nameBytes; } @Override /** {@inheritDoc} */ public List<Put> getPutCommand(Map<String, Object> fields)throws IOException { String rowKeyCol = getRowKeyColumn(); String colFamily = getColumnFamily(); byte[] colFamilyBytes = Bytes.toBytes(colFamily); Object rowKey = fields.get(rowKeyCol); if (null == rowKey) { // If the row-key column is null, we don‘t insert this row. LOG.warn("Could not insert row with null value for row-key column: "+ rowKeyCol); return null; } Put put = new Put(Bytes.toBytes(rowKey.toString() + ":custom")); for (Map.Entry<String, Object> fieldEntry : fields.entrySet()) { String colName = fieldEntry.getKey(); if (!colName.equals(rowKeyCol)) { // This is a regular field, not the row key. // Add it if it‘s not null. Object val = fieldEntry.getValue(); if (null != val) { put.add(colFamilyBytes, getFieldNameBytes(colName),Bytes.toBytes(val.toString())); } } } return Collections.singletonList(put); } }
(1)扩展PutTransformer类
(2)包并放进sqoop-lib目录下,执行命令
./sqoop-import -D sqoop.hbase.insert.put.transformer.class=com.gamewave.sqoop.extensions.tansformat_row_key.DdtMapInfoTransFormat –connect jdbc:mysql://xxx.xxx.xxx.xxx/service --table ddt_map_info_copy --hbase-create-table --hbase-table ddt_map_info_copy --column-family info --split-by mapId --username root --P --compression-codec lzo
命令中-D为指定属性名称 参数值为键值对
sqoop.hbase.insert.put.transformer.class属性值为指定格式化类名称
(3)验证
进入hbase shell 可以查看得到rowkey后缀均为:custom,修改成功
sqoop 补充