RabbitMQ死信队列实战——解决订单超时未支付

365bet体育开户官网 📅 2025-07-03 07:25:36 👤 admin 👁️ 6612 ❤️ 700
RabbitMQ死信队列实战——解决订单超时未支付

目录

一、死信队列

二、产生原因

三、解决订单超时

代码实现

一、死信队列

DLX(dead-letter-exchange),死信队列也是一般的队列,当消息变成死信时,消息会投递到死信队列中,经过死信队列进行消费的一种形式,对应的交换机叫死信交换机DLX。

二、产生原因

1、当消息投递到mq后,没有消费者去消费,而消息过期后会进入死信队列。

package com.xiaojie.springboot.config;

import org.springframework.amqp.core.*;

import org.springframework.context.annotation.Bean;

import org.springframework.stereotype.Component;

import java.util.HashMap;

import java.util.Map;

/**

* @author xiaojie

* @version 1.0

* @description:死信队列配置

* @date 2021/10/8 21:07

*/

@Component

public class DLXConfig {

//定义队列

private static final String MY_DIRECT_QUEUE = "snail_direct_queue";

//定义队列

private static final String MY_DIRECT_DLX_QUEUE = "xiaojie_direct_dlx_queue";

//定义死信交换机

private static final String MY_DIRECT_DLX_EXCHANGE = "xiaojie_direct_dlx_exchange";

//定义交换机

private static final String MY_DIRECT_EXCHANGE = "snail_direct_exchange";

//死信路由键

private static final String DIRECT_DLX_ROUTING_KEY = "msg.dlx";

//绑定死信队列

@Bean

public Queue dlxQueue() {

return new Queue(MY_DIRECT_DLX_QUEUE);

}

//绑定死信交换机

@Bean

public DirectExchange dlxExchange() {

return new DirectExchange(MY_DIRECT_DLX_EXCHANGE);

}

@Bean

public Queue snailQueue() {

Map args = new HashMap<>(2);

// 绑定我们的死信交换机

args.put("x-dead-letter-exchange", MY_DIRECT_DLX_EXCHANGE);

// 绑定我们的路由key

args.put("x-dead-letter-routing-key", DIRECT_DLX_ROUTING_KEY);

return new Queue(MY_DIRECT_QUEUE, true, false, false, args);

}

@Bean

public DirectExchange snailExchange() {

return new DirectExchange(MY_DIRECT_EXCHANGE);

}

//绑定队列到交换机

@Bean

public Binding snailBindingExchange(Queue snailQueue, DirectExchange snailExchange) {

return BindingBuilder.bind(snailQueue).to(snailExchange).with("msg.send");

}

//绑定死信队列到死信交换机

@Bean

public Binding dlxBindingExchange(Queue dlxQueue, DirectExchange dlxExchange) {

return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DIRECT_DLX_ROUTING_KEY);

}

}

生产者产生消息后,并没与消费者去消费,等待消息过期后,自动进入死信队列

public class DLXProvider {

//定义交换机

private static final String MY_DIRECT_EXCHANGE = "snail_direct_exchange";

//普通队列路由键

private static final String DIRECT_ROUTING_KEY = "msg.send";

@Autowired

private RabbitTemplate rabbitTemplate;

public String sendDlxMsg(){

String msg="我是模拟死信队列的消息。。。。。";

rabbitTemplate.convertAndSend(MY_DIRECT_EXCHANGE, DIRECT_ROUTING_KEY, msg, (message) -> {

//设置有效时间,如果消息不被消费,进入死信队列

message.getMessageProperties().setExpiration("10000");

return message;

});

return "success";

}

}

2、当队列满了之后

@Bean

public Queue snailQueue() {

Map args = new HashMap<>(2);

// 绑定我们的死信交换机

args.put("x-dead-letter-exchange", MY_DIRECT_DLX_EXCHANGE);

// 绑定我们的路由key

args.put("x-dead-letter-routing-key", DIRECT_DLX_ROUTING_KEY);

// args.put("x-message-ttl", 5000); //为队列设置过期时间

// x-max-length:队列最大容纳消息条数,大于该值,mq拒绝接受消息,消息进入死信队列

args.put("x-max-length", 5);

return new Queue(MY_DIRECT_QUEUE, true, false, false, args);

}

