0%

Flink集群搭建

Flink集群搭建

环境:

hadoop 2.6.0

centos 7

java 1.8.144

scala 2.11.8

机器: datanode 126 127 128(ssh)

版本选择

参考:Flink 官方文档

Flink 1.8

选择原因: Flink 还处在频繁更新的状态,较新的版本,特性和老版本特别较大,

Flink 1.8 有如下主要改变:

1.将会增量清除旧的State
2.编程方面TableEnvironment弃用
3.Flink1.8将不发布带有Hadoop的二进制安装包

其中编程方面的改变比较重要,会延续到以后的版本,综合考虑,不使用最新的1.9,使用1.8是较为稳妥的选择.

Flink安装模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
local模式:适用于本地开发和测试环境,占用的资源较少,部署简单 ,只需要部署JDK和flink即可达到功能开发和测试的目的。只需要一台主机即可。

standalone cluster:可以在测试环境功能验证完毕到版本发布的时候使用,进行性能验证。搭建需要ssh
jdk和flink。至少需要3台主机,一个master两个worker节点。

YARN:flink使用YARN进行调度。

Hadoop Integration:和hadoop生态进行整合,可以借用HDFS、YARN的功能,是用于整个大数据环境都用Hadoop全家桶的环境。

Docker: 在开发测试使用,docker方式很容易搭建。推荐的方式。

kubernetes:由于FLink使用的无状态模式,只需要kubernetes提供计算资源即可。会是Flink以后运行的主流方式,可以起到节约硬件资源和便于管理的效果。

HA模式:
现在主流的方式有standalone cluster HA 和YARN cluster HA方式,适用于在生产上部署。
standalone cluster HA:
需要JDK、ssh、zookeeper HA、flink构建,至少需要三个物理机。

YARN cluster HA:
需要JDK、ssh、zookeeper HA、Hadoop HA、flink,需要更多的资源。

若flink运行于k8s则可以借助于kubernetes的集群提供高可用,充分的利用资源。

当前大部分公司还是将Flink运行在物理机上。

多种安装模式尝试:

下载

Flink官方链接

官方链接中可以选择使用scala 2.11 还是 2.12版本.这边选择2.11版本即可.

下载完成之后, tar zvxf flink-1.8.1-bin-scala_2.11.tgz -C /usr解压

Yarn配置模式的选择:

如果选择Flink on Yarn的话,就比较简单,因为测试集群上已经存在了CDH,所以直接尝试Flink on yarn.

把 Flink 运行在 YARN 上有两种方式,第一种方式是建立一个长期运行的 Flink YARN Session,然后向这个 Session 提交 Flink Job,多个任务同时运行时会共享资源。第二种方式是为单个任务启动一个 Flink 集群,这个任务会独占 Flink 集群的所有资源,任务结束即代表集群被回收。

配置

因为之前安装使用的CDH 所以系统中没有hadoop的环境变量,这边需要配置hadoop的环境变量才可以继续使用.

配置如下:

1
2
3
4
5
# hadoop
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop

# hadoop conf
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop

配置完成之后 source 立即生效.

注意

因为之前说过1.8版本有个新特性就是官方不再发布关联hadoop的二进制包,所以hadoop的依赖我们自己下载.

下载地址

我们的hadoop是2.6的,下载这个2.6版本的就好.

下载完成后有添加到lib

1
flink-shaded-hadoop-2-uber-2.6.5-7.0.jar

