![深入理解Flink:实时大数据处理实践](https://wfqqreader-1252317822.image.myqcloud.com/cover/828/25449828/b_25449828.jpg)
2.3 编程模型
2.3.1 分层组件栈
Flink的组件分为4层,各个模块之间的层次关系如图2-5所示。
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/68_1.jpg?sign=1738852517-TTYhwqofPDigm8iNCkwkybovFoFfNaoT-0-13d23b6aeef366443f8f44c431c7a922)
图2-5 Flink各个模块之间的层次关系
(1)Deploy层:Flink支持多种部署模式,如本地(Local)单机版、Standalone集群、YARN集群及云(Cloud)部署模式。
(2)Core 层:本层是 Flink 分布式数据处理引擎的核心实现层,包括计算图的所有底层实现,例如时间与窗口机制、一致性语义、任务管理与调度、物理执行计划。应用程序通常不需要调用本层 API,而是调用流处理 API、批处理API或构建在这两层API基础之上的Library API。
(3)API层:该层包括流处理API和批处理API,Flink的批处理是建立在流式架构上的,而不是用批处理模拟流处理,这种技术基因决定了 Flink 更适用于流处理的场合。
(4)Library层:该层是Flink的应用框架层,构建在流处理API和批处理API之上,因此同一应用框架库有两种版本选择,如流式关系型 API(Table/SQL)。此外,本层还包括CEP、FlinkML和Gelly。
2.3.2 流式计算模型
一个典型的流处理应用程序(命名为Programm 2.1)如下:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/69_1.jpg?sign=1738852517-pFZagJndSWZGlgPEUO5sKuu26d3Hzoyv-0-19ddc034bf892ca8416e09b61c1cd817)
这段程序的逻辑计算图形式如图2-6所示。
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/69_2.jpg?sign=1738852517-9UHlwpgaD9ueSBLrCyxiYXRFCoaylzkp-0-503d1d591f19e2cdcfca920a3f795c69)
图2-6 Programm 2.1的逻辑计算图形式
图 2-6 中 Stream 为传输通道中的数据,Operator 为计算图的节点
,Streaming Dataflow为计算图
。
计算图的物理形式由计算节点的多个并行实例组成,其中并行实例的含义是:在分布式环境中,同一计算节点有多个功能相同的物理部署实例,如图2-7中逻辑形式中的map()节点会有两个部署实例map()[1]和map()[2]。
在并行模式时:
(1)每个Operator的实例数为并行度,任意两个Operator的并行度之间是独立的。例如,图2-7中Source Operator的并行度为2,而Sink Operator的并行度为1;每个Operator称为一个任务,Operator的每个实例称为子任务(subtask),子任务这个概念来自其和JVM线程之间的关系。
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/70_1.jpg?sign=1738852517-P6FkM5Qp4wLwyHWQ2dkpNiD4YTyY8biH-0-d367f65d145fb68b9bd5ca3f0bbbdd27)
图2-7 Programm 2.1的物理计算图形式
(2)Stream有一个或多个分区(partition)。Stream有两种模式:
● 直连(One-to-One)模式,即一个实例的输出是另一个实例的输入。在Programm 2.1 的物理计算图形式中,Source 的 subtask[1](即 Source[1])和map的subtask[1](即map [1])直接相连,Source[1]的输出全部传输给map [1],没有被拆分成多个分区。
● 分区(Redistribution)模式,即一个实例的输出被拆分成多个部分传输给多个下级实例。在Programm 2.1的物理计算图形式中,map [1]被拆分成两部分,分别输入给不同的下级实例。
2.3.3 流处理编程
1.DataStream与DataSet
Flink用DataStream表示无界数据集,用DataSet表示有界数据集,前者用于流处理应用程序,后者用于批处理程序。根据所处理事件数据结构类型的不同,应用程序可以定义不同类型的 DataStream对象和 DataSet对象。以下程序定义事件类型为String的DataStream对象和事件类型为LabeledVector(带标签的训练样本,每个样本用向量表示)的DataSet对象:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/71_1.jpg?sign=1738852517-QXboDtmS12GGJClPcIpCJF6UDWGsmz4j-0-1e04864e128e525708ff8d4bd9690781)
从操作形式上看,DataStream 和 DataSet 与集合(Collection)有些相似,但是两者有着本质不同:
(1)DataStream 和 DataSet 是不可变的数据集合,因此不可以像操纵集合那样增加或删除 DataStream和 DataSet中的元素,也不可以通过诸如下标等方式访问某个元素。这里重申之前定义的概念,事件、元素、数据等都是用于指代流处理或批处理所处理的数据对象的,具体使用哪个称呼依赖语境。
(2)Flink应用程序通过Source创建DataStream对象和DataSet对象,通过转换操作产生新的DataStream对象和DataSet对象。
2.程序结构
Flink按照数据处理流程编写应用程序,共分为5个步骤。
1)获取运行时
运行时分为两类:StreamingExecutionEnvironment和ExecutionEnvironment,分别对应流处理和批处理程序:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/72_1.jpg?sign=1738852517-gvZg9kgeGABeKcUg2gLFzAIwzF2FIJX2-0-7c7c77e7249b798b45047ba5303e6c80)
运行时是应用程序被调度执行时的上下文环境,上述方法根据当前环境自动选择本地或集群运行时环境。以流处理为例,创建方法如下:
(1)通过createLocalEnvironment方法创建运行时,基于这种运行时的应用程序会运行在同一个 JVM 进程中,本地调试时通常采用这种运行时。createLocalEnvironment有三种接口形式:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/72_2.jpg?sign=1738852517-DNl1wwCRlWMMeJdSVmROhBg6TagVyOIi-0-d13a0486bef347cbd2a07cf671124266)
从上面的接口可以看出,通过 createLocalEnvironment 方法创建的运行仍是StreamingExecutionEnvironment。
(2)通过 createRemoteEnvironment 创建运行时,基于这种运行时的应用程序会被提交到集群中运行,连接集群调试通常用这种运行时。createRemoteEnvironment有两种接口形式:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/72_3.jpg?sign=1738852517-O0dYBcTwbTnZZlNiDjjVKKc6GbX21Ewi-0-dabfb86028b2624a9a3ef1160c13ca71)
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/73_1.jpg?sign=1738852517-PHh8AeL6sRplCIOZeCfgwkgrdUQ0dwpj-0-8f303e9c9979272d49620c7f87ebcddb)
2)添加外部数据源
可以添加外部数据源,如 Kafka和文件,也可以由应用创建 DataStream或DataSet,后一种方法常用于测试环境。
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/73_2.jpg?sign=1738852517-j96iQBXxXQ9pylvfmzW7z12Lve99FeBr-0-8e756e24a7e9b6fdd21d74beacfe1931)
3)定义算子转换函数
下面的代码将input元素值转换成整型,转换后得到DataStream[Int]:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/74_1.jpg?sign=1738852517-WAS1eZKxL8C2aZiPVxZfF46fkoy48Nwb-0-c669829466ec46af484020bf212242d2)
4)定义Sink
Sink的功能是将数据处理结果写入外部系统:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/74_2.jpg?sign=1738852517-VQl0gq98iLQ32y2u6kOKefcvNGIsSbZN-0-33a71ebffdb44609cbdb51daf2388613)
除了上述两种常用的Sink,应用程序还可以将处理结果写入Kafka:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/74_3.jpg?sign=1738852517-MpxSkA9luzd4xfKVl8NyLyATVt2kvruA-0-73ddfd3bd1fe525dbf7a4aae84462f11)
5)启动程序
调用运行时的execute()方法:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/74_4.jpg?sign=1738852517-TnIPEBQhbCM8qtZAbrJCALwxEu9LaFpQ-0-29749157faf888e252933b6bd3814938)
3.指定键(key)
可以通过Scala Case类(或Java元组)的位置索引、对象属性名称、key的选择器(selector)三种方式指定key,定义如下:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/74_5.jpg?sign=1738852517-F8l1WjQlsE7z5754pUbfx6PUwT6M7iVJ-0-4887ea1ab4b585e38a83fd40dd0267ff)
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/75_1.jpg?sign=1738852517-lCo1T3ofA3KWFKxr6cB8V97D3iL21Oj2-0-26d399c20d53c4dde8cf8a992a527ca5)
4.并行度设置
有4种设置Flink并行度的方式。
(1)通过紧跟在Operator之后的setParallelism方法设置并行度,这种并行度只影响对应的Operator:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/75_2.jpg?sign=1738852517-5DC6AKyLt6mLGxHZGco1oDdWKBm42KGw-0-c221978fb641bd8fad97ee308ff1455c)
(2)通过运行时设置作业级并行度:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/75_3.jpg?sign=1738852517-cAsF0Ly4qSqvFqeAby5cTgocSZ1tI0RO-0-de4a4bdbb93228bb644b0a72a1c8852c)
(3)通过客户端设置并行度,这种并行度也是作业级的:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/75_4.jpg?sign=1738852517-WiZoHUJplUVC475kSFEmtfHNCh6FhG2N-0-b8d879124c832a6c86f468376104f73d)
(4)通过 Flink 的配置文件设置系统级并行度,这种并行度对集群上的所有作业都起作用:
![](https://epubservercos.yuewen.com/BCB923/13898202603283606/epubprivate/OEBPS/Images/76_1.jpg?sign=1738852517-7ssqdS4UC0BEb7bFucTZM3Nlxhrr35qr-0-867f3962eb6ceaaeaf5ea8bf84bb9e55)