NipGeihou's blog NipGeihou's blog
  • Java

    • 开发规范
    • 进阶笔记
    • 微服务
    • 快速开始
    • 设计模式
  • 其他

    • Golang
    • Python
    • Drat
  • Redis
  • MongoDB
  • 数据结构与算法
  • 计算机网络
  • 应用

    • Grafana
    • Prometheus
  • 容器与编排

    • KubeSphere
    • Kubernetes
    • Docker Compose
    • Docker
  • 组网

    • TailScale
    • WireGuard
  • 密码生成器
  • 英文单词生成器
🍳烹饪
🧑‍💻关于
  • 分类
  • 标签
  • 归档

NipGeihou

我见青山多妩媚,料青山见我应如是
  • Java

    • 开发规范
    • 进阶笔记
    • 微服务
    • 快速开始
    • 设计模式
  • 其他

    • Golang
    • Python
    • Drat
  • Redis
  • MongoDB
  • 数据结构与算法
  • 计算机网络
  • 应用

    • Grafana
    • Prometheus
  • 容器与编排

    • KubeSphere
    • Kubernetes
    • Docker Compose
    • Docker
  • 组网

    • TailScale
    • WireGuard
  • 密码生成器
  • 英文单词生成器
🍳烹饪
🧑‍💻关于
  • 分类
  • 标签
  • 归档
  • 设计模式

  • 开发规范

  • 经验分享

  • 记录

  • 快速开始

    • Spring Boot整合RabbitMQ
      • 前言
      • 依赖
        • pom.xml
      • 配置
        • application.yml
        • RabbitMqConfig.java
      • 生产者
        • RabbitMqTest.java
      • 消费者
        • 手动确认
      • 工具类
      • 问题
      • 参考
    • Spring Boot整合Email邮件发送
    • Spring Boot整合jasypt加密配置文件
    • Spring Boot整合单元测试
    • Spring Boot整合优雅关机
    • Spring Boot整合Redis分布式锁
    • Spring Boot整合MyBatis-plus
    • XXL-JOB快速上手
    • Spring Boot整合WebSocket(stomp协议)
    • SpringBoot整合i18n(多语言)
    • 第三方登录 - Google
    • 第三方登录 - Facebook
    • Spring Boot 整合Elasticsearch
  • 笔记

  • 面试题

  • 微服务

  • 踩过的坑

  • Java
  • 快速开始
NipGeihou
2022-01-12
目录

Spring Boot整合RabbitMQ

RabbitMQ 相关概念

# 前言

在一些基础教程中,常常把生产者和消费者写在一起,在实际开发中与其他服务交互时,这两者多为分开的,因此在整合 RabbitMQ 时,应区分哪些配置是消费者那些是生产者的。

# 依赖

# pom.xml

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

笔记

无需定义 version, org.springframework.boot 包版本均由 spring-boot-starter-parent 里的 spring-boot-dependencies 维护。

# 配置

# application.yml

更多配置信息查看:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#messaging.amqp (opens new window)

spring:
  # 【消费者、生产者共用】配置rabbitMq 服务器
  rabbitmq:
    host: mymq.xxxxx.com
    port: 5672
    username: admin
    password: xxxxxxx
    # 虚拟host 可以不设置,使用server默认host(/)
    virtual-host: xxxxxx
    # 手动提交消息
    listener:
      simple:
        # 当监听的队列不存在时的行为:true,监听的队列必须存在;false,监听的队列可以不存在
        missing-queues-fatal: false
        # 消息确认模式:none,不确认;auto,自动确认;manual,手动确认
        acknowledge-mode: manual
        # 消费者并发数
        concurrency: 3
        # 消费者最大并发数
        max-concurrency: 10
      direct:
        # 当监听的队列不存在时的行为:true,监听的队列必须存在;false,监听的队列可以不存在
        missing-queues-fatal: false
        # 消息确认模式:none,不确认;auto,自动确认;manual,手动确认
        acknowledge-mode: manual
    # 生产者:确认类型,correlated,关联确认(更可靠);simple,简单确认
    publisher-confirm-type: correlated
    # 生产者:开启队列应答
    publisher-returns: true

消费者(Consumer)设置自动重连

在实际开发中发现,会发现默认配置下,RabbitMQ 重启后,Consumer 丢失,需重启 Consumer 服务。

解决办法: missing-queues-fatal: false

# RabbitMqConfig.java

  • 注入消息队列:用于在 mq 中间件中注册队列
package com.xxx.common.config;

import com.xxx.common.constant.QueueNameConsts;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @author NipGeihou
 * @create 2022-01-14 14:33
 */
@Slf4j
@Configuration
public class RabbitMqConfig {
    
    /**
     * 生产者配置
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        // 开启发布确认模式
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        // 开启消息返回模式
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 发送消息时设置强制标志;仅当提供了 returnCallback 时才适用
        rabbitTemplate.setMandatory(true);
        // 确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
        // 返回回调
        rabbitTemplate.setReturnsCallback(returned -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode(), returned.getReplyText(), returned.getMessage()));
        // 消息转换:JSON
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    /**
     * 用于消费者将消息转换JSON为对象
     * @param objectMapper
     * @return
     */
    @Bean
    public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }
    
    /**
     * 注入此对象的目的是为了在rabbitmq中注册这个队列,如果没有注入,只能在已有队列或手动在mq的web页面创建
     *
     * @return
     */
    @Bean
    public Queue queue() {
        // queue的name可以存储在配置文件、常量、局部变量等
        return new Queue(QueueNameConsts.CARD_ACCOUNT_ORDER_PAID);
    }
}

