Flink操练(三十二)之自定义键控状态(二)ListState

0 简介

ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下:

  ListState需要将某些值存到一个List中(Iterable),意味着缓存的数据不只是一个而是多个值。很多情况下都可以使用,例如计算的数值要包含全天的每一个记录,此时只有将每个记录的值存成一个列表才可以计算。

1.实例

1.1 实例一

首先需要先定义一个ListState,然后再重写KeyedProcessFunction中的open方法:

    private var itemState : ListState[ItemViewCount] = _

    override def open(parameters: Configuration): Unit = {

      //命名状态变量的名字和类型
      val itemStateDescription: ListStateDescriptor[ItemViewCount] = new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount])
      itemState = getRuntimeContext.getListState(itemStateDescription)
    }

ListStateDescriptor提供了几种不同的定义方式:

Flink操练(三十二)之自定义键控状态(二)ListState

两个参数分别是ListStateDescriptor的名字和typeClass

1.2 实例二

package qiuhua;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;

/**
 * @program: bigdata_learn
 * @description: 通过ListState求key 出现了 3 次,则需要计算平均值
 * @author: Mr.逗
 * @create: 2021-09-08 16:18
 **/
public class CountAverageWithListState extends RichFlatMapFunction, Tuple2> {
    /**
     * ValueState : 里面只能存一条元素
     * ListState : 里面可以存很多数据
     */
    private ListState> elementsByKey;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //注册状态
        ListStateDescriptor> descriptor = new ListStateDescriptor<>
                ("list_state"//状态名字
                        , Types.TUPLE(Types.LONG, Types.LONG)//状态存储的数据类型
                );
        elementsByKey=getRuntimeContext().getListState(descriptor);
    }

    @Override
    public void flatMap(Tuple2 value, Collector> out) throws Exception {
        Iterable> currentState  = elementsByKey.get();//拿到当前key的状态值
        //如果状态值没有初始化,则初始化
        if(currentState==null)
        {
            elementsByKey.addAll(Collections.emptyList());
        }
        //更新状态
        elementsByKey.update((List>) value);
        //判断,如果当前key出现了3次,则需要计算平均值,并且输出
        List> allElements = Lists.newArrayList(currentState);
        if (allElements.size()==3)
        {
            long count=0;
            long sum=0;
            for(Tuple2 ele:allElements)
            {
                count++;
                sum+=ele.f1;
            }
            double avg=(double)sum/count;
            out.collect(Tuple2.of(value.f0,avg));
            //清除状态
            elementsByKey.clear();
        }
    }
}

总结

Flink提供了三种基于key/value的state接口,ListState接口适用于缓存多个值的计算。具体实现之前,因为state必须是基于key,且必须获取getRuntimeContext,state必须同时满足两个条件:

实际实现时候,因为windowedStream在scala中不能实现RichWindowFunction,因此在main中使用flatmap间接实现了windowFunction中的功能:

val fromTransactionDataStream = watermarkTransaction
      .keyBy(_.code)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      
val transaction = fromTransactionDataStream
      .apply(new StockTransactionApply)
      .keyBy(_._3)
      .flatMap(new TransactionStateFlatMapFunction)
展开阅读全文

页面更新:2024-06-16

标签:状态   表里   平均值   重写   初始化   缓存   数据类型   实例   元素   接口   定义   名字   两个   数据   列表   科技

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top