Java API 开发中使用 HornetQ 进行消息处理
随着互联网的快速发展,大量的信息交互涌现出来,消息队列成为解决高并发、高可用、异步处理等问题的重要手段。HornetQ 是由 JBoss 开发的基于 JMS 协议的高性能、高可用的开源消息中间件。本文将介绍如何在 Java API 开发中使用 HornetQ 进行消息处理。
一、快速入门
- 下载 HornetQ
HornetQ的官网(http://hornetq.apache.org/downloads.html)提供了多种格式的下载包,这里选择 HornetQ-2.4.0.Final-bin.tar.gz。
- 安装 HornetQ
下载完成后,将 HornetQ-2.4.0.Final-bin.tar.gz 解压缩到本地文件夹中。
- 启动 HornetQ
进入 HornetQ 的 bin 目录,执行以下命令:
./run.sh
出现以下日志信息则表示成功启动 HornetQ 服务:
11:14:21,867 INFO [ServerImpl] Starting HornetQ Server
11:14:21,986 INFO [JournalStorageManager] Using NIO Journal
11:14:22,626 INFO [NettyAcceptor] Started Netty Acceptor version #{version}
11:14:22,697 INFO [HornetQServerImpl] HornetQ Server version #{version} [${name}] started
- 部署 HornetQ 控制台
将 HornetQ 的 hornetq-console.war 放入 Tomcat 的 webapps 目录下,启动 Tomcat,通过 http://localhost:8080/hornetq-console 访问 HornetQ 控制台。
二、HornetQ 的使用
- 发布和接收消息
HornetQ 的发布订阅模式是基于 Topic 的,发布端向某个 Topic 发布消息,而多个接收端可以同时订阅这个 Topic,接收端就可以接收到多个发布端发布的消息。
(1)消息发布端
先创建一个发布端(Publisher)发送消息,代码如下:
public class Publisher {
public static void main(String[] args) throws Exception {
// 初始化连接工厂等配置信息
ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
JMSContext jmsContext = connectionFactory.createContext();
// 发送消息
JMSProducer producer = jmsContext.createProducer();
Destination destination = HornetQJMSClient.createTopic("exampleTopic");
producer.send(destination, "Hello, HornetQ!");
// 关闭连接
jmsContext.close();
}
}
(2)消息接收端
再创建一个接收端(Subscriber)去接收消息,并将消息打印出来,代码如下:
public class Subscriber {
public static void main(String[] args) throws Exception {
// 初始化连接工厂等配置信息
ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
JMSContext jmsContext = connectionFactory.createContext();
// 创建消费者
Destination destination = HornetQJMSClient.createTopic("exampleTopic");
JMSConsumer consumer = jmsContext.createConsumer(destination);
// 接收消息并打印
String message = null;
do {
message = consumer.receiveBody(String.class, 1000);
System.out.println("Received message: " + message);
} while (message != null);
// 关闭连接
jmsContext.close();
}
}
在运行发布端和接收端之后,可以在 HornetQ 控制台上查看发布端发送的消息,如下图所示:
- 消息持久化
HornetQ 支持将消息进行持久化存储,这意味着即使 HornetQ 宕机了,也能保证消息不丢失。
(1)发送者
我们需要将消息的持久性设置为 DeliveryMode.PERSISTENT,如下所示:
public class Publisher {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
JMSContext jmsContext = connectionFactory.createContext();
// 设定持久性
JMSProducer producer = jmsContext.createProducer();
destination = HornetQJMSClient.createTopic("exampleTopic");
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 发送消息
producer.send(destination, "Hello, HornetQ!");
jmsContext.close();
}
}
(2)接收者
HornetQ 默认已将消息持久化存储,因此不需要在接收者端进行特定的配置,继续使用上一节中的 Subscriber 类即可。
- 集群模式
HornetQ 具有高可用性的特点,可以以集群模式运行,以确保消息的可靠性和高并发性。以下是实现 HornetQ 集群模式的步骤:
(1)拷贝 HornetQ 目录并新建文件夹
将 HornetQ 目录拷贝一份并重命名为 HornetQ2,然后新建一个名为 cluster 的文件夹,并将 HornetQ2 目录下的 data 目录、log 目录、tmp 目录等文件夹全部复制到 cluster 文件夹下。
(2)修改配置文件
在 HornetQ 目录下的 examples/configs/clustered 配置文件夹中,将 hq-configuration.xml 文件及 server0 和 server1 文件夹复制到 HornetQ2 目录中,并按照下面的方式修改 server0 文件夹中的 hornetq-configuration.xml 文件:
(a)将节点名修改为 server0
(b)将 cluster-connections 中的 server-username 和 server-password 修改为"guest"
(c)修改 connector 地址为本机 IP 地址,如 192.168.1.1
(d)将 jms-configuration 下的 use-ha 设为 true
如下所示:
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<cluster-password>guest</cluster-password>
<paging-directory>${data.dir:../data}/paging</paging-directory>
<bindings-directory>${data.dir:../data}/bindings</bindings-directory>
<journal-directory>${data.dir:../data}/journal</journal-directory>
<large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
<journal-type>NIO</journal-type>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-file-size>10240</journal-file-size>
<journal-buffer-timeout>28000</journal-buffer-timeout>
<journal-max-io>1</journal-max-io>
<disk-scan-period>5000</disk-scan-period>
<max-disk-usage>90</max-disk-usage>
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>1628000</page-sync-timeout>
<global-max-size>100Mb</global-max-size>
<connectors>
<connector name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="192.168.1.1"/>
<param key="port" value="5445"/>
</connector>
</connectors>
<acceptors>
<acceptor name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="192.168.1.1"/>
<param key="port" value="5545"/>
</acceptor>
</acceptors>
<cluster-connections>
<cluster-connection name="my-cluster">
<address>jms</address>
<connector-ref>netty</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<forward-when-no-consumers>true</forward-when-no-consumers>
<max-hops>1</max-hops>
<discovery-group-ref discovery-group-name="my-discovery-group"/>
<static-connectors>
<connector-ref&
.........................................................