1 问题描述
前几天在跑一个程序的时候遇到了一个很诡异的事情,在dolphinSchedule上面上线了几天的一个程序突然挂掉了,查看问题原因总是报连接超时。无从下手,然后查看代码发现是同事写的代码有一个分区的bug,改掉了但是运行到程序的一个关联操作的地方还是连接超时,然后直接就死掉了。感觉很诡异,因为程序运行了几天没问题,说明代码可能是没问题的,然后计算的数据是一年的数据,猜想可能是数据量的变化出现的问题。
2 问题详情
2.1 报错信息
- 具体信息
1 | Traceback (most recent call last): |
在其他节点上显示有一些重复连接超时的警告:
1 | 21/07/02 19:13:42 WARN scheduler.TaskSetManager: Lost task 9.0 in stage 70.0 (TID 5771, cdh06.nari.com, executor 12): java.net.ConnectException: Connection timed out (Connection timed out) |
2.2 报错代码
1 | dfjoin=df365.join(resDf3,on=["xx"]).\ |
3 问题刨析
从报错的信息中基本可以定位问题出现在数据join的位置,但是其中的两个dataframe,resDf3和df365数据量都在十几万也并不是很大。但是从这句话中可以看出也是join的问题,建议我们关闭BroadcastJoin。然后大致可以清楚了,BroadcastJoin是关联一个大表和一个小表的方法,但是这里关联的两个表数据量基本相同,如果使用BroadcastJoin的话就可能导致广播时间过长而连接超时,初步猜想问题的原因就在于此。
1 | You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1 |
于是添加了spark.sql.autoBroadcastJoinThreshold参数为-1,程序正常运行,问题解决。
4 扩展延伸:spark的三种join方式
文章参考链接:SparkSQL – 有必要坐下来聊聊Join – 有态度的HBase/Spark/BigData (hbasefly.com)
这里是对该文章内的信息转载并做一个简要的摘录:
当前SparkSQL支持三种Join算法-shuffle hash join、broadcast hash join以及sort merge join。其中前两者归根到底都属于hash join,只不过在hash join之前需要先shuffle还是先broadcast。
| 方法 | 所属类别 | 使用场景 |
|---|---|---|
| shuffle hash join | hash join | 一张大表join一张小表 |
| broadcast hash join | hash join | 一张大表join一张极小表 |
| sort merge join | merge join | 两张大表join |
4.1 Hash Join
先来看看这样一条SQL语句:select * from order,item where item.id = order.i_id,很简单一个Join节点,参与join的两张表是item和order,join key分别是item.id以及order.i_id。现在假设这个Join采用的是hash join算法,整个过程会经历三步:
确定Build Table以及Probe Table:这个概念比较重要,Build Table使用join key构建Hash Table,而Probe Table使用join key进行探测,探测成功就可以join在一起。通常情况下,小表会作为Build Table,大表作为Probe Table。此事例中item为Build Table,order为Probe Table。
构建Hash Table:依次读取Build Table(item)的数据,对于每一行数据根据join key(item.id)进行hash,hash到对应的Bucket,生成hash table中的一条记录。数据缓存在内存中,如果内存放不下需要dump到外存。
探测:再依次扫描Probe Table(order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件(item.id = order.i_id),如果匹配成功就可以将两者join在一起
这里可能存在的疑问:
hash join性能如何?hash join基本都只扫描两表一次,可以认为o(a+b)
为什么Build Table选择小表?因为构建的Hash Table最好能全部加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景,对于两个大表的join场景并不适用;
4.1.1 Broadcast Hash Join
Broadcast Hash Join一般分为两步:
broadcast阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给driver,driver再统一分发给所有executor;要不就是基于bittorrete的p2p思路;
hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探
SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold,默认为10M
4.1.2 Shuffle Hash Join
在大数据条件下如果一张表很小,执行join操作最优的选择无疑是broadcast hash join,效率最高。但是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join就不再是最优方案。此时可以按照join key进行分区,根据key相同必然分区相同的原理,就可以将大表join分而治之,划分为很多小表的join,充分利用集群资源并行化。如下图所示,shuffle hash join也可以分为两步:
shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。这个过程称为shuffle
hash join阶段:每个分区节点上的数据单独执行单机hash join算法。
4.2 Sort-Merge Join
sort merge join 一般分为三个步骤:
shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理
sort阶段:对单个分区节点的两表数据,分别进行排序
merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边
5. 再次回到问题
从上面介绍的三种join的原理可以基本确定各自所对应的应用场景,这次遇到的问题就是由于spark默认使用了broadcast join方法,然后两个join的表的数据量基本相似,所以遇到了broadcast join乱广播导致程序超时的现象。所以手动禁止该方法解决了问题。