1. Maven依赖图

使用 IDEA 可以看到 compile 级别的依赖图如下:

2. 解读

rocketmq-common 主要封装了一些通用的数据类型、枚举、常量等定义,以及一些维护命令的实现,因此我们逐个package理解一下。

2.1. common

class 说明
BrokerConfig Broker的配置JavaBean,内存中的结构:
  1. rocketmqHome:默认值 System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,System.getenv(MixAll.ROCKETMQ_HOME_ENV));其中 ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir" ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME"
  2. namesrvAddr:默认值 System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,System.getenv(MixAll.NAMESRV_ADDR_ENV));其中 NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr" NAMESRV_ADDR_ENV = "NAMESRV_ADDR"
  3. brokerIP1:默认值 RemotingUtil.getLocalAddress();取网卡所有IPV4地址不为 127.0192.168 开头的第一个,IPV4地址为空则取IPV6第一个地址,否则返回 InetAddress.getLocalHost()
  4. brokerIP2:默认值 RemotingUtil.getLocalAddress();同 brokerIP1
  5. brokerName:默认值 localHostName();不能获取 InetAddress.getLocalHost().getHostName() 时返回固定值 DEFAULT_BROKER
  6. brokerClusterName:默认值 "DefaultCluster"
  7. brokerId:默认值 MixAll.MASTER_ID;其中 MASTER_ID = 0L
  8. brokerPermission:默认值 PermName.PERM_READ|PermName.PERM_WRITE
  9. defaultTopicQueueNums:默认值 8
  10. autoCreateTopicEnable:默认值 true
  11. clusterTopicEnable:默认值 true
  12. brokerTopicEnable:默认值 true
  13. autoCreateSubscriptionGroup:默认值 true
  14. sendMessageThreadPoolNums:默认值 16 + Runtime.getRuntime().availableProcessors() * 4
  15. pullMessageThreadPoolNums:默认值 16 + Runtime.getRuntime().availableProcessors() * 2
  16. adminBrokerThreadPoolNums:默认值 16
  17. clientManageThreadPoolNums:默认值 16
  18. flushConsumerOffsetInterval:默认值 1000 * 5
  19. flushConsumerOffsetHistoryInterval:默认值 1000 * 60
  20. rejectTransactionMessage:默认值 false
  21. fetchNamesrvAddrByAddressServer:默认值 false
  22. sendThreadPoolQueueCapacity:默认值 100000
  23. pullThreadPoolQueueCapacity:默认值 100000
  24. filterServerNums:默认值 0
  25. longPollingEnable:默认值 true
  26. shortPollingTimeMills:默认值 1000
  27. notifyConsumerIdsChangedEnable:默认值 true
  28. highSpeedMode:默认值 false
  29. transferMsgByHeap:默认值 false
ConfigManager 配置管理的抽象类,提供了3个方法:
  1. load:从文件加载文件反序列化,出错时尝试加载 .bak 文件反序列化
  2. persist:同步方法,将配置序列化到文件
