<?xml version="1.0" encoding="UTF-8"?>
@Service
@Slf4j
public class OrderCancelListener implements MessageListener {
@Autowired
private MqOrderService mqOrderService;
@Override
public void onMessage(Message message) {
if (null == message.getBody()) {
log.error("rabbitmq订单取消msg.getBody()为空");
return;
}
try {
String content = new String(message.getBody());
log.info("rabbitmq订单取消消息beg:{}", content);
JSONObject jsonObject = JSONObject.parseObject(content);
Object object = jsonObject.get("Properties");
log.info("rabbitmq订单取消end,content="+new String(message.getBody()));
} catch (Exception e) {
log.error("rabbitmq订单取消error,msg=[{}]",message.toString(),e);
return;
}
return;
}
}
#配置mq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#虚拟host可以不进行设置
spring.rabbitmq.virtual-host=/
#设置回调确认
#spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
@Configuration
public class TopicRabbitConfig {
//绑定键
public final static String man = "topic.order";//0
public final static String woman = "topic.cancel.order";//1
public final static String order = "buriedPoint.ordered";//2
public final static String pay = "PaySuccessWebOrder";//3
public final static String enterClass = "enterClass";//4
public final static String exitClass = "exitClass";//5
public final static String buriedPoint = "buriedPoint.orderCanceld";//6
public final static String testPoint = "test.Point";//6
/*消息确认队列*/
public final static String CONFIRM="topic.messages";
@Bean
public Queue confirmQueue() {
return new Queue(TopicRabbitConfig.CONFIRM);
}
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
public Queue Queue3() {
return new Queue(TopicRabbitConfig.order);
}
@Bean
public Queue QueueEnterClass() {
return new Queue(TopicRabbitConfig.enterClass);
}
@Bean
public Queue QueueExitClass() {
return new Queue(TopicRabbitConfig.exitClass);
}
@Bean
public Queue QueuePay() {
return new Queue(TopicRabbitConfig.pay);
}
@Bean
public Queue QueueBuried() {
return new Queue(TopicRabbitConfig.buriedPoint);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
@Bean
TopicExchange exchange2() {
return new TopicExchange("topicExchange2");
}
//将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
//将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
//这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage3() {
return BindingBuilder.bind(Queue3()).to(exchange()).with(order);
}
//新增
@Bean
Binding bindingExchangeMessage4() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(pay);
}
@Bean
Binding bindingExchangeMessage5() {
return BindingBuilder.bind(QueueEnterClass()).to(exchange()).with(enterClass);
}
@Bean
Binding bindingExchangeMessage6() {
return BindingBuilder.bind(QueueExitClass()).to(exchange()).with(exitClass);
}
@Bean
Binding bindingExchangeMessage7() {
return BindingBuilder.bind(QueuePay()).to(exchange()).with(pay);
}
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(QueueBuried()).to(exchange()).with(buriedPoint);
}
@Bean
Binding bindingExchangeMessage8() {
return BindingBuilder.bind(QueueBuried()).to(exchange2()).with(testPoint);
}
//进行绑定确认queue
@Bean
Binding bindingConfirmExchange() {
return BindingBuilder.bind(confirmQueue()).to(exchange()).with(CONFIRM);
}
}
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: M A N ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("passport_exchange", "topic.order", manMap);
return "ok";
}
@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.cancel.order", womanMap);
return "ok";
}
@GetMapping("/sendTopicMessage3")
public String sendTopicMessage3() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "regServiceTest.buriedPoint", womanMap);
return "ok";
}
/**
* @Author: 郭佳
* @Params:
* @Description:接收topic消息
* @Return:
* @Date: 2020-12-22 10:42
*/
@GetMapping("/sendTopic")
public String sendTopic(String topic) {
String messageId = String.valueOf(UUID.randomUUID());
// String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", topic);
womanMap.put("createTime", createTime);
womanMap.put("msg", "这是个测试");
//TestDirectExchange topicExchange
rabbitTemplate.convertAndSend("TestFanoutExchange", topic, womanMap);
// rabbitTemplate.convertAndSend("topicExchange2", topic, womanMap);
return "ok";
}
}
@Component
public class DirectReceiver {
//消息确认消费
@RabbitListener(queues = "topic.messages")
// @RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver2 : " + message);
}
// @RabbitHandler
@RabbitListener(queues = "TestFanoutQueue")//监听的队列名称 TestDirectQueue
public void processFanout(Map testMessage) {
System.out.println("FanoutReceiver消费者收到消息 : " + testMessage.toString());
}
// @RabbitHandler
@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
public void process(Map testMessage) {
System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());
}
@RabbitListener(queues = "topic.man")
public void receiveMessage2(Map testMessage) {
System.out.println("我是监听topic.man 的,满足 topic.# 的都过来 , " + testMessage.toString());
}
@RabbitListener(queues = "topic.woman")
public void receiveMessage3(Map testMessage) {
System.out.println("我是监听topic.woman 的,满足 topic.# 的都过来 , " + testMessage.toString());
}
}
页面更新:2024-04-29
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号