RabbitMQ - 基于 SpringAMQP 带你实现五种消息队列模型
发布日期:2025-05-04 11:46:41
浏览次数:4
分类:精选文章
本文共 6756 字,大约阅读时间需要 22 分钟。
Spring AMQP 开发指南
目录
1.1 概念
- AMQP是什么? AMQP(Advanced Message Queuing Protocol)是一种用于在应用程序之间传递业务消息的开放标准协议,与语言和平台无关,更符合微服务架构下的独立性要求。
- Spring AMQP是什么? Spring AMQP 是基于 AMQP 协议定义的一套 API 规范,提供了模板用于发送和接收消息,包含两部分:
spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
1.2 前置知识(实现案例前必看)
1.2.1 创建队列
-
在实际开发中,创建交换机和队列有两种方式,其中
Bean方式最为常用和推荐。 -
队列的创建:
name:队列名称。durable:是否持久化。exclusive:是否独占。true表示队列只能由一个消费者使用,false表示所有消费者均可使用。autoDelete:自动删除。true表示没有消费者使用后,队列会自动删除。arguments:扩展参数,可自定义队列选项,如队列过期、消息过期、死信队列等。
-
示例代码:
在Configuration层中创建队列:@Configurationpublic class MqConfig { @Bean public Queue simpleQueue() { return new Queue("simple.queue", true); }}
1.2.2 创建交换机
-
直接交换机:
直接交换机的创建方法:@Configurationpublic class MqConfig { @Bean public DirectExchange directExchange() { return new DirectExchange("direct", true, true); }} -
扇出交换机和主题交换机:
扇出交换机和主题交换机的创建方式与直接交换机类似,主要区别在于不同的路由规则。 -
交换机的创建属性:
name:交换机名称。durable:是否持久化。autoDelete:自动删除。arguments:扩展参数,可配置延时消息、过期时间、死信交换机等。
1.2.3 创建绑定
- 绑定交换机和队列的方式是通过
BindingBuilder对象实现的。 - 示例代码:
@Configurationpublic class MqConfig { @Bean public Binding simpleBinding() { return BindingBuilder.bind(simpleQueue()) .to(directExchange()) .with("simple"); }} - 注意事项:
- 如果使用扇出交换机(
FanoutExchange),则不需要通过with指定bindingKey。
- 如果使用扇出交换机(
1.2.4 @RabbitListener 注解
- 情况一:队列已存在
- 如果队列已存在,可以直接通过
@RabbitListener注解监听队列消息,成为消费者。 - 示例代码:
@RabbitListener(queues = "simple.queue")public void ListenQueue(String msg) { System.out.println("消费者接收到 simple.queue 的消息: " + msg);} - 如果队列已存在,可以直接通过
- 情况二:队列不存在
- 如果队列不存在,可以通过
@RabbitListener注解创建交换机、队列和绑定关系。 - 示例代码:
@RabbitListener(bindings = @QueueBinding( value = @Queue("queue1"), exchange = @Exchange(name = "exchange1", type = ExchangeTypes.TOPIC), key = "test"))public void testListener(String msg) { System.out.println("消费者接收到消息: " + msg);} - 如果队列不存在,可以通过
1.2.5 为什么更推荐使用 @Bean 注解?
- 优点:
- 解耦合:队列、交换机、绑定关系的创建与业务逻辑分离。
- 更好地控制 RabbitMQ 监听器的创建和销毁,避免性能问题和内存泄漏。
- 管理连接和通道,更有助于提高可靠性和性能。
1.3 案例实现
1.3.1 基础消息队列(BasicQueue)
- 队列创建:
@Configurationpublic class MqConfig { @Bean public Queue simpleQueue() { return new Queue("simple.queue", true); }} - 消息发送: 在
publisher服务中添加测试类:@RunWith(SpringRunner.class)@SpringBootTestpublic class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessageSimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); }} - 消息消费: 在
consumer服务中添加消费逻辑:@Componentpublic class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void ListenQueue(String msg) { System.out.println("消费者接收到 simple.queue 的消息: " + msg); }}
1.3.2 工作队列(Work Queue)
-
工作队列:
Work Queue 用于提高消息处理速度,避免消息堆积。 -
消息预取问题:
消费者可能会先进行消息预取(prefetch),以避免同时处理过多消息。- 解决方法:设置
prefetch为 1,以适应消费者处理能力。
- 解决方法:设置
-
实现步骤:
- 在
publisher服务中循环发送消息:@Testpublic void testSendMessageWorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, workQueue!"; for (int i = 0; i < 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); }} - 在
consumer服务中添加多个消费者:@Componentpublic class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void ListenQueue(String msg) throws InterruptedException { System.err.println("消费者接收到 simple.queue 的消息: " + msg + LocalDate.now()); Thread.sleep(100); } @RabbitListener(queues = "simple.queue") public void ListenQueue2(String msg) throws InterruptedException { System.out.println("消费者接收到 simple.queue 的消息: " + msg + LocalDate.now()); Thread.sleep(25); }}
1.3.3 广播交换机(Fanout Exchange)
- 广播消息: Fanout Exchange 将消息广播到所有绑定的队列。
- 实现步骤:
- 创建 FanoutExchange 和 绑定队列:
@Configurationpublic class FanoutConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("itcast.fanout"); } @Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1"); } @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1) .to(fanoutExchange); } @Bean public Queue fanoutQueue2() { return new Queue("fanout.queue2"); } @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2) .to(fanoutExchange); }} - 编写测试方法:
@Testpublic void testFanoutMessage() { String exchangeName = "itcast.fanout"; String message = "hello! fanout"; rabbitTemplate.convertAndSend(exchangeName, "", message);}
1.3.4 路由交换机(Direct Exchange)
- 路由规则: Direct Exchange 根据路由键(
RoutingKey)将消息路由到指定队列。 - 实现步骤:
- 在
consumer服务中添加多个消费者:@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"java", "C++"}))public void listenDirectQueue1(String msg) { System.out.println("消费者 1 收到 Direct 消息:" + msg);}@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"java", "GO"}))public void listenDirectQueue2(String msg) { System.out.println("消费者 2 收到 Direct 消息:" + msg);} - 编写测试方法:
@Testpublic void testDirectMessage() { String exchangeName = "itcast.direct"; String message = "hello! direct"; rabbitTemplate.convertAndSend(exchangeName, "C++", message);}
1.3.5 主题交换机(Topic Exchange)
- 主题交换机: Topic Exchange 的路由键使用通配符(
#和*),以实现更灵活的路由。 - 实现步骤:
- 创建主题交换机和队列:
@Configurationpublic class TopicConfig { @Bean public TopicExchange topicExchange() { return new TopicExchange("itcast.topic"); } @Bean public Queue topicQueue1() { return new Queue("topic.queue1"); } @Bean public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue1) .to(topicExchange) .with("china.#"); } @Bean public Queue topicQueue2() { return new Queue("topic.queue2"); } @Bean public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue2) .to(topicExchange) .with("#.good"); }} - 编写测试方法:
@Testpublic void testTopicMessage() { String exchangeName = "itcast.topic"; String message = "hello! topic"; rabbitTemplate.convertAndSend(exchangeName, "china.good", message);}
通过以上步骤,可以实现 Spring AMQP 在不同场景下的消息队列开发,满足各种业务需求。
发表评论
最新留言
表示我来过!
[***.240.166.169]2026年06月11日 04时48分40秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
PHP——封装Curl请求方法支持POST | DELETE | GET | PUT 等
2023-02-28
PHP——底层运行机制与原理
2023-02-28
php一句话图片运行,【后端开发】php一句话图片木马怎么解析
2023-02-28
PHP三方登录,移动端与服务端交互
2023-02-28
php上传文件找不到临时文件夹
2023-02-28
PHP下curl用法分析
2023-02-28
php与web服务器关系
2023-02-28
redis事务操作
2023-02-28
PHP中array_merge和array相加的区别分析
2023-02-28
PHP中curl特性
2023-02-28
PHP中date时间不对
2023-02-28
PHP中dirname(__FILE__)的意思
2023-02-28
PHP中extract()函数的妙用
2023-02-28
PHP中fileinfo的作用以及怎么开启fileinfo
2023-02-28
PHP中file_get_contents如何带上cookies
2023-02-28
PHP中header的作用
2023-02-28
PHP中implode()和explode()
2023-02-28
PHP中ob系列函数讲解(浏览器缓存技术)
2023-02-28
PHP中serialize和json序列化与反序列化的区别
2023-02-28