1. ActiveMQ发送和接收消息

生产者示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Producer {
public static void main(String[] args) throws Exception {
// 连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.139.129:61616");
// 创建一条连接
Connection conn = factory.createConnection();
// 启动连接
conn.start();
// 创建会话
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 发送目的地
Destination destination = session.createQueue("TEST.QUEUE");
// 消息生产者
MessageProducer producer = session.createProducer(destination);
// 持久化(MQ重启后消息不会丢失)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 文本内容消息
TextMessage message = session.createTextMessage("=== Hello ActiveMQ ===");
// 发送到目的地
producer.send(message);
// 提交事务
session.commit();
// 关闭连接
conn.close();
}
}

消费者示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Consumer {
public static void main(String[] args) throws Throwable {
// 连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.139.129:61616");
// 创建一条连接
Connection conn = factory.createConnection();
// 启动连接
conn.start();
// 创建会话
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 目的地
Destination destination = session.createQueue("TEST.QUEUE");
// 消息消费者
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
// 接收消息
TextMessage message = (TextMessage) consumer.receive();
// 打印接收到的消息
System.out.println(String.format("[接收消息]: %s", message.getText()));
}
}
}

2. 与Spring整合

pom.xml 配置:

# pom.xml


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<properties>
<activemq.version>5.15.2</activemq.version>
<spring.version>4.3.7.RELEASE</spring.version>
</properties>
<dependencies>
<!-- MQ依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>${activemq.version}</version>
</dependency>
<!-- SPRING依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

Spring提供了一个方便的JmsTemplate来屏蔽发送消息时的JMS细节,但是JmsTemplate在发送每条消息时,都会创建一个新的Connection,Session,MessageProducer实例,当消息发送出去之后再关闭它们。这样效率非常低下。PooledConnectionFactory支持Connection,Session,MessageProducer实例的池化,使得Connection,Session,MessageProducer实例在使用完之后可以返回池中,以便以后可以重复使用它们。使用PooledConnectionFactory,需要添加activemq-pool依赖包的支持。

生产者配置:

# spring-activemq-producer.xml


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 连接工厂 -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${mq.brokerURL}"/>
</bean>
<!-- 连接池工厂 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="activeMQConnectionFactory"/>
<property name="maxConnections" value="${mq.maxConnections}"/>
<property name="maximumActiveSessionPerConnection" value="${mq.maximumActiveSessionPerConnection}"/>
<property name="idleTimeout" value="${mq.idleTimeout}"/>
</bean>
<!-- 消息模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
</bean>
<!-- 消息队列 -->
<bean id="mqQueue1" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="TEST.QUEUE1" />
</bean>
<bean id="mqQueue2" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="TEST.QUEUE2" />
</bean>
</beans>

# activemq.properties


1
2
3
4
5
6
7
8
# 链接地址, 采用故障转移策略
mq.brokerURL = failover:(tcp://192.168.139.129:61616,tcp://192.168.139.130:61616)?randomize=false&timeout=3000
# 最大的连接数, 默认值是1, 可以适当调大一些
mq.maxConnections = 100
# 连接的空闲时间, 超时则回收, 默认是30秒, 可以适当调大一些, 提高连接的重用时间
mq.idleTimeout = 180000
# 每个连接最大的会话数, 默认是500, 可以适当调小一些
mq.maximumActiveSessionPerConnection = 300

故障转移配置语法:

1
failover:(uri1,...,uriN)?transportOptions&nestedURIOptions

或者:

1
failover:uri1,...,uriN

如果某台消息服务器宕机等异常情况导致uri1地址失效,ActiveMQ的故障转移机制会从配置的地址列表中选取另外一个地址进行重连。需要注意的是,ActiveMQ并不是按照配置的地址列表来选取地址的,而是随机进行选取的。如果想要关闭随机选取的机制,可以添加randomize=false参数。想要了解更多的配置信息可以链接到官网查阅 failover-transport-reference

Spring 配置:

# spring-context.xml


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config/>
<context:component-scan base-package="org.fanlychie"/>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:activemq.properties</value>
</list>
</property>
</bean>
<import resource="spring-activemq-producer.xml"/>
</beans>

消息生产者:

# MessageProducer.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Component
public class MessageProducer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
@Qualifier("mqQueue1")
private Destination mqQueue1;
@Autowired
@Qualifier("mqQueue2")
private Destination mqQueue2;
public void sendQueue1Message(final String message) {
jmsTemplate.send(mqQueue1, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
System.out.println("<=== [TEST.QUEUE1] 发送消息:" + message);
return session.createTextMessage(message);
}
});
}
public void sendQueue2Message(final String message) {
jmsTemplate.send(mqQueue2, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
System.out.println("<=== [TEST.QUEUE2] 发送消息:" + message);
return session.createTextMessage(message);
}
});
}
}

Junit单元测试:

