入门概述
MQ 的产品种类和对比
MQ 就是消息中间件。MQ 是一种理念,ActiveMQ 是 MQ 的落地产品。不管是哪款消息中间件,都有如下一些技术维度:
(1) kafka
- 编程语言:scala。
- 大数据领域的主流 MQ。
(2) rabbitmq
- 编程语言:erlang
- 基于 erlang 语言,不好修改底层,不要查找问题的原因,不建议选用。
(3) rocketmq
- 编程语言:java
- 适用于大型项目。适用于集群。
(4) activemq
编程语言:java
适用于中小型项目。
MQ 的产生背景
系统之间直接调用存在的问题
微服务架构后,链式调用是我们在写程序时候的一般流程,为了完成一个整体功能会将其拆分成多个函数(或子模块),比如模块 A 调用模块 B,模块 B 调用模块 C,模块 C 调用模块 D。但在大型分布式应用中,系统间的 RPC 交互繁杂,一个功能背后要调用上百个接口并非不可能,从单机架构过渡到分布式微服务架构的通例。这些架构会有哪些问题?
系统之间接口耦合比较严重
每新增一个下游功能,都要对上游的相关接口进行改造;
举个例子:如果系统 A 要发送数据给系统 B 和系统 C,发送给每个系统的数据可能有差异,因此系统 A 对要发送给每个系统的数据进行了组装,然后逐一发送;
当代码上线后又新增了一个需求:把数据也发送给 D,新上了一个 D 系统也要接受 A 系统的数据,此时就需要修改 A 系统,让他感知到 D 系统的存在,同时把数据处理好再给 D。在这个过程你会看到,每接入一个下游系统,都要对系统 A 进行代码改造,开发联调的效率很低。其整体架构如下图:
面对大流量并发时,容易被冲垮
每个接口模块的吞吐能力是有限的,这个上限能力如果是堤坝,当大流量(洪水)来临时,容易被冲垮。
举个例子秒杀业务:上游系统发起下单购买操作,就是下单一个操作,很快就完成。然而,下游系统要完成秒杀业务后面的所有逻辑(读取订单,库存检查,库存冻结,余额检查,余额冻结,订单生产,余额扣减,库存减少,生成流水,余额解冻,库存解冻)。
等待同步存在性能问题
RPC 接口上基本都是同步调用,整体的服务性能遵循“木桶理论”,即整体系统的耗时取决于链路中最慢的那个接口。比如 A 调用 B/C/D 都是 50ms,但此时 B 又调用了 B1,花费 2000ms,那么直接就拖累了整个服务性能。
根据上述的几个问题,在设计系统时可以明确要达到的目标:
1,要做到系统解耦,当新的模块接进来时,可以做到代码改动最小;能够解耦
2,设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮;能削峰
3,强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步
MQ 的主要作用
(1) 异步。调用者无需等待。
(2) 解耦。解决了系统之间耦合调用的问题。
(3) 削峰。抵御洪峰流量,保护了主业务。
MQ 的定义
面向消息的中间件(message-oriented middleware)MOM 能够很好的解决以上问题。是指利用高效可靠的消息传递机制与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储、流量削峰,异步通信,数据同步等功能。
大致的过程是这样的:发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题 topic 中,在合适的时候,消息服务器回将消息转发给接受者。在这个过程中,发送和接收是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然的关系;尤其在发布 pub/订阅 sub 模式下,也可以完成一对多的通信,即让一个消息有多个接受者。
MQ 的特点
采用异步处理模式
消息发送者可以发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道(主题或者队列)上;
消息接收者则订阅或者监听该爱通道。一条消息可能最终转发给一个或者多个消息接收者,这些消息接收者都无需对消息发送者做出同步回应。整个过程都是异步的。
案例:
也就是说,一个系统跟另一个系统之间进行通信的时候,假如系统 A 希望发送一个消息给系统 B,让他去处理。但是系统 A 不关注系统 B 到底怎么处理或者有没有处理好,所以系统 A 把消息发送给 MQ,然后就不管这条消息的“死活了”,接着系统 B 从 MQ 里面消费出来处理即可。至于怎么处理,是否处理完毕,什么时候处理,都是系统 B 的事儿,与系统 A 无关。
应用系统之间解耦合
发送者和接受者不必了解对方,只需要确认消息。
发送者和接受者不必同时在线。
整体架构
MQ 的缺点
两个系统之间不能同步调用,不能实时回复,不能响应某个调用的回复。
ActiveMQ 安装和控制台
ActiveMQ 安装
官方下载
官网地址: http://activemq.apache.org/ ,点击下面,开始下载。
安装步骤
说明:activemq需要预先安装JDK,使用 rpm 方式安装即可。
下载地址:http://archive.apache.org/dist/activemq ,本次学习版本为:apache-activemq-5.14.3
① 创建目录
1 | cd /usr/local/ |
② 上传安装包
使用 rz
命令,或者使用 xftp 上传到 /usr/local/activemq
目录下
1 | rz |
③ 解压
1 | tar -xzvf apache-activemq-5.14.3-bin.tar.gz |
④ 进入解压后的 bin
目录增加执行命令
1 | cd /usr/local/activemq/apache-activemq-5.14.3/bin |
⑤ 访问 activemq 管理页面地址: http://IP:8161/ ,账户:admin,密码:admin
⑥ 其他
1 | # 查看 activemq 状态 |
⑦ 如果启动报 ActiveMQ 异常:java.net.URISyntaxException: Illegal character in hostname at index ,解决办法:
1 | # 首先翻译一下这个异常,就是:主机名中包含非法字符,那么非法字符是什么呢?是“_”下划线 |
⑧ 查看程序启动是否成功的3种方式(通用)
1 | # 方式1:查看进程 |
ActiveMQ 控制台
访问 activemq 管理页面地址: http://IP:8161/ ,账户:admin,密码:admin
记住两个端口:
- 采用 61616 端口提供 JMS 服务
- 采用 8161 端口提供管理控制台服务
入门案例、MQ 标准、API 详解
搭建工程
IDEA 新建 Maven 工程
新建空项目:activemq_study
新建 maven 模块: activemq_demo
POM 文件
1 |
|
JMS 编码总体规范
JMS 开发基本步骤
java 消息服务
Destination 简介
Destination 是目的地。下面拿 jvm 和 mq ,做个对比。目的地,我们可以理解为是数据存储的地方。
Destination 分为两种:队列和主题。下图介绍:
队列消息生产者的入门案例
1 | package com.itjing.activemq.queue; |
Number Of Pending Messages:
等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
Number Of Consumers:
消费者数量,消费者端的消费者数量。
Messages Enqueued:
进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
Messages Dequeued:
出队消息数,可以理解为是消费者消费掉的数量。
总结:
- 当有一个消息进入这个队列时,等待消费的消息是 1,进入队列的消息是 1。
- 当消息消费后,等待消费的消息是 0,进入队列的消息是 1,出队列的消息是 1。
- 当再来一条消息时,等待消费的消息是 1,进入队列的消息就是 2。
队列消息消费者的入门案例
1 | package com.itjing.activemq.queue; |
控制台显示:
异步监听式消费者(MessageListener)
1 | package com.itjing.activemq.queue; |
队列消息(Queue)总结
两种消费方式
同步阻塞方式(receive)
订阅者或接收者抵用 MessageConsumer 的 receive() 方法来接收消息,receive 方法在能接收到消息之前(或超时之前)将一直阻塞。
异步非阻塞方式(监听器onMessage())
订阅者或接收者通过 MessageConsumer 的 setMessageListener(MessageListener listener) 注册一个消息监听器,当消息到达之后,系统会自动调用监听器 MessageListener 的 onMessage(Message message) 方法。
队列的特点
消息消费情况
情况 1:只启动消费者 1。
结果:消费者 1 会消费所有的数据。
情况 2:先启动消费者 1,再启动消费者 2。
结果:消费者 1 消费所有的数据。消费者 2 不会消费到消息。
情况 3:生产者发布 6 条消息,在此之前已经启动了消费者 1 和消费者 2。
结果:消费者1和消费者2平摊了消息。各自消费3条消息。
疑问:怎么去将消费者 1 和消费者 2 不平均分摊呢?而是按照各自的消费能力去消费
。
我觉得,现在 activemq 就是这样的机制。
Topic 介绍、入门案例、控制台
topic 介绍
在发布订阅消息传递域中,目的地被称为主题(topic)
发布/订阅消息传递域的特点如下:
(1)生产者将消息发布到 topic 中,每个消息可以有多个消费者,属于 1:N 的关系;
(2)生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息;
(3)生产者生产时,topic 不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
默认情况下如上所述,但是 JMS 规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅。
生产者案例
只有第 4 步不一样
1 | package com.itjing.activemq.topic; |
消费者案例
1 | package com.itjing.activemq.topic; |
存在多个消费者,每个消费者都能收到,自从自己启动后所有生产的消息。
ActiveMQ 控制台
topic 有多个消费者时,消费消息的数量 ≈ 在线消费者数量 * 生产消息的数量,每个消费者都可以消费到生产的消息
。
下图展示了:我们先启动了 3 个消费者,再启动一个生产者,并生产了 3 条消息。
topic 和 queue 对比
JMS 规范
JavaEE
JMS 是什么
什么是 Java 消息服务?
Java 消息服务指的是两个应用程序之间进行异步通信的 API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持 Java 应用程序开发。
在 JavaEE 中,当两个应用程序使用 JMS 进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步/削峰
的效果。
消息头
JMS 的消息头有哪些属性:
JMSDestination:
消息目的地
JMSDeliveryMode:
消息持久化模式
JMSExpiration:
消息过期时间
JMSPriority:
消息的优先级
JMSMessageID:
消息的唯一标识符。后面我们会介绍如何解决幂等性。
说明: 消息的生产者可以 set 这些属性,消息的消费者可以 get 这些属性。这些属性在 send 方法里面也可以设置。
1 | package com.itjing.activemq.topic; |
消息体
5 种消息体格式:
下面我们演示 TextMessage 和 MapMessage 的用法。
消息生产者
1 | package com.itjing.activemq.topic; |
消息消费者
1 | package com.itjing.activemq.topic; |
消息属性
如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。
他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。
下图是设置消息属性的 API:
消息生产者
1 | ...... |
消息消费者
1 | ...... |
消息的持久化
什么是持久化消息?
保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
我的理解:在消息生产者将消息成功发送给MQ消息中间件之后。无论是出现任何问题,如:MQ服务器宕机、消费者掉线等。都保证(topic要之前注册过,queue不用)消息消费者,能够成功消费消息。如果消息生产者发送消息就失败了,那么消费者也不会消费到该消息。
queue 消息非持久和持久
queue 非持久,当服务器宕机,消息不存在(消息丢失了)。
queue 持久化,当服务器宕机,消息依然存在。queue消息默认是持久化的。
持久化消息,保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。
可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
topic 消息持久化
topic默认就是非持久化的
,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。
topic 消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。
注意:
- 一定要先运行一次消费者,等于向 MQ 注册,类似我订阅了这个主题。
- 然后再运行生产者发送消息。
- 之后无论消费者是否在线,都会收到消息。如果不在线的话,下次连接的时候,会把没有收过的消息都接收过来。
持久化 topic 生产者代码:
1 | package com.itjing.activemq.topic; |
持久化 topic 消费者代码:
1 | package com.itjing.activemq.topic; |
控制台介绍:
topic 页面还是和之前的一样。另外在subscribers
页面也会显示。如下:
消息的事务性
生产者开启事务后,执行 commit 方法,这批消息才真正的被提交。不执行 commit 方法,这批消息不会提交。执行 rollback 方法,之前的消息会回滚掉。生产者的事务机制,要高于签收机制,当生产者开启事务,签收机制不再重要。
消费者开启事务后,执行 commit 方法,这批消息才算真正的被消费。不执行 commit 方法,这些消息不会标记已消费,下次还会被消费。执行 rollback 方法,是不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息,下次还会被消费。消费者利用commit和rollback方法,甚至能够违反一个消费者只能消费一次消息的原理。
问:消费者和生产者需要同时操作事务才行吗? 答:消费者和生产者的事务,完全没有关联,各自是各自的事务。
消息生产者代码:
1 | package com.itjing.activemq.tx; |
消息消费者代码:
1 | package com.itjing.activemq.tx; |
消费者的控制台输出信息。可以看出 commit 和 rollback 方法的作用。
1 | ***消费者接收到的消息: tx msg--0 |
消息的签收机制
签收的几种方式
① 自动签收
(Session.AUTO_ACKNOWLEDGE):该方式是默认的。该种方式,无需我们程序做任何操作,框架会帮我们自动签收收到的消息。
② 手动签收
(Session.CLIENT_ACKNOWLEDGE):手动签收。该种方式,需要我们手动调用Message.acknowledge()
,来签收消息。如果不签收消息,该消息会被我们反复消费,直到被签收。
③ 允许重复消息
(Session.DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到。
④ 事务下的签收
(Session.SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到。
事务和签收的关系
① 在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务优先于签收,开始事务后,签收机制不再起任何作用。
② 非事务性会话中,消息何时被确认取决于创建会话时的应答模式。
③ 生产者事务开启,只有 commit 后才能将全部消息变为已消费。
④ 事务偏向生产者,签收偏向消费者。也就是说,生产者使用事务更好点,消费者使用签收机制更好点。
代码演示
下面我们演示,非事务下的消费者如何使用手动签收
的方式
消息生产者:
1 | package com.itjing.activemq.tx; |
消息消费者:
1 | package com.itjing.activemq.tx; |
JMS 的点对点总结
点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。
如果在 Session 关闭时有部分消息己被收到但还没有被签收(acknowledged),那当消费者下次连接到相同的队列时,这些消息还会被再次接收。
队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势。
JMS 的发布订阅总结
JMS 的发布订阅总结
JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作 topic。
主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。
主题使得消息订阅者和消息发布者保持互相独立不需要解除即可保证消息的传送。
非持久订阅
非持久订阅只有当客户端处于激活状态,也就是和 MQ 保持连接状态才能收发到某个主题的消息。
如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。
一句话:先订阅注册才能接受到发布,只给订阅者发布消息。
持久订阅
客户端首先向 MQ 注册一个自己的身份 ID 识别号,当这个客户端处于离线时,生产者会为这个 ID 保存所有发送到主题的消息,当客户再次连接到 MQ 的时候,会根据消费者的 ID 得到所有当自己处于离线时发送到主题的消息。
非持久订阅状态下,不能恢复或重新派送一个未签收的消息。
持久订阅才能恢复或重新派送一个未签收的消息。
非持久和持久化订阅如何选择
当所有的消息必须被接收,则用持久化订阅。当消息丢失能够被容忍,则用非持久订阅。
ActiveMQ 的 broker
broker 是什么
相当于一个 ActiveMQ 服务器实例。
说白了,Broker 其实就是实现了用代码的形式启动 ActiveMQ 将 MQ 嵌入到 Java 代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。
这种方式,我们实际开发中很少采用,因为他缺少太多了东西,如:日志,数据存储等等。
启动 broker 时指定配置文件
启动 broker 时指定配置文件,可以帮助我们在一台服务器上启动多个 broker。实际工作中一般一台服务器只启动一个 broker。
1 | [root@iz2ze4lfnjztrjppyfbqo1z apache-activemq-5.14.3]# cd conf/ |
嵌入式的 broker 启动
用 ActiveMQ Broker 作为独立的消息服务器来构建 Java 应用。
ActiveMQ 也支持在 vm 中通信基于嵌入的 broker,能够无缝的集成其他 java 应用。
pom.xml 添加一个依赖:
1 | <dependency> |
嵌入式 broker 的启动类:
1 | public class EmbedBroker { |
发布时间: 2021-01-26
最后更新: 2024-06-24
本文标题: MQ消息中间件之ActiveMQ快速入门(一)
本文链接: https://blog-yilia.xiaojingge.com/posts/e6501e06.html
版权声明: 本作品采用 CC BY-NC-SA 4.0 许可协议进行许可。转载请注明出处!
