`
yinbinhome
  • 浏览: 120613 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类

activemq+spring 持久化发送消息

    博客分类:
  • java
 
阅读更多

最近的一个项目里,访问压力过大,需要异步处理一些不需要即使处理的请求。

于是考虑用JMS,开始考察了几个jms服务,显示看了openJMS,实验了一下,发现他和spring的结合不是很好,而且只支持JMS 1.02,已经很长时间没有更新了,就不考虑使用了。

后来转到了activemq。

在网上找了好几天资料,最后整理一下,还算比较全面的,大家可以参考一下。

1、先是下载activeMq。

2、由于默认的activemq是日志文件的持久订阅,需要修改activemq的配置文件才能持久化到数据库里,在activemq的conf目录下的activemq.xml,找到

 

        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"/>
        </persistenceAdapter>

 

 把它注释掉换成

 

	<persistenceAdapter>
		 <jdbcPersistenceAdapter dataSource="#oracle-ds"/>
	</persistenceAdapter>

 

 还要在activemq.xml的<bean></beans>里面增加数据库的配置,并且把oracle的驱动jar包,复制到activemq下的lib目录里,由于我使用的是oracle我的配置如下:

 

 

    <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
		  <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
		  <property name="url" value="jdbc:oracle:thin:@10.60.30.31:1521:orcl"/>
		  <property name="username" value="activemq"/>
		  <property name="password" value="activemq"/>
		  <property name="maxActive" value="200"/>
		  <property name="poolPreparedStatements" value="true"/>
	</bean>

 

 到此你的activemq发送消息就可以持久化了(注:前提是你在spring里配置activemq时要配置消息的持久化)

 

3、接下来是spring+activemq 使用的jar包,都在activemq目录下的lib里

 

4、下来是具体一些配置文件和类文件

 

发送消息类

 

 

/**
 * message sender
 * @description <p></p>
 * @author quzishen
 * @project NormandyPositionII
 * @class MessageSender.java
 * @version 1.0
 * @time 2011-1-11
 */
public class MessageSender {
	// ~~~ jmsTemplate
	public JmsTemplate jmsTemplate;
	
	/**
	 * send message
	 */
	public void sendMessage(){
		jmsTemplate.convertAndSend("hello jms!");
	}
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
}
 

 

接收消息类

 

 

/**
 * message receiver
 * @description <p></p>
 * @author quzishen
 * @project NormandyPositionII
 * @class MessageReceiver.java
 * @version 1.0
 * @time 2011-1-11
 */
public class MessageReceiver implements MessageListener {
	/* (non-Javadoc)
	 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
	 */
	public void onMessage(Message m) {
		System.out.println("[receive message]");
		
		ObjectMessage om = (ObjectMessage) m;
		try {
			String key1 = om.getStringProperty("key1");
			
			System.out.println(key1);
			
			System.out.println("model:"+om.getJMSDeliveryMode());
			System.out.println("destination:"+om.getJMSDestination());
			System.out.println("type:"+om.getJMSType());
			System.out.println("messageId:"+om.getJMSMessageID());
			System.out.println("time:"+om.getJMSTimestamp());
			System.out.println("expiredTime:"+om.getJMSExpiration());
			System.out.println("priority:"+om.getJMSPriority());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}

 

 

在发送消息和接收消息前可以做一些自定的处理,就是这个类

 

 

/**
 * message converter
 * @description <p></p>
 * @author quzishen
 * @project NormandyPositionII
 * @class MessageConvertForSys.java
 * @version 1.0
 * @time 2011-1-11
 */
public class MessageConvertForSys implements MessageConverter {
	/* (non-Javadoc)
	 * @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object, javax.jms.Session)
	 */
	public Message toMessage(Object object, Session session)
			throws JMSException, MessageConversionException {
		System.out.println("[toMessage]");
		ObjectMessage objectMessage = session.createObjectMessage();
		
		objectMessage.setJMSExpiration(1000);
		objectMessage.setStringProperty("key1", object+"_add");
		
		return objectMessage;
	}
	/* (non-Javadoc)
	 * @see org.springframework.jms.support.converter.MessageConverter#fromMessage(javax.jms.Message)
	 */
	public Object fromMessage(Message message) throws JMSException,
			MessageConversionException {
		System.out.println("[fromMessage]");
		ObjectMessage objectMessage = (ObjectMessage) message;
		
		return objectMessage.getObjectProperty("key1");
	}
}
 

 

第一种,PTP方式的配置:

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 
	http://www.springframework.org/schema/beans/spring-beans-2.5.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context-2.5.xsd "
	default-autowire="byName">
	
	<!-- JMS PTP MODEL -->
	<!-- PTP链接工厂 -->
	<bean id="queueConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://127.0.0.1:61616" />
		<!-- <property name="brokerURL" value="vm://normandy.notify" /> -->
		<property name="useAsyncSend" value="true" />
	</bean>
	<!-- 定义消息队列 -->
	<bean id="dest" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="queueDest" />
	</bean>
	<!-- PTP jms模板 -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="queueConnectionFactory"></property>
		<property name="defaultDestination" ref="dest" />
		<property name="messageConverter" ref="messageConvertForSys" />
		<property name="pubSubDomain" value="false" />
	</bean>
	
	<!-- 消息转换器 -->
	<bean id="messageConvertForSys" class="com.normandy.tech.test.MessageConvertForSys"></bean>
	
	<!-- 消息发送方 -->
	<bean id="messageSender" class="com.normandy.tech.test.MessageSender"></bean>
	<!-- 消息接收方 -->
	<bean id="messageReceiver" class="com.normandy.tech.test.MessageReceiver"></bean>
	
	<!-- 消息监听容器 -->
	<bean id="listenerContainer"  
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="queueConnectionFactory" />  
        <property name="destination" ref="dest" />  
        <property name="messageListener" ref="messageReceiver" />  
    </bean>
</beans>
 

 

 

第二种:PUB/SUB方式的配置

我们配置两个消息订阅者,分别订阅不同的消息,这样用于判断是否成功执行了消息的发布和消息的订阅

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 
	http://www.springframework.org/schema/beans/spring-beans-2.5.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context-2.5.xsd "
	default-autowire="byName">
	
    <!-- JMS TOPIC MODEL -->
    <!-- TOPIC链接工厂 -->
    <bean id="topicSendConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://127.0.0.1:61616" />
		<property name="useAsyncSend" value="true" />
	</bean>
	
	<bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://127.0.0.1:61616" />
	</bean>
	
	<!-- 定义主题 -->
	<bean id="myTopic"  class="org.apache.activemq.command.ActiveMQTopic">
      <constructor-arg value="normandy.topic"/>
   	</bean>
   	
   	<bean id="myTopic2"  class="org.apache.activemq.command.ActiveMQTopic">
      <constructor-arg value="normandy.topic2"/>
   	</bean>
   	
   	<!-- 消息转换器 -->
	<bean id="messageConvertForSys" class="com.normandy.tech.test.MessageConvertForSys"></bean>
   	
   	<!-- TOPIC send jms模板 -->
	<bean id="topicSendJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="topicSendConnectionFactory"></property>
		<property name="defaultDestination" ref="myTopic" />
		<property name="messageConverter" ref="messageConvertForSys" />
		<!-- 开启订阅模式 -->
		<property name="pubSubDomain" value="true"/>
	</bean>
	
	<!-- 消息发送方 -->
	<bean id="topicMessageSender" class="com.normandy.tech.test.MessageSender">
		<property name="jmsTemplate" ref="topicSendJmsTemplate"></property>
	</bean>
	
	<!-- 消息接收方 -->
	<bean id="topicMessageReceiver" class="com.normandy.tech.test.MessageReceiver">
	</bean>
	
	<!-- 主题消息监听容器 -->
	<bean id="listenerContainer"  
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="topicListenConnectionFactory" />  
        <property name="pubSubDomain" value="true"/><!-- default is false -->
        <property name="destination" ref="myTopic" />  <!-- listen topic: myTopic -->
        <property name="subscriptionDurable" value="true"/>
        <property name="clientId" value="clientId_001"/><!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,知道被这个ID的客户端消费掉-->
        <property name="messageListener" ref="topicMessageReceiver" />  
    </bean>
    
    <!-- 主题消息监听容器2 -->
    <bean id="listenerContainer2"  
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="topicListenConnectionFactory" />  
        <property name="pubSubDomain" value="true"/><!-- default is false -->
        <property name="destination" ref="myTopic2" />  <!-- listen topic: myTopic2 -->
        <property name="subscriptionDurable" value="true"/>
        <property name="clientId" value="clientId_002"/>!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,知道被这个ID的客户端消费掉-->
        <property name="messageListener" ref="topicMessageReceiver" />  
    </bean>
</beans>

 

测试一下是否能发送和接收消息,我是在main方法里测试的

 

	public static void main(String[] args) throws HttpException, IOException {
		System.out.println("初始化spring!准备开始接收!");
		ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-jms-topic-receiver.xml");
		MessageSender t=(MessageSender) context.getBean("topicMessageSender");
		t.sendMessage();
	}

 

 

 

 

 

Activemq支持两种消息传送模式:PERSISTENT (持久消息)和 NON_PERSISTENT(非持久消息)

从字面意思就可以了解,这是两种正好相反的模式。

1、PERSISTENT 持久消息

是activemq默认的传送方式,此方式下的消息在配合activemq.xml中配置的消息存储方式,会被存储在特定的地方,直到有消费者将消息消费或者消息过期进入DLQ队列,消息生命周期才会结束。

此模式下可以保证消息只会被成功传送一次和成功使用一次,消息具有可靠性。在消息传递到目标消费者,在消费者没有成功应答前,消息不会丢失。所以很自然的,需要一个地方来持久性存储。

如果消息消费者在进行消费过程发生失败,则消息会被再次投递。


2、NON_PERSISTENT 非持久消息

非持久的消息适用于不重要的,可以接受消息丢失的哪一类消息,这种消息只会被投递一次,消息不会在持久性存储中存储,也不会保证消息丢失后的重新投递。



在spring提供的JmsTemplate中,同样提供了针对于当前功能的配置选项:

 

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="cachingConnectionFactory"></property>
		<property name="defaultDestination" ref="dest" />
		<property name="messageConverter" ref="messageConverter" />
		<property name="pubSubDomain" value="false" />
		<property name="explicitQosEnabled" value="true" /> <!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false-->
		<property name="deliveryMode" value="1" /> <!-- 发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
	</bean>
 

 

消息的签收模式:

客户端成功接收一条消息的标志是一条消息被签收,成功应答。

消息的签收情形分两种:

1、带事务的session

 如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。

2、不带事务的session

 不带事务的session的签收方式,取决于session的配置。

Activemq支持一下三种模式:

  Session.AUTO_ACKNOWLEDGE  消息自动签收
  Session.CLIENT_ACKNOWLEDGE  客户端调用acknowledge方法手动签收
  Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送。在第二次重新传递消息的时候,消息头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。

spring提供的JmsTemplate中的配置方式:

 

 

<!-- PTP jms模板 -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="cachingConnectionFactory"></property>
		<property name="defaultDestination" ref="dest" />
		<property name="messageConverter" ref="messageConverter" />
		<property name="pubSubDomain" value="false" />
		<property name="sessionAcknowledgeMode" value="1" /> 
<!-- 消息应答方式
		Session.AUTO_ACKNOWLEDGE  消息自动签收
		Session.CLIENT_ACKNOWLEDGE  客户端调用acknowledge方法手动签收
		Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
		-->
	</bean>
 

 

最后补充一下,在复制代码的时候,就是xml代码,一定要保证你项目的其他spring的xml的命名空间和你复制的新建的xml(我的新建spring配置activemq的名是:applicationContext-jms-topic.xml)命名空间保持一致,不然会出问题。我就在这挣扎了两天。希望大家注意。

就是这个地方:

 

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
 

 

 

以上资料来自:

http://blog.csdn.net/quzishen/article/details/6128781

http://blog.csdn.net/quzishen/article/details/6131222

 

 

 

分享到:
评论
2 楼 javatozhang 2014-09-17  
写的不错嘛
1 楼 sunaijia 2014-03-20  
难得的好文章

相关推荐

    spring+activemq topic持久化订阅

    spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...

    activeMQ mysql 持久化

    NULL 博文链接:https://showlike.iteye.com/blog/2000117

    spring集成activemq演示queue和topic 持久化

    本人在学习activemq,然后 测试完成的demo, 包含了queue,topic,持久化到mysql,订阅模式,包好用

    消息队列中间件ActiveMQ入门到精通视频教程及资料

    003-顺序消费+消息过滤SELECTOR+MessageConsumer+MySql持久化;004-p2p模式+pulish-subscribe发布订阅模式+与spring集成;005-集群部署1;006-集群部署2;007-集群部署3;activemq集群配置文档.pdf;ActiveMQ(中文)...

    activemq新手大全

    一、JMS基本概念 二、activemq介绍及安装 1、消息中间件简介 2、activemq 2.1、activemq简介 2.2、activemq下载 2.3、运行activemq服务 2.4、测试 2.5、监控 3、activemq特性 ...5.3 activemq 持久化机制

    activemq-demo

    集成activemq和spring 的消息推送 包含普通队列消息和持久化订阅消息

    采用Spring整合activeMQ与quartz的JMS数据同步实例

    支持持久化的采用Spring整合activeMQ与quartz的JMS数据同步实例,包含依赖的jar包

    apache-activemq-5.11.2

    ⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务) ⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性 ⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,...

    实战ActiveMQ集群与应用视频教程.zip

    网盘文件永久链接 1:ActiveMQ入门和消息中间件 2:JMS基本概念和模型 3:JMS的可靠性机制 4:JMS的API结构和开发步骤 ...8:ActiveMQ消息存储持久化 9:ActiveMQ的静态网络链接 10:多线程consumer访问集群 ..........

    ActiveMQ.chm

    activeMQ集成流程: activceMQ从了解过程,安装,消息发送模式的讲解,从点对点模式到订阅模式,及深入了解JMS,再到集成到spring,.消息的持久化,高级特性,应用场景,面试常用问题

    ActiveMQ.rar

    包括:独有消费者、消息异步分发、消息优先级、管理持久化消息、消息分组、 消息选择器、消息重递策略、慢消费者处理等 n 十三:杂项技术 包括:监控和管理Broker、集成ActiveMQ和Tomcat、什么时候使用ActiveMQ等 n ...

    ActiveMQ从入门到精通(二)

    本文来自于jianshu,介绍了消息的顺序消费、JMSSelectors、消息的同步/异步接受方式、Message、P2P/PubSub、持久化订阅、持久化消息到MySQL以及与Spring整合等知识。接上一篇,本篇主要讨论的话题是:消息的顺序消费...

    ActiveMQ消息服务器-其他

    2、完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务) 3、对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性 4、通过了常见J2EE服务器(如 Geronimo,JBoss 4,...

    springcloud入门

    springcloud-zipkin:链路跟踪工具,监控并就持久化微服务集群中调用链路的通畅情况,采用rabbitmq异步传输、elasticsearch负责持久化的方式集成。 #### 软件架构 1、JDK:jdk-8u181-windows-x64。 2、MAVEN:...

    ActiveMQ应用与实例1

    1.多种语言和协议编写客户端 2.完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务) 3.对Spring的支持,ActiveMQ可以很容易内

    Spring in Action(第2版)中文版

    10.1.3在spring中安装activemq 10.2协同使用jms和spring 10.2.1处理冗长失控的jms代码 10.2.2使用jms模板 10.2.3转换消息 10.2.4将spring的网关支持类应用于jms 10.3创建消息驱动pojo 10.3.1创建消息监听器 ...

    Spring in Action(第二版 中文高清版).part2

    10.1.3 在Spring中安装ActiveMQ 10.2 协同使用JMS和Spring 10.2.1 处理冗长失控的JMS代码 10.2.2 使用JMS模板 10.2.3 转换消息 10.2.4 将Spring的网关支持类应用于JMS 10.3 创建消息驱动POJO 10.3.1 创建...

    Spring in Action(第二版 中文高清版).part1

    10.1.3 在Spring中安装ActiveMQ 10.2 协同使用JMS和Spring 10.2.1 处理冗长失控的JMS代码 10.2.2 使用JMS模板 10.2.3 转换消息 10.2.4 将Spring的网关支持类应用于JMS 10.3 创建消息驱动POJO 10.3.1 创建...

Global site tag (gtag.js) - Google Analytics