Update rocketmq-producer-start.md

pull/109/head
Yang Libin 3 years ago committed by GitHub
parent 521b97b3ac
commit c4a2083959
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,8 +1,10 @@
# RocketMQ 生产者启动流程
入口: 入口:
org.apache.rocketmq.client.producer.DefaultMQProducer#start org.apache.rocketmq.client.producer.DefaultMQProducer#start
``` ```java
@Override @Override
public void start() throws MQClientException { public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup)); this.setProducerGroup(withNamespace(this.producerGroup));
@ -17,9 +19,9 @@ org.apache.rocketmq.client.producer.DefaultMQProducer#start
} }
``` ```
第一步、检查producerGroup 第一步、检查 producerGroup
``` ```java
private void checkConfig() throws MQClientException { private void checkConfig() throws MQClientException {
Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
@ -33,9 +35,9 @@ private void checkConfig() throws MQClientException {
} }
``` ```
第二步、设置instanceName 第二步、设置 instanceName
``` ```java
public void changeInstanceNameToPID() { public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) { if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime(); this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
@ -43,9 +45,9 @@ public void changeInstanceNameToPID() {
} }
``` ```
第三步、创建mqClientInstance,它是与nameserver和broker通信的中介 第三步、创建 mqClientInstance,它是与 nameserver broker 通信的中介
```jsx ```java
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId(); String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId); MQClientInstance instance = this.factoryTable.get(clientId);
@ -66,9 +68,9 @@ public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientCon
} }
``` ```
第四步、将生产者加入mqClientInstance管理 第四步、将生产者加入 mqClientInstance 管理
``` ```java
public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) { if (null == group || null == producer) {
return false; return false;
@ -84,11 +86,11 @@ public synchronized boolean registerProducer(final String group, final DefaultMQ
} }
``` ```
第五步、启动MQClientInstance有一些关于消费者的任务 会在消费者启动流程中讲解) 第五步、启动 MQClientInstance有一些关于消费者的任务 会在消费者启动流程中讲解)
1> 启动netty客户端 创建与nameserver、broker通信的channel 1. 启动 netty 客户端 ,创建与 nameserver、broker 通信的 channel
``` ```java
public void start() { public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
@ -158,11 +160,11 @@ public void start() {
} }
``` ```
2> 启动一些周期性的任务: 2. 启动一些周期性的任务:
更新nameserver地址的任务 更新 nameserver 地址的任务:
``` ```java
if (null == this.clientConfig.getNamesrvAddr()) { if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@ -178,9 +180,9 @@ if (null == this.clientConfig.getNamesrvAddr()) {
} }
``` ```
更新topic路由信息的任务 更新 topic 路由信息的任务:
``` ```java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override @Override
@ -194,9 +196,9 @@ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
``` ```
更新broker的任务 更新 broker 的任务:
``` ```java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override @Override
@ -211,11 +213,11 @@ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
``` ```
启动拉取消息线程: 启动拉取消息线程:
`this.pullMessageService.start();` `this.pullMessageService.start();`
``` ```java
public void run() { public void run() {
log.info(this.getServiceName() + " service started"); log.info(this.getServiceName() + " service started");
@ -231,4 +233,4 @@ public void run() {
log.info(this.getServiceName() + " service end"); log.info(this.getServiceName() + " service end");
} }
``` ```

Loading…
Cancel
Save