Spring Boot整合RabbitMQ
# 前言
在一些基础教程中,常常把生产者和消费者写在一起,在实际开发中与其他服务交互时,这两者多为分开的,因此在整合 RabbitMQ 时,应区分哪些配置是消费者那些是生产者的。
# 依赖
# pom.xml
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
笔记
无需定义 version, org.springframework.boot
包版本均由 spring-boot-starter-parent
里的 spring-boot-dependencies
维护。
# 配置
# application.yml
spring:
# 【消费者、生产者共用】配置rabbitMq 服务器
rabbitmq:
host: mymq.xxxxx.com
port: 5672
username: admin
password: xxxxxxx
# 虚拟host 可以不设置,使用server默认host(/)
virtual-host: xxxxxx
# 手动提交消息
listener:
simple:
# 当监听的队列不存在时的行为:true,监听的队列必须存在;false,监听的队列可以不存在
missing-queues-fatal: false
# 消息确认模式:none,不确认;auto,自动确认;manual,手动确认
acknowledge-mode: manual
# 消费者并发数
concurrency: 3
# 消费者最大并发数
max-concurrency: 10
direct:
# 当监听的队列不存在时的行为:true,监听的队列必须存在;false,监听的队列可以不存在
missing-queues-fatal: false
# 消息确认模式:none,不确认;auto,自动确认;manual,手动确认
acknowledge-mode: manual
# 生产者:确认类型,correlated,关联确认(更可靠);simple,简单确认
publisher-confirm-type: correlated
# 生产者:开启队列应答
publisher-returns: true
消费者(Consumer)设置自动重连
在实际开发中发现,会发现默认配置下,RabbitMQ 重启后,Consumer 丢失,需重启 Consumer 服务。
解决办法: missing-queues-fatal: false
# RabbitMqConfig.java
- 注入消息队列:用于在 mq 中间件中注册队列
package com.xxx.common.config;
import com.xxx.common.constant.QueueNameConsts;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author NipGeihou
* @create 2022-01-14 14:33
*/
@Slf4j
@Configuration
public class RabbitMqConfig {
/**
* 生产者配置
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
// 开启发布确认模式
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
// 开启消息返回模式
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 发送消息时设置强制标志;仅当提供了 returnCallback 时才适用
rabbitTemplate.setMandatory(true);
// 确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
// 返回回调
rabbitTemplate.setReturnsCallback(returned -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode(), returned.getReplyText(), returned.getMessage()));
// 消息转换:JSON
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
/**
* 用于消费者将消息转换JSON为对象
* @param objectMapper
* @return
*/
@Bean
public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
/**
* 注入此对象的目的是为了在rabbitmq中注册这个队列,如果没有注入,只能在已有队列或手动在mq的web页面创建
*
* @return
*/
@Bean
public Queue queue() {
// queue的name可以存储在配置文件、常量、局部变量等
return new Queue(QueueNameConsts.CARD_ACCOUNT_ORDER_PAID);
}
}
confirm和return的区别和联系
- confirmCallBack:消息从生产者到达
exchange
时返回ack
,消息未到达exchange
返回nack
; - returnCallBack:消息进入
exchange
但未进入queue
时会被调用。
# 生产者
# RabbitMqTest.java
package com.xxx.test;
import com.alibaba.fastjson.JSON;
import com.xxx.BaseTest;
import com.xxx.mall.domain.MallProductOrder;
import com.xxx.mall.service.IMallProductOrderService;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author NipGeihou
* @create 2021-12-10 16:38
*/
public class RabbitMqTest extends BaseTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void queueTest() {
MallProductOrder productOrder = productOrderService.getById("1469258256359624707");
rabbitTemplate.convertAndSend("paid_orders", JSON.toJSONString(productOrder));
}
}
# 消费者
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
@Component
@RequiredArgsConstructor
@Slf4j
public class WebsocketListener {
// private final
/**
* 消息队列:处理用户消息
*
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queuesToDeclare = @Queue(name = MqConstant.QUEUE_WEBSOCKET_FISH_USER_MESSAGE, durable = "true"))
public void handleFishUserMessage(String content, Message message, Channel channel) throws IOException {
try {
WebsocketMessage websocketMessage = JSON.parseObject(message.getBody(), WebsocketMessage.class);
String idStr = new String(message.getBody());
log.info("收到消息:{}", idStr);
} catch (Exception e) {
log.error("处理用户消息异常:{}", e.getMessage());
} finally {
// 确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
// 直接获得生产者的SandpayNotifyReq req,无需toObject
@RabbitListener(queuesToDeclare = @Queue(name = MqConstant.QUEUE_SANDPAY_C2B_ORDER_NOTIFY, durable = "true"))
public void handleSandpayC2BOrder(SandpayNotifyReq req, Message message, Channel channel) throws IOException {}
}
# 手动确认
开启手动确认
// 方式一:在配置队列监听时配置ackMode = "MANUAL"
@RabbitListener(queues = "topic_queue",ackMode = "MANUAL")
// 方式二: 配置文件application.yml
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #手动应答
手动确认代码
@RabbitListener(queues = "topic_queue",ackMode = "MANUAL")
public void printMessage(Message message, Channel channel) throws IOException {
// 消息标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1. 接收消息
System.out.println(new String(message.getBody()));
// 2.处理业务
//模拟异常
int i = 2 / 0;
// 3.手动签收
/**
* 参数说明:
* deliveryTag:消息唯一标识
* multiple:boolean类型 是否应用多消息 为true时,会同时签收之前的多个消息
*/
channel.basicAck(deliveryTag, true);
}catch (Exception e){
// 发生异常重回队列
/**
* 参数说明:
* deliveryTag:
* multiple:
* requeue:是否重回队列
*/
channel.basicNack(deliveryTag,true,true);
}
}
# 工具类
import cn.hutool.extra.spring.SpringUtil;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* @author NipGeihou
* @date 2023-10-13
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class RabbitMqUtil {
private static final RabbitTemplate RABBIT_TEMPLATE = SpringUtil.getBean(RabbitTemplate.class);
/**
* 发送消息
*
* @param routingKey
* @param object
*/
public static void send(String routingKey, final Object object) {
RABBIT_TEMPLATE.convertAndSend(routingKey, object);
}
/**
* 发送消息
*
* @param exchange
* @param routingKey
* @param data
*/
public static void send(String exchange, String routingKey, Object data) {
RABBIT_TEMPLATE.convertAndSend(exchange, routingKey, data);
}
/**
* 发送延迟消息
*
* @param exchange
* @param routingKey
* @param data
* @param delayMs
*/
public static void send(String exchange, String routingKey, Object data, long delayMs) {
if (delayMs <= 0) {
send(exchange, routingKey, data);
return;
}
RABBIT_TEMPLATE.convertAndSend(exchange, routingKey, data,
message -> {
// 设置延迟时间
message.getMessageProperties().setHeader("x-delay", delayMs);
return message;
});
}
}
# 问题
在实践中 IDEA 的 debug 模式下,在生产者和消费者都打了断点的情况下,会出现消费者与生产者逻辑在同一线程下执行,也就是说消费者的代码已经执行完,断点在生产者中阻塞,期望下消费者应结束线程,而实际上会一直阻塞知道消费者代码执行完。
在 https://github.com/spring-projects/spring-amqp/issues/574 (opens new window) 中可知,这并不是 Rabbit 的设计,也许是 debug 的原因,也许是
I guess it will block if the broker can't keep up and applies flow control to the connection.
的原因。
# 参考
上次更新: 2023/12/09, 10:33:55