# TestMessageProducer.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/spring-context.xml")
public class TestMessageProducer {
@Autowired
private MessageProducer messageProducer;
@Test
public void testSendQueue1Message() {
messageProducer.sendQueue1Message("=== QUEUE 1 ===");
}
@Test
public void testSendQueue2Message() {
messageProducer.sendQueue2Message("=== QUEUE 2 ===");
}
}

消费者配置:

# spring-activemq-consumer.xml


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 连接工厂 -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${mq.brokerURL}"/>
</bean>
<!-- 连接池工厂 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="activeMQConnectionFactory"/>
<property name="maxConnections" value="${mq.maxConnections}"/>
<property name="maximumActiveSessionPerConnection" value="${mq.maximumActiveSessionPerConnection}"/>
<property name="idleTimeout" value="${mq.idleTimeout}"/>
</bean>
<!-- 消息队列 -->
<bean id="mqQueue1" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="TEST.QUEUE1"/>
</bean>
<bean id="mqQueue2" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="TEST.QUEUE2" />
</bean>
<!-- 消息队列监听器 -->
<bean id="mqQueue1MessageConsumerListener" class="org.fanlychie.mq.MqQueue1MessageConsumerListener"/>
<bean id="mqQueue2MessageConsumerListener" class="org.fanlychie.mq.MqQueue2MessageConsumerListener"/>
<!-- 消息监听容器 -->
<bean id="mqQueue1MessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<property name="destination" ref="mqQueue1"/>
<property name="messageListener" ref="mqQueue1MessageConsumerListener"/>
<!--
<property name="concurrency" value="5-10"/>
-->
</bean>
<bean id="mqQueue2MessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<property name="destination" ref="mqQueue2"/>
<property name="messageListener" ref="mqQueue2MessageConsumerListener"/>
</bean>
</beans>

<property name=”concurrency” value=”5-10”/>

concurrency属性可以用来设定消息监听器中的消费者数量的上限和下限。例如“5-10”意味着消息监听器初始会维护最小的5个消费者线程,在系统负载不断增加的情况下,最终的消费者数量会增长至最大数值10。并且当负载降低后,现有的消费者的数量也不会再主动减少。

# activemq.properties


1
2
3
4
5
6
7
8
# 链接地址, 采用故障转移策略
mq.brokerURL = failover:(tcp://192.168.139.129:61616,tcp://192.168.139.130:61616)?randomize=false&timeout=3000
# 最大的连接数, 默认值是1, 可以适当调大一些
mq.maxConnections = 100
# 连接的空闲时间, 超时则回收, 默认是30秒, 可以适当调大一些, 提高连接的重用时间
mq.idleTimeout = 180000
# 每个连接最大的会话数, 默认是500, 可以适当调小一些
mq.maximumActiveSessionPerConnection = 300

Spring 配置:

# spring-context.xml


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config/>
<context:component-scan base-package="org.fanlychie"/>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:activemq.properties</value>
</list>
</property>
</bean>
<import resource="spring-activemq-consumer.xml"/>
</beans>

消息消费监听器:

# MqQueue1MessageConsumerListener.java


1
2
3
4
5
6
7
8
9
10
11
12
13
public class MqQueue1MessageConsumerListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("===> [TEST.QUEUE1] 接收消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}

# MqQueue2MessageConsumerListener.java


1
2
3
4
5
6
7
8
9
10
11
12
13
public class MqQueue2MessageConsumerListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("===> [TEST.QUEUE2] 接收消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}

测试示例:

1
2
3
4
5
6
7
public class TestMessageConsumerListener {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("/spring-context.xml");
}
}

3. 基于注解方式的消费者

# spring-activemq-consumer.xml


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd">
<!-- 连接工厂 -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${mq.brokerURL}"/>
</bean>
<!-- 连接池工厂 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="activeMQConnectionFactory"/>
<property name="maxConnections" value="${mq.maxConnections}"/>
<property name="maximumActiveSessionPerConnection" value="${mq.maximumActiveSessionPerConnection}"/>
<property name="idleTimeout" value="${mq.idleTimeout}"/>
</bean>
<!-- 消息监听容器工厂 -->
<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
</bean>
<!-- 开启JMS注解 -->
<jms:annotation-driven container-factory="jmsListenerContainerFactory"/>
</beans>

消息消费监听器:

# MessageConsumerListener.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class MessageConsumerListener {
@JmsListener(destination = "TEST.QUEUE1")
public void mqQueue1MessageConsumer(String message) {
System.out.println("===> [TEST.QUEUE1] 接收消息:" + message);
}
@JmsListener(destination = "TEST.QUEUE2")
public void mqQueue2MessageConsumer(String message) {
System.out.println("===> [TEST.QUEUE2] 接收消息:" + message);
}
}

示例项目开发环境:Java-8、Maven-3、IntelliJ IDEA-2017、Spring-4.7、ActiveMQ-5.15.2
完整示例项目链接:activemq-samples