用输出倒逼输入day3:在Spring中使用JMS发送消息

JMS是一个消息中间件标准接口,它定义了一个使用消息代理的公共API。与JDBC为关系数据库操作提供公共接口的方式相似。JMS为每个消息代理提供了公共接口,解决了JMS出现前每个消息代理都有专用API移植性差的痛点。

JMS主要有两种消息模型:

  • 点对点(P2P):消息发送到Queue,每个消息只能被一个消费者接收
  • 发布订阅(Pub/Sub):消息发布到Topic,多个订阅者都能收到消息

常见的JMS实现有如下几种:ActiveMQ,RabbitMQ,Kafka,IBM MQ等。

Spring通过JmsTemplate的基于模版的抽象来支持JMS,使用JmsTemplate,很容易从生产者端跨队列和主题发送消息,并在消费者端接收这些消息。

JmsTemplate是Spring JMS集成支持的核心。和其他面向模版的组件非常相似,JmsTemlate消除了大量与JMS协同工作所需的样板代码。如果没有JmsTemplate,那么创建与消息代理的连接和绘画或者各种异常都需要自己来处理。使用JmsTemplate将专注于真正想做的事情:发送消息

接下来将介绍集成activemq的具体操作。

1.添加依赖

<!-- JMS 基础支持 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jms</artifactId>
</dependency>

<!-- ActiveMQ 客户端 -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.18.4</version> <!-- 兼容 Spring Boot 3.x 的稳定版本 -->
</dependency>

<!-- 可选:JMS 连接池(提升性能) -->
<dependency>
    <groupId>org.messaginghub</groupId>
    <artifactId>pooled-jms</artifactId>
    <version>1.3.9</version>
</dependency>

2.配置ActiveMQ的连接参数

spring.jms.pub-sub-domain=false 
spring.jms.listener.acknowledge-mode=AUTO
spring.jms.template.default-destination=my-queue

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=10

3.根据需要配置JMS连接工厂


import jakarta.jms.ConnectionFactory;
import jakarta.jms.DeliveryMode;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class JmsConfig {
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String username;
    @Value("${spring.activemq.password}")
    private String password;
    @Bean
    public ConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, brokerUrl);;
        factory.setTrustAllPackages(true);
        factory.setBrokerURL(brokerUrl);
        factory.setUserName(username);
        factory.setPassword(password);
       // PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(factory);
        //pooledConnectionFactory.setMaxConnection(10);
        return factory;
    }

    //配置JmsTemplate
    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        //启用消息持久化 防止消息丢失
        jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
        return jmsTemplate;
    }

}

二、创建消息生产者示例

在JmsTemplate的类中发送原始消息的方法是有三个,分别如下:

发送转换自对象和经过处理后从对象转换而来的消息的方法如下:

核心就是send()和convertAndSend(),通过重载生成了不同的方法。

创建消息生产者,使用JmsTemplate发送文本消息、对象消息等。也可以自定义消息。

发送文本消息的示例如下:



import jakarta.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import com.demo.bean.UserMessage;

@Service
public class JmsMessageProducer {
    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendTextMessage(String queueName,String message){
        //方式1:使用convertAndSend 直接转换为对象发送消息
        jmsTemplate.convertAndSend(queueName,message);
        jmsTemplate.send(queueName,session -> {
                    TextMessage textMessage = session.createTextMessage();
                    textMessage.setText(message);
                    textMessage.setStringProperty("sender", "system");
                    return textMessage;
        }
        );
    }

    public void sendObjectMessage(String queueName, UserMessage message){
        //自动将对象转换为ObjectMessage
        jmsTemplate.convertAndSend(queueName,message);
    }


}

接下来是自定义对象,该对象必须实现Serializable接口:


//自定义消息对象 必须实现Serizable
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserMessage implements Serializable {

    private String username;
    private String content;
}

具体的使用在上面的代码中已经体现,不再做赘述。

三、消息消费者示例

在JmsTemplate中接收原始消息的方法有3个,分别如下:

与发送消息类似,接收消息也有对应的将消息转换为域类型的方法,如下:

主要要使用@JmsListener注解来监听队列,以便于接收消息。


import jakarta.jms.JMSException;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import lombok.extern.java.Log;
import org.apache.activemq.Message;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import com.demo.bean.UserMessage;

@Log
@Service
public class JmsMessageConsumer {
    //监听指定队列的文本消息
    @JmsListener(destination = "my-queue")
    public void receiveTextMessage(String message){
        log.info(message);
    }

    //监听对象消息
    @JmsListener(destination = "user-queue")
    public void receiveObjectMessage(UserMessage message){
        log.info(message.getContent());
    }

    public void receiveWithAcknowledge(Message message, Session session) throws JMSException {

        try {
            String text = ((TextMessage) message).getText();
            log.info("处理消息:"+text);
            //手动确认消息
            message.acknowledge();
        }catch (Exception e) {
            //处理失败 回滚消息也就是让消息重新入队
            session.recover();
        }
    }

}

在上述代码中对消息接收、消息手动确认以及消息处理失败等情况都做了处理。

使用JMS切换其它中间件非常简单,比如换成RabbitMQ和Kafka只需要修改依赖和配置,业务代码基本是不需要修改的。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

需要特别注意的有以下几点:

  1. 消息序列化

发送消息对象时,对象必须实现Serializable,否则的话会报序列化异常

  1. 连接池配置

实际使用过程中建议启用连接池,来提升性能

  1. 消息确认机制
  • Auto:默认模式。消息处理成功后会自动进行确认
  • CLIENT_ACKNOWLEDGE:需要手动调用message.acknowledge()确认
  1. Topic和Queue切换

通过修改配置文件中的spring.jms.pub-sub-domain的配置可以进行修改,其中true=Topic,false =Queue

原文链接:,转发请注明来源!