DataVersion 基于时间戳、原子计数器的数据版本
MixAll 预置的系统常量及一些常用方法,常量列表:
  1. ROCKETMQ_HOME_ENV:默认值 "ROCKETMQ_HOME"
  2. ROCKETMQ_HOME_PROPERTY:默认值 "rocketmq.home.dir"
  3. NAMESRV_ADDR_ENV:默认值 "NAMESRV_ADDR"
  4. NAMESRV_ADDR_PROPERTY:默认值 "rocketmq.namesrv.addr"
  5. MESSAGE_COMPRESS_LEVEL:默认值 "rocketmq.message.compressLevel"
  6. WS_DOMAIN_NAME:默认值 System.getProperty("rocketmq.namesrv.domain", "jmenv.tbsite.net")
  7. WS_DOMAIN_SUBGROUP:默认值 System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr")
  8. WS_ADDR:当启动Broker时,没有设置NameServer会从这个地址自动获取,因此除了Ali自己用,一般都要在Broker的配置文件中配置NameServer,否则Broker起来了也不会如预期一样工作,默认值 "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP
  9. DEFAULT_TOPIC:默认值 "TBW102"
  10. BENCHMARK_TOPIC:默认值 "BenchmarkTest"
  11. DEFAULT_PRODUCER_GROUP:默认值 "DEFAULT_PRODUCER"
  12. DEFAULT_CONSUMER_GROUP:默认值 "DEFAULT_CONSUMER"
  13. TOOLS_CONSUMER_GROUP:默认值 "TOOLS_CONSUMER"
  14. FILTERSRV_CONSUMER_GROUP:默认值 "FILTERSRV_CONSUMER"
  15. MONITOR_CONSUMER_GROUP:默认值 "__MONITOR_CONSUMER"
  16. CLIENT_INNER_PRODUCER_GROUP:默认值 "CLIENT_INNER_PRODUCER"
  17. SELF_TEST_PRODUCER_GROUP:默认值 "SELF_TEST_P_GROUP"
  18. SELF_TEST_CONSUMER_GROUP:默认值 "SELF_TEST_C_GROUP"
  19. SELF_TEST_TOPIC:默认值 "SELF_TEST_TOPIC"
  20. OFFSET_MOVED_EVENT:默认值 "OFFSET_MOVED_EVENT"
  21. ONS_HTTP_PROXY_GROUP:默认值 "CID_ONS-HTTP-PROXY"
  22. CID_ONSAPI_PERMISSION_GROUP:默认值 "CID_ONSAPI_PERMISSION"
  23. CID_ONSAPI_OWNER_GROUP:默认值 "CID_ONSAPI_OWNER"
  24. CID_ONSAPI_PULL_GROUP:默认值 "CID_ONSAPI_PULL"
  25. CID_RMQ_SYS_PREFIX:默认值 "CID_RMQ_SYS_"
  26. LocalInetAddrs:获取所有网卡的地址,未使用
  27. Localhost:未使用,默认值 InetAddress.getLocalHost().getHostAddress()
  28. DEFAULT_CHARSET:默认值 "UTF-8"
  29. MASTER_ID:默认值 0L
  30. CURRENT_JVM_PID:通过 RuntimeMXBean 获取当前进程的进程号,未使用
  31. RETRY_GROUP_TOPIC_PREFIX:重试Topic的前缀,默认值 "%RETRY%"
  32. DLQ_GROUP_TOPIC_PREFIX:DLQ是 Dead Letter Queue 的缩写,死信队列的前缀,默认值 "%DLQ%"
  33. SYSTEM_TOPIC_PREFIX:系统Topic的前缀,未使用,默认值 "rmq_sys_"
MQVersion 预置了MQ的研发版本号,当前版本号是:Version.V3_4_6
Pair 将2个对象封装成一对,未使用
ServiceState 生产者、消费者处理服务的运行状态枚举值:
  1. CREATE_JUST:刚创建未启动
  2. RUNNING:运行中
  3. SHUTDOWN_ALREADY
  4. START_FAILED
ServiceThread 服务线程的抽象类,内存中的结构:
  1. thread:内部封装起来的线程,线程名为实现类返回的服务名
  2. JoinTime:服务在 shutdown 的时候线程join的时间,默认值 90 * 1000
  3. hasNotified:默认值 false
  4. stoped:默认值 false
SystemClock 可以根据指定的 precision 自动更新的时钟JavaBean,内存中的结构:
  1. precision:自动更新的周期及修正精度
  2. now:当前时钟,更新时设置为 System.currentTimeMillis()
ThreadFactoryImpl ThreadFactory 实现类JavaBean,根据 threadIndexthreadNamePrefix 生成指定名称的线程,内存中的结构:
  1. threadIndex:默认值 new AtomicLong(0)
  2. threadNamePrefix:
TopicConfig Topic配置的JavaBean,内存中的结构:
  1. DefaultReadQueueNums:默认值 16
  2. DefaultWriteQueueNums:默认值 16
  3. SEPARATOR:静态变量分隔符,未使用,默认值 " "
  4. topicName:需要设置
  5. readQueueNums:默认值 DefaultReadQueueNums
  6. writeQueueNums:默认值 DefaultWriteQueueNums
  7. perm:默认值 PermName.PERM_READ|PermName.PERM_WRITE
  8. topicFilterType:默认值 TopicFilterType.SINGLE_TAG
  9. topicSysFlag:默认值 0
  10. order:默认值 false
