大家好,我是杰哥
今天,继续来探索 Flink ,今天我们主要来看看 Flink 为我们所提供的分层 API
Flink 不仅具有高吞吐、低延迟、高可用等优秀特性,而且还提供了易于使用的分层 API,所以它也是一个易于开发的框架,它的 API 分层如图所示
Flink 包含三层 API:低级 APIs - 有状态处理、核心 APIs - DataStream API 以及 高级 APIs - Table API & SQL
最底层级的抽象仅仅提供了有状态流,它将处理函数(Process Function)嵌入到了DataStream API 中。通过实现 ProcessFunction 接口来进行操作。ProcessFunction 可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口内的多个事件。它提供了对于时间和状态的细粒度控制。开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发回调函数
DataStream?是不是感觉还有点儿熟悉呢?
是的,上一节的例子里面,它就出现过,我们使用 DataStream 类来存放 Flink 程序中的数据集合,供 Flink 进行处理
// 2- 读取文本流 (nc -lk 7777)
DataStreamSource lineDataStream = env.socketTextStream("localhost", 7777);
其中 DataStreamSource 其实就是集成自 DataStream而这里所提到的 DataStream API 的名字就来自于 DataStream 类。这个类中,可以存放包含重复项的不可变数据集合。集合中的数据可以是有限的,也可以是无界的
DataStream 在使用方面类似于常规的 Java 中的集合,但在一些关键方面有很大的不同,比如它们是不可变的,这意味着一旦创建了它们,就不能添加或删除元素。此外,我们也不能查看里面的元素,只能使用 DataStream API 操作(也称为转换)对它们进行一些变换操作
可以通过在 Flink 程序中添加源来创建初始 DataStream。然后,再可以从中派生新的流,并使用映射、过滤器等 API 方法将它们组合在一起
Table API 是一个针对 Java、Scala 和 Python 的语言集成查询 API,它允许以非常直观的方式组合来自关系操作符(如选择、筛选和连接)的查询。Flink 的 SQL API 基于 Apache Calcite,它实现了 SQL 标准的语法
Flink 提供的最高层级的抽象是 SQL。这一层抽象在语法与表达能力上与 Table API 类似, 但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询 可以直接在 Table API 定义的表上执行
Flink 的 分层 API 中,最主要、最常用的是 DataStream API 和 Table API & SQL 。我们可以将两者单独使用,也可以混合使用,取决于我们具体的应用场景
我想,从它的操作步骤来认识它,应该是比较直观的
类似于我们曾经了解到的 JDBC 会有固定的 7 个步骤,使用 Flink 的 DataStream API 进行数据的处理,实际上也包含以下 5 个固定步骤,而 DataStream API 也可以随之被分为 5 类:
1、设置执行环境
2、读取输入流
3、转换操作
4、输出到一个或多个数据汇中
5、执行程序
我们来分别看一下
在 Flink 中,可以使用 StreamExecutionEnvironment 的下列三种方式进行执行环境的创建
getExecutionEnvironment();
createLocalEnvironment();
createRemoteEnvironment(String host, int port, String... jarFiles);
其中前两个表示创建的是本地环境,即表示 Flink 程序运行在本地机器。还可以通过指定远程机器的主机名、端口号以及程序本身所生成的 jar 包最终拷贝到的路径,创建一个远程执行环境,使得 Flink 程序运行在所指定的主机上
StreamExecutionEnvironment 为我们提供了一系列创建流式数据源的方法,使得我们可以将数据流读取到应用中。这些数据流可以来自于文件、数据库、消息队列等 读取的数据,就可以统一放入 DataStream 对象中
1)从集合中读取数据
ArrayList user = new ArrayList<>();
user.add(new User("Mary","./home",1000L));
user.add(new User("Bob","./cart",2000L));
DataStream stream = env.fromCollection(user);
2)从文件中读取数据
DataStream stream = env.readTextFile("words.txt");
3)从 Socket 读取数据
DataStream stream = env.socketTextStream("localhost", 7777);
4)从消息中心读取数据
DataStreamSource stream = env.addSource(new
FlinkKafkaConsumer(
"clicks",
new SimpleStringSchema(),
properties
));
5)自定义 Source如果我们需要读取数据的数据源,Flink 没有为其提供读取数据源的方法,我们就可以通过 Flink 提供的自定义 Source 的方式进行了 那就只好自定义实现 SourceFunction 了。接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法:run()和 cancel()
当数据被存入 DataStream 对象中后,我们就可以对它进行转换操作了 转换的类型有多种多样。有些会重新组织一下 DataStream 中的数据流,比如对这些数据进行一次分组或者分区;有些呢,就会生成一个新的 DataStream (类型可能会发生变化)
比如,使用 map()可以将一个输入流中的所有正方形全部转换为圆形
今天在这里就不细说了,下次文章会专门为大家介绍 DataStream API 的转换操作
一般场景下,我们将数据处理的结果总是会发送到某些外部系统,比如文件、数据库,以及消息中心中等,当然测试时,可以直接输出至标准输出中
比如,输出到文件的方式如下:
StreamingFileSink fileSink = StreamingFileSink
.forRowFormat(new Path("./output"),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15)
)
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5
))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
// 将 Event 转换成 String 写入文件
stream.map(Event::toString).addSink(fileSink);
此外,还可以输出到 Kafka,mysql,pulsar 等
最新版本的 Flink 所支持的所有输入输出如下:
在应用定义完成之后,我们就可以通过 StreamExecutionEnvironment 类中的 execute() 方法进行执行
Flink 被设计为了延迟计算的方式执行,在 执行 execute() 方法之前,前面的所有程序,只是在执行环境中构建了一个执行计划。也就是说,只有执行了 execute() 方法之后,前面定义的 Flink 应用程序才会被真正执行
这 5 个步骤看完了,我们再将那个简单的实例拿过来看看,确认一下是否是这样
public class StreamWordCount {
public static void main(String[] args) throws Exception{
// 1-创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2-读取文本流 (nc -lk 7777)
DataStreamSource lineDataStream = env.socketTextStream("localhost", 7777);
// 3-转换操作
// 3.1 收集各个单词,定义为二元组
SingleOutputStreamOperator> streamOperator = lineDataStream.flatMap((String line, Collector> out) -> {
String words[] = line.split(" ");
for (String word :words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 3.2 分组
KeyedStream, String> keyedStream = streamOperator.keyBy(data -> data.f0);
// 3.3-统计
SingleOutputStreamOperator> sum = keyedStream.sum(1);
// 4-打印
sum.print();
// 5-启动执行
env.execute();
}
}
这个其实就是严格按照我们上述所提到的步骤来走的
其中第 2 步骤,是通过一行一行地读取 socket 文本流内容,进行数据源的输入的,将所输入的数据流存入 lineDataStream 中
而第 3 步骤,则是对 lineDataStream 中的数据进行转换处理操作。首先将各个数据通过 flatMap()方法转换为二元组,然后使用 keyBy() 方法,对其元素进行分组,接着再使用 sum()方法,对分组之后的元素进行求和操作
随着数据流的不断输入,所输入的每个单词出现的次数便会通过程序被源源不断地输出
Flink 提供了分层 API,便于我们进行开发。在实际开发中,我们使用最多的就是 DataStream API ,而怎样认识 DataStream API 呢?从它的操作步骤来看,可以分为 5 个步骤,也就是说 DataStream API 可以分为 5 大类:
1、设置执行环境
2、读取输入流
3、转换操作
4、输出到一个或多个数据汇中
5、执行程序
嗯,就这样。每天学习一点,时间会见证你的强大
欢迎大家关注我们的公众号【青梅主码】,一起持续性学习吧~
往期精彩回顾
总结复盘
架构设计读书笔记与感悟总结
带领新人团队的沉淀总结
复盘篇:问题解决经验总结复盘
网络篇
网络篇(四):《图解 TCP/IP》读书笔记
网络篇(一):《趣谈网络协议》读书笔记(一)
事务篇章
事务篇(四):Spring事务并发问题解决
事务篇(三):分享一个隐性事务失效场景
事务篇(一):毕业三年,你真的学会事务了吗?
Docker篇章
Docker篇(六):Docker Compose如何管理多个容器?
Docker篇(二):Docker实战,命令解析
Docker篇(一):为什么要用Docker?
..........
SpringCloud篇章
Spring Cloud(十三):Feign居然这么强大?
Spring Cloud(十):消息中心篇-Kafka经典面试题,你都会吗?
Spring Cloud(九):注册中心选型篇-四种注册中心特点超全总结
Spring Cloud(四):公司内部,关于Eureka和zookeeper的一场辩论赛
..........
Spring Boot篇章
Spring Boot(十二):陌生又熟悉的 OAuth2.0 协议,实际上每个人都在用
Spring Boot(七):你不能不知道的Mybatis缓存机制!
Spring Boot(六):那些好用的数据库连接池们
Spring Boot(四):让人又爱又恨的JPA
SpringBoot(一):特性概览
..........
翻译
[译]用 Mint 这门强大的语言来创建一个 Web 应用
【译】基于 50 万个浏览器指纹的新发现
使用 CSS 提升页面渲染速度
WebTransport 会在不久的将来取代 WebRTC 吗?
.........
职业、生活感悟
你有没有想过,旅行的意义是什么?
程序员的职业规划
灵魂拷问:人生最重要的是什么?
如何高效学习一个新技术?
如何让自己更坦然地度过一天?
..........
页面更新:2024-03-21
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号