org.apache.bahir
flink-connector-redis_2.12
1.1.0
1、打开终端,使用以下命令安装Redis:
sudo apt install redis-server
sudo systemctl status redis-server
4、用RESP.app链接redis
package test.chapt02;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import test.chapt01.Event;
import java.util.Properties;
public class SinkToRedis {
public static void main(String[] args) throws Exception {
//1、创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、设置并行度
env.setParallelism(1);
//3、设置订阅的kafka的信息
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.122.112:9092");
//4、订阅clicks主题消息
DataStreamSource clicks = env.addSource(new FlinkKafkaConsumer("clicks", new SimpleStringSchema(), properties));
//5、类型转换
SingleOutputStreamOperator map = clicks.map(new MapFunction() {
@Override
public Event map(String s) throws Exception {
String[] split = s.split(",");
return new Event(split[0], split[1], Long.valueOf(split[2]));
}
});
//6、配置redis参数
FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("10.20.1.17")
.setPort(6379).build();
//7、设置redis命令与key value
RedisSink stringRedisSink = new RedisSink(jedisPoolConfig, new redisMapperClass());
//8、输出到sink中
map.addSink(stringRedisSink);
//9、执行环境
env.execute();
}
/**
* 选择输出到redis里面的类型以及key和value
*/
public static class redisMapperClass implements RedisMapper {
@Override
public RedisCommandDescription getCommandDescription() {
//这里指定一个key 就是kafka,其他的都是里面的表信息
return new RedisCommandDescription(RedisCommand.HSET, "kafka");
}
@Override
public String getKeyFromData(Event event) {
//第一列表名称
return event.name;
}
@Override
public String getValueFromData(Event event) {
//第二列表名称
return event.url;
}
}
}
4.1、kafka生产数据
4.2查看redis
页面更新:2024-03-25
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号