ValueState[T]保存单个的值,值的类型为T。
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
* @program: bigdata_learn
* @description: 当key出现了三次进行求平均值
* @author: Mr.逗
* @create: 2021-09-07 17:40
**/
public class CountAverageWithValueState extends RichFlatMapFunction,Tuple2> {
/**
* 定义一个state:keyed state
*
* 1. ValueState里面只能存一条数据,如果来了第二条,就会覆盖第一条。
* 2. Tuple2
* Long:
* 当前的key出现多少次 count 3
* Long:
* 当前的value的总和 sum
*
* sum/count = avg
*
*
* 如果我们想要使用这个state,首先要对state进行注册(初始化),固定的套路
*
*
*/
private ValueState> countAndSum;
/**
* 这个方法其实是一个初始化的方法,只会执行一次
* 我们可以用来注册我们的状态
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//注册状态
ValueStateDescriptor> descriptor = new ValueStateDescriptor<>(
"avg_age",//状态名字
Types.TUPLE(Types.LONG, Types.LONG));//状态存储的数据类型
countAndSum=getRuntimeContext().getState(descriptor);
}
/**
* 每来一条数据,都会调用这个方法
* key相同
* @param element
* @param out
* @throws Exception
*/
@Override
public void flatMap(Tuple2 element, Collector> out) throws Exception {
//拿到当前key的状态值
Tuple2 currentState = countAndSum.value();
//如果状态值没有初始化,则进行初始化
if (currentState==null)
{
currentState=Tuple2.of(0L,0L);
}
//更新状态值中元素的个数
currentState.f0+=1;
//更新状态值中的总值
currentState.f1+=element.f1;
//更新状态
countAndSum.update(currentState);
// 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
if (currentState.f0==3)
{
double avg= (double)currentState.f1 / currentState.f0;
//输出key及对应的平均值
out.collect(Tuple2.of(element.f0,avg));
//清空状态值
countAndSum.clear();
}
}
}
页面更新:2024-04-14
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号