首页 > 代码库 > Hbase创建表插入查询数据案例
Hbase创建表插入查询数据案例
package org.test;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
/*
* tab_global param:userid
*
* tab_user2id info:id
*
* tab_id2user info:username, info:password
*
* tab_users user:follow user:followd user:inbox user:sent
*
* tab_post post:content
*
* */
//hbase接口类
public class HbaseIf {
Configuration conf;
public static HbaseIf ghbase = null;
public static HbaseIf getInstance(){
if(ghbase == null)
ghbase = new HbaseIf();
return ghbase;
}
HbaseIf() {
conf = HBaseConfiguration.create();
}
//创建表的方法
public void create_table(String name, String col, int version)
throws Exception {
HBaseAdmin admin = new HBaseAdmin(conf);
//先检查表是否存在
if (admin.tableExists(name)) {
admin.disableTable(name);
admin.deleteTable(name);
}
HTableDescriptor tableDesc = new HTableDescriptor(name);
HColumnDescriptor hd = new HColumnDescriptor(col);
hd.setMaxVersions(version);
tableDesc.addFamily(hd);
admin.createTable(tableDesc);
admin.close();
}
public List<Post> getPost(String username) throws Exception{
List<Post> list = new ArrayList<Post>();
long id = this.getIdByUsername(username);
//byte[] begin = Bytes.add(Bytes.toBytes(id), Bytes.toBytes(Long.MAX_VALUE-Long.MAX_VALUE));
byte[] begin = Bytes.toBytes(id);
//byte[] end = Bytes.add(Bytes.toBytes(id), Bytes.toBytes(Long.MAX_VALUE));
byte[] end = Bytes.toBytes(id+1);
Scan s = new Scan();
s.setStartRow(begin);
s.setStopRow(end);
HTable tab_post = new HTable(conf, "tab_post");
HTable tab_inbox = new HTable(conf, "tab_inbox");
ResultScanner ss = tab_inbox.getScanner(s);
Get get = null;
Post p = null;
for (Result r : ss) {
byte[] postid = r.getValue(Bytes.toBytes("postid"), null);
get = new Get(postid);
Result rs = tab_post.get(get);
String post_username = Bytes.toString(rs.getValue(Bytes.toBytes("post"), Bytes.toBytes("username")));
String post_content = Bytes.toString(rs.getValue(Bytes.toBytes("post"), Bytes.toBytes("content")));
String post_ts = Bytes.toString(rs.getValue(Bytes.toBytes("post"), Bytes.toBytes("ts")));
p = new Post(post_username, post_content, post_ts);
list.add(0,p);
}
return list;
}
public boolean post(String username, String content)
throws Exception {
HTable tab_global = new HTable(conf, "tab_global");
HTable tab_post = new HTable(conf, "tab_post");
long id = tab_global.incrementColumnValue(Bytes.toBytes("row_postid"),
Bytes.toBytes("param"), Bytes.toBytes("postid"), 1);
byte[] postid = Bytes.toBytes(id);
// insert record in tab_post
Put put = new Put(postid);
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String ts = dateFormat.format(new Date());
put.add(Bytes.toBytes("post"), Bytes.toBytes("username"), username.getBytes());
put.add(Bytes.toBytes("post"), Bytes.toBytes("content"), content.getBytes());
put.add(Bytes.toBytes("post"), Bytes.toBytes("ts"), ts.getBytes());
tab_post.put(put);
tab_global.close();
tab_post.close();
// send the post
long senderid = this.getIdByUsername(username);
System.out.println("sender id:" + senderid);
byte[] begin = Bytes.add(Bytes.toBytes(senderid), Bytes.toBytes(Long.MAX_VALUE-Long.MAX_VALUE));
byte[] end = Bytes.add(Bytes.toBytes(senderid), Bytes.toBytes(Long.MAX_VALUE));
Scan s = new Scan();
s.setStartRow(begin);
s.setStopRow(end);
HTable tab_followed = new HTable(conf, "tab_followed");
HTable tab_inbox = new HTable(conf, "tab_inbox");
ResultScanner ss = tab_followed.getScanner(s);
put = new Put(Bytes.add(Bytes.toBytes(senderid), postid));
put.add(Bytes.toBytes("postid"), null, postid);
tab_inbox.put(put);
for (Result r : ss) {
byte[] did = r.getValue(Bytes.toBytes("userid"), null);
put = new Put(Bytes.add(did, postid));
put.add(Bytes.toBytes("postid"), null, postid);
tab_inbox.put(put);
}
tab_followed.close();
tab_inbox.close();
return true;
}
//执行创建表方法
public void createTables() throws Exception {
// create tag_global and initialization
create_table("tab_global", "param", 1);
HTable ht = new HTable(conf, "tab_global");
Put put = new Put(Bytes.toBytes("row_userid"));
long id = 0;
put.add(Bytes.toBytes("param"), Bytes.toBytes("userid"),
Bytes.toBytes(id));
ht.put(put);
put = new Put(Bytes.toBytes("row_postid"));
put.add(Bytes.toBytes("param"), Bytes.toBytes("postid"),
Bytes.toBytes(id));
ht.put(put);
// create tab_user2id
create_table("tab_user2id", "info", 1);
// create tab_id2user
create_table("tab_id2user", "info", 1);
/*
* tab_follow rowkey:userid CF:name:userid => username version => 1
*/
create_table("tab_follow", "name", 1);
/*
* tab_followed rowkey:userid_{userid} CF:userid => userid
*/
create_table("tab_followed", "userid", 1);
/*
* tab_post
* rowkey:postid
* CF:content
* */
create_table("tab_post", "post", 1);
/*
* tab_inbox
* rowkey:userid+postid
* CF:postid
*/
create_table("tab_inbox", "postid", 1);
}
//获取所有用户
public Set<String> getAllUser() throws Exception {
Set<String> set = new HashSet<String>();
HTable tab_user2id = new HTable(conf, "tab_user2id");
Scan s = new Scan();
ResultScanner ss = tab_user2id.getScanner(s);
for (Result r : ss) {
String name = new String(r.getRow());
set.add(name);
System.out.print(name);
}
return set;
}
public Set<String> getFollow(String username) throws Exception {
long id = this.getIdByUsername(username);
Set<String> set = new HashSet<String>();
HTable tab_follow = new HTable(conf, "tab_follow");
Get get = new Get(Bytes.toBytes(id));
Result rs = tab_follow.get(get);
for (KeyValue kv : rs.raw()) {
String s = new String(kv.getValue());
set.add(s);
System.out.println(s);
}
tab_follow.close();
return set;
}
public boolean alreadyFollow(long oid, long did) throws Exception {
HTable tab_users = new HTable(conf, "tab_users");
Get get = new Get(Bytes.toBytes(oid));
get.setMaxVersions(500);
Result rs = tab_users.get(get);
List<KeyValue> list = rs.getColumn(Bytes.toBytes("user"),
Bytes.toBytes("follow"));
tab_users.close();
for (KeyValue kv : list) {
if (did == Bytes.toLong(kv.getValue()))
return true;
}
return false;
}
public boolean follow(String oname, String dname) throws Exception {
long oid = this.getIdByUsername(oname);
long did = this.getIdByUsername(dname);
if (oid == 0 || did == 0 || oid == did)
return false;
/*
* tab_follow rowkey:userid CF:name:userid => username version => 1
*/
HTable tab_follow = new HTable(conf, "tab_follow");
Put put = new Put(Bytes.toBytes(oid));
put.add(Bytes.toBytes("name"), Bytes.toBytes(did), dname.getBytes());
tab_follow.put(put);
tab_follow.close();
/*
* tab_followed rowkey:userid_{userid} CF:userid => userid
*/
HTable tab_followed = new HTable(conf, "tab_followed");
put = new Put(Bytes.add(Bytes.toBytes(did), Bytes.toBytes(oid)));
put.add(Bytes.toBytes("userid"), null, Bytes.toBytes(oid));
tab_followed.put(put);
tab_followed.close();
return true;
}
public boolean unfollow(String oname, String dname) throws Exception {
long oid = this.getIdByUsername(oname);
long did = this.getIdByUsername(dname);
if (oid == 0 || did == 0 || oid == did)
return false;
/*
* tab_follow rowkey:userid CF:name:userid => username version => 1
*/
HTable tab_follow = new HTable(conf, "tab_follow");
Delete del = new Delete(Bytes.toBytes(oid));
del.deleteColumns(Bytes.toBytes("name"), Bytes.toBytes(did));
tab_follow.delete(del);
tab_follow.close();
/*
* tab_followed rowkey:userid_{userid} CF:userid => userid
*/
HTable tab_followed = new HTable(conf, "tab_followed");
del = new Delete(Bytes.add(Bytes.toBytes(did), Bytes.toBytes(oid)));
tab_followed.delete(del);
tab_followed.close();
return true;
}
public boolean deleteUser(long id) throws Exception {
String username = getNameById(id);
if (username.equals(""))
return false;
HTable tab_user2id = new HTable(conf, "tab_user2id");
HTable tab_id2user = new HTable(conf, "tab_id2user");
Delete del = new Delete(username.getBytes());
tab_user2id.delete(del);
del = new Delete(Bytes.toBytes(id));
tab_id2user.delete(del);
tab_user2id.close();
tab_id2user.close();
return true;
}
//添加用户
public boolean createNewUser(String name, String password)
throws IOException {
HTable tab_global = new HTable(conf, "tab_global");
HTable tab_user2id = new HTable(conf, "tab_user2id");
HTable tab_id2user = new HTable(conf, "tab_id2user");
if (tab_user2id.exists(new Get(name.getBytes())))
return false;
long id = tab_global.incrementColumnValue(Bytes.toBytes("row_userid"),
Bytes.toBytes("param"), Bytes.toBytes("userid"), 1);
// insert record in tab_user2id
Put put = new Put(name.getBytes());
put.add(Bytes.toBytes("info"), Bytes.toBytes("id"), Bytes.toBytes(id));
tab_user2id.put(put);
// insert record in tab_id2user
put = new Put(Bytes.toBytes(id));
put.add(Bytes.toBytes("info"), Bytes.toBytes("username"),
Bytes.toBytes(name));
put.add(Bytes.toBytes("info"), Bytes.toBytes("password"),
Bytes.toBytes(password));
tab_id2user.put(put);
tab_global.close();
tab_user2id.close();
tab_id2user.close();
return true;
}
//通过id获取用户用户名
public String getNameById(long id) {
try {
HTable tab_id2user = new HTable(conf, "tab_id2user");
Result rs = tab_id2user.get(new Get(Bytes.toBytes(id)));
//获取最新一列
KeyValue kv = rs.getColumnLatest(Bytes.toBytes("info"),
Bytes.toBytes("username"));
return Bytes.toString(kv.getValue());
} catch (Exception e) {
return "";
}
}
public long getIdByUsername(String username) {
try {
HTable tab_user2id = new HTable(conf, "tab_user2id");
Result rs = searchByRowKey(tab_user2id, username);
KeyValue kv = rs.getColumnLatest(Bytes.toBytes("info"),
Bytes.toBytes("id"));
byte[] bid = kv.getValue();
return Bytes.toLong(bid);
} catch (Exception e) {
return 0;
}
}
// return 0:not matched >0:match
public long checkPassword(String name, String password) throws Exception {
HTable tab_user2id = new HTable(conf, "tab_user2id");
HTable tab_id2user = new HTable(conf, "tab_id2user");
if (!tab_user2id.exists(new Get(name.getBytes())))
return 0;
Result rs = searchByRowKey(tab_user2id, name);
KeyValue kv = rs.getColumnLatest(Bytes.toBytes("info"),
Bytes.toBytes("id"));
byte[] bid = kv.getValue();
Get get = new Get(bid);
rs = tab_id2user.get(get);
kv = rs.getColumnLatest(Bytes.toBytes("info"),
Bytes.toBytes("password"));
String passwordInDb = Bytes.toString(kv.getValue());
// System.out.println(passwordInDb);
if (!password.equals(passwordInDb))
return 0;
long id = Bytes.toLong(bid);
return id;
}
public Result searchByRowKey(HTable ht, String rk) throws Exception {
Get get = new Get(rk.getBytes());
Result rs = ht.get(get);
return rs;
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
HbaseIf hbase = new HbaseIf();
// hbase.createTables();
/*
* h.createTables(); if(h.createNewUser("robby1", "robby"))
* System.out.println("add user success"); else
* System.out.println("add user failed");
*/
hbase.createTables();
hbase.createNewUser("user1", "pwd1");
hbase.createNewUser("user2", "pwd1");
hbase.createNewUser("user3", "pwd1");
hbase.createNewUser("user4", "pwd1");
hbase.createNewUser("user5", "pwd1");
hbase.follow("user1", "user2");
hbase.follow("user3", "user2");
hbase.follow("user4", "user2");
}
}
Hbase创建表插入查询数据案例
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。