一、概述
ActiveMQ 是 Apache 软件基金会所研发的一款开源消息中间件,它实现了 Java Message Service (JMS) 1.1 规范。消息中间件在分布式系统中扮演着至关重要的角色,主要用于解耦发送者和接收者,使得它们可以异步地进行通信。
消息传递模式
点对点(P2P)模式:消息生产者发送消息到一个特定的队列,消息消费者从这个队列中接收消息。一个消息只能被一个消费者接收。例如,在一个订单处理系统中,一个订单创建消息被发送到订单处理队列,只有一个订单处理程序会接收并处理这个订单消息。
发布 / 订阅(Pub/Sub)模式:消息生产者将消息发布到一个主题(Topic),多个消息消费者可以订阅这个主题来接收消息。当有消息发布到主题时,所有订阅该主题的消费者都能收到消息。就像新闻发布系统,新闻机构(生产者)发布新闻(消息)到一个新闻主题,多个订阅该新闻主题的客户端(消费者)都可以收到新闻消息。
二、核心组件
Broker(代理)
它是 ActiveMQ 的核心服务器,负责接收和分发消息。Broker 管理消息队列和主题,存储消息直到消费者接收它们。可以将 Broker 看作是一个邮局,消息生产者把信件(消息)送到邮局,邮局(Broker)再把信件分发给合适的收件人(消息消费者)。
ActiveMQ 支持多种 Broker 配置,包括单机模式、主从模式和集群模式。单机模式适用于开发和测试环境;主从模式提供了高可用性,当主 Broker 出现故障时,从 Broker 可以接管消息处理工作;集群模式则可以通过水平扩展来处理大量的消息流量。
Destination(目的地)
它是消息的发送目标,包括队列(Queue)和主题(Topic)两种类型。在 JMS 规范中,消息生产者将消息发送到目的地,消息消费者从目的地接收消息。
队列是点对点模式下的目的地,具有先进先出(FIFO)的特性。主题是发布 / 订阅模式下的目的地,用于发布消息并允许多个订阅者接收。
Producer(生产者)
生产者负责创建和发送消息到目的地。在代码中,通过 JMS API 来创建消息并发送到指定的队列或主题。例如,在一个基于 Java 的应用中,使用 ActiveMQ 的生产者可以通过以下步骤发送消息:
首先,获取一个连接工厂(ConnectionFactory),然后创建一个连接(Connection),接着从连接创建一个会话(Session),再从会话创建一个消息生产者(MessageProducer),最后使用消息生产者发送消息。
Consumer(消费者)
消费者用于从目的地接收消息。和生产者类似,消费者也需要通过连接工厂、连接、会话等步骤来创建。消费者可以采用同步或异步的方式接收消息。
同步接收消息时,消费者会阻塞线程,直到有消息到达。例如,在一个简单的消息处理应用中,消费者调用接收消息的方法,线程会暂停,直到消息被接收并返回。异步接收消息则是通过注册消息监听器来实现,当有消息到达时,监听器的回调方法会被自动调用,这样消费者就可以在不阻塞线程的情况下接收消息。
三、安装与配置
下载与安装
可以从 ActiveMQ 官方网站(http://activemq.apache.org/)下载适合操作系统的安装包。对于 Linux 系统,可以下载压缩包,解压后即可使用;对于 Windows 系统,有安装程序,按照安装向导进行安装。
基本配置
主要配置文件是 activemq.xml,位于 ActiveMQ 安装目录的 conf 文件夹下。在这个文件中,可以配置 Broker 的属性,如端口号、存储方式等。例如,可以修改 Broker 监听的端口号,默认情况下,ActiveMQ 的管理控制台端口是 8161,消息传输端口是 61616。还可以配置消息存储方式,如使用文件存储或者数据库存储。
配置用户和权限也是很重要的一部分。ActiveMQ 支持基于简单身份验证和授权机制。可以在 activemq.xml 文件中或者通过单独的 JAAS(Java Authentication and Authorization Service)配置文件来设置用户名和密码,以及用户对队列和主题的访问权限。
四、在 Java 中的使用示例
引入依赖
如果是使用 Maven 构建的项目,需要在 pom.xml 文件中添加 ActiveMQ 的依赖:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq - all</artifactId>
<version>5.16.4</version>
</dependency>
发送消息(点对点模式)
以下是一个简单的 Java 代码示例,用于发送消息到队列:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class QueueProducer {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(队列)
Destination destination = session.createQueue("testQueue");
// 创建消息生产者
MessageProducer producer = session.createMessageProducer(destination);
// 创建消息
TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
// 发送消息
producer.send(message);
// 关闭资源
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接收消息(点对点模式)
相应的接收消息的代码示例:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class QueueConsumer {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(队列)
Destination destination = session.createQueue("testQueue");
// 创建消息消费者
MessageConsumer consumer = session.createMessageConsumer(destination);
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage)message;
System.out.println("Received message: " + textMessage.getText());
}
// 关闭资源
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
这只是 ActiveMQ 的一些基础内容,它还涉及到事务处理、消息持久化、性能优化等多个复杂的主题。
除了点对点(P2P)模式,ActiveMQ 还有发布 / 订阅(Pub/Sub)模式。
发布 / 订阅(Pub/Sub)模式的原理
在发布 / 订阅模式中,消息生产者(发布者)将消息发布到一个主题(Topic)。这个主题就像是一个公告板,发布者把消息 “张贴” 在上面。多个消息消费者(订阅者)可以订阅这个主题,当有消息发布到主题时,所有订阅该主题的消费者都能收到消息。例如,在一个股票行情系统中,股票数据发布者将股票价格变化等消息发布到一个 “股票行情” 主题。所有订阅这个主题的客户端,如股票交易软件、金融分析软件等(消费者)都会收到这些股票消息,这样多个不同的应用都能同时获取最新的股票行情信息。
与点对点模式的对比
消息接收者数量
点对点模式下,一个消息只能被一个消费者接收。就好像在一个私人邮箱(队列)中,一封信(消息)只能被一个收件人(消费者)收取。而在发布 / 订阅模式中,一个消息可以被多个订阅者接收,如同在公告板(主题)上张贴的通知,所有关注这个公告板的人(订阅者)都能看到通知(消息)。
消息目的地类型
点对点模式的目的地是队列(Queue),它具有先进先出(FIFO)的特性,消息严格按照发送顺序被消费者接收。发布 / 订阅模式的目的地是主题(Topic),消息发布到主题后,是广播式地发送给所有订阅者,没有 FIFO 的严格限制。
应用场景差异
点对点模式适用于需要保证消息被唯一处理的场景,比如任务队列。一个任务(消息)被发送到任务队列,只需要一个工作者(消费者)来处理这个任务。发布 / 订阅模式更适合于信息广播的场景,像新闻推送、事件通知等,需要将相同的信息同时发送给多个接收者。
在代码中的实现差异(Java 示例)
发布消息(发布 / 订阅模式)
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicProducer {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(主题)
Destination destination = session.createTopic("testTopic");
// 创建消息生产者
MessageProducer producer = session.createMessageProducer(destination);
// 创建消息
TextMessage message = session.createTextMessage("This is a topic message.");
// 发送消息
producer.send(message);
// 关闭资源
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接收消息(发布 / 订阅模式)
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicConsumer {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(主题)
Destination destination = session.createTopic("testTopic");
// 创建消息消费者
MessageConsumer consumer = session.createMessageConsumer(destination);
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage)message;
System.out.println("Received topic message: " + textMessage.getText());
}
// 关闭资源
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
这两种模式各有其适用场景,在实际的分布式系统开发中,需要根据具体的业务需求来选择合适的消息传递模式。
JMS中的角色
Broker
消息服务器,作为server提供消息核心服务,一个activemq实例,相当于一个服务端
点击Manage ActiveMQ broker可以查看当前这台服务器的信息。
MQ作用-结偶
MQ存储有一个目的地(类似mysql的表),然后有两种类型(queue/topic)
queue : 只可以消费一次
topic : 可以被多个不同的消费者消费 消费的时候要指定目的地
Queue简单使用案例
首先创建一个maven项目,然后将pom依赖配置到pom.xml文件中
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.11</version>
</dependency>
</dependencies>
创建一个生产者java类文件
package com.world.mq;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息发送者(生产者)
*
* @author daosen
*
*/
public class Sender {
public static void main(String[] args) throws JMSException {
// 1.获取连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616");
// 2.获取一个向ActiveMQ的连接
Connection connection = connectionFactory.createConnection();
// 3.获取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.找目的地,获取destination,消费端,也会从这个目的地取消息
Queue queue = session.createQueue("user");
// 5.消息创建者
MessageProducer producer = session.createProducer(queue);
// consumer -> 消费者
// producer -> 创建者
// 6.创建一个消息
for (int i = 0; i < 100; i++) {
TextMessage textMessage = session.createTextMessage("hello" + i);
// 7.向目的地写入消息
producer.send(textMessage);
}
// 7.关闭连接
connection.close();
System.out.println("System exit.");
}
}
创建一个消费者java类
package com.world.mq;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息发送者(消费者)
*
* @author daosen
*
*/
public class Receiver {
public static void main(String[] args) throws JMSException {
// 1.获取连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616");
// 2.获取一个向ActiveMQ的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// 3.获取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.找目的地,获取destination,消费端,也会从这个目的地取消息
Destination queue = session.createQueue("user");
// 5.获取消息
MessageConsumer consumer = session.createConsumer(queue);
// consumer -> 消费者
// producer -> 创建者
while (true) {
TextMessage message = (TextMessage) consumer.receive();
System.out.println("message:" + message.getText());
}
}
}
测试代码
启动消费者代码
启动生产者代码
查看消费者对应的控制台打印日志。
可以看出测试成功了,消费者把消息队列的数据取了出来。 然后我们可以从网页上查看Queues的信息
Number Of Pending Messages:挂起的消息数目
Number Of Consumers:消费者人数
Messages Enqueued:消息排队
Messages Dequeued:消息去排队
JMS消息由三部分构成
消息头
每个消息头字段都有相应的getter和setter方法。
消息属性
如果需要消息头字段以外的值,那么可以使用消息属性。
消息体
JMS定义的消息类型有TextMessage,MapMessage,BytesMessage,StreamMessage,ObjectMessage
一、ActiveMQ简介 📚
1.1 什么是ActiveMQ? 🤔
ActiveMQ是一个完全支持JMS(Java Message Service)规范的开源消息中间件,由Apache基金会维护。它能够提供企业级的消息服务功能,包括点对点的消息传递,发布/订阅模式等。它的主要作用是在分布式系统中实现消息的异步传递,解耦系统间的依赖关系。
1.2 ActiveMQ的作用和优势 👍
ActiveMQ具有以下主要优点:
可靠性:ActiveMQ支持持久化消息,即使在系统崩溃的情况下,消息也不会丢失。
高效性:ActiveMQ支持高并发,能够处理大量的消息传递。
灵活性:ActiveMQ支持多种消息传递模式,包括点对点、发布/订阅等。
易用性:ActiveMQ有良好的Java API支持,易于集成和使用。
二、ActiveMQ的架构 🏗
2.1 ActiveMQ的基本架构 📐
ActiveMQ的基本架构包括以下几个部分:
Broker:Broker是ActiveMQ的核心组件,负责接收、存储和转发消息。
Producer:Producer是消息的生产者,它将消息发送到Broker。
Consumer:Consumer是消息的消费者,它从Broker接收消息。
Destination:Destination是消息的目的地,可以是队列(Queue)或者主题(Topic)。
2.2 ActiveMQ的核心组件 🧩
ActiveMQ的核心组件包括:
Message:消息,是ActiveMQ进行通信的基本单元。
Session:会话,是发送和接收消息的一个单线程上下文。
Connection:连接,是客户端和Broker之间的网络连接。
三、ActiveMQ的安装与配置 💻
3.1 ActiveMQ的安装步骤 🛠
安装ActiveMQ的步骤如下:
下载ActiveMQ的安装包。
解压安装包到指定的目录。
进入bin目录,运行activemq start命令启动ActiveMQ。
3.2 ActiveMQ的配置指南 📝
ActiveMQ的主要配置文件是conf/activemq.xml,我们可以在这个文件中配置Broker、存储器、网络连接器等组件的详细信息。
四、ActiveMQ的基本使用 📖
4.1 如何创建和管理ActiveMQ的Broker 🚦
创建和管理ActiveMQ的Broker主要包括以下步骤:
创建BrokerService对象。
设置BrokerService的各项属性。
调用BrokerService的start方法启动Broker。
4.2 如何发送和接收消息 📩
发送消息的步骤如下:
创建ConnectionFactory对象。
通过ConnectionFactory创建Connection。
通过Connection创建Session。
通过Session创建Destination。
通过Session创建Producer。
通过Producer发送消息。
接收消息的步骤如下:
创建ConnectionFactory对象。
通过ConnectionFactory创建Connection。
通过Connection创建Session。
通过Session创建Destination。
通过Session创建Consumer。
通过Consumer接收消息。
五、ActiveMQ的高级功能 🎩
5.1 ActiveMQ的持久化 📀
ActiveMQ支持将消息持久化到磁盘中,以防止系统崩溃导致消息丢失。ActiveMQ的持久化策略包括KahaDB、LevelDB和JDBC等。
5.2 ActiveMQ的消息过滤 🕸
ActiveMQ支持通过选择器(Selector)对消息进行过滤,只接收符合特定条件的消息。
5.3 ActiveMQ的事务管理 🏦
ActiveMQ支持JMS事务,可以保证在一个事务中的所有消息操作要么全部成功,要么全部失败。
六、ActiveMQ的性能优化 🚀
6.1 ActiveMQ的性能调优策略 🏎
ActiveMQ的性能调优策略主要包括减少持久化的频率、增大网络连接的数量、使用更高效的序列化方式等。
6.2 ActiveMQ的监控和管理 📊
ActiveMQ提供了JMX和Web Console两种方式进行监控和管理。
七、ActiveMQ的实战应用 💼
7.1 ActiveMQ在分布式系统中的应用 🏢
在分布式系统中,ActiveMQ可以用来解耦系统间的依赖关系,实现高效的异步通信。
7.2 ActiveMQ在微服务架构中的应用 🏦
在微服务架构中,ActiveMQ可以用来实现服务间的消息传递,支持事件驱动和消息驱动的微服务设计。
八、ActiveMQ的未来发展趋势 🌈
随着微服务和云计算的发展,ActiveMQ的应用场景将会更加广泛。未来,ActiveMQ可能会加强对云平台的支持,提供更加强大和灵活的消息服务。
九、结论 🎯
ActiveMQ是一个强大、灵活的消息中间件,能够满足企业级的消息服务需求。通过深入理解和实践ActiveMQ,我们可以更好地利用它来解决实际问题。
十、参考文献 📚
ActiveMQ官方文档
Apache ActiveMQ实战
Java消息服务JMS(第二版)
基于JMS的ActiveMQ简单使用
基于Queue模式
一.队列模式特点
1.客户端包括生产者和消费者
2.队列中的消息只能被一个消费者消费
3.消费者可以随时消费队列中的消息
二.创建过程
1.创建连接Connection
2.创建会话Session
3.通过Session来创建其它的(MessageProducer、MessageConsumer、Destination、TextMessage)
4.将生产者 MessageProducer 和消费者 MessageConsumer 都会指向目标 Destination
5.生产者向目标发送TextMessage消息send()
6.消费者设置监听器,监听消息。
三.代码实现
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.14</version>
</dependency>
public class ActiveMQProducer<T> {
private static final String URL = "tcp://192.168.3.78:61616";
private static final String QUEUE_NAME = "queue-demo";
public void sendQueue(T t) throws JMSException {
Connection connection = null;
Session session = null;
try {
//1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//2.创建连接
connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createQueue(QUEUE_NAME);
// 6.创建生产者
MessageProducer producer = session.createProducer(destination);
String message = JSON.toJSONString(t);
TextMessage textMessage = session.createTextMessage(message);
System.out.println("发送Queue消息:" + message);
producer.send(textMessage);
} finally {
if (null != session) {
session.close();
}
if (null != connection) {
connection.stop();
connection.close();
}
}
}
}
消费者
public class ActiveMQConsumer {
private static final String URL = "tcp://192.168.3.78:61616";
private static final String QUEUE_NAME = "queue-demo";
public void consumerQueue() throws JMSException {
//1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//2.创建连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createQueue(QUEUE_NAME);
//6.创建消费者
MessageConsumer consumer = session.createConsumer(destination);
//7.创建消息监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage= (TextMessage) message;
try {
System.out.println("消费Queue消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
基于Topic(主题)模式
主题模式又名发布订阅模式(Pub/Sub)
一.主题模式特点
1.客户端包括发布者和订阅者
2.Topic(主题)中消息被所有订阅者消费
3.消费者不能消费在订阅之前就发送到主题中的消息,也就是说,消费者要先于生产者启动
二.创建过程
1.创建连接Connection
2.创建会话Session
3.通过Session来创建其它的(MessageProducer、MessageConsumer、Destination、TextMessage)
4.将生产者 MessageProducer 和消费者 MessageConsumer 都会指向目标 Destination
5.生产者向目标发送TextMessage消息send()
6.消费者设置监听器,监听消息。
三.代码实现
生产者
public class ActiveMQProducer<T> {
private static final String URL = "tcp://192.168.3.78:61616";
private static final String TOPIC_NAME = "topic-demo";
public void sendTopic(T t) throws JMSException {
Connection connection = null;
Session session = null;
try {
//1.创建ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//2.创建连接
connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//5.创建topic
Destination destination = session.createTopic(TOPIC_NAME);
// 6.创建生产者
MessageProducer producer = session.createProducer(destination);
String message = JSON.toJSONString(t);
TextMessage textMessage = session.createTextMessage(message);
System.out.println("发送Topic消息:" + message);
producer.send(textMessage);
session.commit();
} finally {
if (null != session) {
session.close();
}
if (null != connection) {
connection.stop();
connection.close();
}
}
}
}
消费者
public class ActiveMQConsumer {
private static final String URL = "tcp://192.168.3.78:61616";
private static final String TOPIC_NAME = "topic-demo";
public void consumerTopic() throws JMSException {
//1.创建ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//2.创建连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createTopic(TOPIC_NAME);
//6.创建消费者
MessageConsumer consumer = session.createConsumer(destination);
//7.创建消息监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage= (TextMessage) message;
try {
System.out.println("消费Topic消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
一、ActiveMQ是如何产生的?
产生背景
一开始消息中间件的厂商繁多,且各个厂商之间没有统一的规范,这就导致了各消息中间件非常难以整合协作,因此,后来陆续出现了如JMS和AMQP这样的消息队列规范,提供了统一的标准,而ActiveMQ就是完全遵循JMS规范开发的消息队列。
JMS规范
基本概念
什么是JMS(Java Message Service)规范?JMS是一个基于Java平台面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。在设计JMS时,设计师就计划能够结合现有消息队列的优点,如:
不同的消息传送模式或域,例如点对点消息传送和发布/订阅消息传送
支持同步和异步消息
支持可靠性消息的传输
支持常见的消息格式,如:文本、字节、流、对象等
JMS体系结构
上面是从百度找的一个图片,下面对其中各个对象分别进行说明:
ConnectionFactory:连接工厂,一般设为单例模式,一旦创建,就一直运行在应用容器内,客户端使用连接工厂创建一个JMS连接。
Connection:JMS连接表示JMS客户端和服务器端之间的一个活动的连接。
Session:JMS会话表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。
Destination:消息管道,从生产端流向客户端,包括队列(PTP),主题(Pub/Sub)。
Message Producer和Message Consumer:生产者和消费者对象由Session对象创建,用于发送和接收消息。
Message:JMS 消息由以下几部分组成:消息头,属性,消息体。
消息头(header):JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由Routing。
属性(property):由消息发送者产生,用来添加删除消息头以外的附加信息。
消息体(body):由消息发送者产生,JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。
了解了基本概念后,下面就一起来看看如何使用ActiveMQ吧。
二、如何使用?
基本功能
本节主要讲解activeMQ的基本功能和使用,详细API请查阅官方文档。
消息传递
在上文也讲了ActiveMq支持P2P(点对点)传输和pub/sub模型,这两种传递方式的本质区别就是消息是否可重复消费。比如微信私聊和群聊,私聊就是P2P,除了私聊的双方其它人无法再获取消息,而群聊就相当于pub/sub模式,即群成员都订阅了该群的消息。下面首先我们来看看P2P传输。
P2P
先创建一个Producer生产消息:
public static void main(String[] args) {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
Connection connection = null;
try {
// 创建并开启连接
connection = factory.createConnection();
connection.start();
// 创建会话,设置是否为事务型会话以及消息签收方式
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建发送队列
Destination destination = session.createQueue("queue");
// 创建消息发送者
MessageProducer producer = session.createProducer(destination);
// 创建消息并设置消息内容
TextMessage textMessage = session.createTextMessage();
textMessage.setText("Hello");
// 发送消息
producer.send(textMessage);
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
上面代码注释写的很清楚了,可以看到是完全符合JMS的体系结构的,首先创建一个连接工厂,并通过连接工厂创建连接,然后通过连接创建会话(在创建会话时可以指定是否为事务型会话以及设置消息的签收方式,相关概念在后面会详细讲解),之后再为本次会话创建管道,即传输队列(这里可以指定是创建队列(p2p)还是还是主题(pub/sub)),最后创建消息对象发送到管道提交即完成本次会话的消息生产。接下来看看消费者如何消费消息:
public static void main(String[] args) {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
Connection connection = null;
try {
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建接收队列
Destination destination = session.createQueue("queue");
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(destination);
// 接收消息
TextMessage message = (TextMessage) consumer.receive();
System.out.println(message.getText());
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
整个流程和生产者流程基本是一样的,只不过消费者不再需要自己生产消息,而是从消息队列中获取,这里是通过receive方法获取的,该方法相当于是客户端主动从队列中“拉”消息,并且在消息队列为空时会阻塞等待消息传入;另外还有一种队列“推”送的方式,通过监听器实现。
public static void main(String[] args) {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
Connection connection = null;
try {
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue");
MessageConsumer consumer = session.createConsumer(destination);
// 使用监听器监听队列
MessageListener listener = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println(((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
};
while (true) {
consumer.setMessageListener(listener);
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
需要注意的是listener不会阻塞等待,当消息到达时会主动调用onMessage方法,但它的生命周期和方法的生命周期是相同的,需要像上面一样死循环监听,同时receive和listener是互斥的,即同时只能使用其中一种方式来获取消息。
pub/sub
相对于P2P,发布订阅模式就是可以有多个消费者监听同一个队列,并可重复消费同一个消息,整个代码实现流程和上面的是一样的,只是将 Destination destination = session.createQueue(“queue”);改为Destination destination = session.createTopic(“topic”);即可。 这里需要思考一个问题,消费者能够订阅到哪个时间段的消息呢?是所有的消息还是自消费者注册监听之后的呢?很显然,肯定是只能获取到注册监听之后的消息。但是,若是消费者中途怠机再恢复,怠机过程中产生的消息能否接收到呢?AcitveMQ是支持获取怠机过程中的消息的,即持久订阅工功能。
持久订阅
什么是持久订阅?举个例子,相当于你在微博点击关注某个博主,无论你是否在线,博主发送的消息你都是可以获取到的,持久订阅就类似这样,在创建好连接后首先设置一个自身的身份标识clientId,这个id是唯一的:
connection.setClientID("lwj");
然后通过下面API创建消费者即可创建持久订阅:
MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "lwj");
需要注意持久订阅只有pub/sub模式下才支持。
消息传递的可靠性
在学习了基础的使用后,我们应该考虑一个问题,消息队列该如何保证消息传递的可靠性呢?即如何保证生产的消息正确被消费者签收或者被生产者销毁?这就牵涉到事务型会话和非事务型会话,JMS Session接口提供了 commit 和 rollback 方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。 事务型的会话总是牵涉到事务处理中,commit 或 rollback 方法一旦被调用,一个事务就结束了,而另一个事务被开始;关闭事务性会话将回滚其中的事务。
事务型会话与非事务型会话
JMS在创建session会话时通过第一个参数指定是否为事务型会话:
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
当为事务型会话时,调用commit方法前消息并不会真正的投递到消息中间件中去,而在调用commit后消息会自动确认,需要保证发送端和接收端都是事务型会话。 当为非事务型会话时,相当于生产者逐个投递到消息中间件,但是消息的确认取决于消费者如何设置ACK_MODE,即创建会话时的第二个参数,该参数有4个选项:
SESSION_TRANSACTED:当为事务型会话时的默认选项,若不是事务型会话设置该参数会抛出异常
AUTO_ACKNOWLEDGE:当消费者成功的从 receive 方法返回的时候,或者从MessageListenner.onMessage 方法成功返回的时候,会话自动确认客户收到消息。
CLIENT_ACKNOWLEDGE:消费者通过调用Message的 acknowledge 方法确认消息。需要注意该模式下何时调用acknowledge方法,那么在调用该方法之前收到的消息都会一起被确认,而在此之后收到的消息不会被确认。比如,发送10条消息,消费者在收到第5条消息时调用acknowledge方法,那么前5条都会被确认。
DUPS_OK_ACKNOWLEDGE:消息延迟批量确认,消息生产者在消费者没有确认消息时会重新发送消息。该模式可优化消费者确认消息的性能,但可能会导致消费者收到重复消息(这个参数在优化一节中还会详细讲解)。
需要注意第一个是和事务绑定,后面三个都是针对消费端的,即消息中间件需要接收到消费者的ack才会认为消息被正确处理。
持久化与非持久化消息的存储策略
消息队列为保证高效,消息首先肯定是存储在内存中的,那么一旦消息队列怠机或者消息过多超出内存,消息就会面临丢失的风险,所以需要有相关的手段来保证。 正常情况下,非持久化消息是存储在内存中的,能够存储的最大消息数据在/conf/activemq.xml文件中的systemUsage节点可配置:
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
memoryUsage是设置整个ActiveMQ节点的“可用内存限制”。这个值不能超过ActiveMQ本身设置的最大内存大小。其中的percentOfJvmHeap属性表示百分比。
storeUsage是设置整个ActiveMQ节点,用于存储“持久化消息”的“可用磁盘空间”。
tempUsage是设置临时文件大小。一旦ActiveMQ服务节点存储的消息达到了memoryUsage的限制,非持久化消息就会被转储到 temp store区域,虽然我们说过非持久化消息不进行持久化存储,但是ActiveMQ为了防止数据洪峰出现时非持久化消息大量堆积致使内存耗尽的情况出现,还是会将非持久化消息写入到磁盘的临时区域——temp store。
从上文我们可以了解到ActiveMQ的存储策略,但是还有个问题,持久化消息是通过什么介质存储的呢?主要有以下5种:
KahaDB:默认的存储方式。在data/kahadb这个目录下,会生成四个文件:
db-*.log 存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较快的。默认是32M,达到阀值会自动递增。
db.data 它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-*.log里面存储的消息。
db.redo 用来进行消息恢复
lock文件 锁,表示当前获得kahadb读写权限的broker
JDBC存储,需要配置JDBC连接以及引入相应的jar。会在数据库创建三张表:
ACTIVEMQ_MSGS:消息表,queue和topic都存在这个表中
ACTIVEMQ_ACKS:存储持久订阅的信息和最后一个持久订阅接收的消息ID
ACTIVEMQ_LOCKS:锁表,用来确保某一时刻,只能有一个ActiveMQ broker实例来访问数据库
Memory存储:即内存
LevelDB存储:性能优于KahaDB,但官方不推荐使用。
JDBC Message store with ActiveMQ Journal:这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库和读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。
详细配置方式参照官方文档。
消息发送策略
ActiveMQ支持同步、异步两种发送模式将消息发送到消息中间件上。 同步发送过程中,发送者发送一条消息会阻塞直到消息中间件反馈一个确认消息,表示消息已经被消息中间件处理。这个机制提供了消息的安全性保障,但是由于是阻塞的操作,会影响到客户端消息发送的性能。异步发送的过程中,发送者不需要等待broker提供反馈,所以性能相对较高。但是可能会出现消息丢失的情况。所以使用异步发送的前提是在某些情况下允许出现数据丢失的情况。 默认情况下,非持久化消息是异步发送的,持久化消息并且是在非事务模式下是同步发送的。但是在开启事务的情况下,消息都是异步发送。由于异步发送的效率会比同步发送性能更高,所以在发送持久化消息的时候,尽量去开启事务会话。除了持久化消息和非持久化消息的同步和异步特性以外,我们还可以通过以下几种方式来设置异步发送:
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616?jms.useAsyncSend=true");
((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
((ActiveMQConnection)connection).setUseAsyncSend(true);
三、原理浅析
ActiveMQ的上手非常简单,但仅仅只是会用肯定不行,只有了解其原理,才能对特定的场景做出优化和设计,而要了解其原理,只有通过分析其源码才能完全了解。限于篇幅原因,接下来只是针对发送消息、消费消息和消息重发机制的流程做一个概括性总结。
发送原理
上面就是整个发消息的流程图,当生产者调用send发送消息时,首先会判断producerWindowSize(这个稍后会详细讲解)是否还有空间,若没有了就阻塞等待空间;反之则继续判断是否是异步发送消息,如果是同步,则直接通过底层传输协议传输消息,并阻塞等待response结果;如果是异步发送,同样通过底层传输协议传输消息,但不再需要阻塞等待response,同时会去增加producerWindowSize的值。 什么是producerWindowSize?这个配置主要用来约束异步发送时producer端允许积压(未ack)的消息大小。当发送消息时,首先会判断producerWindowSize是否还有剩余空间,如果没有就阻塞等待空间释放,即等待broker(可以就当作是消息队列中间件)确认消息;如果有空间,就放入到该空间下,等待broker处理。可以通过以下两种方式配置:
在连接url中设置,对所有producer都有效:tcp://localhost:61616?jms.producerWindowSize=1048576
在destination名称中设置,仅对使用该destination的producer有效,并且优先级更高:test-queue?producer.windowSize=1048576
消费原理
消费消息流程
消费者在通过receive消费消息时,并不是直接去broker上获取的消息,而是从本地的unconsumerMessage队列中获取,而该队列则是每次批量从broker上拉取消息,每次拉取的数量就是由prefetchSize控制的。当队列中没有消息时,就会阻塞等待获取消息;反之则依次从unconsumerMessage队列中取出消息消费,并将应答放到delivered队列返回给broker,消费消息和ack是异步的。那消息是如何添加到unconsumerMessage队列中的呢?这个过程也是非常复杂的,这里就不详细分析了,感兴趣的读者可自行分析源码。下面我们来看看消息的确认过程。
消息确认及消息重发
看到上面这张图,可能会比较懵,没关系,我们首先来了解一下ACK_MODE和ACK_TYPE,ACK_MODE在上文已经讲过了,但仅仅是消费端确认了还不够,还需要让broker知道消息是否正常消费,因此在确认消息后消费者还会根据处理结果返回不同的ACK_TYPE给broker,ACK_TYPE一共有以下6种:
DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未处理结束。
POSION_ACK_TYPE = 1 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者 加入DLQ(死信队列)
STANDARD_ACK_TYPE = 2 “标准"类型,通常表示为消息"处理成功”,broker 端可以删除消息了。
REDELIVERED_ACK_TYPE = 3 消息需"重发",比如 consumer 处理消息时抛出了异常,broker 稍后会重新发送此消息。
INDIVIDUAL_ACK_TYPE = 4 表示无论在任何 ACK_MODE 下只确认"单条消息"。
UNMATCHED_ACK_TYPE = 5 在 Topic 中,如果一条消息在转发给“订阅者”时,发现此消息不符合 Selector 过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(相当于在 Broker 上确认了消息)。
EXPIRED_ACK_TYPE = 6 消息已过期。
清楚了ACK_TYPE所对应的意思后,再看这张图就很明了了。首先从unconsumerMessage队列中取出消息并处理,若消费消息出现异常失败,消费者就会返回REDELIVERED_ACK_TYPE给broker,broker就会重发该条消息,当超过次数限制消费者就会返回POSION_ACK_TYPE告诉broker该条消息是有毒的,broker根据配置将该条消息抛弃或是加入死信队列中(该队列可以被重新消费);若消费消息成功未出现异常,就会将ack message添加到delivered队列中,消费该队列的消息时,会进行一系列判断并根据结果返回不同的ACK_TYPE。 刚刚我们提到消息消费失败会导致消息重发,那究竟在哪些情况下会被重发呢?主要有以下几种情况:
在事务型会话中,若是没有调用session.commit提交确认消息或者调用session.rollback方法。
在非事务性会话中,ACK 模式为 CLIENT_ACKNOWLEDGE 的情况下,没有调用 acknowledge 或者调用了 recover 方法。
处理消息时发生异常。
这就是整个消息的确认和重发原理。
四、基本优化
使用任何一个中间件并出现性能问题时,我们都会考虑如何去优化,本节只是简单讲讲消费端的优化。 在上文我们提到过prefetchSize配置,该配置表示消费者每次从队列中获取消息的条数,该配置为0时表示消费者通过pull方式从broker获取消息,另外不同类型的队列具有不同的默认值:
持久化队列和非持久化队列的默认值为1000
持久化 topic 默认值为 100
非持久化topic的默认值为 Short.MAX_VALUE-1
但是仅仅只有批量获取肯定是不够的,因为从上文我们知道,消息还有一个确认过程,如果还是单个单个的确认,那这个批量获取就没有什么意义了(除了第一次是批量获取消息,后面都是单个单个的获取消息),所以ActiveMQ还提供了optimizeAcknowledge配置,该参数为true时,消费者会延迟确认(默认是ack了0.65*prefetchSize个消息后才确认)。该配置可以直接在连接url中配置(其中optimizeAcknowledgeTimeOut是表示超过该时间也会自动确认):
ConnectionFactory connectionFactory= new w ActiveMQConnectionFactory( "tcp://192.168.0.106:61616?jms.optimizeAcknowledge=true&jms.optimizeAcknowledgeTimeOut=10000" ");
因此,这两者协同配合才能起到优化的作用。另外,需要注意的是,如果消费端的消费速度比较高,通过这两者组合是能大大提升消费者的性能。如果消费者的消费性能本身就比较慢,设置比较大的 prefetchSize 反而不能有效的达到提升消费性能的目的,因为过大的prefetchSize 会导致某一消费端积压消息,而其它的消费端却“无所事事”。同时,该方案需要消费端能够容忍重复消息,因为当消息还未确认时消费者就怠机了,那么broker就会将该消息重发给其它消费者,导致消息重复。
总结
通过以上学习,我们能看出ActiveMQ是非常简单易上手的,但它有以下缺点:
持久化消息存储需要建立索引,因此吞吐量低,不适合TPS要求高的业务。
不支持消息分片功能,只能自己实现。
由于消息队列产品众多,本文只是从基本概念和使用、核心机制原理以及优化等几方面对ActiveMQ做了一个概括性的引导和总结,并未涉及详细的源码分析,另具体的配置也请参照官方文档。
activemq监听不到消息
可能有以下原因导致activemq监听不到消息:
检查activemq服务是否启动。如果服务未启动,则无法监听到消息。
检查消息队列是否正确。如果消息队列名称不正确,则无法监听到消息。
检查消息监听器是否正确实现。如果监听器实现有误,则无法正确监听到消息。
检查消息过滤器是否正确设置。如果消息过滤器设置有误,则可能会错过需要监听的消息。
检查消息生产者是否正确发送消息。如果消息生产者未正确发送消息,则无法监听到消息。
检查消息消费者是否正确消费消息。如果消息消费者未正确消费消息,则无法监听到消息。
建议逐一排查以上问题,以确定导致activemq无法监听到消息的具体原因。