Flink SQL 设置 TTL

创建基本环境

 /**
  * 创建环境
 */
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
...
 /**
* 创建配置 信息
*/
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment streamTable = StreamTableEnvironment.create(env,settings);
//对时间有要求的可以设置时区
//streamTable.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

编写DDL


//根据官网创建 简单的DDL (参考官网)
//表1 
String userSql="create table user_info(" +
        "u_id BIGINT," +
        "money BIGINT" +
        "" +
        ") WITH (" +
        "'connector' = 'datagen'," +
        "'rows-per-second' = '1'," +
        "'fields.u_id.min' = '12'," +
        "'fields.u_id.max' = '15'," +
        "'fields.money.min' = '1'," +
        "'fields.money.max' = '10'" +
        ")";
//表2
String orderSql="create table order_info(" +
        "u_id BIGINT," +
        "o_id BIGINT," +
        "code BIGINT" +
        "" +
        ") WITH (" +
        "'connector' = 'datagen'," +
        "'rows-per-second' = '20'," +
        "'fields.u_id.min' = '12'," +
        "'fields.u_id.max' = '15'," +
        "" +
        "'fields.o_id.min' = '1'," +
        "'fields.o_id.max' = '10'," +
        "" +
        "'fields.code.kind'='sequence'," +
        "'fields.code.start'='1'," +
        "'fields.code.end'='1000'" +
        ")";

添加状态配置信息


        //左表状态
        streamTable.getConfig().setIdleLeftStateRetention(Duration.ofMillis(10000));
        //右表状态
        streamTable.getConfig().setIdleRightStateRetention(Duration.ofMillis(5000));

执行

 streamTable.executeSql(userSql);
 streamTable.executeSql(orderSql);
 Table table = streamTable.sqlQuery("select u.u_id,o.u_id,o.code,CURRENT_TIMESTAMP from user_info u left join order_info o on u.u_id=o.u_id");

//转换Retract 输出
  DataStream> tuple2DataStream = streamTable.toRetractStream(table, Row.class);
  
 tuple2DataStream.writeAsText("e://log.txt", FileSystem.WriteMode.OVERWRITE);
 env.execute();

验证查看

定位到 参数值

已经获取到left ,right 对应的ttl 状态时间

展开阅读全文

页面更新:2024-03-14

标签:时区   状态   参数   简单   环境   时间   信息

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top