RabbitMQ - 交换机
# 概念
- Exchange 是 RabbitMQ 的消息路由代理。
- 生产者向 RabbitMQ 发送消息时,不会直接将消息发送到 Queue,而是先将消息发送到 Exchange,由 Exchange 将消息路由到一个或多个 Queue。
- Exchange 根据 Binding Key、Routing Key 以及 Headers 属性路由消息。
不同的类型的 Exchange 主要区别为对 Queue 传递消息的不同,如:
direct
:生产者消息 key 必须与队列的 key完全匹配,才能投递。topic
:生产者消息 key 与队列表达式 key 匹配,即可投递。fanout
:生产者消息会投递到每一个绑定的 Queue。headers
:是通过生产者消息header 精准或模糊匹配投递。
# direct 精准匹配 key
# 绑定
/**
* 绑定
* @param exchangeName 交换机名称
* @param queueName 消息队列名称
* @param routingKey 完全匹配的key,如email
*/
function bind(exchangeName, queueName, routingKey);
# 发送消息
/**
* 精准匹配key
* @param exchangeName 交换机名称
* @param routingKey 完全匹配的key
* @param payload 消息体
* @param headers
* @param properties
*/
function direct(exchangeName, routingKey, payload, headers, properties);
监听了对应 key 的 queue 会收到消息。
笔记
- 当 exchangeName 传递的是一个空串时,则会使用默认的消息队列,在官方文档(参考资料 2)中默认的 Direct exchange 应为
amq.direct
,但在实践中发现为(AMQP default)
。 - 当使用默认交换机时,「创建的每个队列都自动绑定到一个与队列名称相同的 routingKey」
# topic 模糊匹配 key
#
:代表匹配一个多或多个、或者一个也匹配不到,支持多级
*
:代表必须匹配一个,且只能是一级,即如果 binding key 为 *.sms.*
,则只有发送消息时指定 routing key 为类似 a.sms.b
等的消息才会被投递到该队列,如果 routing key 为 sms.b
或者 a.sms
或者 a.b.sms
,则都不会被投递到该队列。
# 绑定
/**
* 绑定
* @param exchangeName 交换机名称
* @param queueName 消息队列名称
* @param routingKey 模糊的key,如*.sms.*
*/
function bind(exchangeName, queueName, routingKey);
# 发送消息
/**
* 模糊匹配key
* @param exchangeName 交换机名称
* @param routingKey key,如a.sms.b
* @param payload 消息体
* @param headers
* @param properties
*/
function topic(exchangeName, routingKey, payload, headers, properties);
# fanout 广播
# 绑定
/**
* 绑定
* @param exchangeName 交换机名称
* @param queueName 消息队列名称
*/
function bind(exchangeName, queueName)
# 发送消息
/**
* 广播
* @param exchangeName 交换机名称
* @param payload 消息体
* @param headers
* @param properties
*/
function fanout(exchangeName, payload, headers, properties);
只要是绑定了 fanout 类型交换机的 queue 都可以收到消息。
# headers 模糊匹配 header
# x-delayed-message 延迟队列
第三方插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
声明该类 Exchange,并自定义消息的 Header 属性 x-delay
来指定消息延时投递的时间段,单位为毫秒。消息将在 x-delay
定义的时间段后被投递到对应的 Queue
# 声明
@Bean
public CustomExchange xDelayExchange() {
// 交换机名称
String exchangeName = "x_delay_exchange";
Map<String, Object> args = new HashMap<String, Object>(1);
// 这里使用直连方式的路由,如果想使用不同的路由行为,可以修改,如 topic
args.put("x-delayed-type", "direct");
return new CustomExchange(exchangeName, "x-delayed-message", true, false, args);
}
# 参考资料
上次更新: 2024/03/11, 22:37:05