【RocketMQ】消息的消费

上一讲【RocketMQ】消息的拉取

消息消费

当RocketMQ进行消息消费的时候,是通过ConsumeMessageConcurrentlyServicesubmitConsumeRequest方法,将消息提交到线程池中进行消费,具体的处理逻辑如下:

  1. 如果本次消息的个数小于等于批量消费的大小consumeBatchSize,构建消费请求ConsumeRequest,直接提交到线程池中进行消费即可
  2. 如果本次消息的个数大于批量消费的大小consumeBatchSize,说明需要分批进行提交,每次构建consumeBatchSize个消息提交到线程池中进行消费
  3. 如果出现拒绝提交的异常,调用submitConsumeRequestLater方法延迟进行提交

RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求将消费任务提交到线程池处理即可,否则需要分批进行提交。

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();// 如果消息的个数小于等于批量消费的大小if (msgs.size() <= consumeBatchSize) {// 构建消费请求ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {// 加入到消费线程池中this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {this.submitConsumeRequestLater(consumeRequest);}} else {// 遍历消息for (int total = 0; total < msgs.size(); ) {// 创建消息列表,大小为consumeBatchSize,用于批量提交使用List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {// 加入到消息列表中msgThis.add(msgs.get(total));} else {break;}}// 创建ConsumeRequestConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {// 加入到消费线程池中this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}// 如果出现拒绝提交异常,延迟进行提交this.submitConsumeRequestLater(consumeRequest);}}}}
}

消费任务运行

