0%

Spark核心架构与RDD编程入门

1 基础概念

1.1 核心组件

  • Driver

将用户程序转化成任务(job)

在Executor之间执行任务调度

跟踪Executor的执行情况

通过UI站视查询运行情况

  • Executor:

执行任务并返回给Driver

通过自身的Block Manager为用户程序中需要缓存的RDD提供内存式存储。RDD是直接存储在Executor中的进程内的,因此任务在运行时可以充分利用缓存数据加速运算。

  • Master

资源调度和分配,集群的监控

  • Worker

由Master分配任务并进行计算

  • ApplicantsMaster

对Driver与Master的直接通信进行解耦合,Driver通过ApplicantsMaster与Master进行通信,监控任务的执行等任务

1.2 核心概念

  • Executor和Core

Executor为提供计算的节点个数,Core为核数

  • 并行度

整个集群并行执行任务的数量

1.3 其他相关

  • 序列化(serialization)在计算机科学的资料处理中,是指将数据结构或对象状态转换成可取用格式(例如存成文件,存于缓冲,或经由网络中发送),以留待后续在相同或另一台计算机环境中,能恢复原先状态的过程。

2 RDD

RDD数据处理方式:类似IO流,包含装饰器设计模式。

通过每一个操作方法的输入与输出来确定数据的格式。

2.1 RDD的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//TODO 通过内存中创建
val ints: Seq[Int] = Seq(1, 2, 3, 4)
//val value: RDD[Int] = sc.parallelize(ints)
//makeRdd方法就是parallelize方法的包装
val value: RDD[Int] = sc.makeRDD(ints)

//TODO 通过文件创建rdd
//1. 通过文件路径创建
val value: RDD[String] = sc.textFile("datas/1.txt")
//2. 通过文件夹创建
//val value: RDD[String] = sc.textFile("datas.txt")
//3. 通过文件路径通配符创建
//val value: RDD[String] = sc.textFile("datas/1*.txt")
//4. 通过hdfs文件系统路径创建
//val value: RDD[String] = sc.textFile("hdfs://linux1:8080/tmp/file.txt")

2.1 RDD方法=>RDD算子

  • 转换方法

功能的转换和封装,将旧的RDD包装成新的RDD(flatMap,map)

  • 行动方法(操作方法)

触发任务的调度和作业的执行(collect)

惰性执行方法

2.1.1 RDD转换算子

2.1.1.1 Map

  • Map方法
1
2
3
4
5
6
7
8
9
10
scala> val rdd = sc.makeRDD(List(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> rdd.map{(v)=>(v*2)}.collect()
scala> rdd.map((num:Int)=>{num*2}).collect()
scala> rdd.map((num:Int)=>(num*2)).collect()
scala> rdd.map((num:Int)=>num*2).collect()
# 最简单的方法
scala> rdd.map{_*2}.collect()
scala> rdd.map(_*2).collect()
res: Array[Int] = Array(2, 4, 6)
  • Map的并行计算顺序
    • 单个分区内的数据是一个一个执行的,只有前面一个数据的所有逻辑都执行完才能进行下一个数据的执行逻辑,分区内数据的执行是有序的
    • 而多个分区的数据计算是无序的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 单个分区
val value: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),**1**)
val mapRDD: RDD[Int] = value.map(
num => {
println(">>>>>>>" + num)
num
}
)
val mapRDD1: RDD[Int] = mapRDD.map(
num => {
println("@@@@@@@@" + num)
num
}
)
mapRDD1.collect()
# 输出结果
>>>>>>>1
@@@@@@@@1
>>>>>>>2
@@@@@@@@2
>>>>>>>3
@@@@@@@@3
>>>>>>>4
@@@@@@@@4
# 多个分区
val value: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2)
。。。。。。。
>>>>>>>3
@@@@@@@@3
>>>>>>>4
@@@@@@@@4
>>>>>>>1
@@@@@@@@1
>>>>>>>2
@@@@@@@@2

2.1.1.2 MapPratitions

MapPratitions是对分区的数据进行操作

1
2
3
4
5
6
7
8
9
10
# 求分区的最大值
val value: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2)
# mapPartitions传入的是迭代器,传出的也是迭代器
val value1: RDD[Int] = value.mapPartitions (
iter => List(iter.max).iterator
)
value1.collect().foreach(println)
# 输出
2
4

2.1.1.3 Map和Mappartitions的对比

map是对所有的数据串行进行操作,输入与输出个数相同

而mappartitions是分块对分区内的数据进行操作,输入与输出数据个书可以不同。

map速度慢,占用内存较少

mappartitions速度快,会长时间占用内存。

2.1.1.4 glom()方法

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

2.1.1.5 groupby()

groupby方法会将数据打乱并重新组合,也称之为shuffle

分区数不变

2.1.1.6 coalsce(numPartitions=num;shuffle=boolen)

当大批量数据经过处理之后,需要缩小分区数可以使用coalsce()方法修改分区数。分区数修改后,分区内的数据不会被coalsce方法打散重新组合(并且同一个分区的数据也不会被分开)。

随意使用coalsce方法可能会产生数据倾斜,如果希望使数据均衡,可以使用shuffle(使shuffle=True)。

如果需要扩大分区的话使用repartition,它是coalsce的上层实现(coalsce(numPartitions=bignum;shuffle=True))

2.1.1.7 groupByKey和reduceByKey的区别

  • groupByKey只分组,如果需要聚合需使用map
  • reduceByKey包含分组和聚合两个功能
  • 如果只需要分组则只能使用groupByKey

groupByKey导致数据打乱重组,数据处理的时候存在shuffle操作,数据量大的话可能会导致数据倾斜然后内存溢出。

所以在spark中shuffle操作必须落盘处理(保存在硬盘中),不能在内存中进行等待。所以shuffle操作的性能十分低。

reduceByKey可以预先在分区内将相同key的数据进行聚合,然后进行落盘再shuffle。提升了很大的性能

而groupByKey只能先打乱所有的数据再落盘聚合。

2.1.1.8 foreach

foreach是在每个exector上执行的,而collect()算子是将所有的分区数据集合在一起在执行的

1
2
3
4
5
6
7
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
// 收集后打印
rdd.map(num=>num).collect().foreach(println)
> 1 2 3 4
// 分布式打印
rdd.foreach(println)
> 3 4 1 2

2.1.1.9 宽依赖和窄依赖

宽依赖:父rdd对一个子rdd(一对一/独生子女)

窄依赖:父rdd对多个子rdd(一对多/多生子女)

2.1.2 闭包检查

闭包的理解:使用到了外部(超出其作用域)的变量的函数称为闭包。

从计算的角度算子以外的代码都是在driver端运行,算子里的代码都是在excutor里面运行,在scala编程中经常会遇到算子内会用到算子外的数据,这样就形成了闭包的效果。如果内部需要用到外部的数据,则需要检查内部的对象能否序列化,这样的过程称为闭包检测。(如果算子内部的对象可以进行序列化,则可以对外部引用数据进行序列化传给excutor端进行计算)

3 SparkSql

3.1 DataSet和DataFrame

强类型:DataSet(每一行的类型都可能不相同,当相同时就是DateFrame)

弱类型:DataFrame(每一行的类型都相同,由第一行标识出)