奈学P7大数据架构师五期2022【完结】\正式课程资料\06—图解Kafka源码
第一章 Kafka概念
了解Kafka的特性
• 揭秘Kafka Producer高性能架构设计方案
• 深度剖析Kafka服务端高并发 ,高性能 ,高可用的架构设计
• 深度剖析Kafka提升Consumer的稳定性设计
• 让大家深入了解Kafka的设计 ,合理对Kafka进行调优
为什么会有消息系统
第二章 Kafka高性能架构鉴赏
服务端设计-服务端请求是如何处理的-高性能-高并发
在现有系统中,客户端的功能已确认无误。关于是否存在一个名为“accept”的线程的问题,同样已验证其正确性。有学生提出疑问,认为本系统仅使用单一线程进行处理可能会导致性能瓶颈。请同学们放心,该环节的设计较为简单,并不会成为系统的性能瓶颈。
系统启动后,将执行一个简单的任务,即在端口上注册一个用于接受连接的操作(OP Accept)。这一操作旨在持续监测是否有新的连接请求到达。一旦检测到有客户端尝试建立连接,系统将会创建一个新的套接字(socket)通道,从而建立起此次连接。
具体而言,系统启动后,会在指定端口注册一个用于接收连接请求的事件(OPC,即操作代码接受连接)。通过注册此事件,后台会启动一个进程来持续监听是否有新的连接请求。当有客户端发起连接时,系统将创建一个套接字通道以完成连接的建立。
现在应该能理解为何此处不存在性能瓶颈了。它的任务十分简单,仅需在客户端发出请求时,为这些请求创建相应的连接即可。这一过程并不涉及复杂的处理步骤,因此并不会成为系统性能的限制因素。
这个线程创建出来商客的牵头这一个连接是吧?创建出来这个连接以后,那接下来他干嘛呢?他就把这个链接给我们的这个process这个线程好跟process线程队列。
那接下来他又可以干一个什么事呢?很简单,接下来他肯定要起一个后台的现场,这个线程要干嘛?这个线程肯定要去读取这个队列里面的这些连接。好,读取到这个队列里面的连接以后,他要干嘛?他肯定要往我们这些selector上面的去注册什么OPwrite这样的一些事件,或者OPwrite这样的事件。
他要注册OP right这个事件的目的是什么呢?肯定是要干嘛?肯定用来读取我们客户端发送过来的求。好,一旦我们这个注册了OP注册了什么?
OP red在的事件以后,那接下来我们又有现成干嘛?我们又有现成的去不断的去接听我们这个slack上面到底有没有什么到底有没有一些连接,发送过来。好,如果有连接,发送过来或者说我们客户端有取消过来是吧?客户端有取消过来以后就干嘛?客户端取消过来以后,那说白了它不就是接收到什么?接收到一些接收到一些取消是吧?接收到一些取消,里面这些细节你不用看是吧?因为这个时候,这些名词你肯定不知道是啥意思,你不需要干你只需要知道大概的思路是什么。
我们的这个process现场?到现场里面,我们通过一个后台的一个VR循环的这样的一个线程,让它把里面的这个连接读取到了,读取到了以后,接下来就往slack上面注册了OP right再来事件注册了这些事件以后,我们就可以获取到客户端发送过来的请求。而后客户端发送过来的请求以后,假设假设我们认为我们发送过来的是一个色彩者的一个请求是吧?色彩者的一个请求。好,我们的这一个process线程它要干嘛?他对我们获取到的这些请求是吧,获取到的这些请求进行解析。
因为什么?因为我们客户端发过的请求是二进制的那二进制的你肯定不好看,所以我们这个pro set线程就会把它解析成为什么?解析成为一个对象?我们能能能操作的这样的一个对象叫request。好,他把他这个请求请求搞成,就他把这个请求解析成为一个request的对象以后,我说按照我们正常的思路,是不是就发给我们的这个呃最后这个线程池进行处理就可以了。
但是人家没有是吧?人家没有。因为这样做的话,这样做的话,那万一这个第三层处理不过来怎么办呢?处理不过来怎么办?所以人家有第二层,第二层的话就类似于一个缓存层是吧?类似于一个缓存层,或者说类似于一个仓库,或者说类似于一个消息系统里面的一个消息系统。
好,这里就有一个request的channel在的一个。在哪一个层?这一层好,那这一层干什么呢?我们前面的这几个processor这个线程,就会把我们的这些快速的存到的这个队列里面是吧?存到这个队列里面。
好,存到这个队列里面以后,接下来我们这有第三个限制,第三个层次。第三层说白了就是一个什么?第三层说白了就是一个线程池,第三层说白了就是一个线程池。好,这是这个线程池里面会有几个线程呢?我告诉大家,默认的就是八个线程是吧?八个线程。好,那这个线程池启动了以后,很简单,这个线程就是我们这个队列里面去读取我们的这些起球。好,读取到起球以后对这些起球进行处理,最终的干嘛?
最终通过一些工具类也好,什么样也好,会把如果是我们的一个市场涨这么一个请求的话,他是不是就会把这些数据写到什么?写到我们这个磁盘里面。好,各位,那你想如果他把数据写磁盘写成功以后,他是也要返回一些响应返回一些响应。所以我们这个工具类里面的如果写成功以后,它会封装一些响应,封装一些响应以后,他会把这个响应也垂到什么?也垂到第二层,垂到这的一些对队列里面是吧?一些队列里面。
此时此刻有同学就会看到,老师你知道的这个队列为什么是有三个呢?好,原因很简单,因为什么?因为你这里有三个process现场,所以你这也会有三个响应。因为什么?因为你的这你获取的这个请求,对应的这个响应还是由你来发送回去是吧?还是由你发送回去。所以这那那能有多少个是like就processor现场,这就会有多少个队列是吧
队列好,里面的响应跟我们的这个连接是一一对应的是吧?一一对应的好,他就把这些响应传到什么?传到我们的这个队列里面,传到队列里面以后,我们这其实我们这的不不就是我们这的这个模块里面肯定也有,对吗?是去读取什么?去读取它对应队列里面的这些响应,读取到这些响应以后,往上面是不是注册OP right这样的事件,注册op right这个世界以后,让大家接听到这样的世界。
我们select接听到这样的世界以后就会干嘛?就会把我们的这个响应就返回给我们的客户端,就会把这个响应就返回给我们客户端。这样子的话就完成了我们的整个的一个请求的处理的这样一个流程。好,各位大家千万不要小看这个流程,千万不要小看这个流程。就是因为这样的一个流程,才保证了我们这个卡夫卡能支持高并发,能支持高性能。
我们先来看看Producer端的设计
看看Consumer端的设计
生产者发送消息流程回顾
6.2 从demo 入手
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
/**
* 构建方法,初始化生产者对象
*
* @param topic
* @param isAsync
*/
public Producer(String topic, Boolean isAsync) {
Properties props = new Properties();
// 用户拉取kafka的元数据
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "DemoProducer");
//设置序列化的类。
//二进制的格式
props.put("key.serializer",
"org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//消费者,消费数据的时候,就需要进行反序列化。
// 初始化kafkaProducer
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}
public void run() {
int messageNo = 1;
// 一直会往kafka发送数据
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
//isAsync , kafka发送数据的时候,有两种方式
//1: 异步发送
//2: 同步发送
//isAsync: true的时候是异步发送,false就是同步发送
if (isAsync) { // Send asynchronously
//异步发送
//这样的方式,性能比较好,我们生产代码用的就是这种方式。
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo,
messageStr));
} else { // Send synchronously
try {
//同步发送
//发送一条消息,等这条消息所有的后续工作都完成以后才继续下一条消息的发
送。
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " +
messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (exception != null) {
System.out.println("有异常");
//一般我们生产里面 还会有其它的备用的链路。
} else {
System.out.println("说明没有异常信息,成功的!!");
}
if (metadata != null) {
System.out.println("message(" + key + ", " + message + ") sent to partition(" +
metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + "
ms");
} else {
exception.printStackTrace();
}
}
}
6.3 元数据信息关系
6.4 Producer核心流程深度剖析剖析
6.5 元数据加载流程剖析
KafkaPrducer拉取元数据的流程
6.6 RecordAccumulator原理
6.7 内存池设计原理
七 今日总结
01 源码阅读准备之基础知识准备
02 源码阅读准备之源码环境准备
03 源码阅读准备之源码剖析思路介绍
04 源码阅读准备之从一个demo入手
05 生产者源码之Producer核心流程介绍
06 生产者源码之Producer初始化
07 生产者源码之Producer端元数据管理
08 生产者源码之Producer源码核心流程初探
09 生产者源码之Producer加载元数据
10 生产者源码之分区选择
11 生产者源码之RecordAccumulator封装消息程初探
12 生产者源码之CopyOnWriteMap数据结构使用
13 生产者源码之把数据写入对应批次(分段加锁)
14 生产者源码之内存池设计
图解Kafka源码-第三天
一 、课前准备
1. 掌握上次课内容
二 、课堂主题
讲解Kafka的网络设计
三 、课程目标
1. 掌握内存池设计
2. 掌握生产者消息发送流程
3. 掌握Kafka网络设计
4. 掌握生产者处理响应消息的粘包/拆包技
4.1 内存池设计
4.2 筛选可以发送消息的broker
4.3 Kafka的网络设计
4.4 发送网络请求
4.5 响应消息的流转
五 、总结(5分钟)
1. 内存池的设计
2. Sender线程运行流程
3. 一个batchs什么条件下可以发送?
4. 筛选可以发送请求的broker
5. Kafka的网络设计
6. 如何没有网络建立会发送消息吗?
7. 生产者终于可以发送消息了
8. Producer如何处理响应消息的粘包/拆包问题?9. 如何处理暂存状态的响应消息
图解Kafka源码-第四次课
一 、课前准备
1. 掌握上次课内容
二 、课堂主题
讲解响应消息处理流程
三 、课程目标
1. 了解异常信息处理
2. 内存回收
3. 超时消息处理
4. 服务端网络架构设计
四 、知识点
4.1 响应消息的流转?
4.2 内存池设计
4.3 服务端网络架构设计
5. 课程总结
1. 如何处理响应消息
2. 消息发送完了以后内存如何处理?
3. 消息异常如何处理?
4. 如何处理超时的批次?
5. 如何处理长时间没有收到响应的消息
6. 客户端源码精华总结
7. 观察Kafka源码的包
8. Acceptor线程是如何启动的?
9. Processor线程是如何启动的?
10. Processor线程是如何接收请求的?
11. Processor线程是如何处理StateReceiver的请求的?