我的第一个Flink程序

写在最前面 Flink是支持Java和Scala两种语言,因为在日常的开发中 Java语言用的比较多,所以我的这些例子都是整理学习的Java版本,但是Scala语言有自己独特的优势,各位看客可以自己根据自己的情况选择不同语言。

接下来开始咱们的第一个Flink程序

我用的环境是 jdk 8 maven 3.8.3 flink 1.13(现在的Flink版本到 1.16 但是 需要JDK11)

直接上代码

pom.xml 文件


    8
    8
    1.13.0
    1.8
    2.12
    1.7.30



    
    
        org.apache.flink
        flink-java
        ${flink.version}
    
    
        org.apache.flink
        flink-streaming-java_${scala.binary.version}
        ${flink.version}
    
    
        org.apache.flink
        flink-clients_${scala.binary.version}
        ${flink.version}
    
    
    
        org.slf4j
        slf4j-api
        ${slf4j.version}
    
    
        org.slf4j
        slf4j-log4j12
        ${slf4j.version}
    
    
        org.apache.logging.log4j
        log4j-to-slf4j
        2.14.0
    

文件words.txt

hes java sss
hello flink ass
hello flink ass1
hello bigdata

文件 BatchWordCount.java

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/****
 * @author songshiming
 * @date 2022/11/10
 * @desc
 */
public class BatchWordCount {



    public static void main(String[] args) throws Exception {
        // 1. 创建一个执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2. 从文件中读取数据
        DataSource lineDataSource = env.readTextFile("input/words.txt");
        // 3. 将每行的数据进行分词,转换成二元组类型
        FlatMapOperator> wordAndOneTuple = lineDataSource.flatMap((String line, Collector> out) -> {
            // 将一行文本进行分词
            String[] words = line.split(" ");
            // 将每个单词转换成二元组输出
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        })
                .returns(Types.TUPLE(Types.STRING, Types.LONG));//当 Lambda 表达式使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
        // 4. 安装word进行分组
        UnsortedGrouping> wordAndOneGroup = wordAndOneTuple.groupBy(0);
        // 5. 分组内进行聚合统计
        AggregateOperator> sum = wordAndOneGroup.sum(1);
        // 6. 打印输出
        sum.print();
    }

程序运行结果

展开阅读全文

页面更新:2024-04-01

标签:程序   分词   表达式   看客   类型   语言   版本   环境   文件   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top