org.springframework.boot
spring-boot-starter-amqp
/**
* @Author: GZ
* @CreateTime: 2022-10-25 22:04
* @Description: RabbitMQ 队列和交换机机绑定关系实体对象
* @Version: 1.0
*/
@Data
public class RabbitModuleInfoDTO {
/**
* 路由Key
*/
private String routingKey;
/**
* 队列信息
*/
private Queue queue;
/**
* 交换机信息
*/
private Exchange exchange;
/**
* 交换机信息类
*/
@Data
public static class Exchange {
/**
* 交换机类型
*/
private RabbitExchangeTypeEnum type = RabbitExchangeTypeEnum.DIRECT; // 默认直连交换机
/**
* 交换机名称
*/
private String name;
/**
* 是否持久化
*/
private boolean durable = true; // 默认true持久化,重启消息不会丢失
/**
* 当所有队绑定列均不在使用时,是否自动删除交换机
*/
private boolean autoDelete = false; // 默认false,不自动删除
/**
* 交换机其他参数
*/
private Map arguments;
}
/**
* 队列信息类
*/
@Data
public static class Queue {
/**
* 队列名称
*/
private String name;
/**
* 是否持久化
*/
private boolean durable = true; // 默认true持久化,重启消息不会丢失
/**
* 是否具有排他性
*/
private boolean exclusive = false; // 默认false,可多个消费者消费同一个队列
/**
* 当消费者均断开连接,是否自动删除队列
*/
private boolean autoDelete = false; // 默认false,不自动删除,避免消费者断开队列丢弃消息
/**
* 绑定死信队列的交换机名称
*/
private String deadLetterExchange;
/**
* 绑定死信队列的路由key
*/
private String deadLetterRoutingKey;
/**
* 其他属性设置
*/
private Map arguments;
}
}
/**
* @Author: GZ
* @CreateTime: 2022-10-25 22:09
* @Description: rabbitmq队列初始化器
* @Version: 1.0
*/
@Slf4j
public class RabbitModuleInitializer implements SmartInitializingSingleton {
private AmqpAdmin amqpAdmin;
private List modules;
public RabbitModuleInitializer(AmqpAdmin amqpAdmin, List modules) {
this.amqpAdmin = amqpAdmin;
this.modules = modules;
}
@Override
public void afterSingletonsInstantiated() {
log.info("RabbitMQ 根据配置动态创建和绑定队列、交换机");
declareRabbitModule();
}
/**
* RabbitMQ 根据配置动态创建和绑定队列、交换机
*/
private void declareRabbitModule() {
if (CollectionUtil.isEmpty(modules)) {
return;
}
for (RabbitModuleInfoDTO rabbitModuleInfo : modules) {
//配置参数校验
configParamValidate(rabbitModuleInfo);
// 队列
Queue queue = convertQueue(rabbitModuleInfo.getQueue());
// 交换机
Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());
// 绑定关系
String routingKey = rabbitModuleInfo.getRoutingKey();
String queueName = rabbitModuleInfo.getQueue().getName();
String exchangeName = rabbitModuleInfo.getExchange().getName();
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
// 创建队列
amqpAdmin.declareQueue(queue);
// 创建交换机
amqpAdmin.declareExchange(exchange);
// 队列 绑定 交换机
amqpAdmin.declareBinding(binding);
}
}
/**
* RabbitMQ动态配置参数校验
*
* @param rabbitModuleInfo
*/
public void configParamValidate(RabbitModuleInfoDTO rabbitModuleInfo) {
String routingKey = rabbitModuleInfo.getRoutingKey();
Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置");
Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);
Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);
Assert.isTrue(rabbitModuleInfo.getQueue() != null, "routingKey:{}未配置queue", routingKey);
Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getQueue().getName()), "routingKey:{}未配置exchange的name属性", routingKey);
}
/**
* 转换生成RabbitMQ队列
*
* @param queue
* @return
*/
public Queue convertQueue(RabbitModuleInfoDTO.Queue queue) {
Map arguments = queue.getArguments();
// 转换ttl的类型为long
if (arguments != null && arguments.containsKey("x-message-ttl")) {
arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
}
//设置队列的优先级
if (arguments != null && arguments.containsKey("x-max-priority")) {
arguments.put("x-max-priority", Convert.toLong(arguments.get("x-max-priority")));
}
// 是否需要绑定死信队列
String deadLetterExchange = queue.getDeadLetterExchange();
String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {
if (arguments == null) {
arguments = new HashMap<>(Constants.RABBIT_MQ_INITIAL_CAPACITY);
}
arguments.put("x-dead-letter-exchange", deadLetterExchange);
arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
}
return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
}
/**
* 转换生成RabbitMQ交换机
*
* @param exchangeInfo
* @return
*/
public Exchange convertExchange(RabbitModuleInfoDTO.Exchange exchangeInfo) {
AbstractExchange exchange = null;
RabbitExchangeTypeEnum exchangeType = exchangeInfo.getType();
String exchangeName = exchangeInfo.getName();
boolean isDurable = exchangeInfo.isDurable();
boolean isAutoDelete = exchangeInfo.isAutoDelete();
Map arguments = exchangeInfo.getArguments();
switch (exchangeType) {
case DIRECT:// 直连交换机
exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case TOPIC: // 主题交换机
exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case FANOUT: //扇形交换机
exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case HEADERS: // 头交换机
exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
}
return exchange;
}
}
@Configuration
@ConfigurationProperties(prefix = "common.tools.rabbitmq")
@Data
public class RabbitConfig {
/**
* 交换机、队列、路由Key
* -
*/
private List modules;
/**
* 使用json序列化机制,进行消息转换
*/
@Bean
public MessageConverter jackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 动态创建队列、交换机初始化器
*/
@Bean
@ConditionalOnMissingBean
public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin) {
return new RabbitModuleInitializer(amqpAdmin, modules);
}
}
common:
tools:
rabbitmq:
# 动态创建和绑定队列、交换机的配置
modules:
- routing-key: log.inbound.operation.queue.key
queue:
name: log.inbound.operation.queue
exchange:
name: log.exchange
type: direct
spring:
rabbitmq:
host: 192.168.16.128 # rabbitmq的连接地址
port: 5672 # rabbitmq的连接端口号
username: guest # rabbitmq的用户名
password: guest # rabbitmq的密码
@Component
@RequiredArgsConstructor
@Slf4j
public class TestPublisher {
private final RabbitTemplate rabbitTemplate;
public void send(TestMessageDTO testMessageDTO){
rabbitTemplate.convertAndSend("log.exchange", "log.inbound.operation.queue.key", testMessageDTO);
}
}
@Component
public class TestConsumer {
@RabbitListener(queues = "log.inbound.operation.queue")
public void handleInventoryOperation(TestMessageDTO testMessageDTO) {
System.out.println(testMessageDTO);
}
}
页面更新:2024-03-19
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号