Spark读写HBase示例

1、HBase shell查看表结构

hbase(main):002:0> desc 'SDAS_Person'
Table SDAS_Person is ENABLED                                                                               
SDAS_Person                                                                                                
COLUMN FAMILIES DESCRIPTION                                                                                
{NAME => 'cf0', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE',
 DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE =>
 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                   
{NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE',
 DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE =>
 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                   
{NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE',
 DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE =>
 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                   
3 row(s) in 0.0810 seconds
hbase(main):006:0> desc 'RESULT'
Table RESULT is ENABLED 
RESULT 
COLUMN FAMILIES DESCRIPTION 
{NAME => 'cf0', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE',
DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE =>
'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} 
1 row(s) in 0.0250 seconds

2、HBase shell插入数据

hbase(main):007:0> scan 'SDAS_Person'
ROW                         COLUMN+CELL                                                                    
 SDAS_1#1                   column=cf0:Age, timestamp=1505209254407, value=33                              
 SDAS_1#1                   column=cf0:CompanyID, timestamp=1505209254407, value=1                         
 SDAS_1#1                   column=cf0:InDate, timestamp=1505209254407, value=2017-08-02 16:02:08.49       
 SDAS_1#1                   column=cf0:Money, timestamp=1505209254407, value=5.20                          
 SDAS_1#1                   column=cf0:Name, timestamp=1505209254407, value=zhangsan                       
 SDAS_1#1                   column=cf0:PersonID, timestamp=1505209254407, value=1                          

 

3、pom.xml:

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.binary.version}</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>

 

4、源码:

package com.zxth.sdas.spark.apps
import org.apache.spark._  
import org.apache.spark.rdd.NewHadoopRDD  
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}  
import org.apache.hadoop.hbase.client.HBaseAdmin  
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes  
import org.apache.hadoop.hbase.client.Put  
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job 
import org.apache.hadoop.hbase.client.Result  
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

object HBaseOp {
  var total:Int = 0
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HBaseOp").setMaster("local")  
    val sc = new SparkContext(sparkConf)

    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum","master,slave1,slave2")  
    conf.set("hbase.zookeeper.property.clientPort", "2181")  
    conf.set(TableInputFormat.INPUT_TABLE, "SDAS_Person")

    //读取数据并转化成rdd
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])  
  
    val count = hBaseRDD.count()  
    println("nnn:" + count)
    hBaseRDD.foreach{case (_,result) =>{  
      //获取行键  
      val key = Bytes.toString(result.getRow)  
      //通过列族和列名获取列
      var obj = result.getValue("cf0".getBytes,"Name".getBytes)
      val name = if(obj==null) "" else Bytes.toString(obj)
      
      obj = result.getValue("cf0".getBytes,"Age".getBytes);
      val age:Int = if(obj == null) 0 else Bytes.toString(obj).toInt
      
      total = total + age
      println("Row key:"+key+" Name:"+name+" Age:"+age+" total:"+total)
    }} 
    var average:Double = total.toDouble/count.toDouble
    println("" + total + "/" + count + " average age:" + average.toString())
    
    //write hbase
    conf.set(TableOutputFormat.OUTPUT_TABLE, "RESULT")
    val job = new Job(conf)  
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
    job.setOutputValueClass(classOf[Result])    
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
    var arrResult:Array[String] = new Array[String](1)
    arrResult(0) = "1," + total + "," + average;
    //arrResult(0) = "1,100,11"

    val resultRDD = sc.makeRDD(arrResult)
    val saveRDD = resultRDD.map(_.split(',')).map{arr=>{  
      val put = new Put(Bytes.toBytes(arr(0)))
      put.add(Bytes.toBytes("cf0"),Bytes.toBytes("total"),Bytes.toBytes(arr(1)))  
      put.add(Bytes.toBytes("cf0"),Bytes.toBytes("average"),Bytes.toBytes(arr(2)))  
      (new ImmutableBytesWritable, put)   
    }}
    println("getConfiguration")
    var c = job.getConfiguration()
    println("save")
    saveRDD.saveAsNewAPIHadoopDataset(c)  
    
    sc.stop()
  }  
}

 

5、maven打包

mvn clean scala:compile compile package

 

6、提交运算

bin/spark-submit 
--jars $(echo /opt/hbase-1.2.6/lib/*.jar | tr ' ' ',') 
--class com.zxth.sdas.spark.apps.HBaseOp 
--master local 
sdas-spark-1.0.0.jar

 

内容来源于网络如有侵权请私信删除
你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!