NipGeihou's blog NipGeihou's blog
  • 开发规范
  • 进阶笔记
  • 微服务
  • 快速开始
  • 设计模式
  • NoSQL 数据库

    • Redis
  • 数据结构与算法
  • 计算机网络
烹饪
关于
  • 分类
  • 标签
  • 归档

NipGeihou

我见青山多妩媚,料青山见我应如是
  • 开发规范
  • 进阶笔记
  • 微服务
  • 快速开始
  • 设计模式
  • NoSQL 数据库

    • Redis
  • 数据结构与算法
  • 计算机网络
烹饪
关于
  • 分类
  • 标签
  • 归档
  • 设计模式

  • 开发规范

  • 经验分享

  • 记录

  • 快速开始

    • Spring Boot整合RabbitMQ
      • 前言
      • 依赖
        • pom.xml
      • 配置
        • application.yml
        • RabbitMqConfig.java
      • 生产者
        • RabbitMqTest.java
      • 消费者
      • 参考
    • Spring Boot整合Email邮件发送
    • Spring Boot整合jasypt加密配置文件
    • Spring Boot整合单元测试
    • Spring Boot整合优雅关机
    • Spring Boot整合Redis分布式锁
    • Spring Boot整合MyBatis-plus
    • XXL-JOB快速上手
    • Spring Boot整合WebSocket(stomp协议)
    • SpringBoot整合i18n(多语言)
    • 第三方登录 - Google
    • 第三方登录 - Facebook
  • 笔记

  • 面试题

  • 微服务

  • Java
  • 快速开始
NipGeihou
2022-01-12
目录

Spring Boot整合RabbitMQ

RabbitMQ 相关概念

# 前言

在一些基础教程中,常常把生产者和消费者写在一起,在实际开发中与其他服务交互时,这两者多为分开的,因此在整合 RabbitMQ 时,应区分哪些配置是消费者那些是生产者的。

# 依赖

# pom.xml

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

笔记

无需定义 version, org.springframework.boot 包版本均由 spring-boot-starter-parent 里的 spring-boot-dependencies 维护。

# 配置

# application.yml

更多配置信息查看:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#messaging.amqp (opens new window)

spring:
  # 【消费者、生产者共用】配置rabbitMq 服务器
  rabbitmq:
    host: mymq.xxxxx.com
    port: 5672
    username: admin
    password: xxxxxxx
    # 虚拟host 可以不设置,使用server默认host(/)
    virtual-host: xxxxxx
    # 手动提交消息
    listener:
      simple:
        # 当监听的队列不存在时的行为:true,监听的队列必须存在;false,监听的队列可以不存在
        missing-queues-fatal: false
        # 消息确认模式:none,不确认;auto,自动确认;manual,手动确认
        acknowledge-mode: manual
        # 消费者并发数
        concurrency: 3
        # 消费者最大并发数
        max-concurrency: 10
      direct:
        # 当监听的队列不存在时的行为:true,监听的队列必须存在;false,监听的队列可以不存在
        missing-queues-fatal: false
        # 消息确认模式:none,不确认;auto,自动确认;manual,手动确认
        acknowledge-mode: manual

消费者(Consumer)设置自动重连

在实际开发中发现,会发现默认配置下,RabbitMQ 重启后,Consumer 丢失,需重启 Consumer 服务。

解决办法: missing-queues-fatal: false

# RabbitMqConfig.java

  • 注入消息队列:用于在 mq 中间件中注册队列
package com.xxx.common.config;

import com.xxx.common.constant.QueueNameConsts;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @author NipGeihou
 * @create 2022-01-14 14:33
 */
@Configuration
public class RabbitMqConfig {
    
    
	@Bean
	public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
		connectionFactory.setPublisherConfirms(true);
        // 
		connectionFactory.setPublisherReturns(true);
		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
		rabbitTemplate.setMandatory(true);
		rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
		rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
		return rabbitTemplate;
	}

    /**
     * 注入此对象的目的是为了在rabbitmq中注册这个队列,如果没有注入,只能在已有队列或手动在mq的web页面创建
     *
     * @return
     */
    @Bean
    public Queue queue() {
        // queue的name可以存储在配置文件、常量、局部变量等
        return new Queue(QueueNameConsts.CARD_ACCOUNT_ORDER_PAID);
    }
}

# 生产者

# RabbitMqTest.java

package com.xxx.test;

import com.alibaba.fastjson.JSON;
import com.xxx.BaseTest;
import com.xxx.mall.domain.MallProductOrder;
import com.xxx.mall.service.IMallProductOrderService;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;


/**
 * @author NipGeihou
 * @create 2021-12-10 16:38
 */
public class RabbitMqTest extends BaseTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void queueTest() {
        MallProductOrder productOrder = productOrderService.getById("1469258256359624707");
        rabbitTemplate.convertAndSend("paid_orders", JSON.toJSONString(productOrder));
    }

}

# 消费者

@Component
@RequiredArgsConstructor
@Slf4j
public class WebsocketListener {

    // private final

    /**
     * 消息队列:处理用户消息
     *
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queuesToDeclare = @Queue(name = MqConstant.QUEUE_WEBSOCKET_FISH_USER_MESSAGE, durable = "true"))
    public void handleFishUserMessage(Message message, Channel channel) throws IOException {
        try {
            WebsocketMessage websocketMessage = JSON.parseObject(message.getBody(), WebsocketMessage.class);

            String idStr = new String(message.getBody());
            log.info("收到消息:{}", idStr);
        } catch (Exception e) {
            log.error("处理用户消息异常:{}", e.getMessage());
        } finally {
            // 确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

}

# 参考

  • Getting Started | Messaging with RabbitMQ (opens new window)
  • 集成 RabbitMQ - 廖雪峰的官方网站 (opens new window)
#RabbitMQ#Spring Boot
上次更新: 2023/06/10, 18:45:20
maven报错:'parent.relativePath' points at no local
Spring Boot整合Email邮件发送

← maven报错:'parent.relativePath' points at no local Spring Boot整合Email邮件发送→

最近更新
01
概念
06-10
02
第三方登录 - Facebook
06-09
03
防火墙 - iptables
06-08
更多文章>
Theme by Vdoing | Copyright © 2018-2023 NipGeihou | 友情链接
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式