Kylin是在Hadoop上的SQL层,最近对Phoenix调研完成之后,对Kylin产生了兴趣。
Kylin
第一章 概述
1.1 Kylin定义
Apache Kylin是一个开源的分布式分析引擎,提供Hadoop/Spark之上的SQL查询接口及多维分析(OLAP)能力以支持超大规模数据,最初由eBay Inc开发并贡献至开源社区。它能在亚秒内查询巨大的Hive表。
1.2 Kylin特点
Kylin的主要特点包括支持SQL接口、支持超大规模数据集、亚秒级响应、可伸缩性、高吞吐率、BI工具集成等。
1)标准SQL接口:Kylin是以标准的SQL作为对外服务的接口。
2)支持超大数据集:Kylin对于大数据的支撑能力可能是目前所有技术中最为领先的。早在2015年eBay的生产环境中就能支持百亿记录的秒级查询,之后在移动的应用场景中又有了千亿记录秒级查询的案例。
3)亚秒级响应:Kylin拥有优异的查询相应速度,这点得益于预计算,很多复杂的计算,比如连接、聚合,在离线的预计算过程中就已经完成,这大大降低了查询时刻所需的计算量,提高了响应速度。
4)可伸缩性和高吞吐率:单节点Kylin可实现每秒70个查询,还可以搭建Kylin的集群。
5)BI工具集成
Kylin可以与现有的BI工具集成,具体包括如下内容。
ODBC:与Tableau、Excel、PowerBI等工具集成
JDBC:与Saiku、BIRT等Java工具集成
Restate:与JavaScript、Web网页集成
Kylin开发团队还贡献了Stippling的插件,也可以使用Stippling来访问Kylin服务。
1.3 Kylin架构
1)REST Server
REST Server是一套面向应用程序开发的入口点,旨在实现针对Kylin平台的应用开发工作。 此类应用程序可以提供查询、获取结果、触发cube构建任务、获取元数据以及获取用户权限等等。另外可以通过Restful接口实现SQL查询。
2)查询引擎(Query Engine)
当cube准备就绪后,查询引擎就能够获取并解析用户查询。它随后会与系统中的其它组件进行交互,从而向用户返回对应的结果。
3)Routing
负责将解析的SQL生成的执行计划转换成cube缓存的查询,cube是通过预计算缓存在hbase中,这部分查询可以在秒级设置毫秒级完成,而且还有一些操作使用过的查询原始数据(存储在Hadoop的hdfs中通过hive查询)。这部分查询延迟较高。
4)元数据管理工具(Metadata)
Kylin是一款元数据驱动型应用程序。元数据管理工具是一大关键性组件,用于对保存在Kylin当中的所有元数据进行管理,其中包括最为重要的cube元数据。其它全部组件的正常运作都需以元数据管理工具为基础。 Kylin的元数据存储在hbase中。
5)任务引擎(Cube Build Engine)
这套引擎的设计目的在于处理所有离线任务,其中包括shell脚本、Java API以及Map Reduce任务等等。任务引擎对Kylin当中的全部任务加以管理与协调,从而确保每一项任务都能得到切实执行并解决其间出现的故障。
1.4 Kylin工作原理
Apache Kylin的工作原理本质上是MOLAP(Multidimension On-Line Analysis Processing)Cube,也就是多维立方体分析。是数据分析中非常经典的理论,下面对其做简要介绍。
1.4.1 纬度和度量
维度:即观察数据的角度。比如员工数据,可以从性别角度来分析,也可以更加细化,从入职时间或者地区的维度来观察。维度是一组离散的值,比如说性别中的男和女,或者时间维度上的每一个独立的日期。因此在统计时可以将维度值相同的记录聚合在一起,然后应用聚合函数做累加、平均、最大和最小值等聚合计算。
度量:即被聚合(观察)的统计值,也就是聚合运算的结果。比如说员工数据中不同性别员工的人数,又或者说在同一年入职的员工有多少。
基数:某个维度的种类数。比如说性别维度,基数为2(男和女)。按照某个维度进行聚合,结果数据的大小主要取决于该维度的基数。
1.4.2 Cube和Cuboid
有了维度跟度量,一个数据或数据模型上的所有字段就可以分类了,他们要么是纬度要么是度量(可以被聚合)。于是就有了根据维度和度量做预计算的理论。
给定一个数据模型,我们可以对其上的所有维度进行聚合,对于N个维度来说,组合的所有可能性共有2^n种。对于每一种维度的组合,将度量值做聚合计算,然后将结果保存为一个物化视图,称为Cuboid。所有维度组合的Cuboid作为一个整体,称为Cube。
下面举一个简单的例子说明,假设有一个电商的销售数据集,其中维度包括时间[time]、商品[item]、地区[location]和供应商[supplier],度量为销售额。那么所有维度的组合就有2^4 = 16种,如下图所示:
一维度(1D)的组合有:[time]、[item]、[location]和[supplier]4种;
二维度(2D)的组合有:[time, item]、[time, location]、[time, supplier]、[item, location]、[item, supplier]、[location, supplier]3种;
三维度(3D)的组合也有4种;
最后还有零维度(0D)和四维度(4D)各有一种,总共16种。
注意:每一种维度组合就是一个Cuboid,16个Cuboid整体就是一个Cube。
1.4.3 核心算法
Kylin的工作原理就是对数据模型做Cube预计算,并利用计算的结果加速查询:
1)指定数据模型,定义维度和度量;
2)预计算Cube,计算所有Cuboid并保存为物化视图;
预计算过程是Kylin从Hive中读取原始数据,按照我们选定的维度进行计算,并将结果集保存到Hbase中,默认的计算引擎为MapReduce,可以选择Spark作为计算引擎。一次build的结果,我们称为一个Segment。构建过程中会涉及多个Cuboid的创建,具体创建过程由kylin.cube.algorithm参数决定,参数值可选 auto,layer 和 inmem, 默认值为 auto,即 Kylin 会通过采集数据动态地选择一个算法 (layer or inmem),如果用户很了解 Kylin 和自身的数据、集群,可以直接设置喜欢的算法。
3)执行查询,读取Cuboid,运行,产生查询结果。
1.4.3.1 逐层构建算法
我们知道,一个N维的Cube,是由1个N维子立方体、N个(N-1)维子立方体、N*(N-1)/2个(N-2)维子立方体、……、N个1维子立方体和1个0维子立方体构成,总共有2^N个子立方体组成,在逐层算法中,按维度数逐层减少来计算,每个层级的计算(除了第一层,它是从原始数据聚合而来),是基于它上一层级的结果来计算的。比如,[Group by A, B]的结果,可以基于[Group by A, B, C]的结果,通过去掉C后聚合得来的;这样可以减少重复计算;当 0维度Cuboid计算出来的时候,整个Cube的计算也就完成了。每一轮的计算都是一个MapReduce任务,且串行执行;一个N维的Cube,至少需要N+1次MapReduce Job。
算法优点:
每一轮的计算都是一个MapReduce任务,且串行执行;一个N维的Cube,至少需要N+1次MapReduce Job。2)受益于Hadoop的日趋成熟,此算法对集群要求低,运行稳定;在内部维护Kylin的过程中,很少遇到在这几步出错的情况;即便是在Hadoop集群比较繁忙的时候,任务也能完成。
算法缺点
2)受益于Hadoo p的日趋成熟,此算法对集群要求低,运行稳定;在内部维护Kylin的过程中,很少遇到在这几步出错的情况;即便是在Hadoop集群比较繁忙的时候,任务也能完成。
算法缺点:
1)当Cube有比较多维度的时候,所需要的MapReduce任务也相应增加;由于Hadoop的任务调度需要耗费额外资源,特别是集群较庞大的时候,反复递交任务造成的额外开销会相当可观;
2)此算法会对Hadoop MapReduce输出较多数据; 虽然已经使用了Combiner来减少从Mapper端到Reducer端的数据传输,所有数据依然需要通过Hadoop MapReduce来排序和组合才能被聚合,无形之中增加了集群的压力;
3)对HDFS的读写操作较多:由于每一层计算的输出会用做下一层计算的输入,这些Key-Value需要写到HDFS上;当所有计算都完成后,Kylin还需要额外的一轮任务将这些文件转成HBase的HFile格式,以导入到HBase中去;
总体而言,该算法的效率较低,尤其是当Cube维度数较大的时候。
1.4.3.2 快速构建算法(inmem)
也被称作“逐段”(By Segment) 或“逐块”(By Split) 算法,从1.5.x开始引入该算法,利用Mapper端计算先完成大部分聚合,再将聚合后的结果交给Reducer,从而降低对网络瓶颈的压力。该算法的主要思想是,对Mapper所分配的数据块,将它计算成一个完整的小Cube 段(包含所有Cuboid);每个Mapper将计算完的Cube段输出给Reducer做合并,生成大Cube,也就是最终结果;如图所示解释了此流程。
与旧算法相比,快速算法主要有两点不同:
1) Mapper会利用内存做预聚合,算出所有组合;Mapper输出的每个Key都是不同的,这样会减少输出到Hadoop MapReduce的数据量;
2)一轮MapReduce便会完成所有层次的计算,减少Hadoop任务的调配。
第二章:Kylin安装指南
2.1安装地址
1.官网地址:
2.官方文档
http://kylin.apache.org/cn/docs/
3.下载地址
http://kylin.apache.org/cn/download/
2.2 软件要求
Hadoop: 2.7+, 3.1+ (since v2.5)
Hive: 0.13 - 1.2.1+
HBase: 1.1+, 2.0 (since v2.5)
Spark (可选) 2.3.0+
Kafka (可选) 1.0.0+ (since v2.5)
JDK: 1.8+ (since v2.5)
OS: Linux only, CentOS 6.5+ or Ubuntu 16.0.4+
官网提示:在 Hortonworks HDP 2.2-2.6 and 3.0, Cloudera CDH 5.7-5.11 and 6.0, AWS EMR 5.7-5.10, Azure HDInsight 3.5-3.6 上测试通过。
2.3 硬件要求:
运行 Kylin 的服务器的最低配置为 4 core CPU,16 GB 内存和 100 GB 磁盘。 对于高负载的场景,建议使用 24 core CPU,64 GB 内存或更高的配置。
2.4 Hadoop环境
Kylin依赖于Hadoop集群处理大量的数据集。需要准备一个配置好HDFS,YARN,MapReduce,,Hive,HBase,Zookeeper和其他服务的Hadoop 集群供 Kylin 运行。
Kylin可以在 Hadoop 集群的任意节点上启动。方便起见,可以在master节点上运行Kylin。但为了更好的稳定性,官网建议将Kylin部署在一个干净的Hadoop client节点上,该节点上 Hive,HBase,HDFS 等命令行已安装好且client 配置(如 core-site.xml
,hive-site.xml
,hbase-site.xml
及其他)也已经合理的配置且其可以自动和其它节点同步。
运行Kylin的Linux账户要有访问 Hadoop 集群的权限,包括创建/写入 HDFS 文件夹,Hive表,HBase 表和提交MapReduce任务的权限。
2.5 Kylin安装
- 从 Apache Kylin下载网站 下载一个适用于您 Hadoop 版本的二进制文件。例如,适用于 HBase 1.x 的 Kylin 2.5.0 可通过如下命令行下载得到:
1 | cd /usr/local/ |
- 解压 tar 包,配置环境变量
$KYLIN_HOME
指向 Kylin 文件夹。
1 | tar -zxvf apache-kylin-2.5.0-bin-hbase1x.tar.gzcd apache-kylin-2.5.0-bin-hbase1xexport KYLIN_HOME=`pwd` |
从 v2.6.1 开始, Kylin 不再包含 Spark 二进制包; 您需要另外下载 Spark,然后设置 SPARK_HOME
系统变量到 Spark 安装目录:
1 | export SPARK_HOME=/path/to/spark |
或者使用脚本下载:
1 | $KYLIN_HOME/bin/download-spark.sh |
Kylin tarball 目录
bin
: shell 脚本,用于启动/停止 Kylin,备份/恢复 Kylin 元数据,以及一些检查端口、获取 Hive/HBase 依赖的方法等;conf
: Hadoop 任务的 XML 配置文件lib
: 供外面应用使用的 jar 文件,例如 Hadoop 任务 jar, JDBC 驱动, HBase coprocessor 等.meta_backups
: 执行bin/metastore.sh backup
后的默认的备份目录;sample_cube
用于创建样例 Cube 和表的文件。spark
: 自带的 spark。tomcat
: 自带的 tomcat,用于启动 Kylin 服务。tool
: 用于执行一些命令行的jar文件。
检查运行环境
Kylin 运行在 Hadoop 集群上,对各个组件的版本、访问权限及 CLASSPATH 等都有一定的要求,为了避免遇到各种环境问题,您可以运行 $KYLIN_HOME/bin/check-env.sh
脚本来进行环境检测,如果您的环境存在任何的问题,脚本将打印出详细报错信息。如果没有报错信息,代表您的环境适合 Kylin 运行。
启动 Kylin
运行 $KYLIN_HOME/bin/kylin.sh start
脚本来启动 Kylin,界面输出如下:
1 | Retrieving hadoop conf dir... |
使用 Kylin
Kylin 启动后您可以通过浏览器 http://:7070/kylin
进行访问。
其中 `` 为具体的机器名、IP 地址或域名,默认端口为 7070。
初始用户名和密码是 ADMIN/KYLIN
。
服务器启动后,您可以通过查看 $KYLIN_HOME/logs/kylin.log
获得运行时日志。
停止 Kylin
运行 $KYLIN_HOME/bin/kylin.sh stop
脚本来停止 Kylin,界面输出如下:
1 | Retrieving hadoop conf dir... |
您可以运行 ps -ef | grep kylin
来查看 Kylin 进程是否已停止。
HDFS 目录结构
Kylin 会在 HDFS 上生成文件,根目录是 “/kylin/”, 然后会使用 Kylin 集群的元数据表名作为第二层目录名,默认为 “kylin_metadata” (可以在conf/kylin.properties
中定制).
通常, /kylin/kylin_metadata
目录下会有这么几种子目录:cardinality
, coprocessor
, kylin-job_id
, resources
, jdbc-resources
.
\1. cardinality
: Kylin 加载 Hive 表时,会启动一个 MR 任务来计算各个列的基数,输出结果会暂存在此目录。此目录可以安全清除。
\2. coprocessor
: Kylin 用于存放 HBase coprocessor jar 的目录;请勿删除。
\3. kylin-job_id
: Cube 计算过程的数据存储目录,请勿删除。 如需要清理,请遵循 storage cleanup guide.
\4. resources
: Kylin 默认会将元数据存放在 HBase,但对于太大的文件(如字典或快照),会转存到 HDFS 的该目录下,请勿删除。
\5. jdbc-resources
:性质同上,只在使用 MySQL 做元数据存储时候出现。
2.6 集群模式部署
Kylin 实例是无状态的服务,运行时的状态信息存储在 HBase metastore 中。 出于负载均衡的考虑,可以启用多个共享一个 metastore 的 Kylin 实例,使得各个节点分担查询压力且互为备份,从而提高服务的可用性。
下图描绘了 Kylin 集群模式部署的一个典型场景:
将多个 Kylin 节点组成集群,先确保他们使用同一个 Hadoop 集群、HBase 集群。然后在每个节点的配置文件 $KYLIN_HOME/conf/kylin.properties
中执行下述操作:
- 配置相同的
kylin.metadata.url
值,即配置所有的 Kylin 节点使用同一个 HBase metastore。 - 配置 Kylin 节点列表
kylin.server.cluster-servers
,包括所有节点(包括当前节点),当事件变化时,接收变化的节点需要通知其他所有节点(包括当前节点)。 - 配置 Kylin 节点的运行模式
kylin.server.mode
,参数值可选all
,job
,query
中的一个,默认值为all
。`job` 模式代表该服务仅用于任务调度,不用于查询;`query` 模式代表该服务仅用于查询,不用于构建任务的调度;`all` 模式代表该服务同时用于任务调度和 SQL 查询。
注意:默认情况下只有一个实例用于构建任务的调度 (即 kylin.server.mode
设置为 all
或者 job
模式)。
任务引擎高可用
从 v2.0 开始, Kylin 支持多个任务引擎一起运行,相比于默认单任务引擎的配置,多引擎可以保证任务构建的高可用。
使用多任务引擎,可以在多个 Kylin 节点上配置它的角色为 job
或 all
。为了避免它们之间产生竞争,需要启用分布式任务锁,需在 kylin.properties
里配置:
1 | kylin.job.scheduler.default=2 |
并记得将所有任务和查询节点的地址注册到 kylin.server.cluster-servers
。
安装负载均衡器
为了将查询请求发送给集群而非单个节点,可以部署一个负载均衡器,如Nginx等,使得客户端和负载均衡器通信代替和特定的 Kylin 实例通信。
1.安装依赖包
yum -y install gcc zlib zlib-devel pcre-devel openssl openssl-devel
2.下载并解压安装包
mkdir -p /usr/local/nginx
//下载tar包
wget http://nginx.org/download/nginx-1.13.7.tar.gz
tar -zxvf nginx-1.13.7.tar.g
3.安装nginx
//进入nginx-1.13.7目录
//执行命令
./configure –prefix=/usr/local/nginx
//执行make命令
make
//执行make install命令
make install
4.配置nginx.conf
# 打开配置文件
vi /usr/local/nginx/conf/nginx.conf
1 | #gzip on; |
检查配置文件是否正确:/usr/local/nginx/sbin/nginx -t
1 | 启动:/usr/local/nginx/sbin/nginx -c /usr/local/nginx/conf/nginx.conf |
重启:/usr/local/nginx/sbin/nginx -s reload
查看进程:ps -ef | grep nginx
从容停止:kill -QUIT 进程号
快速停止:kill -TERM 进程号
强制停止:pkill -9 nginx
第三章 快速入门
第四章 Kylin实例调研
从业务层面来讲,OLAP一般分为两个种类。
即席查询: 即席查询(Ad Hoc)是用户根据自己的需求,灵活的选择查询条件,系统能够根据用户的选择生成相应的统计报表。即席查询与普通应用查询最大的不同是普通的应用查询是定制开发的,而即席查询是由用户自定义查询条件的。
就是用户通过手写SQL来完成一些临时的数据分析需求,这类需求的SQL形式多变,逻辑复杂,对相应时间没有严格的眼球。
固化查询:对一些固化下来的取数,看数的需求,通过数据产品的形式提供给用户,从而提高数据分析和运行效率。这类SQL有固定的模式,对相应时间有较高要求。
美团大规模使用Kylin的例子:
随着公司业务数据量和复杂度的不断提升 ,第二种方案会出现几个突出的问题:
随着维度的不断增加,在数仓中维护各种维度组合的聚合表的成本越来越高,数据开发效率明显下降;
数据量超过千万行后,MySQL的导入和查询变得非常慢,经常把MySQL搞崩,DBA的抱怨很大;
由于大数据平台缺乏更高效率的查询引擎,查询需求都跑在Hive/Presto上,导致集群的计算压力大,跟不上业务需求的增长。
目前OLAP引擎种类还有挺多,目前还没有一个西永能够满足各种场景的查询需求,本质是:没有一个能在数据量、性能、灵活性三个方面做到完美,每个西永在设计时都需要在这三者间做出取舍。
例如:
MPP架构的系统(Presto/Impala/SparkSQL/Drill等)有很好的数据量和灵活性支持,但是对响应时间是没有保证的。当数据量和计算复杂度增加后,响应时间会变慢,从秒级到分钟级,甚至小时级都有可能。
搜索引擎架构的系统(Elasticsearch等)相对比MPP系统,在入库时将数据转换为倒排索引,采用Scatter-Gather计算模型,牺牲了灵活性换取很好的性能,在搜索类查询上能做到亚秒级响应。但是对于扫描聚合为主的查询,随着处理数据量的增加,响应时间也会退化到分钟级。
预计算系统 (Druid/Kylin等)则在入库时对数据进行预聚合,进一步牺牲灵活性换取性能,以实现对超大数据集的秒级响应。
对这些特点有了了解,针对不用的场合才知道选择什么。
典型的使用场景:
- 数据以离线生产为主,数据量在千万到百亿之间级别。
- 需要多维度任意组合的。
- 指标中包含大量去重主表,要求结果精确的。
- 相应时间要求:3s内
- 可以提供SQL接口
Kylin在生产环境经过了考研,2016年初美团开始推广Kylin解决方案。一年后覆盖了所有业务线,成为OLAP首选。
截止16年底,一年时间产生了214个Cube,包含的数据总行数2853亿行,Cube在HBase中的存储有59TB。日查询次数超过了50w次,TP50查询延迟87ms,TP99延迟1266ms,很好满足性能需求。
美团的硬件环境是:30个节点的Kylin专属Hbase集群,2台用于Cube构建的物理机,和8台8核16GVM用作Kylin的查询机。Cube的构建是运行在主计算机群的MR作业,各业务线的构建任务拆分到了他们各自的资源队列上。
Kylin对外是REST接口,我们接入了公司统一的http服务治理框架来实现负载均衡和平滑重启。
调研的时候,看到Cube这个概念,一般都会担心“维度爆炸这个问题”,就是每增加一个维度,由于维度组合翻倍,可能会产生纬度爆炸,对预计算的算力和磁盘空间都产生很大考验,后来发现这个问题并没有想象的严重。这主要是因为:
Kylin支持Partial Cube,不需要对所有维度组合都进行预计算。
实际业务中,纬度之间往往存在衍生关系,而Kylin可以把衍生纬度的计算从预计算推迟到查询处理阶段。
以事实表上的衍生维度为例,业务中的很多维度都是(ID, NAME)成对出现的。查询时需要对ID列进行过滤,但显示时只需要取对应的NAME列。如果把这两列都作为维度,维度个数会翻倍。而在Kylin中,可以把NAME作为ID列的extendedcolumn指标,这样Cube中的维度个数就减半了。
美团在采用衍生维度后,90%的场景可以把Cube中的维度个数(Rowkey列数)控制在20个以内。指标个数呈现长尾分布,小于10个指标的Cube是最多的,不过也有近一半的Cube指标数超过20。总共有382个去重指标,占到了总指标数的10%,绝大多数都是精确去重指标。49%的Cube膨胀率小于100%,即Cube存储量不超过上游Hive表。68%的Cube能够在1小时内完成构建,92%在2小时内完成构建。
从美团的实践中能看出美团投入Kylin是看重的Kylin的海量数据超高查询性能的特点,虽然在磁盘空间上做了一部分牺牲,实践证明Kylin的空间换时间是可行的。
斗鱼大规模使用Kylin的例子
斗鱼的这个例子,更有代表性,斗鱼随着业务的增长,在2019年Q2,平均MAU,达到1.6亿MAU,每天,超大量的用户使用斗鱼各客户端参与线上互动,斗鱼需要对客户端采集到的的性能数据进行统计和分析,开发出具有多维度分析图标和数据监控的APM( Application Performance Monitoring,应用性能监控 )平台。 最初,斗鱼采用了市面上非常流行的 Elasticsearch (简称 ES)实时聚合实现。运行一段时间后,基于 ES 的方案面临用户查询时间长、数据精度丢失等问题,斗鱼采用 Apache Kylin 替换 Elasticsearch, 对 APM 平台中存在的问题进行优化。不试不知道,一试吓一跳。
一、背景
斗鱼是一家面向大众用户的在线直播平台,每天都有超大量的终端用户在使用斗鱼各客户端参与线上互动。伴随业务的迅猛发展,斗鱼需要对客户端采集到的性能数据进行统计和分析,开发出具有多维度分析图表和数据监控的 APM (Application Performance Monitoring,应用性能监控) 平台。
针对不同的客户端采集的不同数据,我们需要将各种维度之间相互组合并聚合,最终产出的数据变成指标在图表中展示。例如:对在时间、地域、网络环境、客户端以及 CDN 厂商等维度聚合下的各项指标情况进行多维度分析,包括客户端网络性能(包含完整请求耗时,请求耗时,响应耗时,DNS 耗时,TCP 耗时,TLS 耗时等等指标)各类错误时间段内的占比以及详细数量、状态码分布等等。图一和图二分别是两个示例:
最初使用ES实施聚合,配合字眼多数据源统一接口(REST多数据源统一接口平台)框架,能够实现纬度指标的自由组合查询。数据采用strom实时消费kafka写入ES,醉倒了数据的事实展示,告警采用定时查询ES的方式。
运行一段时间后,ES的方案存在问题: 采用 ES 实时聚合的方式,大多数时候对单个字段的聚合查询是非常快的,一旦遇到较为复杂的多维度组合查询并且聚合的数据量比较大(如数十亿),就可能会产生大量的分组,对 ES 的性能压力很大,查询时间很长(几十秒到数分钟)导致用户难以等待,还可能会遇到数据精度丢失的问题。 因此为了支撑业务, 考虑再三我们决定寻找替代方案,注意到 Apache Kylin 在大数据 OLAP 分析方面非常有优势,于是决定采用 Kylin 替换 Elasticsearch, 对斗鱼 APM 平台中存在的问题进行优化。不试不知道,一试吓一跳,效果还真的不错。
二、使用Kylin的挑战和解决方案
Kylin集群的搭建
斗鱼的这个需求是独立业务,所以搭建了独立集群,目前为止集群共17台机器,其中 CM 节点3台,角色包含 HDFS,YARN,Zookeeper,Hive,HBase,Kafka(主要是消费使用),Spark 2 等。其中 4 台机器上部署了 Kylin 服务,采用了 1 个 “all“ 节点,1 个 “job“ 节点,2个 “query“ 节点的模式,确保了查询节点和任务节点都互有备份,满足服务的高可用。
斗鱼客户端收集到的 APM 数据会先暂存于 Kafka 消息队列中,Kylin 支持直接从 Kafka topic 中摄入数据而不用先落 Hive,于是我们选择了这种直连 Kafka 的方式来构建实时 Cube。
构建实时Cube的问题
斗鱼客户端收集到的 APM 数据会先暂存于 Kafka 消息队列中,Kylin 支持直接从 Kafka topic 中摄入数据而不用先落 Hive,于是我们选择了这种直连 Kafka 的方式来构建实时 Cube。
Kafka数据格式要求:
Kylin 的实时 Cube 需要配置基于 Kafka topic 的 Streaming Table (将 Kafka topic 映射成一张普通表)。这一步不同于基于 Hive 的数据表(Kylin 可以直接从 Hive metastore 获取表的字段信息),需要管理员进行一定的手工配置,才能将 Kafka 中的 JSON 消息映射成表格中的行。这一步对 Kafka 中的数据格式和字段有一定的要求,起初因为不了解这些要求,配置的 Cube 在构建时经常失败,只有少数 Cube 构建成功。也有的 Cube 很多次都构建成功,但偶尔会有失败。针对这些问题我们进行了一系列的排查和改进。现总结如下:
a. 由于我们原始数据在 kafka 中的存放格式为数组格式(JSON 字符串),所以在创建 Streaming Table 的时候会遇到下面的问题:
Kylin 会将数组中识别的字段默认加上数组下标,例如图中的 0_a,0_b 等,与我们的预期不符,所以需要对数组数据进行拆分。也就是说,Kylin 期望一条消息就是一个 JSON 对象(而非数组)。
b. 我们原始数据中还有嵌套的对象类型的字段,这种类型在 Kylin Streaming Table 识别的时候也可能会有问题,同样需要规整。如 Kylin 会把嵌套格式如 “{A: {B: value}}” 识别为 A_B 的字段,如下图,所以使用起来同样也可以,这个根据业务的不同可以自由选择,可以采用将嵌套字段铺平来规避后面可能出现的问题。
c. 这个是比较难发现的一个问题,就是在设计好 Cube 之后,有时会有 Cube 构建失败的情况,经过排查之后发现,是由于公司业务数据来源的特殊性(来自于客户端上报),所以可能会出现 Kafka 中字段不一致的情况。一旦出现极少数字段不一样的数据混在 Kafka 中,便极有可能让这一次的 Cube 构建失败。
基于以上几点,我们总结,Kylin 在接入 Kafka 实时数据构建之前,一定要做好数据清洗和规整,这也是我们前期耗费大量时间踩坑的代价。数据的清洗和规整我们采用的是流处理(Storm/Flink)对 Kafka 中的数据进行对应的处理,再写入一个新的 Kafka topic 中供 Kylin 消费。
- 任务的定时调度
Cube 的构建任务需要调用 API,如何定时消费 kafka 的数据进行构建,以及消费 kafka 的机制究竟如何。由于对 Kylin 理解的不够,一开始建出来的 Cube 消耗性能十分严重,需要对所建的 Cube 配合业务进行剪枝和优化。
构建实时 Cube 和构建基于 Hive 的离线 Cube 有很多不一样的地方,我们在使用和摸索的过程中踩了很多坑,也有了一定的经验。
由于是近实时 Cube 构建,需要每隔一小段时间就要构建一次,采用服务器中 Kylin 主节点上部署 Crontab-job 的模式来实现。
调度的时间间隔也经过了多次试验,调度的时间短了,上一个任务还没有执行完,下一个就开始了,容易产生堆积,在高峰时期容易造成滚雪球式的崩塌;调度的时间长了,一次处理的数据量就会特别大,导致任务执行的时间和消耗的资源也随之增长(Kylin 取 Kafka的数据,是比较简单粗暴的从上一次调度记录的 offset 直接取到当前最新的 offset,所以间隔时间越长,数据量越多),这是一个需要平衡的事情。经过反复测试使用,以及官方相应的介绍下,我们发现任务执行时间在 10~20 分钟为最优解,当然根据数据量的不同会有不同的调整。
3)Kylin 的剪枝与优化
由于业务比较复杂,每个 Cube 的维度可能特别的多,随着维度数目的增加 Cuboid 的数量会爆炸式的增长。例如 WEB 端网络性能分析的 Cube 维度可能达到 47 个,如果采用全量构建,每一个可能情况都需要的话,最多可能构建 2 的 47 次方,也就是1.4 * 10^14 种组合,这肯定是不能接受的。所以在 Cube 设计的时候一定要结合业务进行优化和剪枝。
首先是筛选,将原始数据中根据不同的业务,选择不同的字段进行设计,以Ajax性能分析为例,选择出需要使用的 25 种维度。(2^47 -> 2^25)
接下来是分组,将 25 种维度按照不同的场景进行分组,例如,地域相关的可以放在一起,浏览器相关的也能分为一组。我们将场景分为了 4 组,将指数增长拆分为多个维度组之和。好的分组可以有效的减少计算复杂度,但是没有设计好的分组,很可能会由于设计问题没有覆盖好各种场景,导致查询的时候需要二次聚合,导致查询的性能很差,这里需要重点注意。(2^25 -> 2^12 + 2^13 + 2^14 + 2^13)
然后是层级维度(Hierarchy Dimensions)、联合维度(Joint Dimensions)和必要维度(Mandatory Dimensions)的设置。这三个官网和网上都有大量的说明,这里不加赘述。最终实现 Kylin 的剪枝,来减少计算的成本。
最后是 Kylin 本身一系列的配置上的优化,这些针对各自业务和集群可以参照官方文档进行调参优化。
Kylin集群优化
起初我们为 Kylin 集群申请的机器类型是计算密集型,没有足够的本地存储空间。Kylin 在运行的过程中磁盘经常满了,常常需要手动清理机器。同时在前期运行的过程中时不时会出现「Kylin 服务挂了(或者管理端登不上)」,「HBase 挂了」等等情况,针对遇到的这几个问题,我们有一些解决的措施。
1)磁盘不足。因为 Kylin 在构建 Cube 的时候,会产生大量的临时文件,而且其中有部分临时文件 Kylin 是不会主动删除的,所以机器经常会出现磁盘空间不足的问题(也跟我们计算型机器磁盘空间小有关)。
解决办法:采用定时自动清除,和手动调动 API 清除临时文件,扩容 2 台大容量机器调整 Reblance 比例(这才彻底解决这个问题)。
2)服务不稳定。刚开始的时候集群部分角色总是挂起(例如 HDFS、HBase 和 Kylin 等),排查发现是由于每台机器存在多个角色,角色分配的内存之和大于机器的可用内存,当构建任务多时,可能导致角色由于内存问题挂掉。
解决办法:对集群中各个角色重新分配,通过扩容可以解决一切资源问题。添加及时的监控,由于 Kylin 不在 CM 中管理,需要添加单独的监控来判断 Kylin 进程是否挂掉或者卡住,一旦发现需要重启 Kylin。要注意有 job 的节点重启时需要设置好 kafka 安装路径。
HBase超时优化
Kylin 在后期维护中,经常会有任务由于 operationTimeout 导致任务失败。如图:
这个报错让 Cube 构建常常失败,且一旦构建失败超过一定的次数,该 Cube 就不会继续构建了,影响到了业务的使用,针对此问题也进行了相应的排查,发现是构建的时候,可能会由于 HBase 连接超时或者是连接数不够造成任务失败。需要在 CM 中调整 HBase 相关参数。包括调整 hbase.rpc.timeout 和 hbase.client.operation.timeout 到 50000 等(之前是 30000,可以根据业务不不同自行调整,如果还有超时可以优化或者继续调整)。
已有 Cube 的修改
由于业务的迭代,新增了几个维度和指标需要增加在已存在的Cube上,又例如原先 Cube 设计上有一些不足需要修改。在这方面例如 DataSource 没有修改功能,新旧 Cube 如何切换,修改经常没有响应等等问题让我们十分为难。
已有 Cube 的修改是目前使用 Kylin 最为头疼的地方。虽然 Kylin 支持 Hybrid model 来支持一定程度的修改,但是在使用的过程中因为
各种各样的原因,例如 Streaming Table 无法修改来新增字段等,还是未能修改成功。
目前采用的修改模式为,重新设计一整套从 DataSource 到 Model 再到 Cube,停止之前 Cube 的构建任务,开始新 Cube 的构建调度。使用修改我们 Java 代码的方式动态的选择查询新 Cube 还是旧 Cube,等到一定的时间周期之后再废弃旧 Cube。目前这种方式的弊端在于查询时间段包含新旧时,需要在程序中拼接数据,十分麻烦且会造成统计数据不准。所以在设计之初就要多考虑一下后面的扩展,可以先预留几个扩展字段。
用SPA如今稳进死
三、效果对比
对比表格
条件 | Apache Kylin | ES实时聚合 | Hive 离线任务再入 MySQL |
---|---|---|---|
查询速度 | 较快,一般在亚秒级别。从HBase中选择适合的纬度,Cube设计的好的话不存在二次聚合,也不会有速度方面的问题。 | 慢,可能有几分钟。实时聚合,在复杂的情况下有严重的性能问题,查询的时间可能到几分钟。 | 快,一般在毫秒级。计算好的数据基于 MySQL 查询,一般不会有性能问题。 |
时效性 | 近实时,一般在30分钟以内。延迟主要取决于任务调度的时间,但是一般都会在10~30分钟左右。 | 实时,一般延迟在秒级。ES的延迟是取决于上游数据的写入延迟和数据刷新的时间,一般可以控制在秒级。 | 离线,一般是T+1延迟。离线数据由于同步和计算的关系,一般都是+1小时延迟或者是+1天延迟。 |
开发难度 | 较简单。实时数据需要先进行一系列的清洗和规整,后面只需要配置即可,不过 Cube 设计有一定的难度。 | 简单。只需要写入数据即可,配合已有的EST框架可以任意组合满足业务需要。 | 工作量极大。针对每一种维度组合,都需要手动开发任务来进行计算和存储。 |
资源消耗 | 一般。单独搭建的集群,不会对其他业务造成影响,但是集群资源需求还是比较大。 | 一般。查询和写入一旦量大复杂后对集群上其他的查询会带来影响。 | 一般。在大集群上跑 YARN 任务,对集群整体影响不大。 |
可拓展性 | 不太好扩展。针对已经建好的 DataSource、Model 和 Cube 的修改比较不友好,但是有解决的办法。 | 可扩展性较强。所有的修改只需修改Index模板,下一周期生效即可。 | 基本无法扩展。每次有新的业务需求需要重新开发任务。 |
容错性 | 较差。对数据的格式和类型要求较为严格,容易导致构建失败。 | 较差。字段不一致会带来冲突,导致字段无法聚合,且冲突一旦在索引中生成,该索引将无法解决,只有等待下一周期或删除索引。 | 较好。对字段类型和数据字段有一定的容错性。 |
数据查询复杂度 | 十分简单。Kylin 会根据条件自动识别就是在哪一个 Cuboid 中查询数据,只需要使用 SQL 即可,跨 Cuboid 的查询也可以自动二次聚合,SQL 也可以直接配合 EST 框架。 | 较为容易。配合 EST 框架查询十分容易,但是由于索引有小时和天后缀,需要在程序中进行判断,才能有效降低查询量。 | 十分困难。由于每个维度存储组合存储的表都不一样,导致存储结构十分复杂,查询的时候需要自己判断在那张表里面,难度很大。 |
Tips: Impala速度快的原因
impala采用了MPP查询引擎(Massively Parallel Processor), 在数据库非共享集群中,每个节点都有独立的磁盘存储系统和内存系统,业务数据根据数据库模型和应用特点划分到各个节点上,每台数据节点通过专用网络或者商业通用网络互相连接,彼此协同计算,作为整体提供数据 库服务。非共享数据库集群有完全的可伸缩性、高可用、高性能、优秀的性价比、资源共享等优势。
Elasticsearch也是一种MPP架构的数据库,Presto、Impala等都是MPP engine,各节点不共享资源,每个executor可以独自完成数据的读取和计算,缺点在于怕stragglers,遇到后整个engine的性能下降到该straggler的能力,所谓木桶的短板,这也是为什么MPP架构不适合异构的机器,要求各节点配置一样。
四、总结
之前ES需要90s完成的查询,使用Kylin只需要2-3s,原来需要加载几分钟的仪表盘,现在只需要几秒钟就能加载完成,速度提升30多倍。
在开发效率上,切换至 Kylin 在前期不熟悉的情况下的确走了一些弯路,踩了不少坑(跟数据质量、对 Kylin 原理的掌握等都有关)。但是后面在熟悉之后便可以有不逊色于 ES 的开发效率,用起来非常不错。
目前版本的 Kylin 也有一些不足,例如数据的时效性,因为 Kylin 2.x 的流数据源只能达到准实时(Near Real-time),准实时延迟通常在十几到几十分钟的,对 APM 系统中的实时告警模块还不能满足业务要求。所以目前实时告警这一块走的还是ES,由于告警只需要对上个短暂周期(1~5 分钟)内的数据做聚合,数据量较小,ES 对此没有性能问题倒能承受。对于海量历史数据,通过 Kylin 来查询的效果更好。
新系统于 2018 年 11 月正式上线,目前已经稳定运行近一年。我们也注意到 Kylin 3.0 已经在实时统计上开始发力,能够做到 ES 这样的秒级延迟,我们会持续关注,希望 Kylin 可以发展的越来越好。
第五章 Kylin在腾讯的平台化及Flink引擎实践
Kylin 现有的用户管理、资源隔离机制并不能满足我们需求,基于此,腾讯对 Kylin 进行了平台化改造。希望平台化改造完成后,在下面这些层面,能够有一些改进:
- 用户管理
- 资源隔离
- 易用性提升
- 方便运维
用户管理:
为了便于系统的管理及安全,公司内部有一套自己的认证系统,而且需要用个人账号去验证,所以 Kylin 作为一个平台对外提供服务的话,也需要接入到该系统。所以,我们新增了一个用户管理界面,该界面展示了 Kylin 平台内的所有用户。管理员可以新增任一用户到 Kylin 平台,新增用户时会填写企业微信名、用户角色以及是否激活用户。当用户登录系统时,会自动检测用户账号以及该账号是否在平台内注册,如果没有注册则无权限,反之自动登录系统。
内部Hive兼容
由于历史原因,我们部门内的 Hive 版本(THive)与 Kylin 不兼容,这就导致 Kylin 无法正常访问 Kylin 集群,所以我们采用了上图所示的兼容方案。首先,我们使用社区 Hive 版本搭建一个全新的 Hive,并作为 Kylin 的默认 Hive;其次,当 kylin 加载源表时,我们是通过内部的 UPS 系统读取 THive 的元数据信息;最后,在 Load 源表到 Kylin 时,我们根据表的元数据信息在 Kylin 的 Hive 上创建一张相同的表,但该表的存储路径依旧指向 THive 的路径,而用户在构建 cube 时,则访问新创建的表,至此就解决了 Kylin 访问 THive 的问题。
计算资源可配置化
目前,Kylin 配置计算资源信息有两种方式:一是在 Kylin 配置文件中配置一个全局的计算集群及队列;二是在创建工程或者 Cube 时,在扩展参数中指定集群配置。这两种配置方式在灵活性及便捷性方面都比较差,而在我们内部是有接口可以获取到某一个用户有计算资源的计算集群及计算队列的,所以,在创建工程或者 Cube 时,我们使用了下拉框选择式的方式,让用户选择提交任务的计算资源及队列,从而大大简化了用户的使用流程。
通知机制
Kylin 只提供了发邮件通知的功能,而作为目前使用最广泛的工具,微信、企业微信在实时性及便捷性方面都远远胜于邮件,所以,我们提供了邮件、微信、企业微信三种方式,供用户选择。
定时调度
Kylin 系统自身并没有提供定时调度功能,但基本上每家公司都有自己的统一调度平台,我们也不例外。我们通过 Kylin 提供的API接口,将 Cube 定时构建的功能作为一个插件集成到了公司内部的统一调度平台上。
业务接入
做完以上平台化改造后,Kylin 平台基本具备了接入不同类型业务的能力,用户申请接入流程如上图所示。
业务使用情况:
我们团队是在今年初才开始引入 Kylin,目前已经在使用的业务主要有 QQ 音乐、腾讯视频、广点通、财付通等,Cube 的数量有 10 个,单份数据存储总量是 5 T,数据规模在 30 亿条左右。
Flink Cube Engion:
1 | 目前,Kylin 已经支持使用 MapReduce 和 Spark 作为构建引擎,而作为目前比较火的流批一体的大数据计算引擎怎能缺席?所以我使用 Flink 开发了一个高性能的构建引擎:Flink Cube Engine。 |
上面
第一个链接可以看到flink cube的完成情况
第二个链接可以看到flink cube在github的代码
使用:
Kylin 的一次 Cube 构建任务,包含了很多个子任务,而最重要的莫过于 Cube 构建这一步骤,所以,我们在 build 和 merge Cube 这两种任务中,优先实现了Cube 构建这一步骤,其他计算步骤依旧通过使用 MapReduce 来实现。
选择使用 Flink Cube Engine 的方式也和选择 Map Reduce 和 Spark 任务类似,我们提供了前台可视化的界面,供用户选择。
上图是我们内部业务上线 Flink Cube Engine 之后的性能对比,从图中可见,该步骤的构建耗时从 49 分钟降到了 13 分钟,优化效果比较明显。两种情况的资源配置如下:
Flink 配置为:
-ytm 4G -yjm 2G -ys 1 -p 100 -yn 100
Spark 采用的动态分配资源如下:
kylin.engine.spark-conf.spark.dynamicAllocation.enabled=true
kylin.engine.spark-conf.spark.dynamicAllocation.minExecutors=2
kylin.engine.spark-conf.spark.dynamicAllocation.maxExecutors=1000
kylin.engine.spark-conf.spark.dynamicAllocation.executorIdleTimeout=300
kylin.engine.spark-conf.spark.shuffle.service.enabled=true
kylin.engine.spark-conf.spark.shuffle.service.port=7337
虽然,Spark 采用的是动态分配资源,但在任务执行过程中,我们观察到 Spark实际分配的资源远比 Flink 要多的多。
那为什么性能提升会那么明显呢?
- Flink Cube Engine 的优化
性能的提升,无非有两方面的原因,一是参数的优化,二是代码的优化。
- 调参
影响 Flink 任务性能主要有几个核心参数:并行度、单个 TM slot 数目、TM container 数目,其中单个 TM container 数目=并行度/单个 TM slot 数目。
我们调优的过程采用了控制变量法,即:固定并行度不变、固定 Job 总内存数不变。通过不断的调整单个 TM 的 slot 数目,我们发现如果单个 TM 的 slot 数目减少,拉起更多的 TM container 性能会更好。
此外,我们还使用了对象复用、内存预分配等方法,发现没有对性能提升起到太大的效果。
- 代码优化(合并计算)
在实现 Flink Cube Engine 的时候,一开始我们使用了 Map/Reduce 两个算子,发现性能很差,比 Spark 的性能还要差很多,后来我们通过调整使用了 Flink 的 mapPartition/reduceGroup 两个算子,性能就有了明显的提升。
Flink Cube Engine 下一步的计划:
- 全链路 Flink
如上所述,目前 Cube 构建过程中,只有最关键的 cube 构建这一子任务使用了 Flink,而其他子任务仍然使用的是 MapReduce,我们下一步会继续完善 Flink Cube Engine,将所有的子任务都使用 Flink 来构建。
- Flink 升级到 1.9
Flink 最近发布了 1.9.0,该版本包含了很多重要特性且性能也有了一定提升,所以,我们会把 Flink Cube Engine 使用的 Flink 版本升级到1.9.0。
第六章 Kylin实操
安装
Kylin的安装相对简单,我们使用的Kylin是2.6.3版本的,安装并没有多少复杂的步骤,按照官网的步骤安装即可:
安装完成后,在bin目录下面执行check-env.sh,这个脚本会执行检查,如果仅仅显示KYLIN_HOME is set to… 说明一切配置正常,就可以启动了。
使用
回到Kylin的bin目录下执行sample.sh,成功执行后会有如下提示:
1 | Retrieving hadoop conf dir... |
出现这个提示,就说明已经成功了。
根据日志提示信息,重新加载元数据,使kylin能读取到创建好的project “learn_kylin”.跳转至system ,点击reload metadata
然后buil cube
此时cube正在创建,可以在Monitor中监控到整个cube构造的过程以及可以查看到每一步构建的过程和日志。
然后就可以在SQL输入框输入相关的SQL,就可以做简单的可视化。
第七章 Kylin的优化
查询时间优化
Kylin 的查询过程主要包含四个步骤:解析 SQL,从 HBase 获取数据,二次聚合运算,返回结果。显然优化的重点就落在如何加快 HBase 获取数据的速度和减少二次聚合预算。
- 提高 HBase 响应时间:修改配置,修改 Cache 的策略,增加 Block Cache 的容量
- 减少二次聚合运算:合理设计纬度,使查询时尽量能精确命中 Cuboid。去重值使用有损算法。
预计算优化
预计算的优化,主要考虑有何缩短构建花费的时间,以及中间结果和最终结果占用的空间。每个业务单独一个 Cube,避免每个 Cube 大而全,减少不必要的计算。
Cube优化
随着维度数目的增加,Cuboid 的数量成指数级增长。为了缓解 Cube 的构建压力,Kylin 提供了 Cube 的高级设置。这些高级设置包括聚合组(Aggregation Group)、联合维度(Joint Dimension)、层级维度(Hierarchy Dimension)和必要维度(Mandatory Dimension)等。
合理调整纬度配置,对需构建的 Cuboid 进行剪枝,刷选出真正需要的 Cuboid,优化构建性能,降低构建时间,大大提高了集群资源的利用效率。
如果Cube优化的好,效果可以非常明显。
优化方法:
1.必须维度
查询时,经常使用的维度,以及低基数纬度。如该维度基数<10,可以考虑作为必须维度。
2.层级维度
维度关系有一定层级性、基数有小到大情况可以使用层级维度。
3.Joint维度
维度之间是同时出现的关系,及查询时,绝大部分情况都是同时出现的。可以使用 joint 维。
4.维度组合组
将为维度进行分组,查询时组与组之间的维度不会同时出现。
配置优化:
配置优化,包括 Kylin 资源的配置以及 Hadoop 集群配置相关修改。
- 构建资源
每个 Cube 构建时,所需的资源不太一样,需要进行相应的资源调整。
- 调整副本
集群默认的文件副本数为 3,Cube 构建时,将副本数调为 2,个别中间任务还可以调整为 1,这样可以降低构建任务时集群 IO。为了保证查询的稳定性,HBase 副本数依然为 3。
- 压缩格式
在实验中发现,如果Hive和Hbase都设置了snappy格式,那么集群IO交互会小很多。
总结:
Kylin在复杂业务的查询中可以用预计算的方式提前计算好预期结果存在Hive临时表中,然后写入Hbase,优点显而易见:
1.可以合理配置集群资源,预计算的时间可以提前设定。
2.对海量数据的提前预计算意味着用户查询时不用重复计算,直接从Hbase中获取结果即可。
缺点:
Kylin仅支持星型模型的数据集,对原数据提出了要求。
并且作为空间换时间的OLAP应用,需要占用HBase集群大量的空间。
维度的膨胀需要结合业务控制,不同的业务还需要研究不同Cube的优化,使用成本会比IMPALA 这种使用SQL语言的高不少。