前言
官网 http://activemq.apache.org/
Apache ActiveMQ是最流行、最强大的开放源码消息传递和集成模式服务器。 Apache ActiveMQ是快速的,它支持许多跨语言客户端和协议,在完全支持JMS 1.1和J2EE 1.4的同时,可以轻松地使用企业集成模式和许多高级特性。Apache ActiveMQ是在Apache 2.0许可下发布的。
1. 特性
- 支持各种各样的跨语言客户端和协议,包括Java、C、C++、c、Ruby、Perl、Python、PHP
Java、C、C++、C、C的高性能客户机的OpenWire Stomp支持,使客户可以轻松地用C、Ruby、Perl、Python、PHP、actionscript/flash、Smalltalk与ActiveMQ以及任何其他流行的Message Broker进行通信。 AMQP v1.0支持 MQTT v3.1支持在物联网环境中进行连接。
- 在JMS客户端和消息代理中对企业集成模式的完全支持
- 支持许多高级功能,如消息组、虚拟目的地、通配符和组合目的地
- 完全支持JMS 1.1和J2EE 1.4,支持临时、持久性、事务和XA消息传递
- Spring支持,使ActiveMQ可以很容易地嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
- 在流行的J2EE服务器中进行测试,如TomEE、Geronimo、JBoss、GlassFish和WebLogic。
包含用于入站和出站消息的JCA 1.5资源适配器,以便ActiveMQ应该在任何符合J2EE 1.4的服务器上自动部署。
- 支持可插入的传输协议,如:vm、TCP、SSL、NIO、UDP、多播、JGroups和JXTA传输协议
- 支持使用JDBC和高性能日志的快速持久性
- 为高性能集群设计,客户机-服务器,基于对等的通信
- REST API为消息传递提供技术无关的和基于语言的基于web的API
- 使用纯DHTML支持web浏览器的web流支持,允许web浏览器成为消息传递结构的一部分
- CXF和Axis支持,以便ActiveMQ可以轻松地放入这些web服务栈中,以提供可靠的消息传递
- 可以作为内存JMS提供程序使用,用于单元测试JMS
2. 应用场景
多个项目之间集成 (1) 跨平台 (2) 多语言 (3) 多项目 降低系统间模块的耦合度,解耦 (1) 软件扩展性 系统前后端隔离 (1) 前后端隔离,屏蔽高安全区
实践
官网上下载最新二进制包,解压到某个目录下,打开mac终端,输入以下命令切换到ActiveMQ主目录 cd /../apache-activemq-5.15.1/bin/macosx 输入命令 ./activemq start 看到如下提示表示启动成功 Starting ActiveMQ Broker… 浏览器输入 http://localhost:8161/admin/ 然后输入用户名 admin 密码 admin便可登录

运行消息发送,控制台打印如下
发送消息:Activemq 发送消息0
发送消息:Activemq 发送消息1
发送消息:Activemq 发送消息2
发送消息:Activemq 发送消息3
发送消息:Activemq 发送消息4
发送消息:Activemq 发送消息5
发送消息:Activemq 发送消息6
发送消息:Activemq 发送消息7
发送消息:Activemq 发送消息8
发送消息:Activemq 发送消息9

运行消费者,控制台打印如下
收到的消息:ActiveMQ 发送消息0
收到的消息:ActiveMQ 发送消息1
收到的消息:ActiveMQ 发送消息2
收到的消息:ActiveMQ 发送消息3
收到的消息:ActiveMQ 发送消息4
收到的消息:ActiveMQ 发送消息5
收到的消息:ActiveMQ 发送消息6
收到的消息:ActiveMQ 发送消息7
收到的消息:ActiveMQ 发送消息8
收到的消息:ActiveMQ 发送消息9

附录
1. 生产者
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 生产者
*
* @author nanfang
* @version V1.0
* @since 2017-10-22 20:11
*/
public class Producer {
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
//发送的消息数量
private static final int SENDNUM = 10;
public static void main(String[] args) {
//连接工厂
ConnectionFactory connectionFactory;
//连接
Connection connection = null;
//会话 接受或者发送消息的线程
Session session;
//消息的目的地
Destination destination;
//消息生产者
MessageProducer messageProducer;
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(Producer.USERNAME, Producer.PASSWORD, Producer.BROKEURL);
try {
//通过连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建一个名称为HelloWorld的消息队列
destination = session.createQueue("HelloWorld");
//创建消息生产者
messageProducer = session.createProducer(destination);
//发送消息
sendMessage(session, messageProducer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer 消息生产者
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
for (int i = 0; i < Producer.SENDNUM; i++) {
//创建一条文本消息
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
System.out.println("发送消息:Activemq 发送消息" + i);
//通过消息生产者发出消息
messageProducer.send(message);
}
}
}
2. 消费者
/*
* Copyright (c) 2001-2017 GuaHao.com Corporation Limited. All rights reserved.
* This software is the confidential and proprietary information of GuaHao Company.
* ("Confidential Information").
* You shall not disclose such Confidential Information and shall use it only
* in accordance with the terms of the license agreement you entered into with GuaHao.com.
*/
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消费者
*
* @author nanfang
* @version V1.0
* @since 2017-10-22 20:26
*/
public class Consumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session;//会话 接受或者发送消息的线程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息的消费者
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(Consumer.USERNAME, Consumer.PASSWORD, Consumer.BROKEURL);
try {
//通过连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个连接HelloWorld的消息队列
destination = session.createQueue("HelloWorld");
//创建消息消费者
messageConsumer = session.createConsumer(destination);
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if (textMessage != null) {
System.out.println("收到的消息:" + textMessage.getText());
} else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}