TopicFilterType Topic 过滤类型的枚举值:
  1. SINGLE_TAG
  2. MULTI_TAG
UtilAll 工具类,提供了时间、字符串、byte[]、堆栈等处理和转换的方法

BrokerConfig.rocketmqHome

  • Broker启动时,加载 brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml" 路径下的日志配置

MixAll中的静态方法简介:

  • getRetryTopic:获取指定 consumerGroup 的重试Topic –> RETRY_GROUP_TOPIC_PREFIX + consumerGroup
  • isSysConsumerGroup:指定 consumerGroup 的是否系统内部消费者组 –> consumerGroup.startsWith(CID_RMQ_SYS_PREFIX)
  • getDLQTopic:获取指定 consumerGroup 的死信队列 –> DLQ_GROUP_TOPIC_PREFIX + consumerGroup
  • brokerVIPChannel:返回指定 brokerAddr 端口号小2的Broker地址;相关端口的问题后续在Broker的解读中会有进一步说明
  • getPID:通过 RuntimeMXBean 获取当前进程的进程号
  • string2File:使用 string2FileNotSafe:使用 将指定 str 写入指定的 fileName;先将内容写入 fileName + ".tmp" 文件,如果 fileName 中的内容存在,将原来的内容写入 fileName + ".bak",删除文件后,将 fileName + ".tmp" 重命名到 fileName
  • string2FileNotSafe:使用 FileWriter 将指定 str 写入指定的 fileName
  • file2String(java.lang.String):使用 file2String(java.io.File) 读取文件内容
  • file2String(java.net.URL):使用 URLConnection.getInputStream() 获取输入流,构造 UTF-8 编码的字符串
  • file2String(java.io.File):使用 FileReader 读取文件内容,未使用缓冲区,直接填充char[]后转换字符串,大文件慎用
  • printObjectProperties(org.slf4j.Logger, java.lang.Object):调用 printObjectProperties(org.slf4j.Logger, java.lang.Object, boolean),第三个参数默认 false
  • printObjectProperties(org.slf4j.Logger, java.lang.Object, boolean):使用 Logger (未指定时,输出到标准输出)输出指定对象的声明的字段,第三个参数控制是否只输出带有 com.alibaba.rocketmq.common.annotation.ImportantField 注解的字段
  • properties2String:将 Properties 对象转成字符串,每行一组 key=value
  • string2Properties:将字符串加载进 Properties 对象
  • object2Properties:将指定对象的字段和值加载进 Properties 对象
  • properties2Object:遍历指定对象的所有set方法,截取set后的剩余字符串再首字母小写作为字段名,如果指定的 Properties 对象中存在该字段,调用该set方法设置 Properties 对象对应的值
  • isPropertiesEqual:通过 Properties#equals 判断两个 Properties 对象是否相等
  • getLocalInetAddress:遍历 NetworkInterface.getNetworkInterfaces() 中的所有地址
  • localhost:InetAddress.getLocalHost().getHostAddress()
  • compareAndIncreaseOnly:通过循环和CAS,保证 AtomicLong 的值设置成功,设置的值必须大于设置前的值才能返回 true

2.2. common.admin

rocketmq-tools 管理工具用到的一些JavaBean

class 说明
ConsumeStats 管理工具用到的消费状态
OffsetWrapper 管理工具用到的偏移量包装
RollbackStats 管理工具用到的回滚状态
TopicOffset 管理工具用到的Topic偏移量
TopicStatsTable 管理工具用到的Topic状态表

2.3. common.annotation

class 说明
ImportantField 用来注解一些配置中的重要项,在维护工具中就可以直接动态输出这些内容

注解使用的地方:

  1. com.alibaba.rocketmq.common.BrokerConfig
  2. com.alibaba.rocketmq.filtersrv.FiltersrvConfig
  3. com.alibaba.rocketmq.store.config.MessageStoreConfig

注解判断的地方:

  1. com.alibaba.rocketmq.common.MixAll#printObjectProperties(org.slf4j.Logger, java.lang.Object, boolean)

