12.Spring和Springboot中配置rabbitMq配置

1.Spring中rabbitmq配置

<?xml version="1.0" encoding="UTF-8"?>


    
    
    
    
    
    
    
    
    
        
   
            
            
           
        
    

    
    
    
 
    
    
   
    
   
    
    
  
    

    
        
        
        
     
        
        
       
    
    


    
    



2.spring中代码调用

@Service
@Slf4j
public class OrderCancelListener implements MessageListener {
    @Autowired
    private MqOrderService mqOrderService;
    @Override
    public void onMessage(Message message) {
        if (null == message.getBody()) {
            log.error("rabbitmq订单取消msg.getBody()为空");
            return;
        }
  
        try {
            String content = new String(message.getBody());
            log.info("rabbitmq订单取消消息beg:{}", content);
            JSONObject jsonObject = JSONObject.parseObject(content);
            Object object = jsonObject.get("Properties");
           
            log.info("rabbitmq订单取消end,content="+new String(message.getBody()));
        } catch (Exception e) {
            log.error("rabbitmq订单取消error,msg=[{}]",message.toString(),e);
         
            return;
        }
        return;

    }
}

3.SpringBoot中rabbbitmq配置

#配置mq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#虚拟host可以不进行设置
spring.rabbitmq.virtual-host=/
#设置回调确认
#spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated

4.Springboot中声明交换机和队列

@Configuration
public class TopicRabbitConfig {
    //绑定键
    public final static String man = "topic.order";//0
    public final static String woman = "topic.cancel.order";//1
    public final static String order = "buriedPoint.ordered";//2
    public final static String pay = "PaySuccessWebOrder";//3
    public final static String enterClass = "enterClass";//4
    public final static String exitClass = "exitClass";//5
    public final static String buriedPoint = "buriedPoint.orderCanceld";//6
    public final static String testPoint = "test.Point";//6
    /*消息确认队列*/
   public final static  String CONFIRM="topic.messages";

    @Bean
    public Queue confirmQueue() {
        return new Queue(TopicRabbitConfig.CONFIRM);
    }
    @Bean
    public Queue firstQueue() {
        return new Queue(TopicRabbitConfig.man);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(TopicRabbitConfig.woman);
    }
    @Bean
    public Queue Queue3() {
        return new Queue(TopicRabbitConfig.order);
    }
    @Bean
    public Queue QueueEnterClass() {
        return new Queue(TopicRabbitConfig.enterClass);
    }

    @Bean
    public Queue QueueExitClass() {
        return new Queue(TopicRabbitConfig.exitClass);
    }
    @Bean
    public Queue QueuePay() {
        return new Queue(TopicRabbitConfig.pay);
    }
    @Bean
    public Queue QueueBuried() {
        return new Queue(TopicRabbitConfig.buriedPoint);
    }
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }
    @Bean
    TopicExchange exchange2() {
        return new TopicExchange("topicExchange2");
    }

    //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
    // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
    @Bean
    Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }
    //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
    //这样只要是消息携带的路由键是topic.man,才会分发到该队列
    @Bean
    Binding bindingExchangeMessage3() {
        return BindingBuilder.bind(Queue3()).to(exchange()).with(order);
    }
   //新增
   @Bean
   Binding bindingExchangeMessage4() {
       return BindingBuilder.bind(firstQueue()).to(exchange()).with(pay);
   }
    @Bean
    Binding bindingExchangeMessage5() {
        return BindingBuilder.bind(QueueEnterClass()).to(exchange()).with(enterClass);
    }
    @Bean
    Binding bindingExchangeMessage6() {
        return BindingBuilder.bind(QueueExitClass()).to(exchange()).with(exitClass);
    }
    @Bean
    Binding bindingExchangeMessage7() {
        return BindingBuilder.bind(QueuePay()).to(exchange()).with(pay);
    }
    @Bean
    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(QueueBuried()).to(exchange()).with(buriedPoint);
    }

    @Bean
    Binding bindingExchangeMessage8() {
        return BindingBuilder.bind(QueueBuried()).to(exchange2()).with(testPoint);
    }

    //进行绑定确认queue
    @Bean
    Binding bindingConfirmExchange() {
        return BindingBuilder.bind(confirmQueue()).to(exchange()).with(CONFIRM);
    }


}

5.写消息

@RestController
public class SendMessageController {
    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "ok";
    }
    @GetMapping("/sendTopicMessage1")
    public String sendTopicMessage1() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: M A N ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map manMap = new HashMap<>();
        manMap.put("messageId", messageId);
        manMap.put("messageData", messageData);
        manMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("passport_exchange", "topic.order", manMap);
        return "ok";
    }

    @GetMapping("/sendTopicMessage2")
    public String sendTopicMessage2() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: woman is all ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map womanMap = new HashMap<>();
        womanMap.put("messageId", messageId);
        womanMap.put("messageData", messageData);
        womanMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.cancel.order", womanMap);
        return "ok";
    }
    @GetMapping("/sendTopicMessage3")
    public String sendTopicMessage3() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: woman is all ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map womanMap = new HashMap<>();
        womanMap.put("messageId", messageId);
        womanMap.put("messageData", messageData);
        womanMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "regServiceTest.buriedPoint", womanMap);
        return "ok";
    }
    /**
     * @Author: 郭佳
     * @Params:
     * @Description:接收topic消息
     * @Return:
     * @Date:  2020-12-22 10:42
     */
    @GetMapping("/sendTopic")
    public String sendTopic(String topic) {
        String messageId = String.valueOf(UUID.randomUUID());
//        String messageData = "message: woman is all ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map womanMap = new HashMap<>();
        womanMap.put("messageId", messageId);
        womanMap.put("messageData", topic);
        womanMap.put("createTime", createTime);
        womanMap.put("msg", "这是个测试");
        //TestDirectExchange  topicExchange
        rabbitTemplate.convertAndSend("TestFanoutExchange", topic, womanMap);
//        rabbitTemplate.convertAndSend("topicExchange2", topic, womanMap);

        return "ok";
    }

}

5.读消息

@Component
public class DirectReceiver {

   //消息确认消费
    @RabbitListener(queues = "topic.messages")
//    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver2  : " + message);
    }


    //    @RabbitHandler
    @RabbitListener(queues = "TestFanoutQueue")//监听的队列名称 TestDirectQueue
    public void processFanout(Map testMessage) {
        System.out.println("FanoutReceiver消费者收到消息  : " + testMessage.toString());
    }

    //    @RabbitHandler
    @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消费者收到消息  : " + testMessage.toString());
    }

    @RabbitListener(queues = "topic.man")
    public void receiveMessage2(Map testMessage) {
        System.out.println("我是监听topic.man 的,满足 topic.# 的都过来 , " + testMessage.toString());
    }

    @RabbitListener(queues = "topic.woman")
    public void receiveMessage3(Map testMessage) {
        System.out.println("我是监听topic.woman 的,满足 topic.# 的都过来 , " + testMessage.toString());
    }
}
展开阅读全文

页面更新:2024-04-29

标签:都会   队列   路由   绑定   交换机   开头   订单   声明   消费者   规则   名称   消息   代码   测试   方法   科技

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top