首页 > 代码库 > api接口

api接口

  1. public class HBaseConn {
  2. private String rootDir;
  3. private String zkServer;
  4. private String port;
  5. private Configuration conf;
  6. private HConnection hConn = null;
  7. ...
  8. }

建立连接:

  1. public HBaseConn(String rootDir, String zkServer, String port) throws IOException{
  2. this.rootDir = rootDir;
  3. this.zkServer = zkServer;
  4. this.port = port;
  5. conf = HBaseConfiguration.create();
  6. conf.set("hbase.rootdir", rootDir);
  7. conf.set("hbase.zookeeper.quorum", zkServer);
  8. conf.set("hbase.zookeeper.property.clientPort", port);
  9. hConn = HConnectionManager.createConnection(conf);
  10. }
  1. String rootDir = "hdfs://itcast04:9000/hbase";
  2. String zkServer = "itcast04";
  3. String port = "2181";
  4. HBaseConn conn = new HBaseConn(rootDir, zkServer, port);

建表:

  1. public void createTable(String tableName, List<String> cols) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
  2. HBaseAdmin admin = new HBaseAdmin(conf);
  3. if(admin.tableExists(tableName)){
  4. throw new IOException("table exists");
  5. }else{
  6. HTableDescriptor tableDesc = new HTableDescriptor(tableName);
  7. for(String col : cols){
  8. HColumnDescriptor colDesc = new HColumnDescriptor(col);
  9. //采用压缩;
  10. colDesc.setCompressionType(Algorithm.GZ);
  11. colDesc.setDataBlockEncoding(DataBlockEncoding.DIFF);
  12. tableDesc.addFamily(colDesc);
  13. }
  14. admin.createTable(tableDesc);
  15. }
  16. }
  1. /*List<String> cols = new LinkedList<String>();
  2. cols.add("basicInfo");
  3. cols.add("moreInfo");
  4. conn.createTable("student", cols);*/

增加记录(put)(RowKey+列族+列标识+Cell):

  1. public void saveData(String tableName, List<Put> puts) throws IOException{
  2. HTableInterface table = hConn.getTable(tableName);
  3. table.put(puts);
  4. table.setAutoFlush(false);
  5. //提高IO吞吐率
  6. table.flushCommits();
  7. }
  1. /*List<Put> puts = new LinkedList<Put>();
  2. //RowKey
  3. Put put1 = new Put(Bytes.toBytes("Tom"));
  4. put1.add(Bytes.toBytes("basicInfo"), Bytes.toBytes("age"), Bytes.toBytes("27"));
  5. put1.add(Bytes.toBytes("moreInfo"), Bytes.toBytes("tel"), Bytes.toBytes("13846677467"));
  6. puts.add(put1);
  7. conn.saveData("student", puts);*/

查表(返回Get):

  1. public Result getData(String tableName, String rowKey) throws IOException{
  2. HTableInterface table = hConn.getTable(tableName);
  3. Get get = new Get(Bytes.toBytes(rowKey));
  4. return table.get(get);
  5. }

将查询到的Get中的KV输出:

  1. public void format(Result result){
  2. String rowKey = Bytes.toString(result.getRow());
  3. KeyValue[] kvs = result.raw();
  4. for(KeyValue kv : kvs){
  5. System.out.println("Column Family: " + Bytes.toString(kv.getFamily()));
  6. System.out.println("Column Name: " + Bytes.toString(kv.getQualifier()));
  7. System.out.println("Cell Value: " + Bytes.toString(kv.getValue()));
  8. }
  9. }
  1. // conn.format(conn.getData("student", "Tom"));

Scan表,利用format输出:

  1. public void hbaseScan(String tableName) throws Exception{
  2. Scan scan = new Scan();
  3. scan.setCaching(1000);
  4. HTableInterface table = hConn.getTable(tableName);
  5. ResultScanner scanner = table.getScanner(scan);
  6. for(Result res : scanner){
  7. format(res);
  8. }
  9. }

简单过滤器:

  1. public void filterTest(String tableName) throws Exception{
  2. Scan scan = new Scan();
  3. scan.setCaching(1000);
  4. // RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("Tom")));
  5. RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("T\\w+"));
  6. scan.setFilter(filter);
  7. HTableInterface table = hConn.getTable(tableName);
  8. ResultScanner scanner = table.getScanner(scan);
  9. for(Result res : scanner){
  10. format(res);
  11. }
  12. }

分页过滤器:

  1. public void pageFilterTest(String tableName) throws IOException{
  2. PageFilter filter = new PageFilter(4);
  3. byte[] lastRow = null;
  4. int pageCount = 0;
  5. HTableInterface table = hConn.getTable(tableName);
  6. while(++pageCount > 0){
  7. System.out.println("Page Count:" + pageCount);
  8. Scan scan = new Scan();
  9. scan.setFilter(filter);
  10. if(lastRow != null){
  11. scan.setStartRow(lastRow);
  12. }
  13. ResultScanner scanner = table.getScanner(scan);
  14. int count = 0;
  15. for(Result res : scanner){
  16. lastRow = res.getRow();
  17. if(++count > 3)
  18. break;
  19. format(res);
  20. }
  21. if(count < 3){
  22. break;
  23. }
  24. }
  25. }


附件列表

     

    api接口