HornetQ做个例子一直报错,郁闷求帮助别人的例子

24.8.分页转存与浏览器;Note;请注意浏览器只对内存中的消息进行操作,它不对转存;24.9.分页转存与未通知的消息;Note;请注意如果消息没有被通知,它会一直留在服务器的内;24.10.例子;Section11.1.34,“分页(pagin;Chapter25.队列属性;有两种方法可以设置队列的属性;25.1.预定义的队列;通过配置可以定义队列;下面
24.8. 分页转存与浏览器
请注意浏览器只对内存中的消息进行操作,它不对转存到磁盘中的消息进行操作。 消息是在被路由到任何队列之前进行转存的,所以在转存时刻,它们还没有进入到任何队列中, 自然也就不可能出现在对某个队列浏览的Y果中。
24.9. 分页转存与未通知的消息
请注意如果消息没有被通知,它会一直留在服务器的内存中,占用着内存资源。只要消息在被接收者收到并通知后,它才 会在服务器端被清除,空出内存空间以便转存在磁盘上的消息被装载到内存进行传递。如果没有通知,消息不会被清除, 也就不会空出内存空间,转存到磁盘上的消息也就无法装载到内存进行传递。于是在接收端就会呈现出死机的现象。 如果消息的通知是依靠ack-batch-size的设定进行的批量通知,那么一定要注意不要将 分页转存的消息临界值设得小于ack-batch-size,否则你的系统可能会发生死机现象!
24.10. 例子
Section 11.1.34, “分页(paging)”是一个说明如何使用HornetQ的分页转发功能的例子。
Chapter 25. 队列属性
有两种方法可以设置队列的属性。一种使用配置文件,另一种使用核心接口(core API)。 本章讲述这些属性的配置以及这些属性的作用。
25.1. 预定义的队列
通过配置可以定义队列。队列的定义可以在核心层定义,也可以在JMS层来定义。首先我们看一下JMS层。
下面就是一个在hornetq-jms.xml中定义的一个队列的例子:
&queue name=&selectorQueue&&
&entry name=&/queue/selectorQueue&/&
&selector string=&color='red'&/&
&durable&true&/durable&
这个队列的name属性定义了队列的名字。例子中我们采用了一种命名的惯例,因些对应的
核心队列的名字是 jms.queue.selectorQueue。
在entry单元内定义的名字用来将队列绑定于JNDI。这是必不可少的。一个队列可以有多个entry定义,每个 定义中的名字都绑定到同一个队列。
selector单元定义的是队列的选择器。定义了选择器后,只有与选择器相匹配的消息才能被加到队列中。 这是一个可选项。如果没有定义选择器,队列将默认没有选择器。
durable定义了队列是否是一个可持久的队列。这也是一个可选项,默认值是true。
如果在核心层定义队列,则使用hornetq-configuration.xml文件。 下面是一个例子:
&queue name=&jms.queue.selectorQueue&&
&address&jms.queue.selectorQueue&/address&
&filter string=&color='red'&/&
&durable&true&/durable&
它的配置与JMS的配置很相似,但有三个不同之处:
队列的name属性是队列的真正名字,不是JMS中的名字。
address一项定义了消息路由的地址。
没有entry单元。
filter的定义使用核心过滤器语法 (在 Chapter 14, 过滤器表达式中描述),不是JMS的选择器语法。
25.2. 使用接口(API)创建队列
队列还可以使用核心接口或管理接口来创建。
核心接口的org.hornetq.api.core.client.ClientSession接口可以用来 创建队列。它有几个createQueue方法,可以在创建队列时对上述的属性进行设置。 除此之外,还有一个额外的
属性temporary可以设置。如果将其设为true, 那么队列在会话断开时将被删除。
在Chapter 30, 管理中讲述了如何用管理接口来创建队列。
25.3. 通过地址设置来配置队列属性
有些属性的定义中地址可以使用通配符。下面是hornetq-configuration.xml 文件中的一个address-setting的配置例子。
&address-settings&
&address-setting match=&jms.queue.exampleQueue&&
&dead-letter-address&jms.queue.deadLetterQueue&/dead-letter-address&
&max-delivery-attempts&3&/max-delivery-attempts&
&redelivery-delay&5000&/redelivery-delay&
&expiry-address&jms.queue.expiryQueue&/expiry-address&
&last-value-queue&true&/last-value-queue&
&max-size-bytes&100000&/max-size-bytes&
&page-size-bytes&20000&/page-size-bytes&
&redistribution-delay&0&/redistribution-delay&
&send-to-dla-on-no-route&true&/send-to-dla-on-no-route&
&address-full-policy&PAGE&/address-full-policy&
&/address-setting&
&/address-settings&
通过上述的地址设定可以将多个属性应用于所有与match属性相匹配的地址。 上面例子中所定义的属性应用于jms.queue.exampleQueue的地址。如果使用 通配符,就可以将这些属性应用于一组匹配的地址。通配符的详细说明在这里。
例如在match中定义字符串jms.queue.#,那么 定义的属性就会应用于所有以jms.queue.开头的地址--即所有的JMS队列。
这些属性在本手册的各个地方有相应的介绍。在此处给出了简单的解释各它所在章的连接。
max-delivery-attempts定义了最大重传递的次数。一个消息如果反复传递超过 了这个值将会被发往死信地址dead-letter-address。相关的完整的解释在 这里。
redelivery-delay定义了重新传递的延迟。它控制HornetQ在重新 传递一个被取消的消息时要等待的时间。参见这里。
expiry-address定义了过期消息的发送地址。参见这里。
last-value-queue 定义一个队列是否使用最新值。参见这里。
max-size-bytes和page-size-bytes用来设置地址的分页转存功能。 它们在这里有详细的解释。
redistribution-delay定义了当最后一个接收者关闭时重新分配队列消息前所等待的时间。 参见这里。
send-to-dla-on-no-route。当一个消息被送到某个地址时,可能不会被路由到任何一个队列。 例如该地址没有绑定任何队列的情况,或者它所有的队列的选择器与该消息不匹配时。这样的消息通常情况下会被丢弃。这时 如果将这个参数设为true,则如果这个地址配置了死信地址的话,这样的消息就会被发送到该地址的死信地址(DLA)。
address-full-policy。这个属性有三个可能的值:PAGE、 DROP 或 BLOCK。它决定了 如果地址的消息所占用的内存达到了max-size-bytes所定义的值时,如何处理后继到来的消息。 默认值是PAGE,就是将后续的消息分页转存到磁盘上。DROP则表示丢弃后续的消息。BLOCK表示阻塞消息的发送方发送后续 的消息。参见Chapter 19, 流控制和Chapter 24, 分页转存。
Chapter 26. 定期消息
与普通消息不同,定期消息是在未来某个指定时间发送的消息。
为了创建定期消息,需要设定一个特殊的参数.
26.1. 定期传递参数
用来标识一个定期消息的参数是&_HQ_SCHED_DELIVERY& (相当于常量Message.HDR_SCHEDULED_DELIVERY_TIME)。
这个参数的值必须是一个大于零的长整型,单位是毫秒。下面例子给出了使用JMS接口创建定期消息的方法:
TextMessage message =
session.createTextMessage(&This is a scheduled message message which will be delivered
in 5 sec.&);
message.setLongProperty(&_HQ_SCHED_DELIVERY&, System.currentTimeMillis() + 5000);
producer.send(message);
// message will not be received immediately but 5 seconds later
TextMessage messageReceived = (TextMessage) consumer.receive();
也可以使用核心接口来发送定期消息。它只需要将同样的参数设定到核心消息上即可。
26.2. 例子
参见Section 11.1.43, “定时消息”,它是一个JMS使用定期消息的例子。
Chapter 27. 最新值队列(Last-Value Queues)
最新值队列是一种特殊的队列。当一个新消息到达一个最新值队列时,它会将所有与该消息定义的Last-Value相同的旧消息 抛弃。换句话说,只有最新的消息被保留下来。
一个典型的用例是股价信息,通常你只关心一支股票的最新价格。
27.1. 最新值队列的配置
最新值队列的配置在address-setting内:
&address-setting match=&jms.queue.lastValueQueue&&
&last-value-queue&true&/last-value-queue&
&/address-setting&
默认的last-value-queue值是false。可以使用通配符来匹配地址。 (参见 Chapter 13, 了解 HornetQ 通配符的语法)。
27.2. 使用Last-Value参数
用来标识最新值的参数名是&_HQ_LVQ_NAME& (相当于核心API中定义的常量Message.HDR_LAST_VALUE_NAME)。
如果两个消息具有相同的Last-Value值,那么较新的消息就会保留,另外一个被丢弃:
包含各类专业文献、中学教育、专业论文、外语学习资料、高等教育、行业资料、15HornetQ2.1中文用户手册等内容。 HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。还支持RESTful API、STOMP(Stomp的客户端可以用多种编程语言来实现 )、AMQP(HornetQ will shortly be implementing AMQP)。HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好!
HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的以来第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。
HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。
HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。
HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中。 用途:松散地联系各系统,不用受其它服务器的制约,有效的减少线程Block的时间. 不同于RPC , 采用的Request/Reponse 的方式. hornetq支持内容Body Stream -- StreamMessage 包含顺序读取值的流 Text -- TextMessage) Map -- MapMessage (key/value)) Object -- ObjectMessage Support Serializable序列化的对象. Bytes -- BytesMessage 字节信息(如存放图像) 下载:wget http://downloads.jboss.org/hornetq/hornetq-2.2.14.Final.zip yum install libaio 中文文档: http://www.jboss.org/hornetq/chinesedocs.html 1.单机配置: 1.1编写启动脚本:start.sh IP=`/sbin/ip a |grep 'inet '|awk -F'/' '{print $1}'|awk '{print $2}'|grep -v 127.0.0.1|head -1` export CLUSTER_PROPS="-Dhornetq.remoting.netty.host=$IP -Djnp.host=$IP"echo $CLUSTER_PROPS sh run.sh &1.2或者修改配置文件 以下两个文件把localhost替换为本机IP config/stand-alone/non-clustered/hornetq-configuration.xml
config/stand-alone/non-clustered/hornetq-beans.xml bindAddress">${jnp.host:192.168.100.241} rmiBindAddress">${jnp.host:192.168.100.241} ${hornetq.remoting.netty.host:192.168.100.241} .... 1.3客户端需要的包 hornetq-core-client.jar netty.jar hornetq-jms-client.jar jboss-jms-api.jar jnp-client.jar1.4配置一个队列,添加配置onfig/stand-alone/non-clustered/hornetq-jms.xml
配置一个主题
hornetq-configuration.xml 在节点下增加 false 1.5收发消息demo public void sendToQueue(String destinationName,Serializable payload) throws Exception { InitialContext ic = new InitialContext(); ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory"); Queue queue = (Queue)ic.lookup(destinationName); Connection connection = cf.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer publisher = session.createProducer(queue); connection.start(); ObjectMessage message = session.createObjectMessage(payload); message.setObject(payload); publisher.send(message); if (connection != null) { connection.close(); } } @TransactionAttribute(value = TransactionAttributeType.REQUIRED) public void onMessage(Message message) { ObjectMessage obj = (ObjectMessage) try { Serializable ser = obj.getObject(); ("[NotificationInbound] onMessage!"); } catch (Exception e) { log.error("[NotificationInbound] ERROR[" + e.getMessage() + "]!!!****"); throw new IllegalStateException(); } } 2 集群配置 2.1单机集群启动脚本 start-cluster0.bat set CLUSTER_PROPS=-Ddata.dir=../data-server2 -Djnp.port=2099 -Djnp.rmiPort=2098 -Dhornetq.remoting.netty.port=6445run ../config/stand-alone/clustered start-cluster1.bat set CLUSTER_PROPS=-Ddata.dir=../data-server3 -Djnp.port=3099 -Djnp.rmiPort=3098 -Dhornetq.remoting.netty.port=7445run ../config/stand-alone/clustered 2.2集群节点启动脚本 start-node.sh IP=`/sbin/ip a |grep 'inet '|awk -F'/' '{print $1}'|awk '{print $2}'|grep -v 127.0.0.1|head -1` export CLUSTER_PROPS="-Dhornetq.remoting.netty.host=$IP -Djnp.host=$IP"echo $CLUSTER_PROPS sh run.sh ../config/stand-alone/clustered 2.2.1集群节点停止脚本 stop-node.sh sh stop.sh ../config/stand-alone/clustered 2.3 .集群配置说明 2.3.1集群发现使用udp协议进行组播 hornetq-configuration.xml 172.16.9.7231.7.7.7987610000 2.3.2客户端连接代码 : final String groupAddress = "231.7.7.7"; final int groupPort = 9876; ConnectionFactory jmsConnectionFactory = HornetQJMSClient.createConnectionFactory(groupAddress, groupPort); Connection jmsConnection1 = jmsConnectionFactory.createConnection(); Connection jmsConnection2 = jmsConnectionFactory.createConnection();2.3.3Server Side load balancing hornetq-configuration.xml jms500truefalse12.3.4Client Side load balancing hornetq-jms.xml true org.hornetq.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy
3.与spring集成示例 3.1spring配置 &?xml version="1.0" encoding="UTF-8"?>
package mon. public interface MessageService { public boolean sendMessage(SerializableObject message) ; } 3.2发送消息 package mon. import java.io.S public class SerializableObject implements Serializable{ /** * */private static final long serialVersionUID = 1L; private O private Boolean isRetry = public Object getObj() { } public void setObj(Object obj) { this.obj = } public Boolean getIsRetry() { return isR } public void setIsRetry(Boolean isRetry) { this.isRetry = isR } } package mon. import javax.jms.JMSE import javax.jms.M import javax.jms.ObjectM import javax.jms.Q import javax.jms.S import org.apache.log4j.L import org.springframework.jms.core.JmsT import org.springframework.jms.core.MessageC public class SendMessageServiceImpl implements MessageService { private static final Logger logger = Logger.getLogger(SendMessageServiceImpl.class); private JmsTemplate jmsT private Queue searchAddMessageQ @Override public boolean sendMessage(SerializableObject message) { return sendQueue(message); } private boolean sendQueue(final SerializableObject so) { try { ("start to send queue to " + searchAddMessageQueue.getQueueName() + ", message : " + so); jmsTemplate.send(searchAddMessageQueue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { ObjectMessage om = session.createObjectMessage(so); } }); } catch (Exception e) { logger.error("Error: send topic failure:" + e.getMessage(), e); } } public JmsTemplate getJmsTemplate() { return jmsT } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsT } public Queue getSearchAddMessageQueue() { return searchAddMessageQ } public void setSearchAddMessageQueue(Queue searchAddMessageQueue) { this.searchAddMessageQueue = searchAddMessageQ } } 3.3接收消息 package mon. import java.util.concurrent.atomic.AtomicI import javax.jms.JMSE import javax.jms.M import javax.jms.MessageL import javax.jms.ObjectM import org.apache.log4j.L public class ReceiveMessageListenerImpl implements MessageListener { private AtomicInteger count = new AtomicInteger(0); private static Logger logger = Logger.getLogger(ReceiveMessageListenerImpl.class); @Override public void onMessage(Message message) { try{ if(message instanceof ObjectMessage){ ObjectMessage objectMessage = (ObjectMessage) if(objectMessage.getObject() instanceof SerializableObject){ SerializableObject so = (SerializableObject) objectMessage.getObject(); (so.getObj()); }else{ (objectMessage); } }else{ System.out.println(message); } } catch (JMSException e) { logger.error( "Error: receive message from topic failure: "+ e.getMessage(), e); }finally{ System.out.println(count.incrementAndGet()); } } } 3.4调用示例 package mon. import java.io.F import java.util.HashM import java.util.M import org.springframework.context.ApplicationC import org.springframework.context.support.FileSystemXmlApplicationC public class Test { private static ApplicationC private static Test instance=new Test(); publicstatic Test getInstance(){ } private Test() { if(ctx == null) { String location = if(System.getProperty("os.name").toLowerCase().contains("windows")){ location = "conf/applicationContext.xml"; }else{ location = "../conf/applicationContext.xml"; } File file = new File(location); ctx = new FileSystemXmlApplicationContext(location); } } /** * @param args */public static void main(String[] args) { getInstance(); MessageService service = ctx.getBean("sendMessageService", MessageService.class); for(int i=0;i<3000;i++){ Map map = new HashMap(); map.put("ooxx", i); SerializableObject so = new SerializableObject(); so.setObj(map); service.sendMessage(so); } } } 4.其它功能 4.1Message expire HornetQ will not deliver a message to a consumer after it's time to live has been exceeded. If the message hasn't been delivered before the time to live is reached, the server can discard it. // message will expire in 5000ms from now message.setExpiration(System.currentTimeMillis() + 5000); Expiry-address
jms.queue.expiryQueue
4.2 Scheduled messages TextMessage message = session.createTextMessage("MSG"); message.setLongProperty("_HQ_SCHED_DELIVERY", System.currentTimeMillis() + 5000); producer.send(message); ... // message will not be received immediately but 5 seconds later TextMessage messageReceived = (TextMessage) consumer.receive(); 4.3Message group Message groups are sets of messages that have the following characteristics: o Messages in a message group sha that is, they have the same group identifier property (JMSXGroupID for JMS, _HQ_GROUP_ID for HornetQ Core API). o Messages in a message group are always consumed by the same consumer, even if there are many consumers on a queue. They pin all messages with the same group id to the same consumer. If that consumer closes another consumer is chosen and will receive all messages with the samegroup id. Based on message Message message = ... message.setStringProperty("JMSXGroupID", "Group-0"); producer.send(message); message = ... message.setStringProperty("JMSXGroupID", "Group-0"); producer.send(message); Based on connection factory...
最新教程周点击榜
微信扫一扫HornetQ的简单例子 - kanpiaoxue - ITeye技术网站
博客分类:
最近对MQ(Message Queue)很感兴趣,准备用到项目上。因为是做Java开发的,所以我找了JMS的MQ。开始看了ActiveMQ和HornetQ。我选择了HornetQ,因为看了一些性能测试,我觉得这款JBoss的MQ相当出色。发现国内关于HornetQ的资料很少,国外有一部分,但是版本都很久了。
自己写了一个例子,环境如下:
1、HornetQ的版本是 hornetq-2.2.5.Final
2、JDK1.6.0_30-b12
3、HornetQ自带example里面的config/stand-alone/non-clustered的配置文件
4、HornetQ作为独立的服务器,运行在一台Dell1950(2CPU,16G内存)上
5、自己写了两个小例子,很简单,就是一个Producer和Consumer
6、发送Person类的实例(必须实现Serializable接口)
7、注意:Producer和Consumer用到的Person类,必须在各个项目的相同的package下面,具有相同的serialVersionUID
8、在classpath下面有jndi.properties文件,里面放置着寻找服务器上面JNDI Server必须的配置
9、在classpath下面有个log4j的配置文件,用来答应日志
代码如下:
public class Producer {
private static Logger logger = Logger.getLogger(Producer.class);
* @param args
public static void main(String[] args) {
runExample();
} catch (Exception e) {
e.printStackTrace();
private static void runExample() throws NamingException, JMSException {
InitialContext ic = new InitialContext();
ConnectionFactory cf = (ConnectionFactory) ic
.lookup("/ConnectionFactory");
Queue orderQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
Connection connection = cf.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(orderQueue);
connection.start();
int count = 0;
while (true) {
Person one = new Person(count++, "xuepeng_" + count);
ObjectMessage msg = session.createObjectMessage(one);
producer.send(msg);
(Producer.class.getName()
+ " start to sent message: " + one);
} finally {
session.close();
public class Person implements Serializable {
private static final long serialVersionUID = 7459001L;
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private String time = format.format(new Date());
* @param id
* @param name
public Person(Integer id, String name) {
this.name =
* @return the id
public Integer getId() {
* @param id
the id to set
public void setId(Integer id) {
* @return the name
public String getName() {
* @param name
the name to set
public void setName(String name) {
this.name =
/* (non-Javadoc)
* @see java.lang.Object#toString()
public String toString() {
return "Person [id=" + id + ", name=" + name + ", time=" + time + "]";
* (non-Javadoc)
* @see java.lang.Object#hashCode()
public int hashCode() {
final int prime = 37;
int result = 17;
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result + ((name == null) ? 0 : name.hashCode());
* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
public boolean equals(Object obj) {
if (this == obj)
if (obj == null)
if (getClass() != obj.getClass())
Person other = (Person)
if (id == null) {
if (other.id != null)
} else if (!id.equals(other.id))
if (name == null) {
if (other.name != null)
} else if (!name.equals(other.name))
public class Consumer {
private static Logger logger = Logger.getLogger(Consumer.class);
* @param args
public static void main(String[] args) {
runExample();
} catch (Exception e) {
e.printStackTrace();
private static void runExample() throws NamingException, JMSException {
InitialContext ic = new InitialContext();
ConnectionFactory cf = (ConnectionFactory) ic
.lookup("/ConnectionFactory");
Queue orderQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
Connection connection = cf.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(orderQueue);
connection.start();
while (true) {
ObjectMessage messageReceived = (ObjectMessage) consumer.receive();
org.hornetq.jms.example.Person one = (org.hornetq.jms.example.Person)messageReceived.getObject();
(Consumer.class.getName()
+ " start to receive message: " + one);
} finally {
session.close();
启动Linux上面的HorneQ服务之后,运行上面的Producer和Consumer均可以实现通讯。
kanpiaoxue
浏览: 420678 次
来自: 北京
理解错了,原来还是计数,不是时间
TimeoutTerminationPolicy 是毫秒 默认 ...
实在没有理解你说的是什么意思。我的这个rar里面有这个包呀。如 ...
你的工程中有jetty-all-9.2.2.v ...

我要回帖

更多关于 帮助他人的例子 的文章

 

随机推荐