Flink操练(三十四)之自定义键控状态(四)ReducingState

一、ReducingState的方法

ReducingState是和ReduceFunction配合使用

get() 获取状态的值

add(IN value)方法添加一个元素,触发reduceFunction计算一次

Flink操练(三十四)之自定义键控状态(四)ReducingState

二、ReducingState的描述器

ReducingState的描述器和之前ValueState、ListState不同,它得和一个ReduceFunction配合使用。

三、统计单词

1、KeyedProcessFunction处理类

package test;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
* @Description: 求和
* @Param:
* @return:
* @Author: Mr.逗
* @Date: 2021/9/9
*/
public class CountSumWithReduceState
        extends RichFlatMapFunction, Tuple2> {

    private ReducingState reducingState;

    /***状态初始化*/
    @Override
    public void open(Configuration parameters) throws Exception {

        ReducingStateDescriptor descriptor = new ReducingStateDescriptor("ReducingDescriptor", new ReduceFunction() {
            @Override
            public Long reduce(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        },Long.class);
        reducingState = getRuntimeContext().getReducingState(descriptor);
    }

    @Override
    public void flatMap(Tuple2 element, Collector> collector) throws Exception {

        //将状态放入
        reducingState.add(element.f1);
        collector.collect(Tuple2.of(element.f0,reducingState.get()));

    }


}

2、主体类

package test;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @program: bigdata_learn
 * @description: 测试reduceState
 * @author: Mr.逗
 * @create: 2021-09-08 17:43
 **/
public class TestKeyedReduceStateMain {
    public static void main(String[] args) throws  Exception{
        //获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(16);
        //获取数据源
        DataStreamSource> dataStreamSource =
                env.fromElements(
                        Tuple2.of(1L, 3L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L),
                        Tuple2.of(1L, 5L),
                        Tuple2.of(2L, 2L),
                        Tuple2.of(2L, 6L));


        // 输出:
        //(1,5.0)
        //(2,4.0)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountSumWithReduceState())
                .print();
        String name = TestKeyedReduceStateMain.class.getName();
        env.execute(name);
    }

}

3、结果展示


Flink操练(三十四)之自定义键控状态(四)ReducingState

四、计算最高温度

1、处理类

package test;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
* @Description: 求最大值
* @Param:
* @return:
* @Author: Mr.逗
* @Date: 2021/9/9
*/
public class CountMaxWithReduceState
        extends RichFlatMapFunction, Tuple2> {

    private ReducingState reducingState;

    /***状态初始化*/
    @Override
    public void open(Configuration parameters) throws Exception {

        ReducingStateDescriptor descriptor = new ReducingStateDescriptor("ReducingDescriptor", new ReduceFunction() {
            @Override
            public Long reduce(Long v1, Long v2) throws Exception {
                return v1>=v2?v1:v2;
            }
        },Long.class);
        reducingState = getRuntimeContext().getReducingState(descriptor);
    }

    @Override
    public void flatMap(Tuple2 element, Collector> collector) throws Exception {

        //将状态放入
        reducingState.add(element.f1);
        collector.collect(Tuple2.of(element.f0,reducingState.get()));

    }


}

2、主体类

package test;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @program: bigdata_learn
 * @description: 测试reduceState
 * @author: Mr.逗
 * @create: 2021-09-08 17:43
 **/
public class TestKeyedReduceStateMain {
    public static void main(String[] args) throws  Exception{
        //获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(16);
        //获取数据源
        DataStreamSource> dataStreamSource =
                env.fromElements(
                        Tuple2.of(1L, 3L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L),
                        Tuple2.of(1L, 5L),
                        Tuple2.of(2L, 2L),
                        Tuple2.of(2L, 6L));


        // 输出:
        //(1,5.0)
        //(2,4.0)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountMaxWithReduceState())
                .print();
        String name = TestKeyedReduceStateMain.class.getName();
        env.execute(name);
    }

}

3、结果展示


Flink操练(三十四)之自定义键控状态(四)ReducingState

展开阅读全文

页面更新:2024-03-12

标签:状态   最大值   数据源   初始化   单词   放入   主体   元素   环境   测试   方法   科技

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top