2.4. common.conflict

class 说明
PackageConflictDetect 用于 fastjson 的版本冲突检测,要求最低版本:1.2.3

fastjson 版本冲突检测的地方:

  1. com.alibaba.rocketmq.broker.BrokerStartup
  2. com.alibaba.rocketmq.filtersrv.FiltersrvStartup
  3. com.alibaba.rocketmq.namesrv.NamesrvStartup
  4. com.alibaba.rocketmq.client.impl.factory.MQClientInstance
  5. com.alibaba.rocketmq.tools.command.MQAdminStartup

2.5. common.constant

class 说明
DBMsgConstants 定义了最大消息包体长度 maxBodySize 长度为:64*1024*1204; //64KB
LoggerName 定义了常用的 slf4j 的 Logger 名字
PermName 主要定义读、写、继承的权限及权限识别方法(位运算)

LoggerName 的声明列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static final String FiltersrvLoggerName = "RocketmqFiltersrv";
public static final String NamesrvLoggerName = "RocketmqNamesrv";
public static final String BrokerLoggerName = "RocketmqBroker";
public static final String ClientLoggerName = "RocketmqClient";
public static final String ToolsLoggerName = "RocketmqTools";
public static final String CommonLoggerName = "RocketmqCommon";
public static final String StoreLoggerName = "RocketmqStore";
public static final String StoreErrorLoggerName = "RocketmqStoreError";
public static final String TransactionLoggerName = "RocketmqTransaction";
public static final String RebalanceLockLoggerName = "RocketmqRebalanceLock";
public static final String RocketmqStatsLoggerName = "RocketmqStats";
public static final String CommercialLoggerName = "RocketmqCommercial";
public static final String FlowControlLoggerName = "RocketmqFlowControl";
public static final String RocketmqAuthorizeLoggerName = "RocketmqAuthorize";

题外话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static String perm2String(final int perm) {
final StringBuffer sb = new StringBuffer("---");
if (isReadable(perm)) {
sb.replace(0, 1, "R");
}
if (isWriteable(perm)) {
sb.replace(1, 2, "W");
}
if (isInherited(perm)) {
sb.replace(2, 3, "X");
}
return sb.toString();
}

因为不是核心代码,调用次数也不多,可以忽略,但是还是倾向用如下代码实现,有点强迫症的赶脚啊。

1
2
3
4
5
6
7
public static String perm2String(final int perm) {
final StringBuilder sb = new StringBuilder(3);
sb.append(isReadable(perm)?'R':'-');
sb.append(isWriteable(perm)?'W':'-');
sb.append(isInherited(perm)?'X':'-');
return sb.toString();
}

2.6. common.consumer

class 说明
ConsumeFromWhere 定义了消费偏移量起始位置的枚举值

过期的枚举已经全部被 CONSUME_FROM_LAST_OFFSET 替代

1
2
3
4
5
6
7
8
9
10
11
12
public enum ConsumeFromWhere {
CONSUME_FROM_LAST_OFFSET,
@Deprecated
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
@Deprecated
CONSUME_FROM_MIN_OFFSET,
@Deprecated
CONSUME_FROM_MAX_OFFSET,
CONSUME_FROM_FIRST_OFFSET,
CONSUME_FROM_TIMESTAMP,
}

2.7. common.filter

class 说明
FilterAPI 主要提供了2个 static 方法:
  1. simpleClassName:截取字符串的方式在完成类名中获取简短类名
  2. buildSubscriptionData:根据consumerGroup、topic、subString构造一个 SubscriptionData
FilterContext 仅存储 consumerGroup 的JavaBean,在接口方法 com.alibaba.rocketmq.common.filter.MessageFilter#match 中使用
MessageFilter 仅声明接口方法 boolean match(final MessageExt msg, final FilterContext context);

subString中如果存在多个tag,由双竖线分隔:tag1 || tag2 || tag3

2.8. common.filter.impl

