0%

PySpark读取HBase数据训练预测模型实践

0. 需求说明

需求是对变压器的负荷进行预测,每一个变压器当作一个独立的个体。需要训练的个体数过多,所以尝试使用pyspark进行训练。原始数据保存在hbase中,程序的整体设计流程为:从hbase中读取数据 –> 数据预处理 –> 使用pyspark训练模型并进行预测

1. 读取hbase

因为是在自己的本地windows进行测试,电脑内存较小就没上虚拟机了。

1.1 pyspark读取hbase需要配置相关包

将HBase的lib目录下的如下包复制到spark的目录下(因为放在其他目录中需要添加该目录的环境变量,所以为了省事就全部放在spark的默认jar目录中了,在spark启动的时候会全部加载进去,在http://127.0.0.1:4040/environment/最下面的**Resource**中可以查看到)

  • 所有hbase开头的jar文件

  • guava-12.0.1.jar

  • htrace-core-3.1.0-incubating.jar

  • protobuf-java-2.5.0.jar

  • spark-example-1.6.0.jar(把hbase的数据转换python可读取的jar包)

  • metrics-core-2.2.0.jar

1.2 读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 在pyspark中打开
host = '172.16.221.102'
table = 'PSSC:HT_E_MP_CURVE_RELA'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)

# 显示部分结果如下
{"qualifier" : "IC85", "timestamp" : "1614130918336", "columnFamily" : "C", "row" : "20201115", "type" : "Put", "value" : "0.0000"}
{"qualifier" : "IC86", "timestamp" : "1614130918336", "columnFamily" : "C", "row" : "20201115", "type" : "Put", "value" : "0.0000"}
{"qualifier" : "IC87", "timestamp" : "1614130918336", "columnFamily" : "C", "row" : "20201115", "type" : "Put", "value" : "0.0000"}
{"qualifier" : "IC88", "timestamp" : "1614130918336", "columnFamily" : "C", "row" : "20201115", "type" : "Put", "value" : "0.0000"}
{"qualifier" : "IC89", "timestamp" : "1614130918336", "columnFamily" : "C", "row" : "20201115", "type" : "Put", "value" : "0.0000"}

1.3 遇到问题

1.3.1 java.io.IOException: com.google.protobuf.ServiceException: java.lang.NoClassDefFoundError:

遇到问题要自己思考出现问题的原因是什么,以及找到问题的解决方法之后要思考底层是如何运行的,为什么这个方法能够解决这个问题。

  • 问题复现:
1
2
3
4
5
6
7
# 包放好后启动报如下错误(第一部分)
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=32, exceptions:
Mon Mar 01 14:28:51 CST 2021, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=68422: row 'PSSC:HT_E_MP_CURVE_RELA,,00000000000000' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=worker01,60020,1613617191977, seqNum=0
......
# 并且下面会有这样一句:(第二部分)
Caused by: java.io.IOException: com.google.protobuf.ServiceException: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
  • 解决:

在google了很多之后以上第一部分的问题之后,都是在说一些hosts映射的解决方法。但是我的hosts是没有问题的,启动之前就已经添加过主机名的映射了,未解决我的问题。

然后仔细将所有的报错提示从头到尾看了一遍,发现了上面第二部分的问题,然后在想是不是因为第二部分的问题导致了第一部分的错误,然后去网上搜该方法,显示”com/yammer/metrics/core/Gauge”是metrics-core-2.2.0.jar包里面的方法,然后在hbase的lib文件夹下面找到了该包,于是导入后遂解决了问题。

1.3.2 pyspark默认版本的问题

1
Exception: Python in worker has different version 3.5 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.
  • 问题解析:

    因为自己的的电脑上有三个版本的python(3.5/3.6/3.8),出现这个问题是因为环境变量冲突,默认的环境变量是anaconda的3.5,然后在pycharm中使用的python版本为3.6,所以产生了冲突。只需要在spark的配置文件中指定某一环境变量即可

  • 解决方法:

    1
    2
    3
    4
    5
    # 在spark-3.0.2-bin-hadoop3.2\conf\spark-env.sh中添加环境变量
    # 如果不存在的话将spark-env.sh.template修改为spark-env.sh并添加
    export PYSPARK_PYTHON=D:\Anaconda3\envs\python36
    export PYSPARK_DRIVER_PYTHON=D:\Anaconda3\envs\python36
    export SPARK_HOME=D:\spark-3.0.2-bin-hadoop3.2

##2. 大批量跑模型