springCloud整合RabbitMQ实现消息中间件
发布日期:2025-05-02 00:05:18 浏览次数:14 分类:精选文章

本文共 3590 字,大约阅读时间需要 11 分钟。

RabbitMQ入门及Spring Cloud整合实践指南

RabbitMQ概述

RabbitMQ是一款开源的消息中间件,基于AMQP协议实现,广泛应用于系统间消息传递的解耦。与Kafka、RocketMQ等工具不同,RabbitMQ在实时性要求不高或并发需求较高的场景中表现优异。以下是RabbitMQ的核心概念:

Broker、Exchange、Queue的关系

  • Broker:消息队列服务器实体,负责接收、存储和转发消息。
  • Exchange:消息交换机,消息首次到达Exchange,通过路由规则将消息发送至目标Queue。
  • Queue:消息队列,消息到达Queue后进入等待状态,等待消费者处理。

Exchange路由规则

  • Direct:严格按照Routing Key路由消息,需明确指定Key值。
  • Topic:基于关键字进行模糊匹配,支持使用通配符(如#*)。
  • Fanout:广播模式,无需配置Key,消息直接发送至所有绑定的Queue。

消息持久化配置

RabbitMQ支持消息持久化,以确保消息安全存储:

  • Exchange持久化:配置durable="true"
  • Queue持久化:配置durable="true"
  • 消息持久化:在发送消息时设置delivery_mode="PERSISTENT"

Spring Cloud整合RabbitMQ

1. 引入依赖

在项目依赖中添加RabbitMQ相关组件:

org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-test

2. 配置RabbitMQ

application.properties中配置RabbitMQ参数:

spring.application.name=rabbitmq-hello
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3. 实现消息生产者

@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitmqTemplate;
public void send() {
String content = "hello" + new Date();
System.out.println("Sender: " + content);
rabbitmqTemplate.convertAndSend("hello", content);
}
}

4. 实现消息消费者

@Component
@RabbitListener(queues = "hello")
public class Receiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver: " + hello);
}
}

5. RabbitMQ配置类

@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
}

6. 主类与测试类

@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class)
public class HelloApplicationTests {
@Autowired
private Sender sender;
@Test
public void hello() {
sender.send();
}
}

Direct和Topic交换机的应用

Direct交换机

@Bean
public Queue queueMessage() {
return new Queue("topic.message");
}
@Bean
public Queue queueMessages() {
return new Queue("topic.messages");
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
public void send() {
String content = "hello" + new Date();
rabbitmqTemplate.convertAndSend("exchange", "topic.message", "topic_message");
rabbitmqTemplate.convertAndSend("exchange", "topic.messages", "topic_messages");
}

Topic交换机

@Component
@RabbitListener(queues = "topic_message")
public class Receiver1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1: " + hello);
}
}
@Component
@RabbitListener(queues = "topic_messages")
public class Receiver2 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver2: " + hello);
}
}

Fanout交换机

@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange);
}
public void send() {
String content = "hello" + new Date();
rabbitmqTemplate.convertAndSend("exchange");
}

通过以上配置,可以灵活地实现消息路由策略,充分发挥RabbitMQ的优势。

上一篇:pdo sqlserver
下一篇:springMvc 3.0 使用基本原理

发表评论

最新留言

初次前来,多多关照!
[***.217.46.12]2026年06月13日 20时26分35秒