使用生产者/消费者确认机制
生产者确认机制,确保消息可以成功发送到队列
spring:
rabbitmq:
host: 192.168.134.171 # rabbitMQ的ip地址
port: 5673 # 端口,默认是5672这里我用的docker映射的5673
username: hmall
password: 123
virtual-host: /hmall
connection-timeout: 1s #设置mq超时时间
template:
retry:
initial-interval: 1000ms #失败后的初始等待时间
multiplier: 1 #下载失败hou等待时长倍数
max-attempts: 3 #最大重试次数
enabled: true # 开启重试机制
#下面配置用于在消息发送者中使用用于监听回调信息
#publisher-confirm-type: correlated #开启publisher-confirm 并设置类型为correlated || 一般不用会降低性能
#他有三种类型
#none 关闭 / simple 同步阻塞 / correlated mq异步方式返回回执消息
publisher-returns: true #开启这个publisher-returns机制,一般用于返回 路由是否存在的,如果不存在是开发者自己导致的一般无需开启
消费者确认机制,确保消息可以成功执行
spring:
rabbitmq:
host: 192.168.134.170 # rabbitMQ的ip地址
port: 5673 # 端口
username: hmall
password: 123
virtual-host: /hmall
listener:
simple:
prefetch: 1
#这里就是使用预取一次只能取一个,提高性能,让性能低的需要处理完才能接受下一个,不影响其他的使用
acknowledge-mode: auto #确认机制
#如果确认机制使用none的时候,等监听到这个消息后就算后面发生异常了也会把消息删除了,如果使用auto则是如果遇到异常自动再次从新发送消息,如果一直异常可能会一直抛出异常
retry:
enabled: true #开启重试机制
connection-timeout: 1s #设置mq超时时间
template:
retry:
initial-interval: 1000ms #失败后的初始等待时间
multiplier: 1 #下载失败hou等待时长倍数
max-attempts: 3 #最大重试次数
enabled: true # 开启重试机制
#stateless: true #true 无状态 false有状态。如果业务中包含事务,这里改为false
需要使用消息重试【加入幂等判断防止消息重复消费】
如果在并发情况下可能会多次更新数据状态,就需要保证业务幂等性,就比如多次执行这个sql操作不会影响结果,比如多次删除购物车
需要使用消费失败处理
在消息重试几次后业务消息还是执行报错失败时,可以将失败消息发送到error.derect这个交换机中,在用交换机反送到队列,可以将错误进行人工处理等方法
需要考虑业务幂等性
在业务中进行判断
可以在业务中进行业务幂等的判断,比如用户下单时,已经支付了,进行通知mq将数据库中改为已支付,就可以判断订单是否为 未支付 ,只要是未支付,就说明数据库没被修改,可以进行修改,但是如果之前已经收到消息将状态已经改为已支付了,就无需做修改数据库的操作了
或者使用一条sql语句也可以实现,和上面可以实现同样效果
update order set status = 2 where id = ? AND status = 1
使用延迟消息
比如订单状态需要给用户30分钟的支付时间,等三十分钟后在进行发送mq消息,进行执行更新状态语句,如果支付了就给更新,未支付就不更新
使用死信交换机实现消息延时
实际上就是将原交换机设置死信时间,在将原交换机的死信后消息发送至死信交换机中,在接受死信交换机的内容,达到接受延时消息的方法
流程
当队列消息没有consumer接受时,将会被传到死信交换机中,配置一个交换机和队列,将其队列绑定至死信交换机中
延时消息插件
下载消息延时插件
1.打开官网下载插件
https://www.rabbitmq.com/community-plugins.html
2.下载延时插件
需要指定版本
下载到plugins文件目录中,这个文件可以自行找你下载的rabbitmq的文件中
我这里已docker启动的rabbitmq为例
docker run -d --hostname my-rabbit -v /mq/plugins:/plugins --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
- 如果用的是这个命令安装,那么文件目录已经挂载了,可以直接到宿主机的
/mq/plugins
目录下使用
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
- 如果启动docker的时候没有进行挂载目录也可以直接下载到宿主机中,使用docker cp复制到plugins目录下也可以
[root@localhost ~]# docker cp rabbitmq_delayed_message_exchange-3.9.0.ez 2a:/plugins
Successfully copied 38.4kB to 2a:/opt/rabbitmq/plugins
进行下载,因为目录已经挂载了,所以只需要进入到容器中进行配置一下就可以了
docker exec -it 2a49c8420635 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
[root@localhost ~]# docker exec -it 2a49c8420635 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@my-rabbit:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_prometheus
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@my-rabbit...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
# 重启docker容器即可
[root@localhost ~]# docker restart 2a49c8420635
2a49c8420635
现在去MQ可视化后台管理添加交换机,发现多了一个选项,则配置成功
java代码实现
//测试类模拟发送方
@Test
void delayMessage() {
rabbitTemplate.convertAndSend("delay.derect","hi","hello",new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000);
return message;
}
});
log.info("消息发送成功");
}
//模拟接受方
//声明延迟交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", declare = "true"),
exchange = @Exchange(name = "delay.derect", delayed = "true"),
key = "hi" ))
public void lenstener(String msg) {
log.info("消息接受成功"+msg);
}
可以发现正好相差10秒后收到消息
取消超时订单
延时消息只适用于延时消息交短的业务,若延时时间较长比如几个小时以上可能会造成消息对接会影响cpu
以上只是示例,可以在使用的时候把代码适当封装一下,提高使用便捷度
例如
@Test
void delayMessage() {
rabbitTemplate.convertAndSend("delay.derect","hi","hello",new DelayMessageProcessor(1000));
log.info("消息发送成功");
}
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {
private final int delay;
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(delay);
return message;
}
}
版权属于:戏人看戏博客网
本文链接:https://day.nb.sb/archives/1403.html
若无注明均为戏人看戏原创,转载请注明出处,感谢您的支持!