class 说明
Op 仅存储 symbol 的抽象类
Operand Op 的子类,无特殊实现
Operator Op 的子类,扩展了 priority 优先级、compareable 是否可比较2个属性;
compareabletrue 时,priority 值大返回1,相等返回0,否则返回-1;
不提供构造方法,仅通过 com.alibaba.rocketmq.common.filter.impl.Operator#createOperator 根据入参字面量返回预置的4种静态常量:
  1. LEFTPARENTHESIS
  2. RIGHTPARENTHESIS
  3. AND
  4. OR
PolishExpr 逆波兰表达式输出。未使用
Type 类型枚举,用于 PolishExpr 解析表达式:
  1. NULL
  2. OPERAND
  3. OPERATOR
  4. PARENTHESIS
  5. SEPAERATOR

Operator预置静态常量:

1
2
3
4
public static final Operator LEFTPARENTHESIS = new Operator("(", 30, false);
public static final Operator RIGHTPARENTHESIS = new Operator(")", 30, false);
public static final Operator AND = new Operator("&&", 20, true);
public static final Operator OR = new Operator("||", 15, true);

Reverse Polish Notation:逆波兰表示法

  • 逆波兰记法中,操作符置于操作数的后面。例如表达“三加四”时,写作“3 4 +”,而不是“3 + 4”。如果有多个操作符,操作符置于第二个操作数的后面,所以常规中缀记法的“3 - 4 + 5”在逆波兰记法中写作“3 4 - 5 +”:先3减去4,再加上5。使用逆波兰记法的一个好处是不需要使用括号。例如中缀记法中“3 - 4 5”与“(3 - 4)5”不相同,但后缀记法中前者写做“3 4 5 -”,无歧义地表示“3 (4 5 ) −”;后者写做“3 4 - 5 *”。

2.9. common.help

class 说明
FAQUrl 预置了一些典型问题链接、2个 static 方法用于提示解决方案

典型问题列表:

2.10. common.hook

class 说明
FilterCheckHook 应该是用作客户端Filter检查的钩子。Class ‘FilterCheckHook’ is never used

2.11. common.message

class 说明
Message 消息的JavaBean,内存中的结构包含4个部分:
  1. topic:主题名
  2. flag:未找到说明
  3. properties:Map 结构存储消息属性
  4. body:byte[] 结构存储消息内容

封装了获取消息属性、内容的方法,部分消息属性的设置方法权限为 defaultMessageAccessor 中提供了 static 操作方法,直接设置预置属性会抛出异常
MessageAccessor 提供了 Message 属性的 static 操作方法
MessageConst Message 中用到的属性静态常量及这些常量集 systemKeySet
MessageDecoder 提供了 MessageId 生成和解析方法、MessageExt 及消息属性的序列化和反序列化方法
MessageExt Message 的子类,扩展了queueId、storeSize、queueOffset、sysFlag、bornTimestamp、storeTimestamp等等
MessageId 消息ID的JavaBean,16字节。内存中的结构包含2个部分:
  1. address:SocketAddress类型的broker监听地址
  2. offset:long类型的偏移量
MessageQueue 消息队列的JavaBean,内存结构包含3个部分:
  1. topic
  2. brokerName
  3. queueId

重写了 hashcodeequals 方法,实现了 Comparable 接口
MessageQueueForC 增加了offset的 MessageQueuecom.alibaba.rocketmq.broker.client.net.Broker2Client#convertOffsetTable2OffsetList 中生成对象

2.12. common.namesrv

class 说明
NamesrvConfig NameServer的配置类JavaBean,内存结构包含4个部分:
  1. rocketmqHome:先取JVM变量rocketmq.home.dir,未设置时取系统环境变量ROCKETMQ_HOME
  2. kvConfigPath:用户目录下/namesrv/kvConfig.json
  3. productEnvName:默认”center”
  4. clusterTest:默认false
NamesrvUtil 定义常量 public static final String NAMESPACE_ORDER_TOPIC_CONFIG = "ORDER_TOPIC_CONFIG";
RegisterBrokerResult Broker注册结果的JavaBean,内存结构包含3个部分:
  1. haServerAddr:
  2. masterAddr:主节点地址
  3. kvTable:KVTable类型的其他值
