博客
关于我
RocketMQ 源码分析 —— Message 拉取与消费(下)
阅读量:799 次
发布时间:2023-03-22

本文共 3074 字,大约阅读时间需要 10 分钟。

RocketMQ PushConsumer 消息队列源码分析

RocketMQ的PushConsumer是处理消息的一种核心组件,主要负责拉取消息并进行消费。其工作流程涉及多个关键模块,包括订阅、消息队列分配、拉取消息以及消息消费等。以下将从源码层面对PushConsumer的实现机制进行详细分析。


1. 消息队列订阅

PushConsumer的订阅过程主要集中在DefaultMQPushConsumerImpl#subscribe方法中。该方法负责根据提供的主题和订阅表达式创建订阅数据,并将其注册到RebalanceService中。同时,它还会通过心跳机制向Broker同步Consumer的信息。

public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

关键点

  • 订阅数据创建:通过FilterAPI.buildSubscriptionData方法构建订阅数据,包括主题和订阅表达式。
  • 注册订阅:将订阅数据存储到RebalanceService中,以便后续进行消息队列分配。
  • 心跳机制:定期发送心跳信息,确保Consumer的信息在Broker端得到更新。

  • 2. 消息队列分配

    消息队列的分配主要由RebalanceService负责。它通过定期调用doRebalance方法,根据消费者的负载情况和消息队列的分布情况,动态地分配消息队列给不同的Consumer。

    public void doRebalance() {
    if (!this.pause) {
    this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
    }
    }

    关键点

  • 定期执行RebalanceService作为一个ServiceThread,会以固定的时间间隔运行,检查是否需要进行消息队列的重新分配。
  • 分配策略:根据AllocateMessageQueueStrategy的不同实现(如平均分配、机器室分配等),分配消息队列给Consumer。
  • 消费顺序:支持有序消费和非顺序消费两种模式,分别根据isConsumeOrderly()的返回值决定。

  • 3. 拉取消息

    PushConsumer负责从Broker拉取消息,主要通过PullMessageService实现。executePullRequest方法会根据配置的拉取消息频率,向Broker发送拉取消息请求。

    public void executePullRequest() {
    this.pullMessageService.executePullRequestLater(new PullRequest());
    }

    关键点

  • 异步拉取消息PullMessageService负责异步执行拉取消息请求,避免阻塞消费者线程。
  • 延迟拉取消息:根据配置的拉取消息频率,决定是否立即拉取消息或延迟到特定时间。
  • 重试机制:如果拉取消息失败,会进行重试,直到成功为止。

  • 4. 消息消费

    消息消费由ConsumeMessageConcurrentlyService负责,支持并发消费和顺序消费两种模式。ConsumeMessageConcurrentlyService会根据配置的批量消费大小,分批次消费消息。

    public void submitConsumeRequest() {
    if (this.consumeExecutor != null) {
    this.consumeExecutor.submit(new ConsumeRequest());
    }
    }

    关键点

  • 并发消费:通过线程池实现并发消费,提升消费效率。
  • 顺序消费:支持按顺序消费,确保消息的消费顺序。
  • 消息确认:消费完成后,通过Ack机制向Broker确认已消费的消息。

  • 5. 消费进度管理

    消费进度的管理由OffsetStore负责,包括读取和持久化消费进度。RemoteBrokerOffsetStore用于集群模式下的进度管理,而LocalFileOffsetStore则用于广播模式下的本地存储。

    public void load() {
    this.offsetTable.putAll(this.readLocalOffset());
    }

    关键点

  • 读取消费进度:根据配置的读取策略(如从内存读取或从存储读取)获取消费进度。
  • 持久化进度:定期将消费进度写入存储,确保消费状态的持久化。
  • 进度同步:消费完成后,将最新的消费进度同步到Broker端,保证消息的可重放。

  • 6. 消费失败处理

    在消息消费过程中,如果遇到失败,消费失败的消息会被发回Broker进行重试。sendMessageBack方法负责将失败的消息重新发送到Broker。

    public void sendMessageBack(MessageModel messageModel) {
    try {
    this.mQClientFactory.consumerSendMessageBack(messageModel);
    } catch (Exception e) {
    // 处理发送失败的情况
    }
    }

    关键点

  • 失败消息发回:将消费失败的消息重新发送到Broker,准备重试。
  • 重试机制:允许消息在一定次数之后重新被消费,避免消息丢失。
  • 幂等性消费:确保消息的消费过程是幂等的,即使多次重试也不会导致消息重复消费。

  • 7. 性能优化

    为了保证PushConsumer的高效运行, RocketMQ采用了多种优化策略:

    • 消息队列持有机制:允许消息队列在消费者未消费时持有一定数量的消息,减少网络开销。
    • 拉取消息策略:支持多种拉取消息策略,如长轮询和短轮询,根据场景选择最优策略。
    • 消费进度同步:通过定期同步消费进度,减少消息的重复消费。

    8. 结尾

    以上是对RocketMQ PushConsumer源码的全面分析,涵盖了从订阅到消息消费的全流程。希望这篇文章能为开发者提供有价值的参考。如果对某些部分有疑问或需要更深入的解析,欢迎在技术社区分享和讨论。

    转载地址:http://miqfk.baihongyu.com/

    你可能感兴趣的文章
    Objective-C实现基于 LIFO的堆栈算法(附完整源码)
    查看>>
    Objective-C实现基于 LinkedList 的添加两个数字的解决方案算法(附完整源码)
    查看>>
    Objective-C实现基于opencv的抖动算法(附完整源码)
    查看>>
    Objective-C实现基于事件对象实现线程同步(附完整源码)
    查看>>
    Objective-C实现基于文件流拷贝文件(附完整源码)
    查看>>
    Objective-C实现基于模板的双向链表(附完整源码)
    查看>>
    Objective-C实现基于模板的顺序表(附完整源码)
    查看>>
    Objective-C实现基本二叉树算法(附完整源码)
    查看>>
    Objective-C实现堆排序(附完整源码)
    查看>>
    Objective-C实现声音录制播放程序(附完整源码)
    查看>>
    Objective-C实现备忘录模式(附完整源码)
    查看>>
    Objective-C实现复制粘贴文本功能(附完整源码)
    查看>>
    Objective-C实现复数类+-x%(附完整源码)
    查看>>
    Objective-C实现外观模式(附完整源码)
    查看>>
    Objective-C实现多尺度MSR算法(附完整源码)
    查看>>
    Objective-C实现多种方法求解定积分(附完整源码)
    查看>>
    Objective-C实现多组输入(附完整源码)
    查看>>
    Objective-C实现多项式函数在某个点的评估算法(附完整源码)
    查看>>
    Objective-C实现多项式哈希算法(附完整源码)
    查看>>
    Objective-C实现大位数乘法(附完整源码)
    查看>>