注意:如果在添加了这一条(队列长度)发生异常时,请删除掉交换机和队列后,重新启动程序,重新进行绑定。

3、消费者拒绝消费消息(消费端发生异常,mq无法收到消费端的ack)

package com.xiaojie.springboot.consumer;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.amqp.support.AmqpHeaders;

import org.springframework.messaging.handler.annotation.Headers;

import org.springframework.stereotype.Component;

import java.util.Map;

/**

* @Description: 消费snail消息的消费者

* @author: xiaojie

* @date: 2021.10.09

*/

@Component

@Slf4j

public class SnailConsumer {

@RabbitListener(queues = "snail_direct_queue")

public void process(Message message, @Headers Map headers, Channel channel) throws Exception {

// 获取消息Id

String messageId = message.getMessageProperties().getMessageId();

String msg = new String(message.getBody(), "UTF-8");

log.info("获取到的消息>>>>>>>{},消息id>>>>>>{}", msg, messageId);

try {

int result = 1 / 0;

System.out.println("result" + result);

// // 手动ack

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

// 手动签收

channel.basicAck(deliveryTag, false);

} catch (Exception e) {

//拒绝消费消息(丢失消息) 给死信队列

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

}

}

}

三、解决订单超时

代码实现

绑定订单死信队列

package com.xiaojie.springboot.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.stereotype.Component;

import java.util.HashMap;

import java.util.Map;

/**

* @author xiaojie

* @version 1.0

* @description:解决订单超时未支付问题,绑定订单死信队列

* @date 2021/10/8 23:12

*/

@Component

public class OrderDlxConfig {

@Value(value="${xiaojie.order.queue}")

private String orderQueue; //订单队列

@Value(value="${xiaojie.order.exchange}")

private String orderExchange;//订单队列

@Value(value="${xiaojie.dlx.queue}")

private String orderDeadQueue;//订单死信队列

@Value(value="${xiaojie.dlx.exchange}")

private String orderDeadExChange;//订单死信交换机

@Value(value="${xiaojie.order.routingKey}")

private String orderRoutingKey;//订单路由键

@Value(value="${xiaojie.dlx.routingKey}")

private String orderDeadRoutingKey;//死信队列路由键

@Bean

public Queue orderQueue(){

Map args = new HashMap<>(2);

// 绑定我们的死信交换机

args.put("x-dead-letter-exchange", orderDeadExChange);

// 绑定我们的路由key

args.put("x-dead-letter-routing-key", orderDeadRoutingKey);

return new Queue(orderQueue, true, false, false, args);

}

@Bean

public Queue orderDeadQueue(){

return new Queue(orderDeadQueue);

}

//绑定交换机

@Bean

public DirectExchange orderExchange(){

return new DirectExchange(orderExchange);

}

@Bean

public DirectExchange orderDeadExchange(){

return new DirectExchange(orderDeadExChange);

}

//绑定路由键

@Bean

public Binding orderBindingExchange(Queue orderQueue, DirectExchange orderExchange) {

return BindingBuilder.bind(orderQueue).to(orderExchange).with(orderRoutingKey);

}

//绑定死信队列到死信交换机

@Bean

public Binding deadBindingExchange(Queue orderDeadQueue, DirectExchange orderDeadExchange) {

return BindingBuilder.bind(orderDeadQueue).to(orderDeadExchange).with(orderDeadRoutingKey);

}

}

创建订单完成之后,发送消息

package com.xiaojie.springboot.service.impl;

import com.alibaba.fastjson.JSONObject;

import com.xiaojie.springboot.entity.Order;

import com.xiaojie.springboot.mapper.OrderMapper;

import com.xiaojie.springboot.service.OrderService;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.scheduling.annotation.Async;

import org.springframework.stereotype.Service;

import java.util.UUID;

/**

* @author xiaojie

* @version 1.0

* @description: 订单实现类

* @date 2021/10/8 22:16

*/