ConsumeRequestConsumeMessageConcurrentlyService的内部类,实现了Runnable接口,在run方法中,对消费任务进行了处理:

  1. 判断消息所属的处理队列processQueue是否处于删除状态,如果已被删除,不进行处理

  2. 重置消息的重试主题

  3. 如果设置了消息消费钩子函数,执行executeHookBefore钩子函数

  4. 获取消息监听器,调用消息监听器的consumeMessage进行消息消费,并返回消息的消费结果状态,状态有两种分别为CONSUME_SUCCESSRECONSUME_LATER

  5. 获取消费的时长,判断是否超时

  6. 如果设置了消息消费钩子函数,执行executeHookAfter钩子函数

  7. 再次判断消息所属的处理队列是否处于删除状态,如果不处于删除状态,调用processConsumeResult方法处理消费结果

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {class ConsumeRequest implements Runnable {private final List<MessageExt> msgs;private final ProcessQueue processQueue; // 处理队列private final MessageQueue messageQueue; // 消息队列@Overridepublic void run() {// 如果处理队列已被删除if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}// 获取消息监听器MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;// 重置消息重试主题名称 defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;// 如果设置了钩子函数if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {// ...
// 执行钩子函数            ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {// 设置消费开始时间戳MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}// 通过消息监听器的consumeMessage进行消息消费,并返回消费结果状态status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;}// 计算消费时长long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {// 出现异常returnType = ConsumeReturnType.EXCEPTION;} else {// 返回NULLreturnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { // 判断超时returnType = ConsumeReturnType.TIME_OUT; // 返回类型置为超时} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { // 如果延迟消费returnType = ConsumeReturnType.FAILED; // 返回类置为失败} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { // 如果成功状态returnType = ConsumeReturnType.SUCCESS; // 返回类型为成功}// ...// 如果消费状态为空if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);// 状态置为延迟消费status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 如果设置了钩子函数if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);// 执行executeHookAfter方法ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);if (!processQueue.isDropped()) {// 处理消费结果ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}}
}// 重置消息重试主题
public class DefaultMQPushConsumerImpl implements MQConsumerInner {public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {// 获取消费组的重试主题:%RETRY% + 消费组名称final String groupTopic = MixAll.getRetryTopic(consumerGroup);for (MessageExt msg : msgs) {// 获取消息的重试主题名称String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);// 如果重试主题不为空并且与消费组的重试主题一致if (retryTopic != null && groupTopic.equals(msg.getTopic())) {// 设置重试主题msg.setTopic(retryTopic);}if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}}}}// 消费结果状态
public enum ConsumeConcurrentlyStatus {/*** 消费成功*/CONSUME_SUCCESS,/*** 消费失败,延迟进行消费*/RECONSUME_LATER;
}

处理消费结果

一、设置ackIndex

ackIndex的值用来判断失败消息的个数,在processConsumeResult方法中根据消费结果状态进行判断,对ackIndex的值进行设置,前面可知消费结果状态有以下两种:

  • CONSUME_SUCCESS:消息消费成功,此时ackIndex设置为消息大小 - 1,表示消息都消费成功。
  • RECONSUME_LATER:消息消费失败,返回延迟消费状态,此时ackIndex置为**-1**,表示消息都消费失败。

二、处理消费失败的消息

广播模式

广播模式下,如果消息消费失败,只将失败的消息打印出来不做其他处理。

集群模式

开启for循环,初始值为i = ackIndex + 1,结束条件为i < consumeRequest.getMsgs().size(),上面可知ackIndex有两种情况:

  • 消费成功:ackIndex值为消息大小-1,此时ackIndex + 1的值等于消息的个数大小,不满足for循环的执行条件,相当于消息都消费成功,不需要进行失败的消息处理。
  • 延迟消费:ackIndex值为-1,此时ackIndex+1为0,满足for循环的执行条件,从第一条消息开始遍历到最后一条消息,调用sendMessageBack方法向Broker发送CONSUMER_SEND_MSG_BACK消息,如果发送成功Broker会根据延迟等级,放入不同的延迟队列中,到达延迟时间后,消费者将会重新进行拉取,如果发送失败,加入到失败消息列表中,稍后重新提交消费任务进行处理。

三、移除消息,更新拉取偏移量

以上步骤处理完毕后,首先调用removeMessage从处理队列中移除消息并返回拉取消息的偏移量,然后调用updateOffset更新拉取偏移量。

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) {// 获取ackIndexint ackIndex = context.getAckIndex();if (consumeRequest.getMsgs().isEmpty())return;switch (status) {case CONSUME_SUCCESS: // 如果消费成功// 如果ackIndex大于等于消息的大小if (ackIndex >= consumeRequest.getMsgs().size()) {// 设置为消息大小-1ackIndex = consumeRequest.getMsgs().size() - 1;}// 计算消费成功的的个数int ok = ackIndex + 1;// 计算消费失败的个数int failed = consumeRequest.getMsgs().size() - ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;case RECONSUME_LATER: // 如果延迟消费// ackIndex置为-1ackIndex = -1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;}// 判断消费模式switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING: // 广播模式for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;case CLUSTERING: // 集群模式List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());// 遍历消费失败的消息for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {// 获取消息MessageExt msg = consumeRequest.getMsgs().get(i);// 向Broker发送延迟消息boolean result = this.sendMessageBack(msg, context);// 如果发送失败if (!result) {// 消费次数+1msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);// 加入失败消息列表中msgBackFailed.add(msg);}}// 如果不为空if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);// 稍后重新进行消费this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}// 从处理队列中移除消息long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {// 更新拉取偏移量this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}}
}

发送CONSUMER_SEND_MSG_BACK消息

延迟级别

RocketMQ的延迟级别对应的延迟时间常量定义在MessageStoreConfigmessageDelayLevel变量中:

public class MessageStoreConfig {private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}

sendMessageBack方法中,首先从上下文中获取了延迟级别(ConsumeConcurrentlyContext中可以看到,延迟级别默认为0),并对主题加上Namespace,然后调用defaultMQPushConsumerImplsendMessageBack发送消息:

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {// 获取延迟级别int delayLevel = context.getDelayLevelWhenNextConsume();// 对主题添加上Namespacemsg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {// 向Broker发送消息this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;}
}// 并发消费上下文
public class ConsumeConcurrentlyContext {/*** -1,不进行重试,加入DLQ队列* 0, Broker控制重试频率* >0, 客户端控制*/private int delayLevelWhenNextConsume = 0; // 默认为0
}

DefaultMQPushConsumerImpsendMessageBack方法中又调用了MQClientAPIImplconsumerSendMessageBack方法进行发送:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {try {// 获取Broker地址String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());// 调用consumerSendMessageBack方法发送消息this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {// ...} finally {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}}
}

MQClientAPIImplconsumerSendMessageBack方法中,可以看到设置的请求类型是CONSUMER_SEND_MSG_BACK,然后设置了消息的相关信息,向Broker发送请求:

public class MQClientAPIImpl {public void consumerSendMessageBack(final String addr,final MessageExt msg,final String consumerGroup,final int delayLevel,final long timeoutMillis,final int maxConsumeRetryTimes) throws RemotingException, MQBrokerException, InterruptedException {// 创建请求头ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();// 设置请求类型为CONSUMER_SEND_MSG_BACKRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);// 设置消费组requestHeader.setGroup(consumerGroup);requestHeader.setOriginTopic(msg.getTopic());// 设置消息物理偏移量requestHeader.setOffset(msg.getCommitLogOffset());// 设置延迟级别requestHeader.setDelayLevel(delayLevel);// 设置消息IDrequestHeader.setOriginMsgId(msg.getMsgId());// 设置最大消费次数requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);// 向Broker发送请求RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark(), addr);}
}

Broker对请求的处理

Broker对CONSUMER_SEND_MSG_BACK类型的请求在SendMessageProcessor中,处理逻辑如下:

  1. 根据消费组获取订阅信息配置,如果获取为空,记录错误信息,直接返回
  2. 获取消费组的重试主题,然后从重试队列中随机选取一个队列,并创建TopicConfig主题配置信息
  3. 根据消息的物理偏移量从commitlog中获取消息
  4. 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0
    • 如果条件满足,表示需要把消息放入到死信队列DLQ中,此时设置DLQ队列ID
    • 如果不满足,判断延迟级别是否为0,如果为0,使用3 + 消息的消费次数作为新的延迟级别
  5. 新建消息MessageExtBrokerInner,设置消息的相关信息,此时相当于生成了一个全新的消息(会设置之前消息的ID),会重新添加到CommitLog中,消息主题的设置有两种情况:
    • 达到了加入DLQ队列的条件,此时主题为DLQ主题(%DLQ% + 消费组名称),消息之后会添加到选取的DLQ队列中
    • 未达到DLQ队列的条件,此时主题为重试主题(%RETRY% + 消费组名称),之后重新进行消费
  6. 调用asyncPutMessage添加消息,详细过程可参考之前的文章【消息的存储】
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {// 处理请求public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:// 处理请求return this.asyncConsumerSendMsgBack(ctx, request);default:// ...}}private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final ConsumerSendMsgBackRequestHeader requestHeader =(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);// ...// 根据消费组获取订阅信息配置SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());// 如果为空,直接返回if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));return CompletableFuture.completedFuture(response);}// ...// 获取消费组的重试主题String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());// 从重试队列中随机选取一个队列int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();int topicSysFlag = 0;if (requestHeader.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}// 创建TopicConfig主题配置信息TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);//...// 根据消息物理偏移量从commitLog文件中获取消息MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());if (null == msgExt) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("look message by offset failed, " + requestHeader.getOffset());return CompletableFuture.completedFuture(response);}// 获取消息的重试主题final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (null == retryTopic) {MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());}msgExt.setWaitStoreMsgOK(false);// 延迟等级获取int delayLevel = requestHeader.getDelayLevel();// 获取最大消费重试次数int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {Integer times = requestHeader.getMaxReconsumeTimes();if (times != null) {maxReconsumeTimes = times;}}// 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0if (msgExt.getReconsumeTimes() >= maxReconsumeTimes|| delayLevel < 0) {// 获取DLQ主题newTopic = MixAll.getDLQTopic(requestHeader.getGroup());// 选取一个队列queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;// 创建DLQ的topicConfigtopicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE | PermName.PERM_READ, 0);// ...} else {// 如果延迟级别为0if (0 == delayLevel) {// 更新延迟级别delayLevel = 3 + msgExt.getReconsumeTimes();}// 设置延迟级别msgExt.setDelayTimeLevel(delayLevel);}// 新建消息MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic); // 设置主题msgInner.setBody(msgExt.getBody()); // 设置消息msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties()); // 设置消息属性msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt); // 设置队列IDmsgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);// 设置消费次数// 原始的消息IDString originMsgId = MessageAccessor.getOriginMessageId(msgExt);// 设置消息IDMessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));// 添加重试消息CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);return putMessageResult.thenApply((r) -> {if (r != null) {switch (r.getPutMessageStatus()) {case PUT_OK:// ...return response;default:break;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(r.getPutMessageStatus().name());return response;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("putMessageResult is null");return response;});}
}

延迟消息处理

由【消息的存储】文章可知,消息添加会进入到asyncPutMessage方法中,首先获取了事务类型,如果未使用事务或者是提交事务的情况下,对延迟时间级别进行判断,如果延迟时间级别大于0,说明消息需要延迟消费,此时做如下处理:

  1. 判断消息的延迟级别是否超过了最大延迟级别,如果超过了就使用最大延迟级别

  2. 获取RMQ_SYS_SCHEDULE_TOPIC,它是在TopicValidator中定义的常量,值为SCHEDULE_TOPIC_XXXX:

    public class TopicValidator {// ...public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
    }
    
  3. 根据延迟级别选取对应的队列,一般会把相同延迟级别的消息放在同一个队列中

  4. 备份之前的TOPIC和队列ID

  5. 更改消息队列的主题为RMQ_SYS_SCHEDULE_TOPIC,所以延迟消息的主题最终被设置为RMQ_SYS_SCHEDULE_TOPIC,放在对应的延迟队列中进行处理

public class CommitLog {public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {// ...// 获取事务类型final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());// 如果未使用事务或者提交事务if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// 判断延迟级别if (msg.getDelayTimeLevel() > 0) {// 如果超过了最大延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 获取RMQ_SYS_SCHEDULE_TOPICtopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 根据延迟级别选取对应的队列int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// 备份之前的TOPIC和队列IDMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 设置SCHEDULE_TOPICmsg.setTopic(topic);msg.setQueueId(queueId);}}// ...}
}

拉取进度持久化

RocketMQ消费模式分为广播模式和集群模式,广播模式下消费进度保存在每个消费者端,集群模式下消费进度保存在Broker端。

广播模式

更新进度

LocalFileOffsetStore中使用了一个ConcurrentMap类型的变量offsetTable存储消息队列对应的拉取偏移量,KEY为消息队列,value为该消息队列对应的拉取偏移量。

在更新拉取进度的时候,从offsetTable中获取当前消息队列的拉取偏移量,如果为空,则新建并保存到offsetTable中,否则获取之前已经保存的偏移量,对值进行更新,需要注意这里只是更新了offsetTable中的数据,并没有持久化到磁盘,持久化的操作在persistAll方法中

public class LocalFileOffsetStore implements OffsetStore {// offsetTable:KEY为消息队列,value为该消息队列的拉取偏移量private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>();@Overridepublic void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {if (mq != null) {// 获取之前的拉取进度AtomicLong offsetOld = this.offsetTable.get(mq);if (null == offsetOld) {// 如果之前不存在,进行创建offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}// 如果不为空if (null != offsetOld) {if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {// 更新拉取偏移量offsetOld.set(offset);}}}}
}

加载进度

由于广播模式下消费进度保存在消费者端,所以需要从本地磁盘加载之前保存的消费进度文件。

LOCAL_OFFSET_STORE_DIR:消费进度文件所在的根路径

public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("user.home") + File.separator + ".rocketmq_offsets");

在LocalFileOffsetStore的构造函数中可以看到,对拉取偏移量的保存文件路径进行了设置,为LOCAL_OFFSET_STORE_DIR + 客户端ID + 消费组名称 + offsets.json,从名字上看,消费进度的数据格式是以JSON的形式进行保存的:

this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator +this.groupName + File.separator + "offsets.json";

在load方法中,首先从本地读取 offsets.json文件,并序列化为OffsetSerializeWrapper对象,然后将保存的消费进度加入到offsetTable中:

 public class LocalFileOffsetStore implements OffsetStore {// 文件路径public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty("rocketmq.client.localOffsetStoreDir",System.getProperty("user.home") + File.separator + ".rocketmq_offsets");private final String storePath;// ...public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {this.mQClientFactory = mQClientFactory;this.groupName = groupName;// 设置拉取进度文件的路径this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +this.mQClientFactory.getClientId() + File.separator +this.groupName + File.separator +"offsets.json";}@Overridepublic void load() throws MQClientException {// 从本地读取拉取偏移量OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {// 加入到offsetTable中offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());for (Entry<MessageQueue, AtomicLong> mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {AtomicLong offset = mqEntry.getValue();log.info("load consumer's offset, {} {} {}",this.groupName,mqEntry.getKey(),offset.get());}}}// 从本地加载文件private OffsetSerializeWrapper readLocalOffset() throws MQClientException {String content = null;try {// 读取文件content = MixAll.file2String(this.storePath);} catch (IOException e) {log.warn("Load local offset store file exception", e);}if (null == content || content.length() == 0) {return this.readLocalOffsetBak();} else {OffsetSerializeWrapper offsetSerializeWrapper = null;try {// 序列化offsetSerializeWrapper =OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);} catch (Exception e) {log.warn("readLocalOffset Exception, and try to correct", e);return this.readLocalOffsetBak();}return offsetSerializeWrapper;}}
}

OffsetSerializeWrapper

OffsetSerializeWrapper中同样使用了ConcurrentMap,从磁盘的offsets.json文件中读取数据后,将JSON转为OffsetSerializeWrapper对象,就可以通过OffsetSerializeWrapperoffsetTable获取到之前保存的每个消息队列的消费进度,然后加入到LocalFileOffsetStoreoffsetTable中:

public class OffsetSerializeWrapper extends RemotingSerializable {private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>();public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() {return offsetTable;}public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) {this.offsetTable = offsetTable;}
}

持久化进度

updateOffset更新只是将内存中的数据进行了更改,并未保存到磁盘中,持久化的操作是在persistAll方法中实现的:

  1. 创建OffsetSerializeWrapper对象
  2. 遍历LocalFileOffsetStore的offsetTable,将数据加入到OffsetSerializeWrapper的OffsetTable中
  3. OffsetSerializeWrapper转为JSON
  4. 调用string2File方法将JSON数据保存到磁盘文件
 public class LocalFileOffsetStore implements OffsetStore {@Overridepublic void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;OffsetSerializeWrapper// 创建OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();// 遍历offsetTablefor (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {if (mqs.contains(entry.getKey())) {// 获取拉取偏移量AtomicLong offset = entry.getValue();// 加入到OffsetSerializeWrapper的OffsetTable中offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);}}// 将对象转为JSONString jsonString = offsetSerializeWrapper.toJson(true);if (jsonString != null) {try {// 将JSON数据保存到磁盘文件MixAll.string2File(jsonString, this.storePath);} catch (IOException e) {log.error("persistAll consumer offset Exception, " + this.storePath, e);}}}
}

集群模式

集群模式下消费进度保存在Broker端。

更新进度

集群模式下的更新进度与广播模式下的更新类型,都是只更新了offsetTable中的数据:

public class RemoteBrokerOffsetStore implements OffsetStore {private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>();@Overridepublic void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {if (mq != null) {// 获取消息队列的进度AtomicLong offsetOld = this.offsetTable.get(mq);if (null == offsetOld) {// 将消费进度保存在offsetTable中offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}if (null != offsetOld) {if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {// 更新拉取偏移量offsetOld.set(offset);}}}}
}

加载

集群模式下加载消费进度需要从Broker获取,在消费者发送消息拉取请求的时候,Broker会计算消费偏移量,所以RemoteBrokerOffsetStore的load方法为空,什么也没有干:

public class RemoteBrokerOffsetStore implements OffsetStore {@Overridepublic void load() {}
}

持久化

由于集群模式下消费进度保存在Broker端,所以persistAll方法中调用了updateConsumeOffsetToBroker向Broker发送请求进行消费进度保存:

public class RemoteBrokerOffsetStore implements OffsetStore {@Overridepublic void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {MessageQueue mq = entry.getKey();AtomicLong offset = entry.getValue();if (offset != null) {if (mqs.contains(mq)) {try {// 向Broker发送请求更新拉取偏移量this.updateConsumeOffsetToBroker(mq, offset.get());log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",this.groupName,this.mQClientFactory.getClientId(),mq,offset.get());} catch (Exception e) {log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);}} else {unusedMQ.add(mq);}}}// ...}
}

持久化的触发

MQClientInstance在启动定时任务的方法startScheduledTask中注册了定时任务,定时调用persistAllConsumerOffset对拉取进度进行持久化,persistAllConsumerOffset中又调用了MQConsumerInnerpersistConsumerOffset方法:

public class MQClientInstance {private void startScheduledTask() {// ...// 注册定时任务,定时持久化拉取进度this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 持久化MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);// ...}private void persistAllConsumerOffset() {Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();// 调用persistConsumerOffset进行持久化impl.persistConsumerOffset();}}
}

DefaultMQPushConsumerImplMQConsumerInner的一个子类,以它为例可以看到在persistConsumerOffset方法中调用了offsetStore的persistAll方法进行持久化:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {@Overridepublic void persistConsumerOffset() {try {this.makeSureStateOK();Set<MessageQueue> mqs = new HashSet<MessageQueue>();Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);// 拉取进度持久化this.offsetStore.persistAll(mqs);} catch (Exception e) {log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);}}
}

总结

参考
丁威、周继锋《RocketMQ技术内幕》

RocketMQ版本:4.9.3

本文链接:https://my.lmcjl.com/post/5415.html

展开阅读全文

4 评论

留下您的评论.