SparkCore
RDD基础
定义
在 Spark 的编程接口中,每一个数据集都被表示为一个对象,称为 RDD。RDD 是 Resillient Distributed Dataset(弹性分布式数据集)的简称,是一个只读的(不可变的)、分区的(分布式的)、容错的、延迟计算的、类型推断的和可缓存的记录集合。
结构
RDD 由以下五部分组成:
-
一组 partition(分区),即组成整个数据集的块;
-
每个 partition(分区)的计算函数(用于计算数据集中所有行的函数);
-
所依赖的 RDD 列表(即父 RDD 列表);
-
(可选的)对于 key-value 类型的 RDD,则包含一个 Partitioner(默认是 HashPartitioner);
-
(可选的)每个 partition 数据驻留在集群中的位置(可选);如果数据存放在 HDFS 上,那么它就是块所在的位置。
特性
-
RDD 会优先使用内存;
-
一旦创建不可修改;
-
惰性执行;
-
可缓存,可复用;
-
可并行处理;
-
强类型,单一类型数据;
-
分区的;
-
可指定分区优先使用的节点。
SC与SS
SparkContext 从 Spark 1.x 引入的,在 2.0 中引入 SparkSession之前,用来作为 Spark 和 PySpark 的入口点。使用 RDD 编程和连接到 Spark Cluster 的第一步就是创建SparkContext。SparkContext 是在 org.apache.spark 包中定义的,它用于在集群中通过编程方式创建 SparkRDD、累加器和广播变量。
虽然 SparkContext 是 2.0 之前一个入口点,但它并没有被 SparkSession 完全取代,SparkContext 的许多特性仍然可用,并在 Spark 2.0 和以后的版本中使用。
SparkContext 构造函数在 2.0 中已经弃用,因此建议使用静态方法 getOrCreate()来创建 SparkContext。该函数用于获取或实例SparkContext,并将其注册为一个单例对象。
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
//val sc = new SparkContext(sparkConf)
val sc = SparkContext.getOrCreate(sparkConf)
创建
- 从集合(内存)中创建RDD。Spark主要提供了两个方法:parallelize和makeRDD,从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法。
val rdd1 = sc.parallelize(
List(1,2,3,4)
)
val rdd2 = sc.makeRDD(
List(1,2,3,4)
)
- 从外部存储(文件)创建RDD。由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase 等。
val fileRDD: RDD[String] = sc.textFile("input")
- 从已有的RD转换得到新的RDD。
// 字符转为大写,得到一个新的 RDD
val rdd5 = rdd4.map(line => line.toUpperCase)
并行度与分区
默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。分区的数量等于任务的数量,不一定等于并行度。分区数量可以在构建 RDD 时指定。
val dataRDD: RDD[Int] =
sc.makeRDD(
List(1,2,3,4),
4)
读取内存数据时,数据分区规则的Spark 核心源码如下。i为第几个分区(从零开始),length为集合长度,numSlices为分区数量。start和end为某一个分区内要存储的集合元素的范围,左闭右开。
val start = ((i*length) / numSlices).toInt
val end = (((i + 1)*length) / numSlices).toInt
读取文件时,spark读取文件的方式和hadoop类似,一行一行读取。数据读取时以偏移量为单位,区分内存分区中以一个元素为单位,数据读取是全闭。偏移量不会重复读取。如果数据源为多个文件,那么计算分区时以文件为单位进行分区。具体Spark核心源码如下。totalSize表示读取文件的字节数的总和,goalSize表示每个分区应该存放多少个字节。如果除不尽的话,比较剩余字节数占goalSize的多少,如果大于%10,则需要产生新的分区,参考Hadoop。
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits)
操作RDD
RDD 操作分为两种类型:转换(Transformation)和动作(action)。转换(Transformation)是用来创建RDD的方法,而动作(action)是使用RDD的方法。
转换算子
map
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(
num => {
num * 2
}
)
val dataRDD2: RDD[String] = dataRDD1.map(
num => {
"" + num
}
)
map 和 foreach 的区别?
- 前者是 transformation 操作(不会立即执行),后者是 action 操作(会立即执行);
- 前者返回值是一个新 RDD,后者没有返回值。
mapPartitions
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
datas => {
datas.filter(x => x!=2)
}
)
思考一个问题:map 和 mapPartitions 的区别?
- 数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。
- 功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
- 性能的角度
Map 算子因为类似于串行操作,所以性能比较低,而mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。
mapPartitionsWithIndex
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
val mpiRDD = rdd.mapPartitionsWithIndex(
(index, iter) => {
if ( index == 1 ) {
iter
} else {
Nil.iterator
}
}
)
flatMap
对集合中每个元素进行操作然后再扁平化。,所以算子也称之为扁平映射。
val dataRDD = sparkContext.makeRDD(List(
List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(
list => list
)
glom
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变 。
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
val glomRDD: RDD[Array[Int]] = rdd.glom()
val maxRDD: RDD[Int] = glomRDD.map(
array => {
array.max
}
)
groupBy
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中,一个组的数据在一个分区中,但是并不是说一个分区中只有一个组。
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
val groupRDD = rdd.groupBy(num => {num%2})
filter
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
val rdd = sc.makeRDD(List(1,2,3,4))
val filterRDD: RDD[Int] = rdd.filter(num=>num%2!=0)
sample
根据指定的规则从数据集中抽取数据。
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4
),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)
distinct
将数据集中重复的数据去重。
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),1)
val dataRDD1 = dataRDD.distinct()
coalesce
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率,当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本。
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)
repartition
该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。repartition常用于增加分区,coalesce常用于减小分区。
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)
sortBy
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。
val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)
val newRDD = rdd.sortBy(t=>t._1.toInt, false)
intersection
对源 RDD 和参数 RDD 求交集后返回一个新的 RDD。
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)
union
对源 RDD 和参数 RDD 求并集后返回一个新的 RDD。
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)
subtract
以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集。
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)
zip
将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)
partitionBy
将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner。
val rdd: RDD[(Int, String)] =
sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
val rdd2: RDD[(Int, String)] =
rdd.partitionBy(new HashPartitioner(2))
reduceByKey
可以将数据按照相同的 Key 对 Value 进行聚合。
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("a", 3), ("b", 4)
))
val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey( (x:Int, y:Int) => {
x + y
} )
groupByKey
将数据源的数据根据 key 对 value 进行分组。
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("a", 3), ("b", 4)
))
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
//打印
//(a,CompactBuffer(1, 2, 3))
//(b,CompactBuffer(4))
- 思考一个问题:reduceByKey 和 groupByKey 的区别?
- 从 shuffle的角度:
reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
- 从功能的角度:
reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey。
-
思考一个问题:groupBy 和 groupByKey 的区别?
(1)后者只能针对KV类型的数据进行分组,而前者没有限制.
(2)后者的返回值是一个迭代器,且迭代器中的元素是value,而前者迭代器中的元素没有改变。
(3)后者只能针对Key进行分组,前者没有限制。
aggregateByKey
将数据根据不同的规则进行分区内计算和分区间计算。
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("a", 3), ("a", 4)
),2)
// 第一个参数列表,需要传递一个参数,表示为初始值
// 主要用于当碰见第一个key的时候,和value进行分区内计算
// 第二个参数列表需要传递2个参数
// 第一个参数表示分区内计算规则
// 第二个参数表示分区间计算规则
rdd.aggregateByKey(10)(
(x, y) => math.max(x, y),
(x, y) => x + y
).collect.foreach(println)
foldByKey
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey。
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
),2)
rdd.foldByKey(0)((x, y) => x+y).collect.foreach(println)
combineByKey
最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
),2)
// combineByKey : 方法需要三个参数
// 第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
// 第二个参数表示:分区内的计算规则
// 第三个参数表示:分区间的计算规则
val newRDD : RDD[(String, (Int, Int))] = rdd.combineByKey(
v => (v, 1),
( t:(Int, Int), v ) => {
(t._1 + v, t._2 + 1)
},
(t1:(Int, Int), t2:(Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?
- reduceByKey
相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同。
- FoldByKey
相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同 。
- AggregateByKey
相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同。
- CombineByKey
当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
sortByKey
在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的。
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)
join
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的。
val rdd1 = sc.makeRDD(List(
("a", 1), ("a", 2), ("c", 3)
))
val rdd2 = sc.makeRDD(List(
("a", 5), ("c", 6),("a", 4)
))
val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)
// (a,(1,5))
// (a,(1,4))
// (a,(2,5))
// (a,(2,4))
// (c,(3,6))
leftOuterJoin
类似于 SQL 语句的左外连接 。
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2)
))
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5), ("b", 3), ("c", 6)
))
val rightJoinRDD = rdd1.rightOuterJoin(rdd2)
rightJoinRDD.collect().foreach(println)
// (a,(Some(1),4))
// (b,(Some(2),5))
// (b,(Some(2),3))
// (c,(None,6))
cogroup
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2)
))
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5), ("c", 7), ("c", 6)
))
val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
cgRDD.collect().foreach(println)
// (a,(CompactBuffer(1),CompactBuffer(4)))
// (b,(CompactBuffer(2),CompactBuffer(5)))
// (c,(CompactBuffer(),CompactBuffer(6, 7)))
动作算子
reduce
聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 聚合数据
val reduceResult: Int = rdd.reduce((x, y) => x+y)
collect
在驱动程序中,以数组 Array 的形式返回数据集的所有元素。
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
rdd.collect().foreach(println)
count
返回 RDD 中元素的个数。
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val countResult: Long = rdd.count()
first
返回 RDD 中的第一个元素。
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val firstResult: Int = rdd.first()
println(firstResult)
take
返回一个由 RDD 的前 n 个元素组成的数组。
vval rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val takeResult: Array[Int] = rdd.take(2)
println(takeResult.mkString(","))
takeOrdered
返回该 RDD 排序后的前 n 个元素组成的数组。
import org.apache.spark.{SparkConf, SparkContext}
object RddTakeOrderedDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RddTakeOrderedDemo")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(1 to 10)
val ord = new MyOrdering()
rdd.takeOrdered(3)(ord).foreach(println)
sc.stop()
}
class MyOrdering extends Ordering[Int] {
override def compare(x: Int, y: Int): Int = {
if (x < y) 1 else if (x == y) 0 else -1
}
}
}
aggregate
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。
val rdd = sc.makeRDD(List(1,2,3,4),2)
val result = rdd.aggregate(10)((x, y) => x+y, (x, y) => x+y)
// 40
fold
折叠操作,aggregate 的简化版操作。
val rdd = sc.makeRDD(List(1,2,3,4),2)
val result = rdd.fold(10)((x, y) => x+y)
// 40
countByKey
统计每种 key 的个数。
val rdd = sc.makeRDD(List(
("a", 1),("a", 2),("a", 3)
))
val stringToLong: collection.Map[String, Long] =rdd.countByKey()
println(stringToLong)
countByValue
和countByKey类似。
def wordcount8(sc : SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordCount: collection.Map[String, Long] = words.countByValue()
}
mapValues
不管Key,直接计算value。
val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
case (num, cnt) => {
num / cnt
}
}
RDD序列化
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。
import org.apache.spark.{SparkConf, SparkContext}
object Spark07_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List[Int](1,2,3))
val user = new User()
// SparkException: Task not serializable
// NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User
// RDD算子中传递的函数是会包含闭包操作,那么就会进行检测功能
// 闭包检测
rdd.foreach(
num => {
println("age = " + (user.age + num))
}
)
sc.stop()
}
//class User extends Serializable {
// 样例类在编译时,会自动混入序列化特质(实现可序列化接口)
//case class User() {
class User {
var age : Int = 30
}
}
Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。
object serializable_Kryo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[Searcher]))
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu",
"atguigu", "hahah"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
case class Searcher(val query: String) {
def isMatch(s: String) = {
s.contains(query)
}
def getMatchedRDD1(rdd: RDD[String]) = {
rdd.filter(isMatch)
}
def getMatchedRDD2(rdd: RDD[String]) = {
val q = query
rdd.filter(_.contains(q))
}
}
RDD依赖关系
相邻的两个RDD的关系称之为依赖关系,多个连续的RDD的依赖关系,称之为血缘关系。
宽依赖:宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。
窄依赖:窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。
RDD阶段划分:DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。
任务划分:RDD 任务切分中间分为:Application、Job、Stage 和 Task
-
Application:初始化一个 SparkContext 即生成一个 Application;
-
Job:一个 Action 算子就会生成一个 Job;
-
Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
-
Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
RDD持久化
Cache、Persist缓存
RDD的对象可以重用,但是数据无法重用,如果一个RDD需要重复使用,那么根据血缘关系,需要从头再次执行来获取数据。
RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
val list = List("Hello Scala", "Hello Spark")
val rdd = sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map(word=>{
(word,1)
})
mapRDD.persist(StorageLevel.DISK_ONLY)
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
reduceRDD.collect().foreach(println)
println("**************************************")
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
存储级别如下如所示。
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。
CheckPoint检查点
所谓的检查点其实就是通过将 RDD 中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。对 RDD 进行checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。
// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")
// 创建一个 RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input/1.txt")
// 业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
word => {
(word, System.currentTimeMillis())
}
}
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点:针对 wordToOneRdd 做检查点计算
wordToOneRdd.checkpoint()
// 触发执行逻辑
wordToOneRdd.collect().foreach(println)
区别
-
cache将数据临时存储在内存中进行数据重用。
-
persist将数据临时存储在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但是数据安全,如果作业执行完毕,临时保存的数据文件就会丢失。
-
checkpoint将数据长久地保存在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但是数据安全,为了保证数据安全,所一般情况下会独立执行作业。为了能够提高效率,一般情况下需要和cache联合使用(先用cache,再紧跟着用checkpoint)。
-
Cache 缓存只是将数据保存起来,不切断血缘依赖(会在血缘关系中添加新的依赖)。Checkpoint 检查点切断血缘依赖(执行过程中会切断血缘关系,重新建立新的血缘关系,等同于改变了数据源)。
-
Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。
-
建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。
文章来源: 博客园
- 还没有人评论,欢迎说说您的想法!