SpringBoot RabbitMQ开发流程

技术分享
204 0

使用生产者/消费者确认机制

生产者确认机制,确保消息可以成功发送到队列

spring:
  rabbitmq:
    host: 192.168.134.171 # rabbitMQ的ip地址
    port: 5673 # 端口,默认是5672这里我用的docker映射的5673
    username: hmall
    password: 123
    virtual-host: /hmall
    connection-timeout: 1s #设置mq超时时间
    template:
      retry:
        initial-interval: 1000ms #失败后的初始等待时间
        multiplier: 1 #下载失败hou等待时长倍数
        max-attempts: 3 #最大重试次数
        enabled: true # 开启重试机制
    #下面配置用于在消息发送者中使用用于监听回调信息
    #publisher-confirm-type: correlated #开启publisher-confirm 并设置类型为correlated || 一般不用会降低性能
    #他有三种类型
    #none 关闭  / simple 同步阻塞 / correlated mq异步方式返回回执消息
    publisher-returns: true #开启这个publisher-returns机制,一般用于返回 路由是否存在的,如果不存在是开发者自己导致的一般无需开启

消费者确认机制,确保消息可以成功执行

spring:
  rabbitmq:
    host: 192.168.134.170 # rabbitMQ的ip地址
    port: 5673 # 端口
    username: hmall
    password: 123
    virtual-host: /hmall
    listener:
      simple:
        prefetch: 1
        #这里就是使用预取一次只能取一个,提高性能,让性能低的需要处理完才能接受下一个,不影响其他的使用
        acknowledge-mode: auto #确认机制
        #如果确认机制使用none的时候,等监听到这个消息后就算后面发生异常了也会把消息删除了,如果使用auto则是如果遇到异常自动再次从新发送消息,如果一直异常可能会一直抛出异常
        retry:
          enabled: true #开启重试机制

    connection-timeout: 1s #设置mq超时时间
    template:
      retry:
        initial-interval: 1000ms #失败后的初始等待时间
        multiplier: 1 #下载失败hou等待时长倍数
        max-attempts: 3 #最大重试次数
        enabled: true # 开启重试机制
        #stateless: true #true 无状态 false有状态。如果业务中包含事务,这里改为false

需要使用消息重试【加入幂等判断防止消息重复消费】

如果在并发情况下可能会多次更新数据状态,就需要保证业务幂等性,就比如多次执行这个sql操作不会影响结果,比如多次删除购物车

需要使用消费失败处理

在消息重试几次后业务消息还是执行报错失败时,可以将失败消息发送到error.derect这个交换机中,在用交换机反送到队列,可以将错误进行人工处理等方法

需要考虑业务幂等性

2023-11-28T14:29:36.png

2023-11-28T14:29:45.png

在业务中进行判断

可以在业务中进行业务幂等的判断,比如用户下单时,已经支付了,进行通知mq将数据库中改为已支付,就可以判断订单是否为 未支付 ,只要是未支付,就说明数据库没被修改,可以进行修改,但是如果之前已经收到消息将状态已经改为已支付了,就无需做修改数据库的操作了

2023-11-28T14:31:00.png

或者使用一条sql语句也可以实现,和上面可以实现同样效果

update order set status = 2 where id = ? AND status = 1

2023-11-28T14:31:14.png

使用延迟消息

比如订单状态需要给用户30分钟的支付时间,等三十分钟后在进行发送mq消息,进行执行更新状态语句,如果支付了就给更新,未支付就不更新

2023-11-28T14:31:28.png

使用死信交换机实现消息延时

实际上就是将原交换机设置死信时间,在将原交换机的死信后消息发送至死信交换机中,在接受死信交换机的内容,达到接受延时消息的方法

2023-11-28T14:31:43.png

流程

2023-11-28T14:32:06.png

当队列消息没有consumer接受时,将会被传到死信交换机中,配置一个交换机和队列,将其队列绑定至死信交换机中

2023-11-28T14:32:53.png

延时消息插件

2023-11-28T14:33:05.png

下载消息延时插件

1.打开官网下载插件

https://www.rabbitmq.com/community-plugins.html

2023-11-28T14:33:17.png

2.下载延时插件

需要指定版本

2023-11-28T14:33:27.png

下载到plugins文件目录中,这个文件可以自行找你下载的rabbitmq的文件中

我这里已docker启动的rabbitmq为例

docker run -d --hostname my-rabbit -v /mq/plugins:/plugins --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
  • 如果用的是这个命令安装,那么文件目录已经挂载了,可以直接到宿主机的 /mq/plugins目录下使用
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
  • 如果启动docker的时候没有进行挂载目录也可以直接下载到宿主机中,使用docker cp复制到plugins目录下也可以
[root@localhost ~]# docker cp rabbitmq_delayed_message_exchange-3.9.0.ez 2a:/plugins
Successfully copied 38.4kB to 2a:/opt/rabbitmq/plugins

进行下载,因为目录已经挂载了,所以只需要进入到容器中进行配置一下就可以了

docker exec -it 2a49c8420635 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
[root@localhost ~]# docker exec -it 2a49c8420635 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Enabling plugins on node rabbit@my-rabbit:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_prometheus
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@my-rabbit...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange
started 1 plugins.
# 重启docker容器即可
[root@localhost ~]# docker restart 2a49c8420635
2a49c8420635

现在去MQ可视化后台管理添加交换机,发现多了一个选项,则配置成功

2023-11-28T14:33:42.png

java代码实现

//测试类模拟发送方 
@Test
    void delayMessage() {
        rabbitTemplate.convertAndSend("delay.derect","hi","hello",new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(10000);
                return message;
            }
        });
        log.info("消息发送成功");
    }
//模拟接受方
//声明延迟交换机
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", declare = "true"),
        exchange = @Exchange(name = "delay.derect", delayed = "true"),
        key = "hi" ))
public void lenstener(String msg) {
    log.info("消息接受成功"+msg);
}

可以发现正好相差10秒后收到消息

2023-11-28T14:33:54.png

2023-11-28T14:34:22.png

取消超时订单

延时消息只适用于延时消息交短的业务,若延时时间较长比如几个小时以上可能会造成消息对接会影响cpu

2023-11-28T14:34:36.png

以上只是示例,可以在使用的时候把代码适当封装一下,提高使用便捷度

例如

@Test
void delayMessage() {
    rabbitTemplate.convertAndSend("delay.derect","hi","hello",new DelayMessageProcessor(1000));
    log.info("消息发送成功");
}
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {
    private final int delay;
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setDelay(delay);
        return message;
    }
}
最后更新 2023-11-28
评论 ( 0 )
OωO
隐私评论