Flink sql 窗口函数功能验证
- 数据准备:向测试环境kafka TESTtopic依次添加如下6条数据
- { "bidtime":"2020-04-15 08:05:00", "price":"4.00", "item":"C" }, { "bidtime":"2020-04-15 08:07:00", "price":"2.00", "item":"A" }, { "bidtime":"2020-04-15 08:09:00", "price":"5.00", "item":"D" }, { "bidtime":"2020-04-15 08:11:00", "price":"3.00", "item":"B" }, { "bidtime":"2020-04-15 08:13:00", "price":"1.00", "item":"E" }, { "bidtime":"2020-04-15 08:17:00", "price":"6.00", "item":"F" }
- 打开flink sql client 建表:
- create table Bid( bidtime TIMESTAMP(3), price DECIMAL(10,2), item STRING, watermark for bidtime as bidtime - interval '1' second ) WITH ('connector' = 'kafka', 'topic' ='TEST.LIQIANG', 'properties.bootstrap.servers' ='172.22.17.26:9092,172.22.17.27:9092,172.22.17.28:9092', 'scan.startup.mode' = 'earliest-offset', 'properties.group.id'='flink-sql-client-local', 'properties.fetch.max.bytes'='5242880', 'format' = 'json');
- create table Bid( bidtime TIMESTAMP(3), price DECIMAL(10,2), item STRING, pt as PROCTIME() ) WITH ('connector' = 'kafka', 'topic' ='TEST.LIQIANG', 'properties.bootstrap.servers' ='172.22.17.26:9092,172.22.17.27:9092,172.22.17.28:9092', 'scan.startup.mode' = 'earliest-offset', 'properties.group.id'='flink-sql-client-local', 'properties.fetch.max.bytes'='5242880', 'format' = 'json');
- 做窗口查询:
- select tumble_rowtime(bidtime,interval '20' SECOND) as window_time,sum(price) from Bid group by tumble(bidtime,interval '20' SECOND);
- select TUMBLE_PROCTIME(pt,interval '20' SECOND) as window_time, TUMBLE_START(pt,interval '20' SECOND) as window_start, TUMBLE_START(pt,interval '20' SECOND) as window_end, sum(price) as sum_price from Bid group by tumble(pt,interval '20' SECOND);
- 结果展示:
页面更新:2024-03-01
标签:窗口 函数 事件 功能 环境 时间 测试 数据
1
2
3
4
5
上滑加载更多 ↓
所有内容加载完毕