1. 解读

1.1. NamesrvStartup 处理流程

  1. 设置版本号
    com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
    com.alibaba.rocketmq.broker.processor.AdminBrokerProcessor#callConsumer

  2. 设置Socket缓冲区
    如果没有设置 SEND 缓冲区:NettySystemConfig.SocketSndbufSize = 2048
    如果没有设置 RECV 缓冲区:NettySystemConfig.SocketRcvbufSize = 2048

  3. FastJson版本冲突检测
    com.alibaba.rocketmq.common.conflict.PackageConflictDetect#detectFastjson

  4. 命令行参数解析
    使用 com.alibaba.rocketmq.srvutil.ServerUtil#parseCmdLine 解析命令行,如果解析失败,则程序退出

  5. 配置文件加载
    命令行中包含 -c 选项,则加载指定的 properties 配置文件,并通过 com.alibaba.rocketmq.common.MixAll#properties2Object 设置 NamesrvConfigNettyServerConfig其中,如果配置中没有设置 ListenPort,默认为 9876

  6. 输出配置参数列表
    命令行中包含 -p 选项,则通过 com.alibaba.rocketmq.common.MixAll#printObjectProperties(org.slf4j.Logger, java.lang.Object) 输出 NamesrvConfigNettyServerConfig 的声明字段,输出完成后程序退出

  7. 配置 NamesrvConfig
    通过 com.alibaba.rocketmq.common.MixAll#properties2Object 将命令行参数加到 NamesrvConfig

  8. 配置 logback
    logback 配置文件固定:namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"
    注意:此前会先判断 namesrvConfig.getRocketmqHome() 未设置时,会导致程序退出。不过这种情况仅发生在直接运行
    com.alibaba.rocketmq.namesrv.NamesrvStartup#main,如果是脚本启动,会自动将该值设置为 mqnamesrv 脚本的上上层目录

  9. 实例化并初始化 NamesrvController
    使用 NamesrvConfig、NettyServerConfig 实例化 NamesrvController 然后调用 com.alibaba.rocketmq.namesrv.NamesrvController#initialize 初始化,初始化失败时程序退出
    否则增加 ShutdownHook ,在程序退出时记录日志,并调用 com.alibaba.rocketmq.namesrv.NamesrvController#shutdown 释放资源

  10. 启动 NamesrvController
    调用 com.alibaba.rocketmq.namesrv.NamesrvController#start 启动线程工作

1.2. NamesrvController 初始化流程

  1. 加载 KvConfig
    如果 NamesrvConfigkvConfigPath 路径指定的文件内容不为空,则将文件存储的 json 内容加载进Map:HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>>

  2. 实例化 NettyRemotingServer
    rocketmq-remoting 模块中的 com.alibaba.rocketmq.remoting.netty.NettyRemotingServer#NettyRemotingServer(com.alibaba.rocketmq.remoting.netty.NettyServerConfig, com.alibaba.rocketmq.remoting.ChannelEventListener)

  3. 构造 RemotingExecutorThread 线程池
    使用 java.util.concurrent.Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory) 构造固定数目线程池,数目由 NettyServerConfig.getServerWorkerThreads 决定

  4. 注册
    NamesrvConfigclusterTesttrue 时,调用 com.alibaba.rocketmq.remoting.RemotingServer#registerDefaultProcessor 将一个 com.alibaba.rocketmq.namesrv.processor.ClusterTestRequestProcessor 和 RemotingExecutorThread 线程池绑定起来;否则将一个 com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor 和 RemotingExecutorThread 线程池绑定起来。绑定到 NettyRemotingServerdefaultRequestProcessor

    两者区别: ClusterTestRequestProcessor 继承自 DefaultRequestProcessor 并重写了 getRouteInfoByTopic 方法,RouteInfoManager 中未能获取到 TopicRouteData 时,去集群上查询一次

  5. 启动定时任务

    1. 5s 后启动,每隔 10s 执行一次:清理失效的 Broker 信息
    2. 1m 后启动,每隔 10m 执行一次:在日志中 INFO 级别输出 KvConfig 的内容

1.3. NamesrvController 启动流程

实际上是启动 NamesrvControllerNettyRemotingServer

  1. 构造 DefaultEventExecutorGroup
    线程数目由 NettyServerConfig.getServerWorkerThreads 决定,线程名:"NettyServerCodecThread_" + this.threadIndex.incrementAndGet()

  2. 构造 BOSS 线程
    线程数固定为1,线程名:NettyBoss_1

  3. 构造 WORKER 线程
    如果是 Linux,且 NettyServerConfiguseEpollNativeSelectortrue,返回一个 EpollEventLoopGroup,线程数目由 NettyServerConfig.getServerSelectorThreads 决定,线程名:String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet());否则返回一个 NioEventLoopGroup,线程数目由 NettyServerConfig.getServerSelectorThreads 决定,线程名:String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())

  4. 配置 ServerBootstrap

    • ChannelOption.SO_BACKLOG1024
    • ChannelOption.SO_REUSEADDRtrue
    • ChannelOption.SO_KEEPALIVEfalse
    • ChannelOption.SO_SNDBUFnettyServerConfig.getServerSocketSndBufSize()
    • ChannelOption.SO_RCVBUFnettyServerConfig.getServerSocketRcvBufSize()
    • ChannelOption.TCP_NODELAYtrue 设置的是childOption
    • localAddressnew InetSocketAddress(this.nettyServerConfig.getListenPort())
    • childHandler
      1. NettyEncoder:编码
      2. NettyDecoder:解码
      3. IdleStateHandler:空闲处理,空闲时间由 nettyServerConfig.getServerChannelMaxIdleTimeSeconds() 决定
      4. NettyConnetManageHandler:连接管理,记录连接日志,并处理连接事件(空闲事件)
      5. NettyServerHandler:业务逻辑处理。根据 RemotingCommandtype 调用 com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommandcom.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand
    • NettyServerConfig 未将 serverPooledByteBufAllocatorEnable 设置为 false,则 ChannelOption.ALLOCATOR 设置为 PooledByteBufAllocator.DEFAULT,即默认开启 Netty 的池化 ByteBuf
    1. 启动 ServerBootstrap
      调用 io.netty.bootstrap.AbstractBootstrap#bind()

    2. 启动定时任务
      3s 后启动,每隔 1s 执行一次:处理超时请求

    ###