`
haoran_10
  • 浏览: 436254 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

消息中间件(2)-ActiveMq & Spring 技术集成

阅读更多
  • 一、为什么需要spring + activeMq 集成?

1、原生的JMS使用起来,太繁琐,需要封装很多层才能在正式代码中使用,

2、activemq一套开源的JMS实现方案,实现了服务端和客户端,开箱即用

3、spring一整套组件,直接拿来主义

 

  • 二、实现步骤

1、开启activemq服务端

直接从官网下载 http://activemq.apache.org/,开箱即用的东西,略过。

 

2、客户端调用所需要的关键jar包,使用的maven管理,其他包管理类似

 

<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-jms</artifactId>
	<version>4.0.6.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-core</artifactId>
	<version>5.7.0</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>5.7.0</version>
</dependency>

 

 

实际使用的时候,可以适当调整版本号

 

3、在JMS规范 中,使用JMS几个步骤,

   (1)、用JNDI 得到ConnectionFactory对象;

    (2)、用JNDI 得到目标队列或主题对象,即Destination对象;

    (3)、用ConnectionFactory创建Connection 对象;

    (4)、用Connection对象创建一个或多个JMS Session;

    (5)、用Session 和Destination 创建MessageProducer和MessageConsumer;

    (6)、通知Connection 开始传递消息。

而activemq使用的步骤也是大同小异

 

   3.1、建立ConnectionFactory以及spring高度封装的JmsTemplate

     

@Bean
public PooledConnectionFactory PooledConnectionFactory(){
	ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
	connectionFactory.setBrokerURL(MQ_URL);//配置MQ_URL,例如tcp://localhost:61616
	
	PooledConnectionFactory PooledConnectionFactory = new PooledConnectionFactory();
	PooledConnectionFactory.setConnectionFactory(connectionFactory);
	
	return PooledConnectionFactory;
}

@Bean
public JmsTemplate JmsTemplate(){
	JmsTemplate JmsTemplate = new JmsTemplate();
	JmsTemplate.setConnectionFactory(PooledConnectionFactory());
	
	return JmsTemplate;
}

  3.2 建立Queue或者Topic

 

 

@Bean(name="queue1")
public Destination queue1Destination(){
	Destination destination = new ActiveMQQueue(queue1);//队列 定义queue1名称即可
       //Destination destination = new ActiveMQTopic(queue1);//主题
	
	return destination;
}

   

 

   第3,4步直接省略,JmsTemplate已经做好了

   3.3 发送消息

   

@Component
public class MyMessageSender {
	@Autowired JmsTemplate jmsTemplate;
	@Resource(name="queue1") Destination destination;
	
	public void send(String msg){
		jmsTemplate.send(destination, new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage message = session.createTextMessage();
				message.setText(msg);
				return message;
			}
		});
	}
}

  发送消息是如此的简单,制定队列名称,就可以直接发送消息到队列或者主题

 

  3.4消息监听,有了消息发送就得有Consumer处理消息

  

@Component
public class MyMessageListener implements MessageListener{

	@Override
	public void onMessage(Message message) {
		System.out.println("==========Thread:"+Thread.currentThread().getId());
		System.out.println("==========MyMessageListener"+message.toString());
	}

}

   以及支持该消息监听的消息容器

    

@Autowired MyMessageListener messageListener;

@Bean(name="queue1ListenerContainer")
	public DefaultMessageListenerContainer DefaultMessageListenerContainer(){
		DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
		listenerContainer.setConnectionFactory(PooledConnectionFactory());
		listenerContainer.setDestination(queue1Destination());
		listenerContainer.setMessageListener(messageListener);
		listenerContainer.setConcurrentConsumers(2);//可以理解为起两个线程去消费消息
		return listenerContainer;
	}

 

至此主流程开发结束。

 

 

 

1
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics