Spring Boot整合RabbitMQ
# 前言
在一些基础教程中,常常把生产者和消费者写在一起,在实际开发中与其他服务交互时,这两者多为分开的,因此在整合 RabbitMQ 时,应区分哪些配置是消费者那些是生产者的。
# 依赖
# pom.xml
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
笔记
无需定义 version, org.springframework.boot
包版本均由 spring-boot-starter-parent
里的 spring-boot-dependencies
维护。
# 配置
# application.yml
spring:
# 【消费者、生产者共用】配置rabbitMq 服务器
rabbitmq:
host: mymq.xxxxx.com
port: 5672
username: admin
password: xxxxxxx
# 虚拟host 可以不设置,使用server默认host(/)
virtual-host: xxxxxx
# 手动提交消息
listener:
simple:
# 当监听的队列不存在时的行为:true,监听的队列必须存在;false,监听的队列可以不存在
missing-queues-fatal: false
# 消息确认模式:none,不确认;auto,自动确认;manual,手动确认
acknowledge-mode: manual
# 消费者并发数
concurrency: 3
# 消费者最大并发数
max-concurrency: 10
direct:
# 当监听的队列不存在时的行为:true,监听的队列必须存在;false,监听的队列可以不存在
missing-queues-fatal: false
# 消息确认模式:none,不确认;auto,自动确认;manual,手动确认
acknowledge-mode: manual
消费者(Consumer)设置自动重连
在实际开发中发现,会发现默认配置下,RabbitMQ 重启后,Consumer 丢失,需重启 Consumer 服务。
解决办法: missing-queues-fatal: false
# RabbitMqConfig.java
- 注入消息队列:用于在 mq 中间件中注册队列
package com.xxx.common.config;
import com.xxx.common.constant.QueueNameConsts;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author NipGeihou
* @create 2022-01-14 14:33
*/
@Configuration
public class RabbitMqConfig {
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherConfirms(true);
//
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
return rabbitTemplate;
}
/**
* 注入此对象的目的是为了在rabbitmq中注册这个队列,如果没有注入,只能在已有队列或手动在mq的web页面创建
*
* @return
*/
@Bean
public Queue queue() {
// queue的name可以存储在配置文件、常量、局部变量等
return new Queue(QueueNameConsts.CARD_ACCOUNT_ORDER_PAID);
}
}
# 生产者
# RabbitMqTest.java
package com.xxx.test;
import com.alibaba.fastjson.JSON;
import com.xxx.BaseTest;
import com.xxx.mall.domain.MallProductOrder;
import com.xxx.mall.service.IMallProductOrderService;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author NipGeihou
* @create 2021-12-10 16:38
*/
public class RabbitMqTest extends BaseTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void queueTest() {
MallProductOrder productOrder = productOrderService.getById("1469258256359624707");
rabbitTemplate.convertAndSend("paid_orders", JSON.toJSONString(productOrder));
}
}
# 消费者
@Component
@RequiredArgsConstructor
@Slf4j
public class WebsocketListener {
// private final
/**
* 消息队列:处理用户消息
*
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queuesToDeclare = @Queue(name = MqConstant.QUEUE_WEBSOCKET_FISH_USER_MESSAGE, durable = "true"))
public void handleFishUserMessage(Message message, Channel channel) throws IOException {
try {
WebsocketMessage websocketMessage = JSON.parseObject(message.getBody(), WebsocketMessage.class);
String idStr = new String(message.getBody());
log.info("收到消息:{}", idStr);
} catch (Exception e) {
log.error("处理用户消息异常:{}", e.getMessage());
} finally {
// 确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
# 参考
上次更新: 2023/06/10, 18:45:20