1. Maven依赖图
使用 IDEA 可以看到 compile 级别的依赖图如下:

2. 解读
rocketmq-common
主要封装了一些通用的数据类型、枚举、常量等定义,以及一些维护命令的实现,因此我们逐个package理解一下。
2.1. common
class | 说明 |
---|---|
BrokerConfig | Broker的配置JavaBean,内存中的结构:
|
ConfigManager | 配置管理的抽象类,提供了3个方法:
|
DataVersion | 基于时间戳、原子计数器的数据版本 |
MixAll | 预置的系统常量及一些常用方法,常量列表:
|
MQVersion | 预置了MQ的研发版本号,当前版本号是:Version.V3_4_6 |
Pair | 将2个对象封装成一对,未使用 |
ServiceState | 生产者、消费者处理服务的运行状态枚举值:
|
ServiceThread | 服务线程的抽象类,内存中的结构:
|
SystemClock | 可以根据指定的 precision 自动更新的时钟JavaBean,内存中的结构:
|
ThreadFactoryImpl | ThreadFactory 实现类JavaBean,根据 threadIndex 和 threadNamePrefix 生成指定名称的线程,内存中的结构:
|
TopicConfig | Topic配置的JavaBean,内存中的结构:
|
TopicFilterType | Topic 过滤类型的枚举值:
|
UtilAll | 工具类,提供了时间、字符串、byte[]、堆栈等处理和转换的方法 |
BrokerConfig.rocketmqHome
- Broker启动时,加载
brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"
路径下的日志配置MixAll中的静态方法简介:
- getRetryTopic:获取指定
consumerGroup
的重试Topic –>RETRY_GROUP_TOPIC_PREFIX + consumerGroup
- isSysConsumerGroup:指定
consumerGroup
的是否系统内部消费者组 –>consumerGroup.startsWith(CID_RMQ_SYS_PREFIX)
- getDLQTopic:获取指定
consumerGroup
的死信队列 –>DLQ_GROUP_TOPIC_PREFIX + consumerGroup
- brokerVIPChannel:返回指定
brokerAddr
端口号小2的Broker地址;相关端口的问题后续在Broker的解读中会有进一步说明- getPID:通过
RuntimeMXBean
获取当前进程的进程号- string2File:使用
string2FileNotSafe:使用
将指定str
写入指定的fileName
;先将内容写入fileName + ".tmp"
文件,如果fileName
中的内容存在,将原来的内容写入fileName + ".bak"
,删除文件后,将fileName + ".tmp"
重命名到fileName
- string2FileNotSafe:使用
FileWriter
将指定str
写入指定的fileName
- file2String(java.lang.String):使用
file2String(java.io.File)
读取文件内容- file2String(java.net.URL):使用
URLConnection.getInputStream()
获取输入流,构造UTF-8
编码的字符串- file2String(java.io.File):使用
FileReader
读取文件内容,未使用缓冲区,直接填充char[]后转换字符串,大文件慎用- printObjectProperties(org.slf4j.Logger, java.lang.Object):调用
printObjectProperties(org.slf4j.Logger, java.lang.Object, boolean)
,第三个参数默认false
- printObjectProperties(org.slf4j.Logger, java.lang.Object, boolean):使用
Logger
(未指定时,输出到标准输出)输出指定对象的声明的字段,第三个参数控制是否只输出带有com.alibaba.rocketmq.common.annotation.ImportantField
注解的字段- properties2String:将
Properties
对象转成字符串,每行一组key=value
- string2Properties:将字符串加载进
Properties
对象- object2Properties:将指定对象的字段和值加载进
Properties
对象- properties2Object:遍历指定对象的所有set方法,截取set后的剩余字符串再首字母小写作为字段名,如果指定的
Properties
对象中存在该字段,调用该set方法设置Properties
对象对应的值- isPropertiesEqual:通过
Properties#equals
判断两个Properties
对象是否相等- getLocalInetAddress:遍历
NetworkInterface.getNetworkInterfaces()
中的所有地址- localhost:
InetAddress.getLocalHost().getHostAddress()
- compareAndIncreaseOnly:通过循环和CAS,保证
AtomicLong
的值设置成功,设置的值必须大于设置前的值才能返回true
2.2. common.admin
rocketmq-tools
管理工具用到的一些JavaBean
class | 说明 |
---|---|
ConsumeStats | 管理工具用到的消费状态 |
OffsetWrapper | 管理工具用到的偏移量包装 |
RollbackStats | 管理工具用到的回滚状态 |
TopicOffset | 管理工具用到的Topic偏移量 |
TopicStatsTable | 管理工具用到的Topic状态表 |
2.3. common.annotation
class | 说明 |
---|---|
ImportantField | 用来注解一些配置中的重要项,在维护工具中就可以直接动态输出这些内容 |
注解使用的地方:
com.alibaba.rocketmq.common.BrokerConfig
com.alibaba.rocketmq.filtersrv.FiltersrvConfig
com.alibaba.rocketmq.store.config.MessageStoreConfig
注解判断的地方:
com.alibaba.rocketmq.common.MixAll#printObjectProperties(org.slf4j.Logger, java.lang.Object, boolean)
2.4. common.conflict
class | 说明 |
---|---|
PackageConflictDetect | 用于 fastjson 的版本冲突检测,要求最低版本:1.2.3 |
fastjson 版本冲突检测的地方:
com.alibaba.rocketmq.broker.BrokerStartup
com.alibaba.rocketmq.filtersrv.FiltersrvStartup
com.alibaba.rocketmq.namesrv.NamesrvStartup
com.alibaba.rocketmq.client.impl.factory.MQClientInstance
com.alibaba.rocketmq.tools.command.MQAdminStartup
2.5. common.constant
class | 说明 |
---|---|
DBMsgConstants | 定义了最大消息包体长度 maxBodySize 长度为:64*1024*1204 ; //64KB |
LoggerName | 定义了常用的 slf4j 的 Logger 名字 |
PermName | 主要定义读、写、继承的权限及权限识别方法(位运算) |
LoggerName 的声明列表:
1234567891011121314 public static final String FiltersrvLoggerName = "RocketmqFiltersrv";public static final String NamesrvLoggerName = "RocketmqNamesrv";public static final String BrokerLoggerName = "RocketmqBroker";public static final String ClientLoggerName = "RocketmqClient";public static final String ToolsLoggerName = "RocketmqTools";public static final String CommonLoggerName = "RocketmqCommon";public static final String StoreLoggerName = "RocketmqStore";public static final String StoreErrorLoggerName = "RocketmqStoreError";public static final String TransactionLoggerName = "RocketmqTransaction";public static final String RebalanceLockLoggerName = "RocketmqRebalanceLock";public static final String RocketmqStatsLoggerName = "RocketmqStats";public static final String CommercialLoggerName = "RocketmqCommercial";public static final String FlowControlLoggerName = "RocketmqFlowControl";public static final String RocketmqAuthorizeLoggerName = "RocketmqAuthorize";题外话:
12345678910111213141516 public static String perm2String(final int perm) {final StringBuffer sb = new StringBuffer("---");if (isReadable(perm)) {sb.replace(0, 1, "R");}if (isWriteable(perm)) {sb.replace(1, 2, "W");}if (isInherited(perm)) {sb.replace(2, 3, "X");}return sb.toString();}因为不是核心代码,调用次数也不多,可以忽略,但是还是倾向用如下代码实现,有点强迫症的赶脚啊。
1234567 public static String perm2String(final int perm) {final StringBuilder sb = new StringBuilder(3);sb.append(isReadable(perm)?'R':'-');sb.append(isWriteable(perm)?'W':'-');sb.append(isInherited(perm)?'X':'-');return sb.toString();}
2.6. common.consumer
class | 说明 |
---|---|
ConsumeFromWhere | 定义了消费偏移量起始位置的枚举值 |
过期的枚举已经全部被
CONSUME_FROM_LAST_OFFSET
替代
123456789101112 public enum ConsumeFromWhere {CONSUME_FROM_LAST_OFFSET,CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,CONSUME_FROM_MIN_OFFSET,CONSUME_FROM_MAX_OFFSET,CONSUME_FROM_FIRST_OFFSET,CONSUME_FROM_TIMESTAMP,}
2.7. common.filter
class | 说明 | |
---|---|---|
FilterAPI | 主要提供了2个 static 方法:
|
|
FilterContext | 仅存储 consumerGroup 的JavaBean,在接口方法 com.alibaba.rocketmq.common.filter.MessageFilter#match 中使用 |
|
MessageFilter | 仅声明接口方法 boolean match(final MessageExt msg, final FilterContext context); |
subString中如果存在多个tag,由双竖线分隔:
tag1 || tag2 || tag3
2.8. common.filter.impl
class | 说明 |
---|---|
Op | 仅存储 symbol 的抽象类 |
Operand | Op 的子类,无特殊实现 |
Operator | Op 的子类,扩展了 priority 优先级、compareable 是否可比较2个属性;当 compareable 为 true 时,priority 值大返回1,相等返回0,否则返回-1;不提供构造方法,仅通过 com.alibaba.rocketmq.common.filter.impl.Operator#createOperator 根据入参字面量返回预置的4种静态常量:
|
PolishExpr | 逆波兰表达式输出。未使用 |
Type | 类型枚举,用于 PolishExpr 解析表达式:
|
Operator预置静态常量:
1234 public static final Operator LEFTPARENTHESIS = new Operator("(", 30, false);public static final Operator RIGHTPARENTHESIS = new Operator(")", 30, false);public static final Operator AND = new Operator("&&", 20, true);public static final Operator OR = new Operator("||", 15, true);Reverse Polish Notation:逆波兰表示法
- 逆波兰记法中,操作符置于操作数的后面。例如表达“三加四”时,写作“3 4 +”,而不是“3 + 4”。如果有多个操作符,操作符置于第二个操作数的后面,所以常规中缀记法的“3 - 4 + 5”在逆波兰记法中写作“3 4 - 5 +”:先3减去4,再加上5。使用逆波兰记法的一个好处是不需要使用括号。例如中缀记法中“3 - 4 5”与“(3 - 4)5”不相同,但后缀记法中前者写做“3 4 5 -”,无歧义地表示“3 (4 5 ) −”;后者写做“3 4 - 5 *”。
2.9. common.help
class | 说明 |
---|---|
FAQUrl | 预置了一些典型问题链接、2个 static 方法用于提示解决方案 |
典型问题列表:
2.10. common.hook
class | 说明 |
---|---|
FilterCheckHook | 应该是用作客户端Filter检查的钩子。Class ‘FilterCheckHook’ is never used |
2.11. common.message
class | 说明 |
---|---|
Message | 消息的JavaBean,内存中的结构包含4个部分:
封装了获取消息属性、内容的方法,部分消息属性的设置方法权限为 default 在 MessageAccessor 中提供了 static 操作方法,直接设置预置属性会抛出异常 |
MessageAccessor | 提供了 Message 属性的 static 操作方法 |
MessageConst | Message 中用到的属性静态常量及这些常量集 systemKeySet |
MessageDecoder | 提供了 MessageId 生成和解析方法、MessageExt 及消息属性的序列化和反序列化方法 |
MessageExt | Message 的子类,扩展了queueId、storeSize、queueOffset、sysFlag、bornTimestamp、storeTimestamp等等 |
MessageId | 消息ID的JavaBean,16字节。内存中的结构包含2个部分:
|
MessageQueue | 消息队列的JavaBean,内存结构包含3个部分:
重写了 hashcode 和 equals 方法,实现了 Comparable 接口 |
MessageQueueForC | 增加了offset的 MessageQueue ,com.alibaba.rocketmq.broker.client.net.Broker2Client#convertOffsetTable2OffsetList 中生成对象 |
2.12. common.namesrv
class | 说明 |
---|---|
NamesrvConfig | NameServer的配置类JavaBean,内存结构包含4个部分:
|
NamesrvUtil | 定义常量 public static final String NAMESPACE_ORDER_TOPIC_CONFIG = "ORDER_TOPIC_CONFIG"; |
RegisterBrokerResult | Broker注册结果的JavaBean,内存结构包含3个部分:
|
TopAddressing | 查询NameServer信息的配置类JavaBean,内存结构包含3个部分:
提供了 #fetchNSAddr() 方法从 wsAddr 和 unitName 组成的url查询最新的NameServer地址 |
wsAddr
值参见:MixAll.WS_ADDR
2.13. common.protocol
class | 说明 |
---|---|
RequestCode | 请求码 |
ResponseCode | 应答码 |
2.14. common.protocol.body
class | 说明 |
---|---|
CMResult | 消费结果枚举值:
|
2.15. common.protocol.heartbeat
class | 说明 |
---|---|
ConsumeType | 消费类型枚举值:
|
MessageModel | 消息模式枚举值:
|
2.16. common.protocol.topic
class | 说明 |
---|---|
RequestCode | 定义了请求消息常量 |
ResponseCode | 定义了应答消息常量 |
包下其他类,多为JavaBean,不做赘述
2.17. common.running
class | 说明 |
---|---|
RunningStats | 运行状态枚举值:
|
2.18. common.stats
class | 说明 |
---|---|
StatsItem | 提供了生成状态快照 StatsSnapshot 的方法 |
StatsSnapshot | 状态快照JavaBean |
2.19. common.subscription
class | 说明 |
---|---|
SubscriptionGroupConfig | 订阅组配置JavaBean,内存结构包含8个部分:
|
2.20. common.sysflag
class | 说明 |
---|---|
MessageSysFlag | 位运算提供消息的压缩、多Tag、事务类型等标记的判断 |
PullSysFlag | 位运算提供拉取消息的确认偏移量、阻塞、订阅、过滤等标记的判断 |
TopicSysFlag | 位运算提供主题标记的判断 |
2.21. common.utils
class | 说明 |
---|---|
ChannelUtil | 仅提供获取远端IP的 static 方法 |
HttpTinyClient | 提供了 java.net.HttpURLConnection 封装的,httpGet、httpPost方法 |
IOTinyUtils | 提供了一些IO操作供 HttpTinyClient 使用 |
3. 其他
通过搜索,发现了 rocketmq-common
中使用的一些环境变量
源文件:BrokerConfig
rocketmq.home.dir
默认值 用途 操作系统环境变量 ROCKETMQ_HOME
设置Broker的工作目录,目前用来定位 /conf/logback_broker.xml
配置文件,和定位文件%s/bin/startfsrv.sh
rocketmq.namesrv.addr
默认值 用途 操作系统环境变量 NAMESRV_ADDR
设置NameServer的地址
源文件:MixAll
rocketmq.namesrv.domain
默认值 用途 jmenv.tbsite.net 指定在线NameServer查询服务的域名 rocketmq.namesrv.domain.subgroup
默认值 用途 nsaddr 指定在线NameServer查询服务的URL后缀
源文件:PackageConflictDetect
com.alibaba.rocketmq.packageConflictDetect.enable
默认值 用途 true 是否进行包冲突检测,这里仅检测fastjson必须大于 1.2.3
版本
源文件:NamesrvConfig
rocketmq.home.dir
默认值 用途 操作系统环境变量 ROCKETMQ_HOME
设置NameServer的工作目录,目前用来定位 /conf/logback_namesrv.xml
配置文件