首页 > 代码库 > 读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor
读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor
Coprocessor是HBase 0.92.0引入的特性。使用Coprocessor,可以将一些计算逻辑下推到HBase节点,HBase由一个单纯的存储系统升级为分布式数据处理平台。
Coprocessor分为两种:Observer和Endpoint。Observer能修改扩展已有的客户端操作功能,而Endpoint能引入新的客户端操作。
Observer
Observer的作用类似于数据库的触发器或者AOP中的advice。下图为Put操作增加Observer,其中1-2-4-6是一次正常的Put操作RPC调用过程,而3和5属于Observer,可以在Put操作之前和之后加入自定义处理逻辑。
Observer包括三种,RegionObserver(针对数据访问和更新操作,运行在Region上)/WALObserver(针对WAL日志事件,运行在RegionServer上下文)/MasterObserver(针对DDL操作,运行在Master节点)。
Endpoint
Endpoint的作用则类似于数据库存储过程。实现机制是通过扩展HBase RPC协议,给客户端暴露新的操作接口。如下图,客户端负责发起调用和收集结果,服务端各节点负责并行计算。
实战
以上一章的follows表为例,通过Observer实现followedBy被关注表数据一致性维护,Endpoint实现关注人数量统计。
因为要实现在插入follows表时自动插入followedBy表,需要用到关注人/被关注人用户名信息,所以首先升级schema。
实现Observer
代码中有三处注释值得注意:
- postPut方法在put操作之后被调用。
- 如果通过hbase-site.xml安装Observer,会应用到全局所有表,所以这里判断put操作的是否follows表。
- 这里有点bad smell。Observer运行在服务器端,为了共用代码,又调用客户端代码,仅为演示作用。
packageHBaseIA.TwitBase.coprocessors; //… publicclass FollowsObserver extends BaseRegionObserver { private HTablePool pool = null; @Override public void start(CoprocessorEnvironment env)throws IOException { pool = newHTablePool(env.getConfiguration(), Integer.MAX_VALUE); } @Override public void stop(CoprocessorEnvironment env)throws IOException { pool.close(); } @Override public void postPut(//1,在Put操作之后调用 finalObserverContext<RegionCoprocessorEnvironment> e, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException { byte[] table=e.getEnvironment().getRegion().getRegionInfo().getTableName(); if (!Bytes.equals(table,FOLLOWS_TABLE_NAME)) return; //2,判断表名 KeyValue kv =put.get(RELATION_FAM, FROM).get(0); String from =Bytes.toString(kv.getValue()); kv = put.get(RELATION_FAM,TO).get(0); String to =Bytes.toString(kv.getValue()); RelationsDAO relations = newRelationsDAO(pool); relations.addFollowedBy(to,from);//3,插入followedBy表 } }Observer的安装可以通过修改hbase-site.xml或者使用tableschema修改语句完成,前者需要重启HBase服务,后者只需要重新上下线对应表。
$ hbase shell HBaseShell; enter 'help<RETURN>' for list of supported commands. Type"exit<RETURN>" to leave the HBase Shell Version0.92.0, r1231986, Mon Jan 16 13:16:35 UTC 2012 hbase(main):001:0>disable 'follows' 0 row(s) in 7.0560 seconds hbase(main):002:0>alter 'follows', METHOD => 'table_att', 'coprocessor'=>'file:///Users/ndimiduk/repos/hbaseiatwitbase/ target/twitbase-1.0.0.jar |HBaseIA.TwitBase.coprocessors.FollowsObserver|1001|' Updatingall regions with the new schema... 1/1regions updated. Done. 0 row(s) in 1.0770 seconds hbase(main):003:0>enable 'follows' 0 row(s) in 2.0760 seconds
其中1001为优先级,当加载多个Observer时,按照优先级次序运行。
实现Endpoint
关注人数量统计可以通过客户端Scan实现,相比Endpoint方案,有两个待改进点:
- 传输所有关注人到客户端,不必要的网络I/O。
- 拿到所有关注人Result结果后,遍历实现计数是单线程的。
实现Endpoint包括三部分
定义PRC接口
publicinterface RelationCountProtocol extends CoprocessorProtocol { public long followedByCount(String userId) throwsIOException; }
服务端实现
和客户端不同,InternalScanner运行在特定Region上,返回的是原始的KeyValue对象。
packageHBaseIA.TwitBase.coprocessors; //… publicclass RelationCountImpl extends BaseEndpointCoprocessor implementsRelationCountProtocol { @Override public longfollowedByCount(String userId) throws IOException { byte[]startkey = Md5Utils.md5sum(userId); Scan scan = newScan(startkey); scan.setFilter(newPrefixFilter(startkey)); scan.addColumn(RELATION_FAM,FROM); scan.setMaxVersions(1); RegionCoprocessorEnvironmentenv= (RegionCoprocessorEnvironment)getEnvironment(); InternalScanner scanner =env.getRegion().getScanner(scan);//1,服务器端 long sum = 0; List<KeyValue> results= new ArrayList<KeyValue>(); boolean hasMore = false; do { hasMore =scanner.next(results); sum += results.size(); results.clear(); } while (hasMore); scanner.close(); return sum; } }
客户端代码
参考注释:
- 定义Call实例
- 调用服务端Endpoint。
- 聚合所有RegionServer得到的结果
public long followedByCount (final String userId) throws Throwable { HTableInterface followed =pool.getTable(FOLLOWED_TABLE_NAME); final byte[] startKey = Md5Utils.md5sum(userId); final byte[] endKey =Arrays.copyOf(startKey, startKey.length); endKey[endKey.length-1]++; Batch.Call<RelationCountProtocol,Long> callable = newBatch.Call<RelationCountProtocol, Long>() { @Override public Longcall(RelationCountProtocol instance) throws IOException { returninstance.followedByCount(userId); } };//1 call instance Map<byte[], Long>results = followed.coprocessorExec( RelationCountProtocol.class, startKey, endKey, callable);//2 invoke endpoint long sum = 0; for(Map.Entry<byte[],Long> e : results.entrySet()) { sum +=e.getValue().longValue(); }//3 aggreagte results return sum; }Endpoint只能通过配置文件部署,还需要将相关jar包加入到HBase classpath。
<property> <name>hbase.coprocessor.region.classes</name> <value>HBaseIA.TwitBase.coprocessors.RelationCountImpl</value> </property>