技术点
RabbitMQ: 死信队列,脑瓜子空空就点 传送门
进行回忆呗!
实现原理
1、用户下单之后,投递一个订单消息存放在订单队列里,该消息过期时间为x分钟,一直未被订单消费者消费,消息会转移到死信交换机路由到死信队列中,被我们的死信消费者x分钟后消费
2、死信消费者在根据订单号码查询支付订单状态,如果是未支付情况下,则将该订单设置未超时。
对筛选出来的订单号码进行核对校验
- 订单中是否存在
- 携带订单号码调用支付宝查询订单支付状态是否为待支付
- 更新该订单号码状态
超时消费流程图

代码
定义配置类
1、定义订单交换机,订单队列,设置订单交换机绑定订单队列
2、定义死信订单交换机,死信订单队列,设置死信订单交换机绑定死信订单队列
3、设置订单队列的死信订单交换机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| @Configuration public class QueueConfig {
public static final String ORDER_EXCHANGE="ORDER_EXCHANGE"; public static final String DEAD_ORDER_EXCHANGE="DEAD_ORDER_EXCHANGE"; public static final String ORDER_QUEUE="ORDER_QUEUE"; public static final String DEAD_ORDER_QUEUE="DEAD_ORDER_QUEUE"; public static final String ORDER_EXCHANGE_ROUTING_KEY="ORDER_EXCHANGE_ROUTING_KEY"; public static final String DEAD_ORDER_EXCHANGE_ROUTING_KEY="DEAD_ORDER_EXCHANGE_ROUTING_KEY"; @Bean("orderExchange") public DirectExchange orderExchange(){ return new DirectExchange(ORDER_EXCHANGE); }
@Bean("orderDeadExchange") public DirectExchange orderDdadExchange(){ return new DirectExchange(DEAD_ORDER_EXCHANGE); }
@Bean("orderQueue") public Queue orderQueue(){ Map<String,Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange",DEAD_ORDER_EXCHANGE); arguments.put("x-dead-letter-routing-key",DEAD_ORDER_EXCHANGE_ROUTING_KEY); return QueueBuilder.durable(ORDER_QUEUE).withArguments(arguments).build(); }
@Bean("deadOrderQueue") public Queue deadOrderQueue(){ return QueueBuilder.durable(DEAD_ORDER_QUEUE).build(); }
@Bean public Binding queueABindingX(@Qualifier("orderQueue") Queue orderQueue, @Qualifier("orderExchange") DirectExchange orderExchange){ return BindingBuilder.bind(orderQueue).to(orderExchange).with(ORDER_EXCHANGE_ROUTING_KEY); }
@Bean public Binding queueDBindingY(@Qualifier("deadOrderQueue") Queue deadOrderQueue, @Qualifier("deadorderExchange") DirectExchange deadorderExchange){ return BindingBuilder.bind(deadOrderQueue).to(deadorderExchange).with(DEAD_ORDER_EXCHANGE_ROUTING_KEY); }
}
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Resource private OrderTimeoutManager orderTimeoutManager;
@Override public BaseResponse<String> toPayResultToken(PayOrderTokenDto payOrderTokenDto) { PaymentInfoEntity paymentChannelEntity = dtoToDo(payOrderTokenDto, PaymentInfoEntity.class); int result = paymentInfoMapper.insert(paymentChannelEntity); if (result <= 0) { return setResultError("插入支付记录失败!"); } Long id = paymentChannelEntity.getId();
orderTimeoutManager.sendOrderTimeoutMsg(id + ""); return setResultSuccess(payToken); }
|
指定消息的过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Component public OrderTimeoutManager { @Resource private RabbitTemplate rabbitTemplate; public void sendOrderTimeoutMsg(String msg) { MessagePostProcessor messagePostProcessor = message -> { message.getMessageProperties().setExpiration("30000"); return message; }; rabbitTemplate.convertAndSend(ConfirmConfig.ORDER_EXCHANGE, ConfirmConfig.CONFIRM_ROUTING_KEY,msg,messagePostProcessor); log.info("发送消息内容:{}",msg); } }
|
也可以定义多个消费者,进行轮训消费消息
消费者
1 2 3 4 5 6 7 8 9 10
| @Slf4j @Component public class OrderConsumer {
@RabbitListener(queues = ConfirmConfig.ORDER_QUEUE) public void receiveConfirmMessage(Message message){ String msg = new String(message.getBody()); log.info("接受到的订单队列orderqueue消息:{}",msg); } }
|
死信队列消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Slf4j @Component public class DeadOrderConsumer {
@Autowired private PayOrderTimeoutService payOrderTimeoutService;
@RabbitListener(queues = ConfirmConfig.DEAD_ORDER_QUEUE) public void receiveConfirmMessage(Message message){ String msg = new String(message.getBody()); log.info("接受到的队列deadOrderqueue消息:{}",msg); payOrderTimeoutService.orderTimeout(payId);
}
}
|
校验封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Service public class PayOrderTimeoutService { @Autowired private PaymentInfoMapper paymentInfoMapper;
public boolean orderTimeout(Long payId) { PaymentInfoEntity paymentInfoEntity = paymentInfoMapper.selectById(Integer.paseInt(payId)); if (paymentInfoEntity == null) { return false; }
if (!PaymentConstant.PAYMENT_STATUS_NOT.equals(paymentInfoEntity.getPaymentStatus())) { return false; } paymentInfoEntity.setPaymentStatus(PaymentConstant.PAYMENT_STATUS_TIMEOUT); paymentInfoMapper.updateById(paymentInfoEntity); return true; } }
|
后续更新要解决消息队列消息丢失,持久化设置