springboot 整合rabbitmq 动态生成交换机、队列

1.引入依赖

    
            org.springframework.boot
            spring-boot-starter-amqp
        

2.创建实体类

/**
 * @Author: GZ
 * @CreateTime: 2022-10-25  22:04
 * @Description: RabbitMQ 队列和交换机机绑定关系实体对象
 * @Version: 1.0
 */
@Data
public class RabbitModuleInfoDTO {
    /**
     * 路由Key
     */
    private String routingKey;
    /**
     * 队列信息
     */
    private Queue queue;
    /**
     * 交换机信息
     */
    private Exchange exchange;
    /**
     * 交换机信息类
     */
    @Data
    public static class Exchange {
        /**
         * 交换机类型
         */
        private RabbitExchangeTypeEnum type = RabbitExchangeTypeEnum.DIRECT; // 默认直连交换机
        /**
         * 交换机名称
         */
        private String name;
        /**
         * 是否持久化
         */
        private boolean durable = true; // 默认true持久化,重启消息不会丢失
        /**
         * 当所有队绑定列均不在使用时,是否自动删除交换机
         */
        private boolean autoDelete = false; // 默认false,不自动删除
        /**
         * 交换机其他参数
         */
        private Map arguments;
    }
    /**
     * 队列信息类
     */
    @Data
    public static class Queue {
        /**
         * 队列名称
         */
        private String name;
        /**
         * 是否持久化
         */
        private boolean durable = true; // 默认true持久化,重启消息不会丢失
        /**
         * 是否具有排他性
         */
        private boolean exclusive = false; // 默认false,可多个消费者消费同一个队列
        /**
         * 当消费者均断开连接,是否自动删除队列
         */
        private boolean autoDelete = false; // 默认false,不自动删除,避免消费者断开队列丢弃消息
        /**
         * 绑定死信队列的交换机名称
         */
        private String deadLetterExchange;
        /**
         * 绑定死信队列的路由key
         */
        private String deadLetterRoutingKey;
        /**
         * 其他属性设置
         */
        private Map arguments;
    }

}

3.rabbitmq队列初始化

/**
 * @Author: GZ
 * @CreateTime: 2022-10-25  22:09
 * @Description: rabbitmq队列初始化器
 * @Version: 1.0
 */
@Slf4j
public class RabbitModuleInitializer implements SmartInitializingSingleton {
    private AmqpAdmin amqpAdmin;

    private List modules;



    public RabbitModuleInitializer(AmqpAdmin amqpAdmin, List modules) {
        this.amqpAdmin = amqpAdmin;
        this.modules = modules;
    }

    @Override
    public void afterSingletonsInstantiated() {
        log.info("RabbitMQ 根据配置动态创建和绑定队列、交换机");
        declareRabbitModule();
    }

    /**
     * RabbitMQ 根据配置动态创建和绑定队列、交换机
     */
    private void declareRabbitModule() {
        if (CollectionUtil.isEmpty(modules)) {
            return;
        }
        for (RabbitModuleInfoDTO rabbitModuleInfo : modules) {
            //配置参数校验
            configParamValidate(rabbitModuleInfo);
            // 队列
            Queue queue = convertQueue(rabbitModuleInfo.getQueue());
            // 交换机
            Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());
            // 绑定关系
            String routingKey = rabbitModuleInfo.getRoutingKey();
            String queueName = rabbitModuleInfo.getQueue().getName();
            String exchangeName = rabbitModuleInfo.getExchange().getName();
            Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);

