消息队列实战

初始化消息队列

Channel 频道:理解为操作消息队列的 client(比如 jdbcClient、redisClient),提供了和消息队列 server 建立通信的传输方法(为了复用连接,提高传输效率)。程序通过 channel 操作 rabbitmq(收发消息)
创建消息队列参数:
queueName:消息队列名称(注意,同名称的消息队列,只能用同样的参数创建一次)
durabale:消息队列重启后,消息是否丢失
exclusive:是否只允许当前这个创建消息队列的连接操作消息队列
autoDelete:没有人用队列后,是否要删除队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 用于创建测试程序用到的交换机和队列(只用在程序启动前执行一次)
*/
@Slf4j
@Component
public class InitRabbitMqBean {

@Value("${spring.rabbitmq.host:localhost}")
private String host;

@PostConstruct
public void init() {
try {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
// 建立连接、创建频道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 创建交换机
String exchangeName = "code_exchange";
channel.exchangeDeclare(exchangeName, "direct");
// 创建队列
String queueName = "code_queue";
// 参数
// durable: 是否开启持久化。消息队列重启后,消息是否丢失
// exclusive: 是否允许当前这个创建消息队列的连接操作消息队列
// autoDelete: 没有人用队列后,是否要删除队列
channel.queueDeclare(queueName, true, false, false, null);
// 交换机与消息队列进行绑定
// routingKey: 控制消息要转发到哪个队列
channel.queueBind(queueName, exchangeName, "my_routingKey");
log.info("消息队列启动成功");
} catch (Exception e) {
log.error("消息队列启动失败", e);
}
}
}

生产者

1
2
3
public void sendMessage(String exchange, String routingKey, String message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}

rabbitTemplate 的 convertAndSend()方法可以给指定队列发送消息,函数有三个参数,第一个是交换机(exchange)的名字,第二个是路由键(routing-key)的名字,第三个则为消息的内容。

消费者

持久化

1)队列持久化
durable 参数设置为 true,服务器重启后队列不丢失:

1
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

2)消息持久化
指定 MessageProperties.PERSISTENT_TEXT_PLAIN 参数:

1
2
3
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));

控制单个消费者的处理任务积压数:

每个消费者最多同时处理 1 个任务

1
channel.basicQos(1);

消息确认机制:

为了保证消息成功被消费(快递成功被取走),rabbitmq 提供了消息确认机制,当消费者接收到消息后,比如要给一个反馈:

  • ack:消费成功
  • nack:消费失败
  • reject:拒绝

如果告诉 rabbitmq 服务器消费成功,服务器才会放心地移除消息。
支持配置 autoack,会自动执行 ack 命令,接收到消息立刻就成功了。

1
2
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
});

建议 autoack 改为 false,根据实际情况,去手动确认。
指定确认某条消息:

1
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), );

第二个参数 multiple(多个) 批量确认:是指是否要一次性确认所有的历史消息直到当前这条

指定拒绝某条消息:

1
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);

第 3 个参数表示是否重新入队,可用于重试

消息过期机制

官方文档:https://www.rabbitmq.com/ttl.html
可以给每条消息指定一个有效期,一段时间内未被消费者处理,就过期了。
示例场景:消费者(库存系统)挂了,一个订单 15 分钟还没被库存系统处理,这个订单其实已经失效了,哪怕库存系统再恢复,其实也不用扣减库存。
适用场景:清理过期数据、模拟延迟队列的实现(不开会员就慢速)、专门让某个程序处理过期请求
1)给队列中的所有消息指定过期时间

1
2
3
4
5
// 创建队列,指定消息过期参数
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 5000);
// args 指定参数
channel.queueDeclare(QUEUE_NAME, false, false, false, args);

2)给某条消息指定过期时间
语法:

1
2
3
4
5
// 给消息指定过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("1000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));

如果在过期时间内,还没有消费者取消息,消息才会过期。
注意,如果消息已经接收到,但是没确认,是不会过期的。

如果消息处于待消费状态并且过期时间到达后,消息将被标记为过期。但是,如果消息已经被消费者消费,并且正在处理过程中,即使过期时间到达,消息仍然会被正常处理。

交换机

一个生产者给 多个 队列发消息,1 个生产者对多个队列。
交换机的作用:提供消息转发功能,类似于网络路由器
要解决的问题:怎么把消息转发到不同的队列上,好让消费者从不同的队列消费。
绑定:交换机和队列关联起来,也可以叫路由,算是一个算法或转发策略
绑定代码:

1
channel.queueBind(queueName, EXCHANGE_NAME, "绑定规则");

消息队列实战
https://xmas-nnnut.github.io/2024/01/02/消息队列实战/
作者
Xmas-nnnut
发布于
2024年1月2日
许可协议