方案:创建activeMq连接工厂,通过@Bean注入。使用@JmsListener进行监听消费数据。@Qualifier指定Bean发送数据。
支持配置多个Mq,创建不同的Bean即可,案例使用了三个MQ。以第三个说明,其他两个一样的模式。
<!-- activemq mq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.9</version>
</dependency>
<!-- pool 对象池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
<!--消息队列连接池-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.16.0</version>
</dependency>
activemq:
one:
#1
brokerUrl: tcp://ip:61616
user: admin
password: 11111
#true:topic模式,false:queue模式
pub-sub-domain: false
two:
#2
brokerUrl: tcp://ip:61616
user: admin
password: 111111
#true:topic模式,false:queue模式
pub-sub-domain: true
three:
#3
brokerUrl: tcp://ip:61616
user: admin
password: 111111
#true:topic模式,false:queue模式
pub-sub-domain: false
#接收不同mq时用到
msgListener:
oneMqName: 1
twoMqName: 2
threeMqName: 3
4.1 ActiveMqConfigs
以3为例子
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
/**
* @author gyDeBug
* @version 1.0
* @date 2021/3/30 15:41
* @description:ActiveMq多实例配置。
*/
@Configuration
public class ActiveMqConfigs {
/**
* three mq 地址 账号密码注入
* @param brokerUrl
* @param username
* @param password
* @return
*/
@Bean(name = "threeConnectionFactory")
public ActiveMQConnectionFactory threeConnectionFactory(
@Value("${config.activemq.three.brokerUrl}") String brokerUrl,
@Value("${config.activemq.three.user}") String username,
@Value("${config.activemq.three.password}") String password) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL( brokerUrl );
factory.setUserName( username );
factory.setPassword( password );
return factory;
}
/**
* three JmsTemplate生成
*
* @param connectionFactory
* @param pubSubDmain
* @return
*/
@Bean(name = "threeJmsTemplate")
public JmsTemplate threeJmsTemplate(
@Qualifier("threeConnectionFactory") ActiveMQConnectionFactory connectionFactory,
@Value("${config.activemq.three.pub-sub-domain}") boolean pubSubDmain) {
JmsTemplate jmsTemplate = new JmsTemplate( connectionFactory );
jmsTemplate.setPubSubDomain( pubSubDmain );
return jmsTemplate;
}
/**
* three JmsListener工厂生成
*
* @param connectionFactory
* @param pubSubDmain
* @return
*/
@Bean(name = "threeJmsListenerContainerFactory")
public JmsListenerContainerFactory threeJmsListenerContainerFactory(
@Qualifier("threeConnectionFactory") ActiveMQConnectionFactory connectionFactory,
@Value("${config.activemq.three.pub-sub-domain}") boolean pubSubDmain) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory( connectionFactory );
factory.setPubSubDomain( pubSubDmain );
return factory;
}
}
4.2 ActiveMqSendMsgService
生产者发送消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
/**
* @author gyDeBug
* @version 1.0
* @date 2021/3/30 15:49
* @description:
*/
@Slf4j
@Service
public class ActiveMqSendMsgService {
@Autowired
@Qualifier("threeJmsTemplate")//或twoJmsTemplate
private JmsTemplate jmsTemplate;
/**
* @description:消息发送测试,提供MQ
* @author gyDeBug
* @param: * @param null
* @return:
* @date: 2021/3/30 15:50
*/
public void sendMq(String msg) {
try {
jmsTemplate.convertAndSend("test",msg);
log.info( "=========== send ActiveMq message OK============" );
} catch (Exception e) {
log.error("SendMsgController.sendMq()", e);
}
}
}
4.3 QueueListener1
消息监听
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Slf4j
@Component
public class QueueListener1 {
/**
*@Description: three MQ 接收方法
*@Param:
*@return:
*@Author: gyDeBug
*@date: 2021/9/30
**/
@JmsListener(destination = "${config.msgListener.threeMqName}" ,containerFactory = "threeJmsListenerContainerFactory")
public void receiveQueueThree(String msg)throws Exception {
if (StringUtils.isEmpty(msg)) {
System.out.println("=========");
}
else{
log.info("监听到的信息:"+msg);
}
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容