            // 创建队列
            amqpAdmin.declareQueue(queue);
            // 创建交换机
            amqpAdmin.declareExchange(exchange);
            // 队列 绑定 交换机
            amqpAdmin.declareBinding(binding);
        }
    }

    /**
     * RabbitMQ动态配置参数校验
     *
     * @param rabbitModuleInfo
     */
    public void configParamValidate(RabbitModuleInfoDTO rabbitModuleInfo) {

        String routingKey = rabbitModuleInfo.getRoutingKey();

        Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置");

        Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);
        Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);

        Assert.isTrue(rabbitModuleInfo.getQueue() != null, "routingKey:{}未配置queue", routingKey);
        Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getQueue().getName()), "routingKey:{}未配置exchange的name属性", routingKey);

    }

    /**
     * 转换生成RabbitMQ队列
     *
     * @param queue
     * @return
     */
    public Queue convertQueue(RabbitModuleInfoDTO.Queue queue) {
        Map arguments = queue.getArguments();

        // 转换ttl的类型为long
        if (arguments != null && arguments.containsKey("x-message-ttl")) {
            arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
        }
        //设置队列的优先级
        if (arguments != null && arguments.containsKey("x-max-priority")) {
            arguments.put("x-max-priority", Convert.toLong(arguments.get("x-max-priority")));
        }

        // 是否需要绑定死信队列
        String deadLetterExchange = queue.getDeadLetterExchange();
        String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
        if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {

            if (arguments == null) {
                arguments = new HashMap<>(Constants.RABBIT_MQ_INITIAL_CAPACITY);
            }
            arguments.put("x-dead-letter-exchange", deadLetterExchange);
            arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);

        }

        return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
    }


    /**
     * 转换生成RabbitMQ交换机
     *
     * @param exchangeInfo
     * @return
     */
    public Exchange convertExchange(RabbitModuleInfoDTO.Exchange exchangeInfo) {

        AbstractExchange exchange = null;

        RabbitExchangeTypeEnum exchangeType = exchangeInfo.getType();

        String exchangeName = exchangeInfo.getName();
        boolean isDurable = exchangeInfo.isDurable();
        boolean isAutoDelete = exchangeInfo.isAutoDelete();

        Map arguments = exchangeInfo.getArguments();

        switch (exchangeType) {
            case DIRECT:// 直连交换机
                exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            case TOPIC: // 主题交换机
                exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            case FANOUT: //扇形交换机
                exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            case HEADERS: // 头交换机
                exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
        }
        return exchange;
    }

}

4.rabbitmq配置

@Configuration
@ConfigurationProperties(prefix = "common.tools.rabbitmq")
@Data
public class RabbitConfig {
    /**
     * 交换机、队列、路由Key
     * -
     */
    private List modules;
    /**
     * 使用json序列化机制,进行消息转换
     */
    @Bean
    public MessageConverter jackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    /**
     * 动态创建队列、交换机初始化器
     */
    @Bean
    @ConditionalOnMissingBean
    public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin) {
        return new RabbitModuleInitializer(amqpAdmin, modules);
    }


}

5.yml配置

common:
  tools:
    rabbitmq:
      # 动态创建和绑定队列、交换机的配置
      modules:
        - routing-key: log.inbound.operation.queue.key
          queue:
            name: log.inbound.operation.queue
          exchange:
            name: log.exchange
            type: direct
spring:
  rabbitmq:
    host: 192.168.16.128   # rabbitmq的连接地址
    port: 5672     # rabbitmq的连接端口号
    username: guest # rabbitmq的用户名
    password: guest # rabbitmq的密码

6.生产者

@Component
@RequiredArgsConstructor
@Slf4j
public class TestPublisher {
    private final RabbitTemplate rabbitTemplate;

    public void send(TestMessageDTO testMessageDTO){
        rabbitTemplate.convertAndSend("log.exchange", "log.inbound.operation.queue.key", testMessageDTO);

    }
}

7.消费者

@Component
public class TestConsumer {

    @RabbitListener(queues = "log.inbound.operation.queue")
    public void handleInventoryOperation(TestMessageDTO testMessageDTO) {

        System.out.println(testMessageDTO);
    }

}

8.消息日志显示

展开阅读全文

页面更新:2024-03-19

标签:队列   交换机   死信   动态   路由   初始化   绑定   持久   消费者   消息

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top