场景
当一个订单在删除时,状态修改为锁定,当30分钟没有解锁后,那么将订单的的状态变成删除,并且执行真正的删除逻辑,将订单相关的删除。
解决:订单被误删的问题,如果在删除的延迟时间中(30分钟内),将状态修改为正正常则不会删除

实现
常量定义
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class MQConst { public static final String EXCHANGE_DELAY_NAME_DELETE_ORDER ="topicExchange.delay.delete.order";
public static final String QUEUE_DELAY_NAME_DELETE_ORDER ="queue.delay.delete.order";
public static final String ROUTINGKEY_DELAY_NAME_DELETE_ORDER ="routingKey.delay.delete.order";
public static final String EXCHANGE_DEADLETTER_NAME_DELETE_ORDER ="topicExchange.deadLetter.delete.order";
public static final String QUEUE_DEADLETTER_NAME_DELETE_ORDER ="queue.deadLetter.delete.order";
public static final String ROUTINGKEY_DEADLETTER_NAME_DELETE_ORDER ="routingKey.deadLetter.delete.order"; }
|
绑定关系
springboot 启动是动态创建死信队列,交换机,路由键。
延迟队列,交换机,路由键,已经绑定
以及延迟队列消息过期根据
死信的路由键以及队列找到对应的死信队列,将消费放到里面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
|
@Component @Slf4j public class RabbitMQApplicationRunner implements ApplicationRunner {
@Resource private RabbitAdmin rabbitAdmin;
@Override public void run(ApplicationArguments args) throws Exception {
createExchangeQueueBinding(EXCHANGE_DEADLETTER_NAME_DELETE_ORDER, QUEUE_DEADLETTER_NAME_DELETE_ORDER, ROUTINGKEY_DEADLETTER_NAME_DELETE_ORDER;
createExchangeDelayedQueueBinding(EXCHANGE_DELAY_NAME_DELETE_ORDER, QUEUE_DELAY_NAME_DELETE_ORDER, ROUTINGKEY_DELAY_NAME_DELETE_ORDER, EXCHANGE_DEADLETTER_NAME_DELETE_ORDER,ROUTINGKEY_DEADLETTER_NAME_DELETE_ORDER);
}
public void createExchangeQueueBinding(String exchangeName, String queueName, String routingKey) { TopicExchange exchange = new TopicExchange(exchangeName); rabbitAdmin.declareExchange(exchange);
Queue queue = new Queue(queueName); rabbitAdmin.declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey); rabbitAdmin.declareBinding(binding);
log.info("Exchange, Queue, and Binding created successfully: Exchange [{}], Queue [{}], Routing Key [{}]", exchangeName, queueName, routingKey); }
public void createExchangeDelayedQueueBinding(String exchangeName, String queueName, String routingKey, String deadLetterExchangeName,String deadLetterRoutingKey) { TopicExchange exchange = new TopicExchange(exchangeName); rabbitAdmin.declareExchange(exchange);
Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", deadLetterExchangeName); arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey); arguments.put("x-message-ttl", 6 * 60 * 60 * 1000);
Queue queue = new Queue(queueName, true, false, false, arguments); rabbitAdmin.declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey); rabbitAdmin.declareBinding(binding);
log.info("Delayed Queue created: [{}]", queueName); }
}
|
死信消费者
最好先注释(因为执行顺序问题,可能队列未有找到,然后报错)
用上面的代码先创建死信队列,延迟队列...到服务器的Rabbitmq上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Component @DependsOn("rabbitMQApplicationRunner") @Slf4j public class BusinessConsumer {
@Resource private OrderService orderService;
@RabbitListener(queues = QUEUE_DEADLETTER_NAME_DELETE_ORDER) public void processTenantDelete(Long orderId) { log.info("\n ==== 收到删除订单消息,orderId: {} =====", orderId); try { orderService.deleteRealOrder(orderId); log.info("订单删除成功,orderId: {}", orderId); } catch (Exception e) { log.error("删除订单失败: {} - orderId: {}", e, orderId); } }
}
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Api(tags = "订单管理 - V2") @RestController @Slf4j @RequestMapping("/v2/order") public class TenantControllerV2 {
@Autowired OrderService orderService;
@DeleteMapping(value = "/{id}") @ApiOperation(value = "管理员删除订单", notes = "仅供管理员操作") @ApiImplicitParam(name = "id", value = "订单id", required = true, dataType = SwaggerDataType.LONG) public ResponseEntity<Void> delete(@PathVariable Long id) { orderService.deleteV2(id); return new ResponseEntity<>(HttpStatus.OK); }
}
|
服务
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void deleteV2(Long id) { Order tenant = this.orderRepo.findById(id) .orElseThrow(() -> new RestBadRequestException("order not found!")); order.setStatus(OrderStatusEnum.LOCKED); orderRepo.save(tenant); rabbitTemplate.convertAndSend( MQConst.EXCHANGE_DELAY_NAME_DELETE_ORDER, MQConst.ROUTINGKEY_DELAY_NAME_DELETE_ORDER, id ); }
|
真正删除的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Override @Transactional public void deleteRealOrder(Long id) { order order = this.orderRepo.findById(id) .orElseThrow(() -> new RestBadRequestException("order not found!")); log.info("开始删除订单:{}, 状态为:{}, 是否是锁定:{}",id,order.getUserStatus(),order.getUserStatus().equals(orderStatusEnum.LOCKED));
if(order.getUserStatus().equals(orderStatusEnum.DELETED)){ return; } if(order.getUserStatus().equals(orderStatusEnum.LOCKED)){ order.setUserStatus(orderStatusEnum.DELETED); orderRepo.save(order); Map<Long, SimpleorderVO> orderIdToSimpleorderMap = (Map<Long, SimpleorderVO>) RedisUtils.getMappingMap("orderIdToSimpleorderMap", redisTemplate); if (null != orderIdToSimpleorderMap) { orderIdToSimpleorderMap.remove(order.getId()); redisTemplate.opsForHash().delete("orderIdToSimpleorderMap", order.getId()); } deviceService.unbindByorderIdV2(order); orderRelationService.delete(id); } }
|
验证
打开rabbitmq的控制台,进入到队列里面,点击
“queue.delay.delete.order”
发起后就能看到里面有一条消息,等半小时后就会消失,在去数据库检查状态是否变成DELETE,是就删除成功。
如果测试的话,最好将延迟队列设置为2分钟,如果需要设置
就需要去rabbitmq控制台先手动删除之前的订单延迟,死信队列。注释监听死信队列的消费者代码,重启应用,去创建这些队列。
延迟队列创建成功,去控制台查看ttl真的是2分钟吗?如果是解开死信队列的消费者代码,重启应用。