confirm和return的区别和联系

  • confirmCallBack:消息从生产者到达 exchange 时返回 ack ,消息未到达 exchange 返回 nack ;
  • returnCallBack:消息进入 exchange 但未进入 queue 时会被调用。

# 生产者

# RabbitMqTest.java

package com.xxx.test;

import com.alibaba.fastjson.JSON;
import com.xxx.BaseTest;
import com.xxx.mall.domain.MallProductOrder;
import com.xxx.mall.service.IMallProductOrderService;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;


/**
 * @author NipGeihou
 * @create 2021-12-10 16:38
 */
public class RabbitMqTest extends BaseTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void queueTest() {
        MallProductOrder productOrder = productOrderService.getById("1469258256359624707");
        rabbitTemplate.convertAndSend("paid_orders", JSON.toJSONString(productOrder));
    }

}

# 消费者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;

@Component
@RequiredArgsConstructor
@Slf4j
public class WebsocketListener {

    // private final

    /**
     * 消息队列:处理用户消息
     *
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queuesToDeclare = @Queue(name = MqConstant.QUEUE_WEBSOCKET_FISH_USER_MESSAGE, durable = "true"))
    public void handleFishUserMessage(String content, Message message, Channel channel) throws IOException {
        try {
            WebsocketMessage websocketMessage = JSON.parseObject(message.getBody(), WebsocketMessage.class);

            String idStr = new String(message.getBody());
            log.info("收到消息:{}", idStr);
        } catch (Exception e) {
            log.error("处理用户消息异常:{}", e.getMessage());
        } finally {
            // 确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    
    // 直接获得生产者的SandpayNotifyReq req,无需toObject
    @RabbitListener(queuesToDeclare = @Queue(name = MqConstant.QUEUE_SANDPAY_C2B_ORDER_NOTIFY, durable = "true"))
    public void handleSandpayC2BOrder(SandpayNotifyReq req, Message message, Channel channel) throws IOException {}

}

# 手动确认

开启手动确认

// 方式一:在配置队列监听时配置ackMode = "MANUAL"
@RabbitListener(queues = "topic_queue",ackMode = "MANUAL")

// 方式二: 配置文件application.yml
spring:
  rabbitmq:
	listener:
      simple:
        acknowledge-mode: manual #手动应答

手动确认代码

@RabbitListener(queues = "topic_queue",ackMode = "MANUAL")
public void printMessage(Message message, Channel channel) throws IOException {
    // 消息标识
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
        // 1. 接收消息
        System.out.println(new String(message.getBody()));
        // 2.处理业务
        //模拟异常
        int i = 2 / 0;
        // 3.手动签收
        /**
         * 参数说明:
         *  deliveryTag:消息唯一标识
         *  multiple:boolean类型 是否应用多消息 为true时,会同时签收之前的多个消息
         */
        channel.basicAck(deliveryTag, true);
    }catch (Exception e){
        // 发生异常重回队列
        /**
         * 参数说明:
         *  deliveryTag:
         *  multiple:
         *  requeue:是否重回队列
         */
        channel.basicNack(deliveryTag,true,true);
    }
}

# 工具类

import cn.hutool.extra.spring.SpringUtil;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * @author NipGeihou
 * @date 2023-10-13
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class RabbitMqUtil {

    private static final RabbitTemplate RABBIT_TEMPLATE = SpringUtil.getBean(RabbitTemplate.class);

    /**
     * 发送消息
     *
     * @param routingKey
     * @param object
     */
    public static void send(String routingKey, final Object object) {
        RABBIT_TEMPLATE.convertAndSend(routingKey, object);
    }

    /**
     * 发送消息
     *
     * @param exchange
     * @param routingKey
     * @param data
     */
    public static void send(String exchange, String routingKey, Object data) {
        RABBIT_TEMPLATE.convertAndSend(exchange, routingKey, data);
    }


    /**
     * 发送延迟消息
     *
     * @param exchange
     * @param routingKey
     * @param data
     * @param delayMs
     */
    public static void send(String exchange, String routingKey, Object data, long delayMs) {
        if (delayMs <= 0) {
            send(exchange, routingKey, data);
            return;
        }
        RABBIT_TEMPLATE.convertAndSend(exchange, routingKey, data,
                message -> {
                    // 设置延迟时间
                    message.getMessageProperties().setHeader("x-delay", delayMs);
                    return message;
                });
    }

}

# 问题

  1. 在实践中 IDEA 的 debug 模式下,在生产者和消费者都打了断点的情况下,会出现消费者与生产者逻辑在同一线程下执行,也就是说消费者的代码已经执行完,断点在生产者中阻塞,期望下消费者应结束线程,而实际上会一直阻塞知道消费者代码执行完。

    在 https://github.com/spring-projects/spring-amqp/issues/574 (opens new window) 中可知,这并不是 Rabbit 的设计,也许是 debug 的原因,也许是 I guess it will block if the broker can't keep up and applies flow control to the connection. 的原因。

# 参考

  • Getting Started | Messaging with RabbitMQ (opens new window)
  • 集成 RabbitMQ - 廖雪峰的官方网站 (opens new window)
#RabbitMQ#Spring Boot
上次更新: 2023/12/09, 10:33:55
RuoYi-Vue-Plus
Spring Boot整合Email邮件发送

← RuoYi-Vue-Plus Spring Boot整合Email邮件发送→

最近更新
01
iSCSI服务搭建
05-10
02
磁盘管理与文件系统
05-02
03
网络测试 - iperf3
05-02
更多文章>
Theme by Vdoing | Copyright © 2018-2025 NipGeihou | 友情链接
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式