,大家好,我是 华仔, 又跟大家见面了。,原文完整版在星球里面,如果感兴趣可以扫文末二维码加入。,上篇主要带大家深度剖析了「号称承载 Kafka 客户端消息快递仓库 RecordAccmulator 的架构设计」,消息被暂存到累加器中,今天主要聊聊「发送网络 I/O 的 Sender 线程的架构设计」,深度剖析下消息是如何被发送出去的。,,通过「场景驱动」的方式,来看看消息是如何从客户端发送出去的。,在上篇中,我们知道了消息被暂存到 Deque<ProducerBatch> 的 batches 中,等「批次已满」或者「有新批次被创建」后,唤醒 Sender 子线程将消息批量发送给 Kafka Broker 端。,,接下来我们就来看看,「Sender 线程的架构实现以及发送处理流程」,为了方便大家理解,所有的源码只保留骨干。,在《图解Kafka生产者初始化核心流程》这篇中我们知道 KafkaProducer 会启动一个后台守护进程,其线程名称:kafka-producer-network-thread + “|” + clientId。,在 KafkaProducer.java 类有常量定义:NETWORK_THREAD_PREFIX,并启动 守护线程 KafkaThread 即 ioThread,如果不主动关闭 Sender 线程会一直执行下去。,github 源码地址如下:,从上面得出 Sender 类是一个线程类, 我们来看看 Sender 线程的重要字段和方法,并讲解其是如何发送消息和处理消息响应的。,从该类属性字段来看比较多,这里说几个关键字段:,Sender 类的方法也不少,这里针对关键方法逐一讲解下。,Sender 线程实现了 Runnable 接口,会不断的调用 runOnce(),这是一个典型的循环事件机制。,当 Sender 线程启动后会直接运行 run() 方法,该方法在 4种情况下会一直循环调用去发送消息到 Broker。,我们来看看这个执行业务处理的方法,关于事务的部分后续专门文章讲解。,该方法比较简单,主要做了3件事情:,该方法主要是获取已经准备好的节点上的批次数据并过滤过期的批次集合,最后暂存消息。,该方法主要做了12件事情,逐一说明下:,,从上面源码可以看出,SendProducerData 方法中调用到了 Sender 线程类中多个方法,这里就不一一讲解了。,通过前两部分的源码解读和剖析,我们可以得出 Sender 线程的处理流程可以分为两大部分:「发送请求」、「接收响应结果」。,从 runOnce 方法可以得出发送请求也分两步:「消息预发送」、「真正的网络发送」。,等 Sender 线程收到 Broker 端的响应结果后,会根据响应结果分情况进行处理。,原文完整版在星球里面,如果感兴趣可以扫文末二维码加入。,这里,我们一起来总结一下这篇文章的重点。,1、开篇总述消息被暂存到 Deque<ProducerBatch> 的 batches 中,等「批次已满」或者「有新批次被创建」后,唤醒 Sender 子线程将消息批量发送给 Kafka Broker 端,从而引出「Sender 线程」。,2、带你深度剖析了「Sender 线程」 的发送消息以及响应处理的相关方法。,3、最后带你串联了整个消息发送的流程,让你有个更好的整体认知。
文章版权声明
1 原创文章作者:cmcc,如若转载,请注明出处: https://www.52hwl.com/21509.html
2 温馨提示:软件侵权请联系469472785#qq.com(三天内删除相关链接)资源失效请留言反馈
3 下载提示:如遇蓝奏云无法访问,请修改lanzous(把s修改成x)
4 免责声明:本站为个人博客,所有软件信息均来自网络 修改版软件,加群广告提示为修改者自留,非本站信息,注意鉴别