Flink操练(三十一)之自定义键控状态(一)之ValueState

1、介绍

ValueState[T]保存单个的值,值的类型为T。

2、求当key出现了三次进行平均值计算

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

/**
 * @program: bigdata_learn
 * @description: 当key出现了三次进行求平均值
 * @author: Mr.逗
 * @create: 2021-09-07 17:40
 **/
public class CountAverageWithValueState extends RichFlatMapFunction,Tuple2> {
    /**
     * 定义一个state:keyed state
     *
     * 1. ValueState里面只能存一条数据,如果来了第二条,就会覆盖第一条。
     * 2. Tuple2
     *      Long:
     *          当前的key出现多少次  count 3
     *      Long:
     *          当前的value的总和   sum
     *
     *          sum/count = avg
     *
     *
     *  如果我们想要使用这个state,首先要对state进行注册(初始化),固定的套路
     *
     *
     */
    private ValueState> countAndSum;
    /**
     * 这个方法其实是一个初始化的方法,只会执行一次
     * 我们可以用来注册我们的状态
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //注册状态
        ValueStateDescriptor> descriptor = new ValueStateDescriptor<>(
                "avg_age",//状态名字
                Types.TUPLE(Types.LONG, Types.LONG));//状态存储的数据类型
        countAndSum=getRuntimeContext().getState(descriptor);
    }

    /**
     * 每来一条数据,都会调用这个方法
     * key相同
     * @param element
     * @param out
     * @throws Exception
     */
    @Override
    public void flatMap(Tuple2 element, Collector> out) throws Exception {
        //拿到当前key的状态值
        Tuple2 currentState  = countAndSum.value();
        //如果状态值没有初始化,则进行初始化
        if (currentState==null)
        {
            currentState=Tuple2.of(0L,0L);
        }
        //更新状态值中元素的个数
        currentState.f0+=1;
        //更新状态值中的总值
        currentState.f1+=element.f1;
        //更新状态
        countAndSum.update(currentState);
        // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
        if (currentState.f0==3)
        {
           double avg= (double)currentState.f1 / currentState.f0;
           //输出key及对应的平均值
            out.collect(Tuple2.of(element.f0,avg));
            //清空状态值
            countAndSum.clear();
        }
    }
}
展开阅读全文

页面更新:2024-04-14

标签:状态   都会   平均值   套路   总和   总值   初始化   多少次   数据类型   个数   元素   定义   操作   方法   数据   科技

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top