JMS是一个消息中间件标准接口,它定义了一个使用消息代理的公共API。与JDBC为关系数据库操作提供公共接口的方式相似。JMS为每个消息代理提供了公共接口,解决了JMS出现前每个消息代理都有专用API移植性差的痛点。
JMS主要有两种消息模型:
- 点对点(P2P):消息发送到Queue,每个消息只能被一个消费者接收
- 发布订阅(Pub/Sub):消息发布到Topic,多个订阅者都能收到消息
常见的JMS实现有如下几种:ActiveMQ,RabbitMQ,Kafka,IBM MQ等。
Spring通过JmsTemplate的基于模版的抽象来支持JMS,使用JmsTemplate,很容易从生产者端跨队列和主题发送消息,并在消费者端接收这些消息。
JmsTemplate是Spring JMS集成支持的核心。和其他面向模版的组件非常相似,JmsTemlate消除了大量与JMS协同工作所需的样板代码。如果没有JmsTemplate,那么创建与消息代理的连接和绘画或者各种异常都需要自己来处理。使用JmsTemplate将专注于真正想做的事情:发送消息。
接下来将介绍集成activemq的具体操作。
1.添加依赖
<!-- JMS 基础支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jms</artifactId>
</dependency>
<!-- ActiveMQ 客户端 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.18.4</version> <!-- 兼容 Spring Boot 3.x 的稳定版本 -->
</dependency>
<!-- 可选:JMS 连接池(提升性能) -->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
<version>1.3.9</version>
</dependency>2.配置ActiveMQ的连接参数
spring.jms.pub-sub-domain=false
spring.jms.listener.acknowledge-mode=AUTO
spring.jms.template.default-destination=my-queue
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=103.根据需要配置JMS连接工厂
import jakarta.jms.ConnectionFactory;
import jakarta.jms.DeliveryMode;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;
@Configuration
public class JmsConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String username;
@Value("${spring.activemq.password}")
private String password;
@Bean
public ConnectionFactory connectionFactory(){
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, brokerUrl);;
factory.setTrustAllPackages(true);
factory.setBrokerURL(brokerUrl);
factory.setUserName(username);
factory.setPassword(password);
// PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(factory);
//pooledConnectionFactory.setMaxConnection(10);
return factory;
}
//配置JmsTemplate
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
//启用消息持久化 防止消息丢失
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
return jmsTemplate;
}
}
二、创建消息生产者示例
在JmsTemplate的类中发送原始消息的方法是有三个,分别如下:
发送转换自对象和经过处理后从对象转换而来的消息的方法如下:
核心就是send()和convertAndSend(),通过重载生成了不同的方法。
创建消息生产者,使用JmsTemplate发送文本消息、对象消息等。也可以自定义消息。
发送文本消息的示例如下:
import jakarta.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import com.demo.bean.UserMessage;
@Service
public class JmsMessageProducer {
@Autowired
private JmsTemplate jmsTemplate;
public void sendTextMessage(String queueName,String message){
//方式1:使用convertAndSend 直接转换为对象发送消息
jmsTemplate.convertAndSend(queueName,message);
jmsTemplate.send(queueName,session -> {
TextMessage textMessage = session.createTextMessage();
textMessage.setText(message);
textMessage.setStringProperty("sender", "system");
return textMessage;
}
);
}
public void sendObjectMessage(String queueName, UserMessage message){
//自动将对象转换为ObjectMessage
jmsTemplate.convertAndSend(queueName,message);
}
}
接下来是自定义对象,该对象必须实现Serializable接口:
//自定义消息对象 必须实现Serizable
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserMessage implements Serializable {
private String username;
private String content;
}具体的使用在上面的代码中已经体现,不再做赘述。
三、消息消费者示例
在JmsTemplate中接收原始消息的方法有3个,分别如下:
与发送消息类似,接收消息也有对应的将消息转换为域类型的方法,如下:
主要要使用@JmsListener注解来监听队列,以便于接收消息。
import jakarta.jms.JMSException;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import lombok.extern.java.Log;
import org.apache.activemq.Message;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import com.demo.bean.UserMessage;
@Log
@Service
public class JmsMessageConsumer {
//监听指定队列的文本消息
@JmsListener(destination = "my-queue")
public void receiveTextMessage(String message){
log.info(message);
}
//监听对象消息
@JmsListener(destination = "user-queue")
public void receiveObjectMessage(UserMessage message){
log.info(message.getContent());
}
public void receiveWithAcknowledge(Message message, Session session) throws JMSException {
try {
String text = ((TextMessage) message).getText();
log.info("处理消息:"+text);
//手动确认消息
message.acknowledge();
}catch (Exception e) {
//处理失败 回滚消息也就是让消息重新入队
session.recover();
}
}
}
在上述代码中对消息接收、消息手动确认以及消息处理失败等情况都做了处理。
使用JMS切换其它中间件非常简单,比如换成RabbitMQ和Kafka只需要修改依赖和配置,业务代码基本是不需要修改的。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>需要特别注意的有以下几点:
- 消息序列化
发送消息对象时,对象必须实现Serializable,否则的话会报序列化异常
- 连接池配置
实际使用过程中建议启用连接池,来提升性能
- 消息确认机制
- Auto:默认模式。消息处理成功后会自动进行确认
- CLIENT_ACKNOWLEDGE:需要手动调用message.acknowledge()确认
- Topic和Queue切换
通过修改配置文件中的spring.jms.pub-sub-domain的配置可以进行修改,其中true=Topic,false =Queue
