ActiveMQ主题模式传递消息

订阅发布方式传递消息:Topic ,就类似微信公众号一样,可以多个订阅者接收一样的消息

补充:由于topic传递消息的特点是,一个生产者发送给多个消费者,生产者生产的消息在没有被消费者消费之前,并不会将消息持久化到activemq的服务端,发送的消息会自动消失。所以 测试的时候需要先创建消费者对象,然后再发送消息,防止消息丢失。

生产者实现步骤:

步骤和PTP的方式完全一样,不同的是在创建Destination对象的时候,需要创建topic对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class  {
private static final String url = "tcp://127.0.0.1:61616";
private static final String topicName="topic-test";

public static void main(String[] args) {
//1.创建ConnectionFactory
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
try {
//2.创建Connection
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标(注意:此处变成了Topic)
Destination destination = session.createTopic(topicName);
//6.创建一个生产者
MessageProducer producer = session.createProducer(destination);

for (int i = 0; i < 100; ++i) {
//7.创建消息
TextMessage textMessage = session.createTextMessage("test" + i);
//8.发布消息
producer.send(textMessage);

System.out.println("发送消息" + textMessage.getText());
}
//9.关闭连接
connection.close();
}catch (JMSException e)
{
e.printStackTrace();
}
}
}

消费者实现的步骤:

步骤和PTP消费者实现的步骤一样,唯一不同的是在创建Destination对象的时候,创建topic对象,同时要和发布订阅的生产者的topic一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class AppConsumer {
private static final String url = "tcp://127.0.0.1:61616";
private static final String topicName="topic-test";

public static void main(String[] args) {
//1.创建ConnectionFactory
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
try {
//2.创建Connection
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createTopic(topicName);
//6.创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);
//7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接受消息"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8.关闭连接(上面的监听是异步的,如果关闭则监听就停止了)
//connection.close();
}catch (JMSException e)
{
e.printStackTrace();
}
}
}