JMS

ActiveMQ

MessageQueue--消息队列,一个消息的接受和转发的容器,可用于消息推送。

Posted by 南方 on October 23, 2017

前言

官网 http://activemq.apache.org/

Apache ActiveMQ是最流行、最强大的开放源码消息传递和集成模式服务器。 Apache ActiveMQ是快速的,它支持许多跨语言客户端和协议,在完全支持JMS 1.1和J2EE 1.4的同时,可以轻松地使用企业集成模式和许多高级特性。Apache ActiveMQ是在Apache 2.0许可下发布的。

1. 特性

  • 支持各种各样的跨语言客户端和协议,包括Java、C、C++、c、Ruby、Perl、Python、PHP

Java、C、C++、C、C的高性能客户机的OpenWire Stomp支持,使客户可以轻松地用C、Ruby、Perl、Python、PHP、actionscript/flash、Smalltalk与ActiveMQ以及任何其他流行的Message Broker进行通信。 AMQP v1.0支持 MQTT v3.1支持在物联网环境中进行连接。

  • 在JMS客户端和消息代理中对企业集成模式的完全支持
  • 支持许多高级功能,如消息组、虚拟目的地、通配符和组合目的地
  • 完全支持JMS 1.1和J2EE 1.4,支持临时、持久性、事务和XA消息传递
  • Spring支持,使ActiveMQ可以很容易地嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
  • 在流行的J2EE服务器中进行测试,如TomEE、Geronimo、JBoss、GlassFish和WebLogic。

包含用于入站和出站消息的JCA 1.5资源适配器,以便ActiveMQ应该在任何符合J2EE 1.4的服务器上自动部署。

  • 支持可插入的传输协议,如:vm、TCP、SSL、NIO、UDP、多播、JGroups和JXTA传输协议
  • 支持使用JDBC和高性能日志的快速持久性
  • 为高性能集群设计,客户机-服务器,基于对等的通信
  • REST API为消息传递提供技术无关的和基于语言的基于web的API
  • 使用纯DHTML支持web浏览器的web流支持,允许web浏览器成为消息传递结构的一部分
  • CXF和Axis支持,以便ActiveMQ可以轻松地放入这些web服务栈中,以提供可靠的消息传递
  • 可以作为内存JMS提供程序使用,用于单元测试JMS

2. 应用场景

多个项目之间集成 (1) 跨平台 (2) 多语言 (3) 多项目 降低系统间模块的耦合度,解耦 (1) 软件扩展性 系统前后端隔离 (1) 前后端隔离,屏蔽高安全区

实践

官网上下载最新二进制包,解压到某个目录下,打开mac终端,输入以下命令切换到ActiveMQ主目录 cd /../apache-activemq-5.15.1/bin/macosx 输入命令 ./activemq start 看到如下提示表示启动成功 Starting ActiveMQ Broker… 浏览器输入 http://localhost:8161/admin/ 然后输入用户名 admin 密码 admin便可登录

title

运行消息发送,控制台打印如下

发送消息:Activemq 发送消息0
发送消息:Activemq 发送消息1
发送消息:Activemq 发送消息2
发送消息:Activemq 发送消息3
发送消息:Activemq 发送消息4
发送消息:Activemq 发送消息5
发送消息:Activemq 发送消息6
发送消息:Activemq 发送消息7
发送消息:Activemq 发送消息8
发送消息:Activemq 发送消息9
title

运行消费者,控制台打印如下

收到的消息:ActiveMQ 发送消息0
收到的消息:ActiveMQ 发送消息1
收到的消息:ActiveMQ 发送消息2
收到的消息:ActiveMQ 发送消息3
收到的消息:ActiveMQ 发送消息4
收到的消息:ActiveMQ 发送消息5
收到的消息:ActiveMQ 发送消息6
收到的消息:ActiveMQ 发送消息7
收到的消息:ActiveMQ 发送消息8
收到的消息:ActiveMQ 发送消息9
title

附录

1. 生产者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 生产者
 *
 * @author nanfang
 * @version V1.0
 * @since 2017-10-22 20:11
 */
public class Producer {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory connectionFactory;
        //连接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(Producer.USERNAME, Producer.PASSWORD, Producer.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //创建一个名称为HelloWorld的消息队列
            destination = session.createQueue("HelloWorld");
            //创建消息生产者
            messageProducer = session.createProducer(destination);
            //发送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 发送消息
     *
     * @param session
     * @param messageProducer 消息生产者
     * @throws Exception
     */

    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < Producer.SENDNUM; i++) {
            //创建一条文本消息
            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
            System.out.println("发送消息:Activemq 发送消息" + i);
            //通过消息生产者发出消息
            messageProducer.send(message);
        }
    }
}

2. 消费者

/*
 * Copyright (c) 2001-2017 GuaHao.com Corporation Limited. All rights reserved. 
 * This software is the confidential and proprietary information of GuaHao Company. 
 * ("Confidential Information"). 
 * You shall not disclose such Confidential Information and shall use it only 
 * in accordance with the terms of the license agreement you entered into with GuaHao.com.
 */

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消费者
 *
 * @author nanfang
 * @version V1.0
 * @since 2017-10-22 20:26
 */
public class Consumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接

        Session session;//会话 接受或者发送消息的线程
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(Consumer.USERNAME, Consumer.PASSWORD, Consumer.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个连接HelloWorld的消息队列
            destination = session.createQueue("HelloWorld");
            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                if (textMessage != null) {
                    System.out.println("收到的消息:" + textMessage.getText());
                } else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}