1. 解读
1.1. NamesrvStartup 处理流程
设置版本号
com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
com.alibaba.rocketmq.broker.processor.AdminBrokerProcessor#callConsumer设置Socket缓冲区
如果没有设置 SEND 缓冲区:NettySystemConfig.SocketSndbufSize = 2048
如果没有设置 RECV 缓冲区:NettySystemConfig.SocketRcvbufSize = 2048
FastJson版本冲突检测
com.alibaba.rocketmq.common.conflict.PackageConflictDetect#detectFastjson
命令行参数解析
使用com.alibaba.rocketmq.srvutil.ServerUtil#parseCmdLine
解析命令行,如果解析失败,则程序退出配置文件加载
命令行中包含-c
选项,则加载指定的 properties 配置文件,并通过com.alibaba.rocketmq.common.MixAll#properties2Object
设置NamesrvConfig
、NettyServerConfig
。其中,如果配置中没有设置 ListenPort,默认为9876
输出配置参数列表
命令行中包含-p
选项,则通过com.alibaba.rocketmq.common.MixAll#printObjectProperties(org.slf4j.Logger, java.lang.Object)
输出NamesrvConfig
、NettyServerConfig
的声明字段,输出完成后程序退出配置 NamesrvConfig
通过com.alibaba.rocketmq.common.MixAll#properties2Object
将命令行参数加到NamesrvConfig
配置 logback
logback 配置文件固定:namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"
。
注意:此前会先判断 namesrvConfig.getRocketmqHome() 未设置时,会导致程序退出。不过这种情况仅发生在直接运行
com.alibaba.rocketmq.namesrv.NamesrvStartup#main
,如果是脚本启动,会自动将该值设置为mqnamesrv
脚本的上上层目录实例化并初始化
NamesrvController
使用 NamesrvConfig、NettyServerConfig 实例化NamesrvController
然后调用com.alibaba.rocketmq.namesrv.NamesrvController#initialize
初始化,初始化失败时程序退出
否则增加 ShutdownHook ,在程序退出时记录日志,并调用com.alibaba.rocketmq.namesrv.NamesrvController#shutdown
释放资源启动
NamesrvController
调用com.alibaba.rocketmq.namesrv.NamesrvController#start
启动线程工作
1.2. NamesrvController 初始化流程
加载 KvConfig
如果NamesrvConfig
中kvConfigPath
路径指定的文件内容不为空,则将文件存储的 json 内容加载进Map:HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>>
实例化
NettyRemotingServer
rocketmq-remoting
模块中的com.alibaba.rocketmq.remoting.netty.NettyRemotingServer#NettyRemotingServer(com.alibaba.rocketmq.remoting.netty.NettyServerConfig, com.alibaba.rocketmq.remoting.ChannelEventListener)
构造 RemotingExecutorThread 线程池
使用java.util.concurrent.Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory)
构造固定数目线程池,数目由NettyServerConfig.getServerWorkerThreads
决定注册
NamesrvConfig
中clusterTest
为true
时,调用com.alibaba.rocketmq.remoting.RemotingServer#registerDefaultProcessor
将一个com.alibaba.rocketmq.namesrv.processor.ClusterTestRequestProcessor
和 RemotingExecutorThread 线程池绑定起来;否则将一个com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor
和 RemotingExecutorThread 线程池绑定起来。绑定到NettyRemotingServer
的defaultRequestProcessor
上两者区别:
ClusterTestRequestProcessor
继承自DefaultRequestProcessor
并重写了getRouteInfoByTopic
方法,RouteInfoManager
中未能获取到TopicRouteData
时,去集群上查询一次启动定时任务
- 5s 后启动,每隔 10s 执行一次:清理失效的 Broker 信息
- 1m 后启动,每隔 10m 执行一次:在日志中 INFO 级别输出 KvConfig 的内容
1.3. NamesrvController 启动流程
实际上是启动 NamesrvController
的 NettyRemotingServer
构造
DefaultEventExecutorGroup
线程数目由NettyServerConfig.getServerWorkerThreads
决定,线程名:"NettyServerCodecThread_" + this.threadIndex.incrementAndGet()
构造 BOSS 线程
线程数固定为1,线程名:NettyBoss_1
构造 WORKER 线程
如果是 Linux,且NettyServerConfig
中useEpollNativeSelector
为true
,返回一个EpollEventLoopGroup
,线程数目由NettyServerConfig.getServerSelectorThreads
决定,线程名:String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())
;否则返回一个NioEventLoopGroup
,线程数目由NettyServerConfig.getServerSelectorThreads
决定,线程名:String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())
配置
ServerBootstrap
ChannelOption.SO_BACKLOG
:1024
ChannelOption.SO_REUSEADDR
:true
ChannelOption.SO_KEEPALIVE
:false
ChannelOption.SO_SNDBUF
:nettyServerConfig.getServerSocketSndBufSize()
ChannelOption.SO_RCVBUF
:nettyServerConfig.getServerSocketRcvBufSize()
ChannelOption.TCP_NODELAY
:true
设置的是childOptionlocalAddress
:new InetSocketAddress(this.nettyServerConfig.getListenPort())
childHandler
:- NettyEncoder:编码
- NettyDecoder:解码
- IdleStateHandler:空闲处理,空闲时间由
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()
决定 - NettyConnetManageHandler:连接管理,记录连接日志,并处理连接事件(空闲事件)
- NettyServerHandler:业务逻辑处理。根据
RemotingCommand
的type
调用com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
或com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand
NettyServerConfig
未将serverPooledByteBufAllocatorEnable
设置为false
,则ChannelOption.ALLOCATOR
设置为PooledByteBufAllocator.DEFAULT
,即默认开启 Netty 的池化 ByteBuf
启动
ServerBootstrap
调用io.netty.bootstrap.AbstractBootstrap#bind()
启动定时任务
3s 后启动,每隔 1s 执行一次:处理超时请求
###