在线试读

get_product_contenthtml
第1章  举例感受Hive性能调优的多样性
谈及一项技术的优化,必然是一项综合性的工作,它是多门技术的结合。在这一章中将会用代码来演示各类优化技巧,目的在于演示Hive调优的多样性。
本章将从多个完全不同的角度来介绍Hive优化的多样性,如改写SQL、调整数据存储的文件块、改变数据的存储格式、Hive表的设计等方面。
如果对本章里面提到的技术和技巧不懂的话也没关系,先知道有这样一个技术即可。这些技术将会在本书后续章节陆续讲到,并会让读者知道为什么要用这些技巧,终能在知其然的情况下,知其所以然,不管在应对多大的数据量下都能够在开发、生产和运维等阶段感知和监控性能问题,并快速定位问题点将其快速解决。
1.1  感受改写SQL对性能的影响
通过改写SQL来优化程序性能是编程人员进行调优的常见手段。本节将围绕对union改写案例来让大家感受改写SQL对性能的提升。在1.1.2节中要思考一个问题:这样的写法性能瓶颈点在哪里?在1.1.3节中要思考两个问题:为什么选用这种改写方式?还有提升的空间吗?
1.1.1  数据准备
在做这个演示前需要准备一些数据,如果读者对准备数据代码不感兴趣,则可以略过本节的代码,直接学习后面章节的内容。案例1.1是准备学生信息表(student_tb_txt)数据的代码示例,案例1.2是准备学生选课信息表(student_sc_tb_txt)数据的代码示例。
【案例1.1】 生成Hive表student_tb_txt的数据。
(1)创建student数据的本地目录,并确保该目录有写权限,代码如下:
 
mkdir init_student
chmod -R 777 ./init_student
 
(2)在init_student目录下,创建生成student数据的python代码init_student.py,如下:
 
# coding: utf-8
import random
import datetime
import sys
reload(sys)
sys.setdefaultencoding('utf8')
# lastname和first都是为了来随机构造名称
lastname = u"赵李周吴郑王冯陈褚卫蒋沈韩杨朱秦尤许何吕施张孔曹严华金魏陶姜戚谢邹喻柏
水窦章云苏潘葛奚范彭郎鲁韦昌马苗"
firstname = u"红尘冷暖岁月清浅仓促间遗落一地如诗的句点不甘愿不决绝掬一份刻骨的思念
系一根心的挂牵在你回眸抹兰轩的底色悄然"
#创建一个函数,参数start表示循环的批次
def create_student_dict(start):
    firstlen = len(firstname)
    lastlen = len(lastname)
    # 创建一个符合正太分布的分数队列
    scoreList = [int(random.normalvariate(100, 50)) for _ in xrange(1, 5000)]
    # 创建1万条记录,如果执行程序内存够大,这个可以适当调大
    filename = str(start) '.txt'
    print filename
    #每次循环都创建一个文件,文件名为:循环次数 '.txt',例如 1.txt
    with open('./' filename, mode='wr ') as fp:
        for i in xrange(start * 40000, (start 1) * 40000):
            firstind = random.randint(1, firstlen - 4)
            model = {"s_no": u"xuehao_no_" str(i),
                "s_name": u"{0}{1}".format(lastname[random.randint(1, lastlen - 1)],
                                           firstname[firstind: firstind 1]),
                "s_birth": u"{0}-{1}-{2}".format(random.randint(1991, 2000),
                                                 '0' str(random.randint(1, 9)),
                                                 random.randint(10, 28)),
                "s_age": random.sample([20, 20, 20, 20, 21, 22, 23, 24, 25, 26], 1)[0],
                "s_sex": str(random.sample(['男', '女'], 1)[0]),
                "s_score": abs(scoreList[random.randint(1000, 4990)]),
                's_desc': u"为程序猿攻城狮队伍补充新鲜血液,"
                          u"为祖国未来科技产业贡献一份自己的力量" * random.randint 
(1, 20)}
            #写入数据到本地文件
            fp.write("{0}\t{1}\t{2}\t{3}\t{4}\t{5}\t{6}".
                     format(model['s_no'], model['s_name'],
                            model['s_birth'], model['s_age'],
                            model['s_sex'], model['s_score'],
                            model['s_desc']))
