1. 简介

在 LinkedIn的Kafka的系统上,每天有超过 8000 亿条消息被发送,相当于超过 175 兆兆字节(terabytes)数据,另外,每天还会消耗掉 650 兆兆字节(terabytes)数据的消息,为什么Kafka有这样的能力去处理这么多产生的数据和消耗掉的数据? 下面我们就来分析一下Kafka的高性能之道。

2. 高性能之道

2.1 高效使用磁盘

首先kafka的消息是不断追加到文件中的,因此数据只增加不更新。也没有记录级别的数据删除,只会整个segment删除。上述这个特性使kafka可以充分利用磁盘的顺序读写性能,顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写。另外kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty。当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。

2.2 使用零拷贝

在Linux kernel2.2 之后出现了一种叫做”零拷贝(zero-copy)”系统调用机制,就是跳过“用户缓冲区”的拷贝,建立一个磁盘空间和内存的直接映射,数据不再复制到“用户态缓冲区”。传统模式下数据从文件传输到网络需要4次数据拷贝,4次上下文切换和2次系统调用,通过NIO的transferTo/transferFrom调用操作系统的sendfile实现了零拷贝。总共发生2次内核数据拷贝,2次上下文切换和1次系统调用,消除了CPU数据拷贝。这样系统上下文切换减少为2次,可以提升一倍的性能。

2.3 数据批处理和压缩

Producer和Consumer均支持批量处理数据,从而减少了网络传输的开销。比如每满100条消息才发送一次,或者每5秒发送一次。另外Producer可将数据压缩后发送给broker,从而减少网络传输代价。目前支持Snappy, Gzip和LZ4压缩。Producer压缩之后,在Consumer端需要解压,虽然增加了CPU的工作,但在对大数据处理上,瓶颈在网络上而不是CPU。

2.4 使用Partition技术

通过Partition实现了并行处理和水平扩展,Partition是Kafka(包括Kafka Stream)并行处理的最小单位,不同Partition可处于不同的Broker(节点),可以充分利用多机资源。同一Broker(节点)上的不同Partition可置于不同的Directory,如果节点上 有多个Disk Drive,可将不同的Drive对应不同的Directory,从而使Kafka充分利用多Disk Drive的磁盘优势。

2.5 使用ISR

ISR实现了可用性和一致性的动态平衡。ISR可容忍更多的节点失败,ISR如果要容忍f个节点失败,至少只需要f+1个节点。一旦Leader crash后,ISR中的任何replica皆可竞选成为Leader,如果所有replica都crash,可选择让第一个recover的replica或者第一个在ISR中的replica成为leader。

2.6 zerocopy实验

这里我们使用NIO的transferTo/transferFrom做文件数据传输性能测试,同时使用read/write方式做文件数据传输测试,并比较二者的差异。

  • 使用FileChannel.transferFrom()
  • 使用FileChannel.transferTo()
  • 使用非直接模式ByteBuffer的read/write
  • 使用直接模式 ByteBuffer的read/write
    测试文件大小:600+ MB
    2016-12-24_10-44-45

测试的缓冲区大小:4KB
机器配置:MacBook Pro i7 2.2GHz Mem 16GB SSD 256 GB

测试结果如下.
2016-12-24_10-53-32
结论:

  • transferFrom和transferTo 数据传输性能差不多。transferFrom性能稍优
  • 使用直接模式和非直接模式 read/write 数据传输性能差不多。直接模式性能稍优
  • transferFrom/To与read/write 性能高一倍以上。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package zerocopy;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.EnumSet;

public class ZeroCopyTest {

private final static Path copy_from = Paths.get("/tmp/test/from/Security.mp4");
private final static Path copy_to = Paths.get("/tmp/test/to/Security.mp4");
private static long startTime, elapsedTime;
private static int bufferSizeKB = 4;
private static int bufferSize = bufferSizeKB * 1024;

public static void main(String[] args) throws Exception {

transferfrom();
transferTo();
nonDirectBuffer();
directBuffer();

}

public static void transferfrom() {

try (FileChannel fileChannel_from = (FileChannel.open(copy_from,
EnumSet.of(StandardOpenOption.READ)));
FileChannel fileChannel_to = (FileChannel.open(copy_to,
EnumSet.of(StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)))) {

startTime = System.currentTimeMillis();
fileChannel_to.transferFrom(fileChannel_from, 0L, (int) fileChannel_from.size());
elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("transferFrom Time is " + elapsedTime + " ms");
}catch (IOException ex) {
System.err.println(ex);
}
deleteCopied(copy_to);
}

public static void transferTo() throws Exception{

try (FileChannel fileChannel_from = (FileChannel.open(copy_from,
EnumSet.of(StandardOpenOption.READ)));
FileChannel fileChannel_to = (FileChannel.open(copy_to,
EnumSet.of(StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)))) {

startTime = System.currentTimeMillis();

fileChannel_from.transferTo(0L, fileChannel_from.size(), fileChannel_to);

elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("transferTo Time is " + elapsedTime + " ms");
}catch (IOException ex) {
System.err.println(ex);
}
deleteCopied(copy_to);

}

public static void nonDirectBuffer(){

try (
FileChannel fileChannel_from = FileChannel.open(copy_from,
EnumSet.of(StandardOpenOption.READ));
FileChannel fileChannel_to = FileChannel.open(copy_to,
EnumSet.of(StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE));){

startTime = System.currentTimeMillis();
ByteBuffer bytebuffer = ByteBuffer.allocate(bufferSize);
while ((fileChannel_from.read(bytebuffer)) > 0) {
bytebuffer.flip();
fileChannel_to.write(bytebuffer);
bytebuffer.clear();
}

elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("nonDirectBuffer read/write Time is " + elapsedTime + " ms");
}catch (IOException ex) {
System.err.println(ex);
}
deleteCopied(copy_to);
}

public static void directBuffer(){
try (
FileChannel fileChannel_to = FileChannel.open(copy_to,
EnumSet.of(StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE));
FileChannel fileChannel_from = (FileChannel.open(copy_from,
EnumSet.of(StandardOpenOption.READ)));) {

startTime = System.currentTimeMillis();
ByteBuffer bytebuffer = ByteBuffer.allocateDirect(bufferSize);
while ((fileChannel_from.read(bytebuffer)) > 0) {
bytebuffer.flip();
fileChannel_to.write(bytebuffer);
bytebuffer.clear();
}

elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("directBuffer read/write Time is " + elapsedTime + " ms");
}catch (IOException ex) {
System.err.println(ex);
}
deleteCopied(copy_to);
}

public static void deleteCopied(Path path){
try {
Files.deleteIfExists(path);
}catch (IOException ex) {
System.err.println(ex);
}
}
}

参考:通过零拷贝实现有效数据传输