ReducingState是和ReduceFunction配合使用
get() 获取状态的值
add(IN value)方法添加一个元素,触发reduceFunction计算一次
ReducingState的描述器和之前ValueState、ListState不同,它得和一个ReduceFunction配合使用。
1、KeyedProcessFunction处理类
package test;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
* @Description: 求和
* @Param:
* @return:
* @Author: Mr.逗
* @Date: 2021/9/9
*/
public class CountSumWithReduceState
extends RichFlatMapFunction, Tuple2> {
private ReducingState reducingState;
/***状态初始化*/
@Override
public void open(Configuration parameters) throws Exception {
ReducingStateDescriptor descriptor = new ReducingStateDescriptor("ReducingDescriptor", new ReduceFunction() {
@Override
public Long reduce(Long v1, Long v2) throws Exception {
return v1 + v2;
}
},Long.class);
reducingState = getRuntimeContext().getReducingState(descriptor);
}
@Override
public void flatMap(Tuple2 element, Collector> collector) throws Exception {
//将状态放入
reducingState.add(element.f1);
collector.collect(Tuple2.of(element.f0,reducingState.get()));
}
}
2、主体类
package test;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @program: bigdata_learn
* @description: 测试reduceState
* @author: Mr.逗
* @create: 2021-09-08 17:43
**/
public class TestKeyedReduceStateMain {
public static void main(String[] args) throws Exception{
//获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(16);
//获取数据源
DataStreamSource> dataStreamSource =
env.fromElements(
Tuple2.of(1L, 3L),
Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L),
Tuple2.of(1L, 5L),
Tuple2.of(2L, 2L),
Tuple2.of(2L, 6L));
// 输出:
//(1,5.0)
//(2,4.0)
dataStreamSource
.keyBy(0)
.flatMap(new CountSumWithReduceState())
.print();
String name = TestKeyedReduceStateMain.class.getName();
env.execute(name);
}
}
3、结果展示
1、处理类
package test;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
* @Description: 求最大值
* @Param:
* @return:
* @Author: Mr.逗
* @Date: 2021/9/9
*/
public class CountMaxWithReduceState
extends RichFlatMapFunction, Tuple2> {
private ReducingState reducingState;
/***状态初始化*/
@Override
public void open(Configuration parameters) throws Exception {
ReducingStateDescriptor descriptor = new ReducingStateDescriptor("ReducingDescriptor", new ReduceFunction() {
@Override
public Long reduce(Long v1, Long v2) throws Exception {
return v1>=v2?v1:v2;
}
},Long.class);
reducingState = getRuntimeContext().getReducingState(descriptor);
}
@Override
public void flatMap(Tuple2 element, Collector> collector) throws Exception {
//将状态放入
reducingState.add(element.f1);
collector.collect(Tuple2.of(element.f0,reducingState.get()));
}
}
2、主体类
package test;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @program: bigdata_learn
* @description: 测试reduceState
* @author: Mr.逗
* @create: 2021-09-08 17:43
**/
public class TestKeyedReduceStateMain {
public static void main(String[] args) throws Exception{
//获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(16);
//获取数据源
DataStreamSource> dataStreamSource =
env.fromElements(
Tuple2.of(1L, 3L),
Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L),
Tuple2.of(1L, 5L),
Tuple2.of(2L, 2L),
Tuple2.of(2L, 6L));
// 输出:
//(1,5.0)
//(2,4.0)
dataStreamSource
.keyBy(0)
.flatMap(new CountMaxWithReduceState())
.print();
String name = TestKeyedReduceStateMain.class.getName();
env.execute(name);
}
}
3、结果展示
页面更新:2024-03-12
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号