使用yarn session.sh启动yarn的Session模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bin/yarn-session.sh run -n 2 -tm 2048 -s 4
启动Session参数说明:
-n(--container) taskmanager的数量  
-s(--slots) 用启动应用所需的slot数量/ -s 的值向上取整,有时可以多一些taskmanager,做冗余 每个taskmanager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1 6~10
-jm jobmanager的内存(单位MB) 3072
-tm 每个taskmanager的内存(单位MB)根据core 与内存的比例来设置,-s的值* (core与内存的比)来算
-nm yarn 的appName(现在yarn的ui上的名字)|  
-d 后台执行
启动任务参数:
-j 运行flink 应用的jar所在的目录
-a 运行flink 应用的主方法的参数
-p 运行flink应用的并行度
-c 运行flink应用的主类, 可以通过在打包设置主类
-nm flink 应用名字,在flink-ui 上面展示
-d 后台执行
--fromsavepoint flink 应用启动的状态恢复点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
[root@datanode127 flink-1.8.1]# bin/yarn-session.sh run -n 2 -tm 2048 -s 4
2019-07-31 11:46:46,986 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost
2019-07-31 11:46:46,987 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
2019-07-31 11:46:46,987 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
2019-07-31 11:46:46,987 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m
2019-07-31 11:46:46,987 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-07-31 11:46:46,988 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
2019-07-31 11:46:47,489 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to root (auth:SIMPLE)
2019-07-31 11:46:47,548 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at datanode127/172.16.0.127:8032
2019-07-31 11:46:47,766 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
2019-07-31 11:46:48,018 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-07-31 11:46:48,033 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/data2/flink/flink-1.8.1/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-07-31 11:46:49,540 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1564366526843_1818
2019-07-31 11:46:49,560 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1564366526843_1818
2019-07-31 11:46:49,560 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-07-31 11:46:49,561 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-07-31 11:46:52,573 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
2019-07-31 11:46:52,928 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
Flink JobManager is now running on datanode127:37898 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://datanode127:37898
^C2019-07-31 11:54:04,390 INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint.
2019-07-31 11:54:04,392 INFO org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete.
2019-07-31 11:54:04,393 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Deleted Yarn properties file at /tmp/.yarn-properties-root
[root@datanode127 flink-1.8.1]# ^C
[root@datanode127 flink-1.8.1]# bin/yarn-session.sh run -n 2 -tm 2048 -s 4
2019-07-31 11:58:44,326 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost
2019-07-31 11:58:44,327 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
2019-07-31 11:58:44,327 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
2019-07-31 11:58:44,327 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m
2019-07-31 11:58:44,327 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-07-31 11:58:44,327 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
2019-07-31 11:58:44,779 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to root (auth:SIMPLE)
2019-07-31 11:58:44,837 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at datanode127/172.16.0.127:8032
2019-07-31 11:58:45,046 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
2019-07-31 11:58:45,286 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-07-31 11:58:45,300 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/data2/flink/flink-1.8.1/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-07-31 11:58:46,808 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1564366526843_1821
2019-07-31 11:58:46,828 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1564366526843_1821
2019-07-31 11:58:46,828 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-07-31 11:58:46,829 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-07-31 11:58:50,594 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
2019-07-31 11:58:50,961 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
Flink JobManager is now running on datanode127:35550 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://datanode127:35550

开启之后 最后会给一个WebUI的地址,经过多次发现on yarn模式下,这个端口是随机分配的.

wordcount

使用IDEA创建新的Maven项目,写一个简单的wordcount

在Flink Web UI中创建一个简单的任务:

选中Submit new Job,把打好的Jar包上传进去,可以在这个界面选择需要传入的args

传完参数之后就可以运行了,任务的日志和记录也都可以在上面找到.

点进去就可以看到任务的记录.

这就是Flink on yarn的部署方式之Flink YARN Session.

Standalone

目前大多数企业使用应该还是使用Standalone模式的,从官方发行版本不再包含yarn的jar包就可以看出,Flink团队应该也不是特别喜欢yarn对资源的调度,在Standalone模式里面,我们可以自己配置Flink的资源使用.

安装 解压都和上面一样,主要区别在与Standalone模式要修改flink配置文件.

1
2
3
4
5
6
7
8
9
jobmanager.rpc.address: localhost1   --jobManager 的IP地址
jobmanager.rpc.port: 6123  --jobManager 的端口,默认为6123
jobmanager.heap.mb --jobManager 的JVM heap大小 
taskmanager.heap.mb  --taskManager的jvm heap大小设置
taskmanager.numberOfTaskSlots  --taskManager中taskSlots个数,最好设置成work节点的CPU个数相等
parallelism.default  --并行计算数
fs.default-scheme --文件系统来源
fs.hdfs.hadoopconf:  --hdfs置文件路径
jobmanager.web.port    -- jobmanager的页面监控端口

配置完成后,就可以直接启动

1
sudo ./start-cluster.sh

停止脚本

1
sudo ./stop-cluster.sh

直接浏览器访问页面+管理web 端口

localhost:8081

这里涉及的配置文件比较多,上面只标出了几个比较重要的,需要使用的时候(如配置HA)还是要看最新的官方文档.


下图展示了 Flink 中目前支持的主要几种流的类型,以及它们之间的转换关系。

Session:

在yarn中启动一个长久运行的flink集群,提交任务时都提交到这个flink-session集群中运行,适合小任务。

做HA:保证YarnSessionClusterEntrypoint的高可用,需要在yarn里配置yarn.resourcemanager.am.max-
attempts,在flink-conf.yaml中添加high-availiability的配置

提交任务命令:
先启动yarn-session,
然后直接flink run …

per-job:

每次提交任务都单独启动一个flink集群,适合长久运行的大任务。

HA:per-job的高可用是复用的 standalone HA 的,所以需要配置flink的standalone HA和yarn.application-attempts: 3

提交任务命令:
flink run -m yarn-cluster -yqu root.myjob1 …

问题:per-job方式在yarn上一直是accept状态,flink报错:deployment took more than 60 secoeds

我这里是,由于yarn调度器实际上是一个队列,在yarn-cluster选项中有**-yqu这个参数,表示指定一个队列,如果你已经提交了一个per-job任务,再次提交必须指定一个新的队列名称**,否则他会一直等待之前的任务结束后,新的job才能running,所以一直是accept。

解决:flink run -m yarn-cluster -yqu root.myjob1