MQ基础与运用

2018-08-24

MQ基础与运用

[TOC]

一.MQ原理

待整理

二.ActiveMQ部分

1.概念

消息队列:即时消息通信延时消息通信

MQ 是一个消息中间件,常见的消息中间件有 ActiveMQ | RabbitMQ | kafka

ActiveMQ底层基于java的JMS实现,在没有JMS之前的系统存在很多缺陷:

  1. 前后端同步问题,如果后台没有响应,则前段会一直阻塞等待
  2. 前后端生命周期耦合性太强,一方崩了则另一方也会崩
  3. 点对点通信,前段一次只能发送给某一个单独的服务对象,无法群发

JMS: (Java Message Service ) 通过消息中间件(MOM:Message Oriented Middleware )

将消息发送给单独的消息服务器中,消息服务器会将消息存放在若干的队列/主题中,在合适的时候将消息发送给接收者.发送和接收是异步的,无需阻塞等待 在pub/sub的模式下,可以将消息发送给多个接收者

JMS类中定义了java访问中间件的接口,除此之外都是异常定义

  1. Provider/MessageProvider:生产者
  2. Consumer/MessageConsumer:消费者
  3. PTP:Point To Point,点对点通信消息模型
  4. Pub/Sub:Publish/Subscribe,发布订阅消息模型
  5. Queue:队列,目标类型之一,和PTP结合
  6. Topic:主题,目标类型之一,和Pub/Sub结合
  7. ConnectionFactory:连接工厂,JMS用它创建连接
  8. Connnection:JMS Client到JMS Provider的连接
  9. Destination:消息目的地,由Session创建
  10. Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是Session创建的

2.应用

| 异步处理 | 应用解耦 | 流量削锋 | 消息通讯 |

详情参考:https://blog.csdn.net/kingcat666/article/details/78660535

3.消息模式

  • P2P模式(点对点) Queue
  • Pub/Sub模式(发布订阅) Topic
  • Push模式(推拉模式,消息更新C/S中)

Topic根据业务需求,也可以持久化

客户端启动时设置一个ClientID作为编号在服务器注册

可以将消息一致保存在服务器(可以持久化)

环境搭建文件中存放该操作方法

4.五种不同的消息正文格式

JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

  •   StreamMessage – Java原始值的数据流
  •   MapMessage–一套名称-值对
  •   TextMessage–一个字符串对象(常用)
  •   ObjectMessage–一个序列化的 Java对象
  •   BytesMessage–一个字节的数据流

5.java中与Solr结合

搭建..(省略)

注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。

(5.12.0中许多包和spring相同,而且少方法,坑)

建议使用5.11.2

进入管理后台:

5-1.JMS规范下使用套路 基础原理

导包

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>

提供方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//1.创建工厂对象ConnectionFactory,需要指定IP和端口
ConnectionFactory connectionFactory= new ActiveMQConnectionFactory("tcp://www.fzs.com:61616");
//2.使用工厂对象创建Connection连接对象
Connection connection=connectionFactory.createConnection();
//3.开启连接,调用Connection对象的start方法
connection.start();
//4.创建Session对象
//(两个参数,|1.是否开启分布式事务(少,一般不开),如果开启,第二个参数无意义 | 2.应答模式(自动/手动)一般自动)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.使用Session创建目的模式Destination (queue(点对点)|topic(广播一对多))
Queue queue = session.createQueue("DonY15_ActiveMQ_Message");
//6.使用Session创建生产者Producer
MessageProducer producer = session.createProducer(queue);
//7.创建Message对象(一般textMessage)
TextMessage textMessage = session.createTextMessage("发出命令:全军粗鸡!✧*。٩(ˊᗜˋ*)و✧*。");
//8.发送消息(Message放到Producer)
producer.send(textMessage);
//9.关闭资源(Producer|Session|Connection)
producer.close();
session.close();
connection.close();

接收方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//1.创建工厂对象ConnectionFactory连接MQ服务器
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://www.fzs.com:61616");
//2.使用工厂对象创建Connection连接对象
Connection connection=connectionFactory.createConnection();
//3.开启连接
connection.start();
//4.创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.使用Session创建目的模式Destination (queue(点对点)|topic(广播一对多))
Queue queue = session.createQueue("DonY15_ActiveMQ_Message");
//6.使用Session创建消费者对象
MessageConsumer consumer = session.createConsumer(queue);
//7.接收消息(监听)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//8.打印结果
TextMessage textMessage= (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}

}
});
//9.等待接收消息
System.in.read();
//9.关闭连接
consumer.close();
session.close();
connection.close();