一. 需求
一处产生消息,两处消费,故采取Topic模式的activemq.
二. 下载及安装
http://activemq.apache.org/下载最新版本。
解压, 进入安装目录 bin/activemq start xbean:conf/activemq.xml
http://localhost:8161/admin查看mq状态,用户名和密码在conf/jetty-realm.properties,默认是admin, admin
至此server已启动。
三. server配置
conf/activemq.xml
1. message cursors,
引用
介绍的比较详细,大部分情况下用默认的Store-based cursor就能满足需求
2. producer-flow-control
http://activemq.apache.org/producer-flow-control.html
生产者流量控制,当内存和硬盘空间到达的时候控制流量,抛出异常或挂起等待,个人觉得在有message cursors的时候可以禁用
3. slow consumer handling
对于nondurable 的consumer,要求broker未其保存未消费的message在内存中,会导致生产者阻塞,其他的快速消费者变慢,所以可以设置一个限制大小的消息队列。
constantPendingMessageLimitStrategy
4. jmx enable
未研究todo
四. ps
about durable topics, non-durable模式下,如果订阅者不在线,那么其不在线期间topic上收到的消息将不会被该订阅者收到,如果想要一个订阅者收到topic上的所有消息,请使用durable模式, 推荐使用durable模式,可以免去生产者流量控制和慢消费者处理的麻烦
about transaction mode, server 有transactionstore, 缓存所有的message和acks不执行,直到commit 和 rollback到达,另外也不把commit 和 rollback之前的message和acks放到分发的队列上去
五. durable subscriber样例代码
public class AMQConnectionFactory {
private String url;
private String user;
private String pwd;
private ActiveMQConnectionFactory activeMQConnectionFactory;
public AMQConnectionFactory(String user, String pwd, String url){
this.url = url;
this.user = user;
this.pwd = pwd;
this.activeMQConnectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
}
public Connection getConnection(){
Connection connection = null;
try {
connection = activeMQConnectionFactory.createConnection();
connection.start();
} catch (JMSException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
return connection;
}
public Connection getDurableConnection(String clientId){
Connection connection = null;
try {
connection = activeMQConnectionFactory.createConnection();
connection.setClientID(clientId);
connection.start();
} catch (JMSException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
return connection;
}
public Session getACKSession(){
Session session = null;
Connection connection = getConnection();
if(connection != null){
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
return session;
}
public Session getDurableACKSession(String clientId){
Session session = null;
Connection connection = getDurableConnection(clientId);
if(connection != null){
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
return session;
}
}
producer:
public class TestProducer {
public void run() {
ApplicationContext ctx=new ClassPathXmlApplicationContext(new String[]{"applicationContext.xml"});
AMQConnectionFactory amqConnectionFactory = (AMQConnectionFactory) ctx.getBean("amqConnectionFactory");
Session session = amqConnectionFactory.getACKSession();
try{
Topic topic = session.createTopic("inc.message");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
produce(producer);
}catch (JMSException jm){
jm.printStackTrace();
}
System.exit(0);
}
public void produce(MessageProducer producer) {
Message message = new ActiveMQMapMessage();
try{
message.setStringProperty("message", "hello");
producer.send(message);
int i = 0;
while(true){
i++;
message.setStringProperty("message", "hello" +i);
producer.send(message);
System.out.println("produce message: " + message.getStringProperty("message"));
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}catch (JMSException exception){
exception.printStackTrace();
}
}
}
consumer
public class TestConsumer {
public void run(String clientId) {
ApplicationContext ctx=new ClassPathXmlApplicationContext(new String[]{"applicationContext.xml"});
AMQConnectionFactory amqConnectionFactory = (AMQConnectionFactory) ctx.getBean("amqConnectionFactory");
Session session2 = amqConnectionFactory.getDurableACKSession(clientId);
try{
Destination destination = session2.createTopic("inc.message");
IncConsumerListener incConsumerListener = (IncConsumerListener)
ctx.getBean("incConsumerListener");
MessageConsumer consumer = session2.createDurableSubscriber((Topic) destination, "bbbb");
consumer.setMessageListener(incConsumerListener);
}catch (JMSException jmse){
jmse.printStackTrace();
}
}
public static void main (String args[]){
TestConsumer testConsumer = new TestConsumer();
testConsumer.run("iiii");
}
}
分享到:
相关推荐
一份详细ActiveMQ的使用教程
java中使用消息中间件ActiveMQ的MQTT协议发布消息使用fusesource,fusesource提供三种方式实现发布消息的方式,分别是阻塞式(BlockingConnection)、回调式(CallbackConnection)和Future样式(FutureConnection)
该demo主要用于activeMQ初学,队列消息监听和Topic消息监听
ActiveMQ的安装与使用ActiveMQ的安装与使用ActiveMQ的安装与使用
activeMq in action 使用activeMq开发JMS的简单讲述,activeMq in action 使用activeMq开发JMS的简单讲述
ActiveMQ_使用failover模式进行连接切换时,线程断开 ,ActiveMQ_使用failover模式进行连接切换时,线程断开
activemq消息测试工具
ActiveMQ 的安装与使用,ActiveMQ与spring整合,生产都、消费者、测试类等。
spring整合使用activemq,代码主要是介绍如何使用spring整合使用activemq,可以直接拿过去使用
此demo结合了网上纷乱的资源进行讲解。少量操作即可直接跑通,适合初学者进行学习。所需jar包在demo中也都具有。
ActiveMQ开发库文件(Release版),需配合ActiveMQ开发实例系列其他资源使用
使用ActiveMQ示例.pdf
结合博客提供spring+activeMQ的demo源码
activemq activeMq笔记.docx
适用于Java后台开发消息队列ActiveMQ使用者,包括服务器的搭建以及ActiveMQ的三种使用模式
ActiveMQ使用Ajax实现多人聊天室。
使用activemq依赖库连接, 该项目为java工程,内有ssl证书生成方式链接,不清楚可私信
JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity),提供了应用程序之间异步通信的功能。 本文档介绍ActiveMQ的基本使用和部分性能优化。