首页 > 代码库 > scala操作hbase案例

scala操作hbase案例

案例取自streaming-app项目

package com.asiainfo.ocdc.streaming.toolsimport org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.hbase.client.{Put, Result, Get, HTable}import org.apache.hadoop.hbase.util.Bytesimport scala.collection.mutableobject HbaseTool {  val table = new mutable.HashMap[String,HTable]()  var conf = HBaseConfiguration.create()  def setConf(c:Configuration)={    conf = c  }  def getTable(tableName:String):HTable={    table.getOrElse(tableName,{      println("----new connection ----")      val tbl = new HTable(conf, tableName)      table(tableName)= tbl      tbl    })  }  def getValue(tableName:String,rowKey:String,family:String,qualifiers:Array[String]):Array[(String,String)]={    var result:AnyRef = null    val table_t =getTable(tableName)    val row1 =  new Get(Bytes.toBytes(rowKey))    val HBaseRow = table_t.get(row1)    if(HBaseRow != null && !HBaseRow.isEmpty){      result = qualifiers.map(c=>{        (tableName+"."+c, Bytes.toString(HBaseRow.getValue(Bytes.toBytes(family), Bytes.toBytes(c))))      })    }    else{      result=qualifiers.map(c=>{        (tableName+"."+c,"null")  })    }    result.asInstanceOf[Array[(String,String)]]  }  def putValue(tableName:String,rowKey:String, family:String,qualifierValue:Array[(String,String)]) {    val table =getTable(tableName)    val new_row  = new Put(Bytes.toBytes(rowKey))    qualifierValue.map(x=>{      var column = x._1      val value = x._2      val tt = column.split("\\.")      if (tt.length == 2) column=tt(1)      if(!(value.isEmpty))        new_row.add(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value))    })    table.put(new_row)  }  val family = "F"}