@Service

public class OrderServiceImpl implements OrderService {

@Autowired

private OrderMapper orderMapper;

@Autowired

private RabbitTemplate rabbitTemplate;

@Value(value = "${xiaojie.order.exchange}")

private String orderExchange;

@Value(value = "${xiaojie.order.routingKey}")

private String orderRoutingKey;

@Override

public String saveOrder(Order order) {

String orderId = UUID.randomUUID().toString();

order.setOrderId(orderId);

order.setOrderName("test");

order.setPayMoney(3000D);

Integer result = orderMapper.addOrder(order);

if (result > 0) {

String msg = JSONObject.toJSONString(order);

//发送mq

sendMsg(msg, orderId);

return "success";

}

return "fail";

}

/**

* @description: 发送mq消息

* @param:

* @param: msg

* @param: orderId

* @return: void

* @author xiaojie

* @date: 2021/10/8 22:33

*/

@Async //异步线程发送 ,此处需要单独创建一个类去创建该方法,不然该异步线程可能不会生效

public void sendMsg(String msg, String orderId) {

rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, msg, new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

//设置过期时间30s

message.getMessageProperties().setExpiration("30000");

// message.getMessageProperties().setMessageId(orderId);

return message;

}

});

}

@Override

public Order getByOrderId(String orderId) {

return orderMapper.getOrder(orderId);

}

@Override

public Integer updateOrderStatus(String orderId) {

return orderMapper.updateOrder(orderId);

}

}

死信队列消费者

package com.xiaojie.springboot.service;

import com.alibaba.fastjson.JSONObject;

import com.rabbitmq.client.Channel;

import com.xiaojie.springboot.entity.Order;

import com.xiaojie.springboot.myenum.OrderStatus;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.ExchangeTypes;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.*;

import org.springframework.amqp.support.AmqpHeaders;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.handler.annotation.Headers;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.stereotype.Component;

import java.io.IOException;

import java.util.Map;

/**

* @author xiaojie

* @version 1.0

* @description: 死信队列解决订单超时问题

* @date 2021/10/8 22:43

*/

@Component

@RabbitListener(bindings = @QueueBinding(

value = @Queue("xiaojie_order_dlx_queue"),

exchange = @Exchange(value = "xiaojie_order_dlx_exchange", type = ExchangeTypes.DIRECT),

key = "order.dlx"))

@Slf4j

public class Consumer {

@Autowired

private OrderService orderService;

/*

* @param msg

* @param headers

* @param channel

* @死信队列消费消息,如果订单状态是未支付,则修改订单状态

* @author xiaojie

* @date 2021/10/9 13:49

* @return void

*/

@RabbitHandler

public void handlerMsg(@Payload String msg, @Headers Map headers,

Channel channel) throws IOException {

log.info("接收到的消息是direct:{}" + msg);

try {

Order orderEntity = JSONObject.parseObject(msg, Order.class);

if (orderEntity == null) {

return;

}

// 根据订单号码查询该笔订单是否存在

Order order = orderService.getByOrderId(orderEntity.getOrderId());

if (order == null) {

return;

}

//判读订单状态

if (OrderStatus.UNPAY.getStatus() == order.getStatus()) {

//未支付,修改订单状态

orderService.updateOrderStatus(orderEntity.getOrderId());

//库存+1

System.out.println("库存+1");

}

//delivery tag可以从消息头里边get出来

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

//手动应答,消费者成功消费完消息之后通知mq,从队列移除消息,需要配置文件指明。第二个参数为是否批量处理

channel.basicAck(deliveryTag, false);

} catch (IOException e) {

e.printStackTrace();

//补偿机制

}

}

}

完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码

相关推荐

寒冰王座1.20e官方下载方法,详细步骤看这里!
365app下载安装官方免费下载

寒冰王座1.20e官方下载方法,详细步骤看这里!

📅 07-01 👁️ 3654
十大不需要付费的扫描app推荐
365bet体育开户官网

十大不需要付费的扫描app推荐

📅 06-28 👁️ 1994