flink学习教程之输出数据到redis中

一、导入相关的redis包


			org.apache.bahir
			flink-connector-redis_2.12
			1.1.0
		

二、安装redis

1、打开终端,使用以下命令安装Redis:

sudo apt install redis-server

  1. 安装完成后,Redis将自动启动并运行在默认端口上(6379)
  2. 要检查Redis是否正在运行,可以使用以下命令:
 sudo systemctl status redis-server

4、用RESP.app链接redis


三、尝试读取kafka数据,将数据放入redis中

package test.chapt02;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import test.chapt01.Event;

import java.util.Properties;

public class SinkToRedis {
    public static void main(String[] args) throws Exception {
        //1、创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2、设置并行度
        env.setParallelism(1);
        //3、设置订阅的kafka的信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.122.112:9092");
        //4、订阅clicks主题消息
        DataStreamSource clicks = env.addSource(new FlinkKafkaConsumer("clicks", new SimpleStringSchema(), properties));
        //5、类型转换
        SingleOutputStreamOperator map = clicks.map(new MapFunction() {
            @Override
            public Event map(String s) throws Exception {
                String[] split = s.split(",");
                return new Event(split[0], split[1], Long.valueOf(split[2]));
            }
        });
       //6、配置redis参数
        FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
                .setHost("10.20.1.17")
                .setPort(6379).build();
        //7、设置redis命令与key value
        RedisSink stringRedisSink = new RedisSink(jedisPoolConfig, new redisMapperClass());
        //8、输出到sink中
        map.addSink(stringRedisSink);
        //9、执行环境
        env.execute();
    }

    /**
     * 选择输出到redis里面的类型以及key和value
     */
    public static class redisMapperClass implements RedisMapper {
        @Override
        public RedisCommandDescription getCommandDescription() {
            //这里指定一个key 就是kafka,其他的都是里面的表信息
            return new RedisCommandDescription(RedisCommand.HSET, "kafka");
        }

        @Override
        public String getKeyFromData(Event event) {
            //第一列表名称
            return event.name;
        }

        @Override
        public String getValueFromData(Event event) {
            //第二列表名称
            return event.url;
        }
    }
}


四、测试是否成功

4.1、kafka生产数据

4.2查看redis


展开阅读全文

页面更新:2024-03-25

标签:数据   终端   端口   放入   命令   名称   类型   环境   教程   列表   信息

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top