JMS(java消息服务) 与ActiveMQ(消息队列)实战
发布日期:2021-04-30 21:01:42 浏览次数:115 分类:精选文章

本文共 3865 字,大约阅读时间需要 12 分钟。

1、基础知识:

JMS(Java Message Service)是Java平台上的一个消息服务协议,用于两个Java服务之间的消息传输。 本文将介绍P2P(Point-to-Point)消息模型,类似于HTTP协议的工作原理。 每个消息都会发送到特定的队列,接收者从队列中获取消息,直到消息被消费或超时。 与其他消息系统不同的是,每个消息只能有一个消费者,发送者和接收者之间没有依赖关系。 发送者可以通过JMS进行异步消息传输,无需等待接收者的状态。 接收者接收消息后,会向队列报告接收成功。

2、环境准备:安装ActiveMQ

ActiveMQ是最好的JMS实现之一,支持多线程并发,资源消耗较大。 安装ActiveMQ的过程相对复杂,但使用Docker部署变得非常简单。 以下是通过Docker快速部署ActiveMQ的命令: ```bash docker run -d --name activemq -p 61617:61616 -p 8162:8161 webcenter/activemq ``` 启动后,浏览器访问IP地址加上端口8162即可进入ActiveMQ管理界面,默认用户名密码均为admin。

3、下面开始写代码

使用Spring Boot创建项目,添加ActiveMQ依赖。以下是示例POM文件: ```xml
org.springframework.boot
spring-boot-starter-activemq
``

4、消息发送者代码

以下是发送消息的Java代码: ```java package com.example.demo.jms; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*;

public class Sender {

private static final int SEND_NUMBER = 5;
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61617");
Connection connection = null;
Session session = null;
Destination destination = null;
MessageProducer producer = null;

try {          connection = connectionFactory.createConnection();          connection.start();          session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);          destination = session.createQueue("FirstQueue");          producer = session.createProducer(destination);          producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);          sendMessage(session, producer);          session.commit();      } catch (Exception e) {          e.printStackTrace();      } finally {          try {              if (null != connection) {                  connection.close();              }          } catch (Throwable ignore) {          }      }  }  private static void sendMessage(Session session, MessageProducer producer) throws Exception {      for (int i = 1; i <= SEND_NUMBER; i++) {          TextMessage message = session.createTextMessage("ActiveMq 发送方发送: " + i);          System.out.println("发送方发送了" + i);          producer.send(message);      }  }

}

5、消息接收者代码

以下是接收消息的Java代码: ```java package com.example.demo.jms; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Receiver { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61617"); Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("FirstQueue"); consumer = session.createConsumer(destination); while (true) { TextMessage message = (TextMessage) consumer.receive(500000); if (null != message) { System.out.println("收到的消息有" + message.getText()); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) { connection.close(); } } catch (Throwable ignore) { } } } }

6、验证

运行发送者和接收者代码,观察控制台输出。发送者应输出5条消息,接收者也应显示接收到的5条消息。 通过ActiveMQ管理界面可以实时查看消息队列状态,确保消息传输正常。
上一篇:string类insert函数
下一篇:java8 LocalDate 基础用法

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2026年06月06日 01时17分09秒