# 循环创建记录,一共是40000*500=2千万的数据
for i in xrange(1, 501):
    starttime = datetime.datetime.now()
    create_student_dict(i)
 
(3)确保该文件有被执行的权限,代码如下:
 
chmod 777 init_student.py
 
(4)生成数据,执行下面的代码后将在init_student目录下生成500个txt文件。
python init_student.py
 
(5)创建hdfs目录:
 
hdfs dfs -mkdir /mnt/data/bigdata/hive/warehouse/student_tb_txt/
 
(6)在init_student目录下执行下面的命令,将所有的txt文件上传到步骤5创建的目录下。
 
hdfs dfs -put ./*.txt  /mnt/data/bigdata/hive/warehouse/student_tb_txt/
 
(7)在Hive上创建student内部表,并将数据目录映射到步骤5的目录,代码如下:
 
create table if not exists default.student_tb_txt(
  s_no string comment '学号',
  s_name string comment '姓名',
  s_birth string comment '生日',
  s_age bigint  comment '年龄',
  s_sex string comment '性别',
  s_score bigint comment '综合能力得分',
  s_desc string comment '自我介绍'
)
row format delimited
fields terminated by '\t'
location '/mnt/data/bigdata/hive/warehouse/student_tb_txt/';
 
至此,student_tb_txt表的数据已经生成并加载完成。
【案例1.2】 生成Hive表student_sc_tb_txt的数据。
步骤与案例1.1类似,创建本地目录init_course,并编写student_sc_tb_txt表数据的生成程序init_course.py。代码如下:
 
# coding: utf-8
import random, datetime
import sys
reload(sys)
sys.setdefaultencoding('utf8')
#创建一个函数,参数start表示循环的批次
def create_student_sc_dict(start):
    filename = str(start) '.txt'
    print start
    with open('./' filename , mode='wr ') as fp:
        for i in xrange(start * 40000, (start 1) * 40000):
            #课程出现越多表示喜欢的人越多
            course = random.sample([u'数学', u'数学', u'数学', u'数学', u'数学',
                                    u'语文', u'英语', u'化学', u'物理', u'生物'], 1)[0]
            model = {"s_no": u"xuehao_no_" str(i),
                     "course": u"{0}".format(course),
                     "op_datetime": datetime.datetime.now().strftime("%Y-%m-%d"),
                     "reason": u"我非常非常非常非常非常非常非常"
                               u"非常非常非常非常非常非常非常喜爱{0}".format(course)}
            line = "{0}\t{1}\t{2}\t{3}"\
                .format(model['s_no'], 
                        model['course'], 
                        model['op_datetime'],
                        model['reason'])
            fp.write(line)
# 循环创建记录,一共是40000*500=2千万记录
for i in xrange(1, 501):
    starttime = datetime.datetime.now() # create_student_dict 转换成dataframe
     格式,并注册临时表temp_student
    create_student_sc_dict(i)
 
执行init_course.py代码并生成本地数据,之后创建hdfs目录,即/mnt/data/bigdata/hive/ warehouse/student_sc_tb_txt/,并将本地数据上传到对应的hdfs目录下,后创建对应的hive表,代码如下:
 
create table if not exists default.student_sc_tb_txt(
  s_no string comment '学号',
  course string comment '课程名',
  op_datetime string comment '操作时间',
  reason  string comment '选课原因'
)
row format delimited
fields terminated by '\t'
location '/mnt/data/bigdata/hive/warehouse/student_sc_tb_txt';
 
至此已经完成了student_sc_tb_txt表数据的准备。准备好数据后,接着看下面的案例。
1.1.2  union案例
本节将编写一个带有union关键字的案例。在该案例中查询student_tb_txt表,每个年龄段晚出生和早出生的人的出生日期,并将其存入表student_stat中。
【案例1.3】 编写一个带有union关键字的案例。
 
--创建一张新的统计表
create table student_stat(a bigint, b bigint) partitioned by (tp  string)
STORED AS TEXTFILE;
--开启动态分区
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
--找出各个年龄段早和晚出生的信息,并将这两部分信息使用union进行合并写入
student_stat中
insert into table student_stat partition(tp)
select s_age,max(s_birth) stat,'max' tp
from student_tb_txt
group by s_age
union all
select s_age,min(s_birth) stat, 'min' tp
from student_tb_txt
group by s_age;
--------------执行后计算结果如下-----------------------
Query ID = hdfs_20180928153333_6e96651d-e0d3-41a3-a3d9-121210094aca
Total jobs = 5
Launching Job 1 out of 5
…省略大部分的打印信息
MapReduce Jobs Launched:
Stage-Stage-1: Map: 84  Reduce: 328   Cumulative CPU: 1456.95 sec   HDFS 
Read: 21956521358 HDFS Write: 31719 SUCCESS
Stage-Stage-9: Map: 84  Reduce: 328   Cumulative CPU: 1444.21 sec   HDFS 
Read: 21956522014 HDFS Write: 31719 SUCCESS
Stage-Stage-2: Map: 6   Cumulative CPU: 34.47 sec   HDFS Read: 258198 HDFS 
Write: 582 SUCCESS
Stage-Stage-4: Map: 2   Cumulative CPU: 3.83 sec   HDFS Read: 6098 HDFS 
Write: 84 SUCCESS
Total MapReduce CPU Time Spent: 48 minutes 59 seconds 460 msec
OK
Time taken: 336.306 seconds
 
从上面打印的返回结果可以看到一个共有5个Job对应4个MapReduce(MR)的任务,即Stage-Stage-1、Stage-Stage-9、Stage-Stage-2和Stage-Stage-4对应的任务。
我们重点看黑体部分,从Total MapReduce CPU Time中可以看到所占用系统的实际耗时是48分59秒,用户等待的时间是336秒,记住这些数字,并对比接下来1.1.3节的结果。
 扩展:Partition default.student_stat{tp=max} stats: […]这类信息只有在开启统计信息收集的时候才会打印出来,对应的配置是hive.stats.autogather,默认值是true。
?注意:Total MapReduce CPU Time Spent表示运行程序所占用服务器CPU资源的时间。而Time taken记录的是用户从提交作业到返回结果期间用户等待的所有时间。
接下来我们来看一个不使用union all的对比案例。
1.1.3  改写SQL实现union的优化
在上一节的案例中,我们只是完成了一个简单的求值/小值的统计,但却用了4个MR任务,如图1.1所示。
 
图1.1  union数据流(图中数字仅代表执行顺序)
(1)任务1,取student_tb_txt数据,计算birth的max值,并写入临时区。
(2)任务2,取student_tb_txt数据,计算birth的min值,并写入临时区。
(3)任务3,求任务1和任务2结果的并集。
(4)任务4,把任务3得到的结果写入student_stat中。
 扩展:如何知道是上面4个任务,而不是其他任务呢?有两种方式可以判断:,通过查看执行计划,但是一定要记住一点,Hive的执行计划都是预测的,这点不像Oracle和SQL Server有真实的计划,在后面我们会详细谈谈执行计划;第二,按照SQL语法,结合MapReduce的实现机制去推测MR应该怎么实现,这种方式需要建立在有一定的MapReduce编写经验上,理解基本的分布式计算基本原理。
HiveSQL在执行时会转化为各种计算引擎能够运行的算子。作为HiveSQL的使用者,想要写出更加有效率的HiveSQL代码和MR代码,就需要去理解HiveSQL是如何转化为各种计算引擎所能运行的算子的。怎么做到?分为两步:步,理解基本的MR过程和原理,第二步,在日常编写SQL的过程中,尝试将SQL拆解成计算引擎对应的算子,拆解完和执行计划进行比对,还要和实际执行过程的算子比对,并思考自己拆解完后的算子与通过explain方式得到的算子的执行计划的异同。
在大数据领域,分布式计算和分布式存储会消耗大量的磁盘I/O和网络I/O资源,这部分资源往往成为了大数据作业的瓶颈。在运行案例1.3时观察集群资源的运行情况,将会发现CPU使用率很少,磁盘和网络读/写会变得很高,所以优化的焦点就集中在如何降低作业对I/O资源的消耗上。MR任务有一个缺点,即启动一次作业需要多次读/写磁盘,因为MR会将中间结果写入磁盘,而且为了保障磁盘的读写效率和网络传输效率,会进行多次排序。
如果一个SQL包含多个作业,作业和作业之间的中间结果也会先写到磁盘上。减少中间结果的产生,也就能够达到降低I/O资源消耗,提升程序效率。针对案例1.3,我们调优的关键点就是减少或者避免中间结果集的产生,基于这样的想法,改造后的数据流如图1.2所示。
 
图1.2  去掉union 数据流
观察图1.2,会发现有一处冗余的地方:step1和step2都是对student_tb_txt进行计算,但在计算时要查询两次表,这一步其实是冗余的。如果student_tb_txt是一个基数特别大的表,从表中取数(读取磁盘中的数据)的时间将变得很长,也浪费了集群宝贵的I/O资源。可以将其进行优化,变成只需读取一次表,如图1.3所示。
 
图1.3  优化从源表取数的操作
图1.3中优化了从源表student_tb_txt取数的次数,但是计算max值和min值却要分成两个MR作业,并将计算结果分别插入到student_stat中。如果能够在一个任务当中完成max和min值计算,那就可以减少启动一个作业的时间,以及MR任务对磁盘I/O和网络I/O的消耗。改造后的数据流如图1.4所示。
 
图1.4  简化max/min的计算过程
基于图1.4的方案,在Hive中采用了案例1.4的实现方式。
【案例1.4】 去掉union计算max和min。
 
DROP TABLE if EXISTS student_stat;
--创建student_stat
create table student_stat(a bigint, b bigint) partitioned by (tp  string) 
STORED AS TEXTFILE;
--开启动态分区
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
from student_tb_txt
INSERT into table student_stat partition(tp)
select s_age,min(s_birth) stat,'min' tp
group by s_age
insert into table  student_stat partition(tp)
select s_age,max(s_birth) stat,'max’ tp
group by s_age;
-------------------执行结果-----------------------------------
Query ID = hdfs_20190225095959_25418167-9bbe-4c3d-aae0-0511f52ca683
Total jobs = 1
...//省略部分的打印信息
Hadoop job information for Stage-2: number of mappers: 84; number of 
reducers: 328
2019-02-25 10:00:02,395 Stage-2 map = 0%,  reduce = 0%
2019-02-25 10:00:19,973 Stage-2 map = 3%,  reduce = 0%, Cumulative CPU 18.06 sec
...//省略部分的打印信息
2019-02-25 10:02:44,432 Stage-2 map = 100%,  reduce = 100%, Cumulative CPU
2133.75 sec
MapReduce Total cumulative CPU time: 35 minutes 33 seconds 750 msec
Ended Job = job_1550390190029_0218
...
MapReduce Jobs Launched:
Stage-Stage-2: Map: 84  Reduce: 328   Cumulative CPU: 2133.75 sec   HDFS
Read: 21957309270 HDFS Write: 2530 SUCCESS
Total MapReduce CPU Time Spent: 35 minutes 33 seconds 750 msec
OK
Time taken: 171.342 second
 
从以上结果中可以看到案例1.4的执行结果:
? Total MapReduce CPU Time:35分33秒;
? 用户等待时间:171秒。
相比于案例1.3,在案例1.4中通过改写SQL,即实现MULTI-TABLE-INSERT写法,Total MapReduce CPU Time Spent的总耗时从48分59秒减少到了35分33秒,减少了13分钟26秒,用户实际等待耗时从336秒减少到了171秒,节省了近1/2的时间。案例1.4中,整个优化的过程都集中在对磁盘I/O和网络I/O的优化上,在硬件资源保持不变的情况下,随着数据量的增加,整个集群的磁盘和网络压力会更大,相比于案例1.3的写法,其节省的时间会更加明显。
 扩展:细心的读者会发现,案例1.4只启动了1个job,而案例1.3却启动了5个,每启动一个job,就说明集群多执行了一次MapReduce作业,MapReduce作业越多则代表数据就要经历更多次的磁盘读写和网络通信。随着数据量增多,磁盘和网络的负载会越来越大,耗在每个MapReduce过程的时间延迟也会越来越长。
1.1.4  失败的union调优
如果是从学PL-SQL或T-SQL刚转到学习Hive的读者,可能不知道MULTI-TABLE- INSERT的写法,在分析图1.2的调优方式时,将会采用如下案例来对其优化,即把一个union语句拆解成多个简单SQL,比如案例1.5的写法。
【案例1.5】 将一个含有union的SQL改写为两个简单的SQL。
 
drop table if exists student_stat;
create table student_stat(a bigint, b bigint) partitioned by (tp  string)
STORED AS TEXTFILE;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
--计算max值
insert into table student_stat partition(tp)
select s_age,max(s_birth) stat, 'max' tp
from student_tb_txt
group by s_age;
--计算min值
insert into table student_stat partition(tp)
select s_age,min(s_birth) stat,'min' tp
from student_tb_txt
group by s_age;
 
案例1.5将案例1.3的代码拆分成了两段代码,这样可以省略union的MR作业,计算max和min值的两个作业可直接将数据放到student_stat目录下,少了一次MR作业,并节省了一个MR所额外消耗的资源。看似很合理的方案,我们来看下两个程序的执行结果。
计算max值的执行结果:
 
Query ID = hdfs_20190219142020_2462a9ff-98d0-4fe0-afd8-24f78eebf46b
Total jobs = 1
Launching Job 1 out of 1
...//省略非必要信息
Hadoop job information for Stage-1: number of mappers: 84; number of
reducers: 328
2019-02-19 14:20:16,031 Stage-1 map = 0%,  reduce = 0%
2019-02-19 14:20:26,350 Stage-1 map = 6%,  reduce = 0%, Cumulative CPU 28.57 sec
...//省略部分信息
2019-02-19 14:22:35,773 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU
2028.24 sec
MapReduce Total cumulative CPU time: 33 minutes 48 seconds 240 msec
Ended Job = job_1550390190029_0057
Loading data to table default.student_stat partition (tp=null)
  Time taken for load dynamic partitions : 153
 Loading partition {tp=max}
  Time taken for adding to write entity : 0
Partition default.student_stat{tp=max} stats: [numFiles=7, numRows=7,
totalSize=42, rawDataSize=35]
MapReduce Jobs Launched:
Stage-Stage-1: Map: 84  Reduce: 328   Cumulative CPU: 2028.24 sec   HDFS
Read: 21956949222 HDFS Write: 1265 SUCCESS
Total MapReduce CPU Time Spent: 33 minutes 48 seconds 240 msec
OK
Time taken: 147.503 seconds
 
计算min值的执行结果:
Query ID = hdfs_20190219142222_7babcc53-6ceb-4ca6-a1c1-970d7caa43ab
Total jobs = 1
Launching Job 1 out of 1
...//省略部分信息
Hadoop job information for Stage-1: number of mappers: 84; number of
reducers: 328
2019-02-19 14:22:48,191 Stage-1 map = 0%,  reduce = 0%
2019-02-19 14:22:56,453 Stage-1 map = 6%,  reduce = 0%, Cumulative CPU 25.13 sec
...//省略部分信息
2019-02-19 14:25:11,062 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU
2014.8 sec
MapReduce Total cumulative CPU time: 33 minutes 34 seconds 800 msec
Ended Job = job_1550390190029_0058
Loading data to table default.student_stat partition (tp=null)
  Time taken for load dynamic partitions : 104
 Loading partition {tp=min}
  Time taken for adding to write entity : 0
Partition default.student_stat{tp=min} stats: [numFiles=7, numRows=7,
totalSize=42, rawDataSize=35]
MapReduce Jobs Launched:
Stage-Stage-1: Map: 84  Reduce: 328   Cumulative CPU: 2014.8 sec   HDFS
Read: 21956949222 HDFS Write: 1265 SUCCESS
Total MapReduce CPU Time Spent: 33 minutes 34 seconds 800 msec
OK
Time taken: 152.628 seconds
 
从案例1.5的执行结果中可以得到计算值和小值的Total MapReduce CPU Time总和为66分钟,用户等待时间为303秒。
案例1.5相比于union的写法,即案例1.3,系统CPU耗时多了近16分钟,这样的结果让人感到意外。难道分析过程有问题?其实对于Hive的早期版本,用案例1.5的代码确实是对案例1.3的代码进行了调优,但在Hive版本迭代中对union命令进行了优化,导致拆分后的代码令整个程序跑得慢了。从这里我们可以知道,某些调优方式随着版本的迭代,也逐渐变得不再适用。
 扩展:Hive同其他数据库一样,提供了SQL并行执行的功能。但我们知道,SQL并行执行并不会节省作业耗用的CPU和磁盘资源,只是节省了用户等待的时间。另外,当作业足够大或者集群资源不够的情况下,SQL并不会并行运行。