首页 > 代码库 > spark操作hbase
spark操作hbase
在Spark是一种计算框架,在Spark环境下,不仅支持操作单机文件,HDFS文件,同时也可以用Spark对Hbase操作。
企业中数据源会从HBase取出,这就涉及到了读取hbase数据,本文为了尽可能的让大家能尽快实践和操作Hbase,使用的是Spark Shell 来进行Hbase操作。
一、环境:
Haoop2.2.0
Hbase版本0.96.2-hadoop2, r1581096
Spark1.0.0
本文假设环境已经搭建好,Spark环境搭建可见Spark Haoop集群搭建
Hadoop2.2.0要注意和Hbase的版本兼容,这里Hbase采用0.96.2
二、原理
Spark操作HBase其实是和java client操作HBase的原理是一致的:
scala和java都是基于jvm的语言,只要将hbase的类加载到classpath内,即可调用操作,其它框架类似。
相同点:即都是当作client来连接HMaster,然后利用hbase的API来对Hbase进行操作。
不同点:唯一不同的是:Spark可以将Hbase的数据来当作RDD处理,从而利用Spark来进行并行计算。
三、实践
1、首先检查依赖jar包,在这之前如果hbase的jar包不在spark-shell的classpath里,则需要添加进来。
设置方法: 在Spark-evn.sh里添加SPARK_CLASSPATH=/home/victor/software/hbase/lib/*
这样再再启动启动bin/spark-shell, 启动完毕并且Worker成功注册上之后,import jar 包。
2、操作hbase
2.1 Hbase中数据
hbase里有张score表,里面有2个CF,分别为course和grade。数据如下:
hbase(main):001:0> scan 'scores' ROW COLUMN+CELL Jim column=course:art, timestamp=1404142440676, value=http://www.mamicode.com/67 >2.1 初始化连接参数
scala> import org.apache.spark._ import org.apache.spark._ scala> import org.apache.spark.rdd.NewHadoopRDD import org.apache.spark.rdd.NewHadoopRDD scala> import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration scala> import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat scala> val configuration = HBaseConfiguration.create(); //初始化配置 configuration: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hbase-default.xml, hbase-site.xml scala> configuration.set("hbase.zookeeper.property.clientPort", "2181"); //设置zookeeper client端口 scala> configuration.set("hbase.zookeeper.quorum", "localhost"); //设置zookeeper quorum scala> configuration.set("hbase.master", "localhost:60000"); //设置hbase master scala> configuration.addResource("/home/victor/software/hbase/conf/hbase-site.xml") //将hbase的配置加载 scala> import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.client.HBaseAdminscala> val hadmin = new HBaseAdmin(configuration); //实例化hbase管理 2014-07-01 00:39:24,649 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=90000 watcher=hconnection-0xc7eea5, quorum=localhost:2181, baseZNode=/hbase 2014-07-01 00:39:24,707 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(120)) - Process identifier=hconnection-0xc7eea5 connecting to ZooKeeper ensemble=localhost:2181 2014-07-01 00:39:24,753 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(966)) - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2014-07-01 00:39:24,755 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(849)) - Socket connection established to localhost/127.0.0.1:2181, initiating session 2014-07-01 00:39:24,938 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1207)) - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x146ed61c4ef0015, negotiated timeout = 40000 hadmin: org.apache.hadoop.hbase.client.HBaseAdmin = org.apache.hadoop.hbase.client.HBaseAdmin@1260466
接下来用haoop api来创建一个RDDscala> val hrdd = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], | classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], | classOf[org.apache.hadoop.hbase.client.Result]) 2014-07-01 00:51:06,683 WARN [main] util.SizeEstimator (Logging.scala:logWarning(70)) - Failed to check whether UseCompressedOops is set; assuming yes 2014-07-01 00:51:06,936 INFO [main] storage.MemoryStore (Logging.scala:logInfo(58)) - ensureFreeSpace(85877) called with curMem=0, maxMem=308910489 2014-07-01 00:51:06,946 INFO [main] storage.MemoryStore (Logging.scala:logInfo(58)) - Block broadcast_0 stored as values to memory (estimated size 83.9 KB, free 294.5 MB) hrdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] = NewHadoopRDD[0] at newAPIHadoopRDD at <console>:22读取记录:
这里我们take 1 条数据,可以看到格式是按照我们设定的HadoopRDD。key是一个不变的ImmutableBytesWritable,value是Hbase的Resultscala> hrdd take 1 2014-07-01 00:51:50,371 INFO [main] spark.SparkContext (Logging.scala:logInfo(58)) - Starting job: take at <console>:25 2014-07-01 00:51:50,423 INFO [spark-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Got job 0 (take at <console>:25) with 1 output partitions (allowLocal=true) 2014-07-01 00:51:50,425 INFO [spark-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Final stage: Stage 0(take at <console>:25) 2014-07-01 00:51:50,426 INFO [spark-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Parents of final stage: List() 2014-07-01 00:51:50,477 INFO [spark-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Missing parents: List() 2014-07-01 00:51:50,478 INFO [spark-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Computing the requested partition locally 2014-07-01 00:51:50,509 INFO [Local computation of job 0] rdd.NewHadoopRDD (Logging.scala:logInfo(58)) - Input split: localhost:, 2014-07-01 00:51:50,894 INFO [main] spark.SparkContext (Logging.scala:logInfo(58)) - Job finished: take at <console>:25, took 0.522612687 s res5: Array[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] = Array((4a 69 6d,keyvalues={Jim/course:art/1404142440676/Put/vlen=2/mvcc=0, Jim/course:math/1404142434405/Put/vlen=2/mvcc=0, Jim/grade:/1404142422653/Put/vlen=1/mvcc=0}))
找到Result对象scala> val res = hrdd.take(1) 2014-07-01 01:09:13,486 INFO [main] spark.SparkContext (Logging.scala:logInfo(58)) - Starting job: take at <console>:24 2014-07-01 01:09:13,487 INFO [spark-akka.actor.default-dispatcher-15] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Got job 4 (take at <console>:24) with 1 output partitions (allowLocal=true) 2014-07-01 01:09:13,487 INFO [spark-akka.actor.default-dispatcher-15] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Final stage: Stage 4(take at <console>:24) 2014-07-01 01:09:13,487 INFO [spark-akka.actor.default-dispatcher-15] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Parents of final stage: List() 2014-07-01 01:09:13,488 INFO [spark-akka.actor.default-dispatcher-15] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Missing parents: List() 2014-07-01 01:09:13,488 INFO [spark-akka.actor.default-dispatcher-15] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Computing the requested partition locally 2014-07-01 01:09:13,488 INFO [Local computation of job 4] rdd.NewHadoopRDD (Logging.scala:logInfo(58)) - Input split: localhost:, 2014-07-01 01:09:13,504 INFO [main] spark.SparkContext (Logging.scala:logInfo(58)) - Job finished: take at <console>:24, took 0.018069267 s res: Array[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] = Array((4a 69 6d,keyvalues={Jim/course:art/1404142440676/Put/vlen=2/mvcc=0, Jim/course:math/1404142434405/Put/vlen=2/mvcc=0, Jim/grade:/1404142422653/Put/vlen=1/mvcc=0})) scala> res(0) res33: (org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result) = (4a 69 6d,keyvalues={Jim/course:art/1404142440676/Put/vlen=2/mvcc=0, Jim/course:math/1404142434405/Put/vlen=2/mvcc=0, Jim/grade:/1404142422653/Put/vlen=1/mvcc=0}) scala> res(0)._2 res34: org.apache.hadoop.hbase.client.Result = keyvalues={Jim/course:art/1404142440676/Put/vlen=2/mvcc=0, Jim/course:math/1404142434405/Put/vlen=2/mvcc=0, Jim/grade:/1404142422653/Put/vlen=1/mvcc=0} scala> val rs = res(0)._2 rs: org.apache.hadoop.hbase.client.Result = keyvalues={Jim/course:art/1404142440676/Put/vlen=2/mvcc=0, Jim/course:math/1404142434405/Put/vlen=2/mvcc=0, Jim/grade:/1404142422653/Put/vlen=1/mvcc=0} scala> rs. asInstanceOf cellScanner containsColumn containsEmptyColumn containsNonEmptyColumn copyFrom getColumn getColumnCells getColumnLatest getColumnLatestCell getExists getFamilyMap getMap getNoVersionMap getRow getValue getValueAsByteBuffer isEmpty isInstanceOf list listCells loadValue raw rawCells setExists size toString value遍历这条记录,取出每个cell的值:scala> val kv_array = rs.raw warning: there were 1 deprecation warning(s); re-run with -deprecation for details kv_array: Array[org.apache.hadoop.hbase.KeyValue] = Array(Jim/course:art/1404142440676/Put/vlen=2/mvcc=0, Jim/course:math/1404142434405/Put/vlen=2/mvcc=0, Jim/grade:/1404142422653/Put/vlen=1/mvcc=0)遍历记录
scala> for(keyvalue <- kv) println("rowkey:"+ new String(keyvalue.getRow)+ " cf:"+new String(keyvalue.getFamily()) + " column:" + new String(keyvalue.getQualifier) + " " + "value:"+new String(keyvalue.getValue())) warning: there were 4 deprecation warning(s); re-run with -deprecation for details rowkey:Jim cf:course column:art value:67 rowkey:Jim cf:course column:math value:77 rowkey:Jim cf:grade column: value:3查询记录个数
scala> hrdd.count 2014-07-01 01:26:03,133 INFO [main] spark.SparkContext (Logging.scala:logInfo(58)) - Starting job: count at <console>:25 2014-07-01 01:26:03,134 INFO [spark-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Got job 5 (count at <console>:25) with 1 output partitions (allowLocal=false) 2014-07-01 01:26:03,134 INFO [spark-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Final stage: Stage 5(count at <console>:25) 2014-07-01 01:26:03,134 INFO [spark-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Parents of final stage: List() 2014-07-01 01:26:03,135 INFO [spark-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Missing parents: List() 2014-07-01 01:26:03,166 INFO [spark-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Submitting Stage 5 (NewHadoopRDD[0] at newAPIHadoopRDD at <console>:22), which has no missing parents 2014-07-01 01:26:03,397 INFO [spark-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Submitting 1 missing tasks from Stage 5 (NewHadoopRDD[0] at newAPIHadoopRDD at <console>:22) 2014-07-01 01:26:03,401 INFO [spark-akka.actor.default-dispatcher-16] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(58)) - Adding task set 5.0 with 1 tasks 2014-07-01 01:26:03,427 INFO [spark-akka.actor.default-dispatcher-16] scheduler.FairSchedulableBuilder (Logging.scala:logInfo(58)) - Added task set TaskSet_5 tasks to pool default 2014-07-01 01:26:03,439 INFO [spark-akka.actor.default-dispatcher-5] scheduler.TaskSetManager (Logging.scala:logInfo(58)) - Starting task 5.0:0 as TID 0 on executor 0: 192.168.2.105 (PROCESS_LOCAL) 2014-07-01 01:26:03,469 INFO [spark-akka.actor.default-dispatcher-5] scheduler.TaskSetManager (Logging.scala:logInfo(58)) - Serialized task 5.0:0 as 1305 bytes in 7 ms 2014-07-01 01:26:11,015 INFO [Result resolver thread-0] scheduler.TaskSetManager (Logging.scala:logInfo(58)) - Finished TID 0 in 7568 ms on 192.168.2.105 (progress: 1/1) 2014-07-01 01:26:11,017 INFO [Result resolver thread-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(58)) - Removed TaskSet 5.0, whose tasks have all completed, from pool default 2014-07-01 01:26:11,036 INFO [spark-akka.actor.default-dispatcher-4] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Completed ResultTask(5, 0) 2014-07-01 01:26:11,057 INFO [spark-akka.actor.default-dispatcher-4] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Stage 5 (count at <console>:25) finished in 7.605 s 2014-07-01 01:26:11,067 INFO [main] spark.SparkContext (Logging.scala:logInfo(58)) - Job finished: count at <console>:25, took 7.933270634 s res71: Long = 3
四、总结
Spark操作Hbase其实和java client操作Hbas大体流程是一致的,都是客户端去连接HMaster,最终利用java api来操作hbase。
只不过Spark提供了一种与RDD结合的概念,并且利用scala的语法简洁性,提高了编程效率。
——EOF——
原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/36071323
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。