0%

spark三种join方法对比及应用场景

1 问题描述

前几天在跑一个程序的时候遇到了一个很诡异的事情,在dolphinSchedule上面上线了几天的一个程序突然挂掉了,查看问题原因总是报连接超时。无从下手,然后查看代码发现是同事写的代码有一个分区的bug,改掉了但是运行到程序的一个关联操作的地方还是连接超时,然后直接就死掉了。感觉很诡异,因为程序运行了几天没问题,说明代码可能是没问题的,然后计算的数据是一年的数据,猜想可能是数据量的变化出现的问题。

2 问题详情

2.1 报错信息

  • 具体信息
1
2
3
4
5
6
7
8
9
10
11
12
Traceback (most recent call last):
File "LineLossRate.py", line 237, in <module>
dfjoin.show(5)
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o409.showString.
: org.apache.spark.SparkException: Could not execute broadcast in 1800 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:150)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:387)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)

在其他节点上显示有一些重复连接超时的警告:

1
2
3
4
5
6
7
8
9
10
11
21/07/02 19:13:42 WARN scheduler.TaskSetManager: Lost task 9.0 in stage 70.0 (TID 5771, cdh06.nari.com, executor 12): java.net.ConnectException: Connection timed out (Connection timed out)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:244)
at org.apache.spark.api.python.PythonWorkerFact

2.2 报错代码

1
2
3
4
5
6
7
8
dfjoin=df365.join(resDf3,on=["xx"]).\
withColumn("xx",F.lit(dateMyes)). \
withColumn("xx", F.expr("cast(`xx` as string)")).join(tg_id,on=["xx"]).\
withColumnRenamed("xx","xx").\
select("xx","xx","xx""xx""xx""xx""xx").persist()

dfjoin.show(5)
dfjoin.coalesce(128).write.mode("append").partitionBy("XXX").format("parquet").saveAsTable("tableName")

3 问题刨析

从报错的信息中基本可以定位问题出现在数据join的位置,但是其中的两个dataframe,resDf3和df365数据量都在十几万也并不是很大。但是从这句话中可以看出也是join的问题,建议我们关闭BroadcastJoin。然后大致可以清楚了,BroadcastJoin是关联一个大表和一个小表的方法,但是这里关联的两个表数据量基本相同,如果使用BroadcastJoin的话就可能导致广播时间过长而连接超时,初步猜想问题的原因就在于此。

1
You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

于是添加了spark.sql.autoBroadcastJoinThreshold参数为-1,程序正常运行,问题解决。

4 扩展延伸:spark的三种join方式

文章参考链接:SparkSQL – 有必要坐下来聊聊Join – 有态度的HBase/Spark/BigData (hbasefly.com)

这里是对该文章内的信息转载并做一个简要的摘录:

当前SparkSQL支持三种Join算法-shuffle hash join、broadcast hash join以及sort merge join。其中前两者归根到底都属于hash join,只不过在hash join之前需要先shuffle还是先broadcast。

方法 所属类别 使用场景
shuffle hash join hash join 一张大表join一张小表
broadcast hash join hash join 一张大表join一张极小表
sort merge join merge join 两张大表join

4.1 Hash Join

先来看看这样一条SQL语句:select * from order,item where item.id = order.i_id,很简单一个Join节点,参与join的两张表是item和order,join key分别是item.id以及order.i_id。现在假设这个Join采用的是hash join算法,整个过程会经历三步:

  1. 确定Build Table以及Probe Table:这个概念比较重要,Build Table使用join key构建Hash Table,而Probe Table使用join key进行探测,探测成功就可以join在一起。通常情况下,小表会作为Build Table,大表作为Probe Table。此事例中item为Build Table,order为Probe Table。

  2. 构建Hash Table:依次读取Build Table(item)的数据,对于每一行数据根据join key(item.id)进行hash,hash到对应的Bucket,生成hash table中的一条记录。数据缓存在内存中,如果内存放不下需要dump到外存。

  3. 探测:再依次扫描Probe Table(order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件(item.id = order.i_id),如果匹配成功就可以将两者join在一起

这里可能存在的疑问:

  1. hash join性能如何?hash join基本都只扫描两表一次,可以认为o(a+b)

  2. 为什么Build Table选择小表?因为构建的Hash Table最好能全部加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景,对于两个大表的join场景并不适用

4.1.1 Broadcast Hash Join

Broadcast Hash Join一般分为两步:

  1. broadcast阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给driver,driver再统一分发给所有executor;要不就是基于bittorrete的p2p思路;

  2. hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探

SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold,默认为10M

4.1.2 Shuffle Hash Join

在大数据条件下如果一张表很小,执行join操作最优的选择无疑是broadcast hash join,效率最高。但是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join就不再是最优方案。此时可以按照join key进行分区,根据key相同必然分区相同的原理,就可以将大表join分而治之,划分为很多小表的join,充分利用集群资源并行化。如下图所示,shuffle hash join也可以分为两步:

  1. shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。这个过程称为shuffle

  2. hash join阶段:每个分区节点上的数据单独执行单机hash join算法。

4.2 Sort-Merge Join

sort merge join 一般分为三个步骤:

  1. shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理

  2. sort阶段:对单个分区节点的两表数据,分别进行排序

  3. merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边

5. 再次回到问题

从上面介绍的三种join的原理可以基本确定各自所对应的应用场景,这次遇到的问题就是由于spark默认使用了broadcast join方法,然后两个join的表的数据量基本相似,所以遇到了broadcast join乱广播导致程序超时的现象。所以手动禁止该方法解决了问题。