首页 > 代码库 > hadoop-HBase-observer的一个例子
hadoop-HBase-observer的一个例子
DESCRIPTION ENABLED
{NAME => ‘users‘, coprocessor$1 => ‘file:///home/u/myjar/UsersObserver.jar|test.hbase.inaction.example5_2.Users true
Observer|1001|‘, FAMILIES => [{NAME => ‘info‘, BLOOMFILTER => ‘NONE‘, REPLICATION_SCOPE => ‘0‘, COMPRESSION => ‘N
ONE‘, VERSIONS => ‘3‘, TTL => ‘2147483647‘, MIN_VERSIONS => ‘0‘, BLOCKSIZE => ‘65536‘, IN_MEMORY => ‘false‘, BLOCK
CACHE => ‘true‘}]}
1 row(s) in 0.0610 seconds
先disable表,然后alert一下,那个1001是优先级,别忘记enable
disable ‘users‘
alter ‘users‘, METHOD => ‘table_att‘, ‘coprocessor‘=>‘file:///home/u/mylib/UsersObserver.jar|test.hbase.inaction.example5_2.UsersObserver|1001|‘
alter ‘users‘, METHOD => ‘table_att_unset‘,NAME => ‘coprocessor$2‘ 删除
enable ‘users‘
hbase(main):022:0> scan ‘users‘
ROW COLUMN+CELL
id01 column=info:email, timestamp=1413963413002, value=http://www.mamicode.com/wyj@gmail.com
id01 column=info:name, timestamp=1413963413002, value=http://www.mamicode.com/wyj
id01 column=info:password, timestamp=1413963413002, value=http://www.mamicode.com/000000
id01 column=info:user, timestamp=1413963413002, value=http://www.mamicode.com/id01
id09 column=info:email, timestamp=1414566775616, value=http://www.mamicode.com/test9@gmail.com
id09 column=info:name, timestamp=1414566775616, value=http://www.mamicode.com/test9
id09 column=info:password, timestamp=1414566775616, value=http://www.mamicode.com/9
id09 column=info:user, timestamp=1414566775616, value=http://www.mamicode.com/id09
id99 column=info:email, timestamp=1414565339530, value=http://www.mamicode.com/test99@gmail.com
id99 column=info:name, timestamp=1414565339530, value=http://www.mamicode.com/test99
id99 column=info:password, timestamp=1414565339530, value=http://www.mamicode.com/=====01=====
id99 column=info:user, timestamp=1414565339530, value=http://www.mamicode.com/id99
invalID column=info:invalid_pass, timestamp=1414566775657, value=http://www.mamicode.com/invalid_pass:9
11 row(s) in 0.1290 seconds
start() 和stop()这两个方法,在表enable和disable时分别也会对应执行
这start()和stop()在连接池里获得连接与释放连接实在是多余
observer类似触发器,操作一张表的时候要注意递归调用,拦截到一次put后再put这个表就会递归
这个例子就是这样的,实际上没任何意义,就是个例子而已
注意锁,HBase是行级的锁,如果想put同一行是不会成功的
observer很不好调试,只能整一堆的log......
package test.hbase.inaction.example5_2; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import test.hbase.inaction.example2_4.User; import test.hbase.inaction.example2_4.UsersDAO; /* <span style="color:#ff0000;">先disable表,然后alert一下,那个1001是优先级,别忘记enable</span> disable 'users' alter 'users', METHOD => 'table_att', 'coprocessor'=>'file:///home/u/mylib/FollowsObserver.jar|test.hbase.inaction.example5_2.FollowsObserver|1001|' alter 'users', METHOD => 'table_att_unset',NAME => 'coprocessor$2'<span style="white-space:pre"> </span> enable 'users' describe 'users' */ public class UsersObserver extends BaseRegionObserver { private final Log log = LogFactory.getLog(this.getClass()); private HTablePool pool = null; public void start(CoprocessorEnvironment env) throws IOException { log.info("WYJ ---------------------------- start() begin"); pool = new HTablePool(env.getConfiguration(), Integer.MAX_VALUE); log.info("pool=" + pool); log.info("WYJ ---------------------------- start() end"); } public void stop(CoprocessorEnvironment env) throws IOException { log.info("WYJ ---------------------------- stop() begin"); pool.close(); log.info("WYJ ---------------------------- stop() end"); } public void postPut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, final WALEdit edit, final boolean writeToWAL) { log.info("WYJ: ================================================ postPut() begin"); try { byte[] table = e.getEnvironment().getRegion().getRegionInfo() .getTableName(); log.info("table=" + Bytes.toString(table)); if (!Bytes.equals(table, UsersDAO.TABLE_NAME)) { return; } String pass = Bytes.toString(put.get(UsersDAO.INFO_FAM, UsersDAO.PASS_COL).get(0).getValue()); log.info("pass=" + pass); byte[] user = put.get(UsersDAO.INFO_FAM, UsersDAO.USER_COL).get(0).getValue(); log.info("user=" + Bytes.toString(user)); if (pass == null || pass.length() < 6) { log.info("password.length=" + pass.length()); UsersDAO dao = new UsersDAO(pool); log.info("pool=" + pool); dao.addUser("id99", "test99", "test99@gmail.com", "=====01====="); log.info("dao.addUser(\"id99\", \"test99\", \"test99@gmail.com\", \"99999999\"); FINAL"); } } catch (Exception ex) { log.error(ex, ex); } log.info("WYJ: ================================================ postPut() end"); } }
hadoop-HBase-observer的一个例子