TopAddressing 查询NameServer信息的配置类JavaBean,内存结构包含3个部分:
  1. nsAddr
  2. wsAddr
  3. unitName:不为空时,在 wsAddr 后追加 -${unitName}?nofix=1

提供了 #fetchNSAddr() 方法从 wsAddrunitName 组成的url查询最新的NameServer地址

wsAddr 值参见:MixAll.WS_ADDR

2.13. common.protocol

class 说明
RequestCode 请求码
ResponseCode 应答码

2.14. common.protocol.body

class 说明
CMResult 消费结果枚举值:
  1. CR_SUCCESS
  2. CR_LATER
  3. CR_ROLLBACK
  4. CR_COMMIT
  5. CR_THROW_EXCEPTION
  6. CR_RETURN_NULL

2.15. common.protocol.heartbeat

class 说明
ConsumeType 消费类型枚举值:
  1. CONSUME_ACTIVELY:主动型
  2. CONSUME_PASSIVELY:被动型
MessageModel 消息模式枚举值:
  1. BROADCASTING:广播,每个消费者拥有自己的队列
  2. CLUSTERING:集群,同集群中的消费者作为一组从一个队列消费

2.16. common.protocol.topic

class 说明
RequestCode 定义了请求消息常量
ResponseCode 定义了应答消息常量

包下其他类,多为JavaBean,不做赘述

2.17. common.running

class 说明
RunningStats 运行状态枚举值:
  1. commitLogMaxOffset
  2. commitLogMinOffset
  3. commitLogDiskRatio
  4. consumeQueueDiskRatio
  5. scheduleMessageOffset

2.18. common.stats

class 说明
StatsItem 提供了生成状态快照 StatsSnapshot 的方法
StatsSnapshot 状态快照JavaBean

2.19. common.subscription

class 说明
SubscriptionGroupConfig 订阅组配置JavaBean,内存结构包含8个部分:
  1. groupName:组名,需要设置
  2. consumeEnable:是否允许消费,默认值 true
  3. consumeFromMinEnable:未使用
  4. consumeBroadcastEnable:是否开启广播模式消费,默认值 true
  5. retryQueueNums:最大Topic队列数,默认值 1
  6. retryMaxTimes:重试最大次数,默认值 16
  7. brokerId:当前Broker的id,默认值 MixAll.MASTER_ID=0
  8. whichBrokerWhenConsumeSlowly:当有慢消费时推荐的Broker的id,默认值 1

2.20. common.sysflag

class 说明
MessageSysFlag 位运算提供消息的压缩、多Tag、事务类型等标记的判断
PullSysFlag 位运算提供拉取消息的确认偏移量、阻塞、订阅、过滤等标记的判断
TopicSysFlag 位运算提供主题标记的判断

2.21. common.utils

class 说明
ChannelUtil 仅提供获取远端IP的 static 方法
HttpTinyClient 提供了 java.net.HttpURLConnection 封装的,httpGet、httpPost方法
IOTinyUtils 提供了一些IO操作供 HttpTinyClient 使用

3. 其他

通过搜索,发现了 rocketmq-common 中使用的一些环境变量

源文件:BrokerConfig

rocketmq.home.dir

默认值 用途
操作系统环境变量 ROCKETMQ_HOME 设置Broker的工作目录,目前用来定位 /conf/logback_broker.xml 配置文件,和定位文件 %s/bin/startfsrv.sh

rocketmq.namesrv.addr

默认值 用途
操作系统环境变量 NAMESRV_ADDR 设置NameServer的地址

源文件:MixAll

rocketmq.namesrv.domain

默认值 用途
jmenv.tbsite.net 指定在线NameServer查询服务的域名

rocketmq.namesrv.domain.subgroup

默认值 用途
nsaddr 指定在线NameServer查询服务的URL后缀

源文件:PackageConflictDetect

com.alibaba.rocketmq.packageConflictDetect.enable

默认值 用途
true 是否进行包冲突检测,这里仅检测fastjson必须大于 1.2.3 版本

源文件:NamesrvConfig

rocketmq.home.dir

默认值 用途
操作系统环境变量 ROCKETMQ_HOME 设置NameServer的工作目录,目前用来定位 /conf/logback_namesrv.xml 配置文件