初始化消息队列
Channel 频道:理解为操作消息队列的 client(比如 jdbcClient、redisClient),提供了和消息队列 server 建立通信的传输方法(为了复用连接,提高传输效率)。程序通过 channel 操作 rabbitmq(收发消息)
创建消息队列参数:
queueName:消息队列名称(注意,同名称的消息队列,只能用同样的参数创建一次)
durabale:消息队列重启后,消息是否丢失
exclusive:是否只允许当前这个创建消息队列的连接操作消息队列
autoDelete:没有人用队列后,是否要删除队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
@Slf4j @Component public class InitRabbitMqBean {
@Value("${spring.rabbitmq.host:localhost}") private String host;
@PostConstruct public void init() { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "code_exchange"; channel.exchangeDeclare(exchangeName, "direct"); String queueName = "code_queue"; channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, "my_routingKey"); log.info("消息队列启动成功"); } catch (Exception e) { log.error("消息队列启动失败", e); } } }
|
生产者
1 2 3
| public void sendMessage(String exchange, String routingKey, String message) { rabbitTemplate.convertAndSend(exchange, routingKey, message); }
|
rabbitTemplate 的 convertAndSend()方法可以给指定队列发送消息,函数有三个参数,第一个是交换机(exchange)的名字,第二个是路由键(routing-key)的名字,第三个则为消息的内容。
消费者
持久化
1)队列持久化
durable 参数设置为 true,服务器重启后队列不丢失:
1
| channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
|
2)消息持久化
指定 MessageProperties.PERSISTENT_TEXT_PLAIN 参数:
1 2 3
| channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
|
控制单个消费者的处理任务积压数:
每个消费者最多同时处理 1 个任务
消息确认机制:
为了保证消息成功被消费(快递成功被取走),rabbitmq 提供了消息确认机制,当消费者接收到消息后,比如要给一个反馈:
- ack:消费成功
- nack:消费失败
- reject:拒绝
如果告诉 rabbitmq 服务器消费成功,服务器才会放心地移除消息。
支持配置 autoack,会自动执行 ack 命令,接收到消息立刻就成功了。
1 2
| channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
|
建议 autoack 改为 false,根据实际情况,去手动确认。
指定确认某条消息:
1
| channel.basicAck(delivery.getEnvelope().getDeliveryTag(), );
|
第二个参数 multiple(多个) 批量确认:是指是否要一次性确认所有的历史消息直到当前这条
指定拒绝某条消息:
1
| channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
|
第 3 个参数表示是否重新入队,可用于重试
消息过期机制
官方文档:https://www.rabbitmq.com/ttl.html
可以给每条消息指定一个有效期,一段时间内未被消费者处理,就过期了。
示例场景:消费者(库存系统)挂了,一个订单 15 分钟还没被库存系统处理,这个订单其实已经失效了,哪怕库存系统再恢复,其实也不用扣减库存。
适用场景:清理过期数据、模拟延迟队列的实现(不开会员就慢速)、专门让某个程序处理过期请求
1)给队列中的所有消息指定过期时间
1 2 3 4 5
| Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 5000);
channel.queueDeclare(QUEUE_NAME, false, false, false, args);
|
2)给某条消息指定过期时间
语法:
1 2 3 4 5
| AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("1000") .build(); channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));
|
如果在过期时间内,还没有消费者取消息,消息才会过期。
注意,如果消息已经接收到,但是没确认,是不会过期的。
如果消息处于待消费状态并且过期时间到达后,消息将被标记为过期。但是,如果消息已经被消费者消费,并且正在处理过程中,即使过期时间到达,消息仍然会被正常处理。
交换机
一个生产者给 多个 队列发消息,1 个生产者对多个队列。
交换机的作用:提供消息转发功能,类似于网络路由器
要解决的问题:怎么把消息转发到不同的队列上,好让消费者从不同的队列消费。
绑定:交换机和队列关联起来,也可以叫路由,算是一个算法或转发策略
绑定代码:
1
| channel.queueBind(queueName, EXCHANGE_NAME, "绑定规则");
|