博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ点对点消息通信demo
阅读量:6440 次
发布时间:2019-06-23

本文共 4730 字,大约阅读时间需要 15 分钟。

hot3.png

1.下载ActiveMQ,安装并启动activeMQ,

2.登录控制台,     default: admin/admin

新建queue名为:FirstQueue

依赖包:

    
org.apache.activemq
    
activemq-core
    
5.7.0

package org.jun.util;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class ActiveMQSenderUtil {		public static void main(String arg[]) {		Sender("FirstQueue", "通知:everybody,高性能activeMQ!");	}    /**     * 发送消息     *       * @param content	发送内容     * @param queueName	 队列名     */    public static void Sender(String queueName, String content) {    	// ConnectionFactory :连接工厂,JMS 用它创建连接工厂实例,此处采用ActiveMq的实现jar        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(        		ActiveMQConnection.DEFAULT_USER,        		ActiveMQConnection.DEFAULT_PASSWORD,        		"tcp://localhost:61616"); //tcp地址                // Connection :JMS 客户端到JMS Provider 的连接        Connection connection = null;        // Session: 一个发送或接收消息的线程        Session session;        // Destination :消息的目的地;消息发送给谁.        Destination destination;        // MessageProducer:消息发送者        MessageProducer producer;        try {            // 构造从工厂得到连接对象            connection = connectionFactory.createConnection();            // 启动            connection.start();            // 获取操作连接            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置            destination = session.createQueue(queueName);            // 得到消息生成者【发送者】            producer = session.createProducer(destination);            // 设置不持久化,此处学习,实际根据项目决定            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);            // 构造消息,此处写死,项目就是参数,或者方法获取            TextMessage message = session.createTextMessage(content);            System.out.println("ActiveMq 发送的消息:" + content);            producer.send(message);// 发送消息到目的地方            session.commit();        } catch (Exception e) {            e.printStackTrace();        } finally {            try {                if (null != connection) {                	connection.close();                }            } catch (Throwable ignore) {            }        }    }}
package org.jun.util;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class ActiveMQReceiverUtil {		public static void main(String args[]) {		queueReceiver("FirstQueue");	}	/**	 * 接收消息	 * 	 * @param quequName	队列名	 * @return	 */	public static String queueReceiver(String quequName){    	// ConnectionFactory :连接工厂,JMS 用它创建连接        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD,                "tcp://localhost:61616");//tcp地址        // Connection :JMS 客户端到JMS Provider 的连接        Connection connection = null;        // Session: 一个发送或接收消息的线程        Session session;        // Destination :消息的目的地;消息发送给谁.        Destination destination;        // 消费者,消息接收者        MessageConsumer consumer;        String receiveMsg = "";        try {            // 构造从工厂得到连接对象            connection = connectionFactory.createConnection();            // 启动            connection.start();            // 获取操作连接            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置            destination = session.createQueue(quequName);            consumer = session.createConsumer(destination);            while (true) {                //设置接收者接收消息的时间,为了便于测试,这里谁定为100s                TextMessage message = (TextMessage) consumer.receive(100000);                if (null != message) {                	receiveMsg = message.getText();                	System.out.println("收到消息:" + message.getText());                                    } else {                    break;                }            }        } catch (Exception e) {            e.printStackTrace();        } finally {            try {                if (null != connection) {                	connection.close();                	                }            } catch (Throwable ignore) {            }        }		return receiveMsg;    }}

运行结果:

113729_Afz1_1789904.png

113729_Aak4_1789904.png

转载于:https://my.oschina.net/xiejunbo/blog/505252

你可能感兴趣的文章
基于Zabbix IPMI监控服务器硬件状况
查看>>
Go语言之并发资源竞争
查看>>
GNS帧中继配置,帧中继环境搭建
查看>>
MySql 数据同步
查看>>
awk学习笔记(9) - 表达式
查看>>
版本号大小比较算法
查看>>
在linux日常工作中touch的用法
查看>>
linux
查看>>
SELF4j没什么具体大的报错异常
查看>>
Linux中grub使用技巧
查看>>
win7关闭UAC的方法
查看>>
8-Shell的整数表达式介绍-实践及企业案例脚本剖析
查看>>
linux man命令汉化 操作
查看>>
redis-cli, the Redis command line interface
查看>>
Leetcode 203. Remove Linked List Elements JAVA语言
查看>>
python paramiko实现多线程远程执行命令、多线程远程上传文件、多线程远程下载文件...
查看>>
如何连接Rancher与阿里云NAS存储
查看>>
vCenter Server 所需的端口
查看>>
logrotate日志切割配置
查看>>
rhel7.4安装oracle 11G 11.2.0.4.0 RAC
查看>>