diff --git a/docs/rocketmq/rocketmq-producer-start.md b/docs/rocketmq/rocketmq-producer-start.md index 1df09c3..65c9258 100644 --- a/docs/rocketmq/rocketmq-producer-start.md +++ b/docs/rocketmq/rocketmq-producer-start.md @@ -1,8 +1,10 @@ +# RocketMQ 生产者启动流程 + 入口: org.apache.rocketmq.client.producer.DefaultMQProducer#start -``` +```java @Override public void start() throws MQClientException { this.setProducerGroup(withNamespace(this.producerGroup)); @@ -17,9 +19,9 @@ org.apache.rocketmq.client.producer.DefaultMQProducer#start } ``` -第一步、检查producerGroup +第一步、检查 producerGroup -``` +```java private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); @@ -33,9 +35,9 @@ private void checkConfig() throws MQClientException { } ``` -第二步、设置instanceName +第二步、设置 instanceName -``` +```java public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = UtilAll.getPid() + "#" + System.nanoTime(); @@ -43,9 +45,9 @@ public void changeInstanceNameToPID() { } ``` -第三步、创建mqClientInstance,它是与nameserver和broker通信的中介 +第三步、创建 mqClientInstance,它是与 nameserver 和 broker 通信的中介 -```jsx +```java public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); @@ -66,9 +68,9 @@ public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientCon } ``` -第四步、将生产者加入mqClientInstance管理 +第四步、将生产者加入 mqClientInstance 管理 -``` +```java public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { if (null == group || null == producer) { return false; @@ -84,11 +86,11 @@ public synchronized boolean registerProducer(final String group, final DefaultMQ } ``` -第五步、启动MQClientInstance(有一些关于消费者的任务 会在消费者启动流程中讲解) +第五步、启动 MQClientInstance(有一些关于消费者的任务 会在消费者启动流程中讲解) - 1> 启动netty客户端 ,创建与nameserver、broker通信的channel +1. 启动 netty 客户端 ,创建与 nameserver、broker 通信的 channel -``` +```java public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), @@ -158,11 +160,11 @@ public void start() { } ``` -2> 启动一些周期性的任务: +2. 启动一些周期性的任务: - 更新nameserver地址的任务: +更新 nameserver 地址的任务: -``` +```java if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @@ -178,9 +180,9 @@ if (null == this.clientConfig.getNamesrvAddr()) { } ``` - 更新topic路由信息的任务: +更新 topic 路由信息的任务: -``` +```java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override @@ -194,9 +196,9 @@ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); ``` - 更新broker的任务: +更新 broker 的任务: -``` +```java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override @@ -211,11 +213,11 @@ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); ``` - 启动拉取消息线程: +启动拉取消息线程: - `this.pullMessageService.start();` +`this.pullMessageService.start();` -``` +```java public void run() { log.info(this.getServiceName() + " service started"); @@ -231,4 +233,4 @@ public void run() { log.info(this.getServiceName() + " service end"); } -``` \ No newline at end of file +```