1. 简介

为了更好的实现负载均衡和消息的顺序性,Kafka Producer可以通过分发策略发送给指定的Partition。Kafka保证在partition中的消息是有序的。

  • 一个Partition只分布于一个Broker上(不考虑备份)
  • 一个Partition物理上对应一个文件夹
  • 一个Partition包含多个Segment(Segment对用户透明)
  • 一个Segment对应一个文件
  • Segment由一个个不可变记录组成
  • 记录只会被append到Segment中,不会被单独删除或者修改
  • 清除过期日志时,直接删除一个或多个Segment。

消息被路由到哪个partition上,是有producer客户端决定的.比如客户端采用random,hash及RoundRobin轮询等,如果一个topic中有多个partitions,那么在producer端实现”消息均衡分发”是必要的,producer通过设置partitioner.class的属性来指定向那个分区发送数据。
下面我们就实际演示Producer的分发策略。

2. 准备工作

2.1 启动zookeeper,kafka

sudo docker start zookeeper
sudo docker start kafka`</pre>

Docker设置方法参见上文

2.2 参数修改

将kafka docker中的server.properties配置文件的advertised.host.name改成docker宿主机的IP地址,以便远程客户端访问。

advertised.host.name=192.168.199.122

2016-10-16_0-03-56

3. 启动Eclipse(本地windows端)

3.1 新建project

新建一个maven project(Eclipse 安装设置方法参见本博其他文章)。
2016-10-15_19-14-47

编辑pom.xml 在下加入以下代码

<dependency>    
          <groupId>org.apache.kafka</groupId>    
          <artifactId>kafka_2.11</artifactId>    
          <version>0.8.2.2</version>   
</dependency>

保存后,eclipse自动获取kafka相关jar文件。
2016-10-15_19-29-02

3.2 DemoConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.alexwu211.kafka.kafa080demo;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class DemoConsumer {

/**
* @param args
*/
public static void main(String[] args) {
args = new String[]{"192.168.199.122:2181", "topic1", "group1", "consumer1"};
if (args == null || args.length != 4) {
System.err.print(
"Usage:\n\tjava -jar kafka_consumer.jar ${zookeeper_list} ${topic_name} ${group_name} ${consumer_id}");
System.exit(1);
}
String zk = args[0];
String topic = args[1];
String groupid = args[2];
String consumerid = args[3];
Properties props = new Properties();
props.put("zookeeper.connect", zk);
props.put("group.id", groupid);
props.put("autooffset.reset", "largest");
props.put("autocommit.enable", "true");
props.put("client.id", "test");
props.put("auto.commit.interval.ms", "1000");

ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

Map topicCountMap = new HashMap();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);

KafkaStream stream1 = consumerMap.get(topic).get(0);
ConsumerIterator it1 = stream1.iterator();
while (it1.hasNext()) {
MessageAndMetadata messageAndMetadata = it1.next();
String message =
String.format("Consumer ID:%s, Topic:%s, GroupID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s",
consumerid,
messageAndMetadata.topic(), groupid, messageAndMetadata.partition(),
messageAndMetadata.offset(), new String(messageAndMetadata.key()),new String(messageAndMetadata.message()));
System.out.println(message);
}
}

}

3.3 ProducerDemo

使用RandomPartitioner.class.getName()确保每个producer实例使用单独Partitioner实例。下面的例子使用了两个producer实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.alexwu211.kafka.kafa080demo;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Scanner;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class ProducerDemo {

static private final String TOPIC = "topic1";
static private final String ZOOKEEPER = "192.168.199.122:2181";
static private final String BROKER_LIST = "192.168.199.122:9092";
// static private final int PARTITIONS = TopicAdmin.partitionNum(ZOOKEEPER, TOPIC);
static private final int PARTITIONS = 3;

public static void main(String[] args) throws Exception {
String pt = "RoundRobinShare2";
Producer producer = initProducer(pt);
Producer producer1 = initProducer(pt);
sendOne(producer,producer1, TOPIC,pt);

}

private static Producer initProducer(String pt) {
Properties props = new Properties();
props.put("metadata.broker.list", BROKER_LIST);
// props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("serializer.class", StringEncoder.class.getName());
if (pt =="Random" ){
props.put("partitioner.class", RandomPartitioner.class.getName());
}
if (pt =="Hash")
{
props.put("partitioner.class", HashPartitioner.class.getName());
}
if (pt =="RoundRobin")
{
props.put("partitioner.class", RoundRobinPartitioner.class.getName());
}

// props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
// props.put("compression.codec", "0");
props.put("producer.type", "sync");
props.put("batch.num.messages", "3");
props.put("queue.buffer.max.ms", "10000000");
props.put("queue.buffering.max.messages", "1000000");
props.put("queue.enqueue.timeout.ms", "20000000");

ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer(config);
return producer;
}

public static void sendOne(Producer producer,Producer producer1,String topic,String pt) throws InterruptedException {
KeyedMessage message1 = new KeyedMessage(topic, "31", pt + " test 31");
producer.send(message1);
Thread.sleep(5000);
KeyedMessage message2 = new KeyedMessage(topic, "31", pt + " test 32");
producer1.send(message2);
Thread.sleep(5000);
KeyedMessage message3 = new KeyedMessage(topic, "31", pt + " test 33");
producer.send(message3);
Thread.sleep(5000);
KeyedMessage message4 = new KeyedMessage(topic, "31", pt + " test 34");
producer1.send(message4);
Thread.sleep(5000);
KeyedMessage message5 = new KeyedMessage(topic, "31", pt + " test 35");
producer.send(message5);
Thread.sleep(5000);
producer.close();
}

}

3.4 Hash策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.alexwu211.kafka.kafa080demo;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class HashPartitioner implements Partitioner {

public HashPartitioner(VerifiableProperties verifiableProperties) {}

public int partition(Object key, int numPartitions) {
if ((key instanceof Integer)) {
return Math.abs(Integer.parseInt(key.toString())) % numPartitions;
}
return Math.abs(key.hashCode() % numPartitions);
}
}

我们看到多个producer实例下Key:31会一直发往hash指定的PartitionID:1

Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:16, Message Key:31, Message Payload: Hash test 31
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:17, Message Key:31, Message Payload: Hash test 32
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:18, Message Key:31, Message Payload: Hash test 33
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:19, Message Key:31, Message Payload: Hash test 34
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:20, Message Key:31, Message Payload: Hash test 35`</pre>

3.5 Random策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.alexwu211.kafka.kafa080demo;

import java.util.Random;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class RandomPartitioner implements Partitioner {

public RandomPartitioner(VerifiableProperties verifiableProperties) {}

public int partition(Object key, int numPartitions) {
Random random = new Random();
return random.nextInt(numPartitions);
}
}

由于是Random策略,Key:31会随机发往Partition

Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:21, Message Key:31, Message Payload: Random test 31
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:2, Offset:15, Message Key:31, Message Payload: Random test 32
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:2, Offset:16, Message Key:31, Message Payload: Random test 33
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:2, Offset:17, Message Key:31, Message Payload: Random test 34
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:22, Message Key:31, Message Payload: Random test 35

3.6 RoundRobin策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.alexwu211.kafka.kafa080demo;

import java.util.concurrent.atomic.AtomicLong;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class RoundRobinPartitioner implements Partitioner {

private static AtomicLong next = new AtomicLong();

public RoundRobinPartitioner(VerifiableProperties verifiableProperties) {}

public int partition(Object key, int numPartitions) {
long nextIndex = next.incrementAndGet();
return (int)nextIndex % numPartitions;
}
}

我们注意到我们使用了AtomicLong来保证多实例共享下线程的安全,关于Atomic的作用,可以参见AtomicInteger在实际项目中的应用。我们看到多个producer实例下Key:31会轮流发往topic1可用的Partition。

Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:0, Offset:14, Message Key:31, Message Payload: RoundRobin test 31
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:29, Message Key:31, Message Payload: RoundRobin test 32
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:2, Offset:21, Message Key:31, Message Payload: RoundRobin test 33
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:0, Offset:15, Message Key:31, Message Payload: RoundRobin test 34
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:30, Message Key:31, Message Payload: RoundRobin test 35`</pre>

4. 验证Partitioner的实例个数及其是否需要保证线程安全

从第2步各种分发策略的实验可知,当指定使用自定义的Partitioner实现后,Producer会使用该实现来作路由决策(即决定消息应该发送到哪个Broker上的哪个Partition)。这就涉及到该类如果是一个实例被共享,需要考虑线程安全的问题,以上我们使用AtomicLong来保证多实例共享下线程的安全。那么如果不使用AtomicLong的情况下,会发生怎样的情况呢?
我们增加一个RoundRobin2策略来演示在共享实例情况下使用和不使用AtomicLong的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

package com.alexwu211.kafka.kafa080demo;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class RoundRobinPartitioner2 implements Partitioner {

private int i = 0;

public RoundRobinPartitioner2(VerifiableProperties verifiableProperties) {}

public int partition(Object key, int numPartitions) {
long nextIndex = i++;
return (int)nextIndex % numPartitions;
}
}

相应的在preducerdemo增加以下代码,RoundRobinShare使用AtomicLong,RoundRobinShare2不使用AtomicLong。

1
2
3
4
5
6
7
8
if (pt =="RoundRobinShare")
{
props.put("partitioner.class", "com.alexwu211.kafka.kafa080demo.RoundRobinPartitioner");
}
if (pt =="RoundRobinShare2")
{
props.put("partitioner.class", "com.alexwu211.kafka.kafa080demo.RoundRobinPartitioner2");
}

在使用AtomicLong情况下,轮询正常。

Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:61, Message Key:31, Message Payload: RoundRobinShare test 31
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:2, Offset:47, Message Key:31, Message Payload: RoundRobinShare test 32
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:0, Offset:36, Message Key:31, Message Payload: RoundRobinShare test 33
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:62, Message Key:31, Message Payload: RoundRobinShare test 34
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:2, Offset:48, Message Key:31, Message Payload: RoundRobinShare test 35

在不使用AtomicLong情况下,轮询失败。

Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:0, Offset:37, Message Key:31, Message Payload: RoundRobinShare2 test 31
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:0, Offset:38, Message Key:31, Message Payload: RoundRobinShare2 test 32
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:63, Message Key:31, Message Payload: RoundRobinShare2 test 33
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:1, Offset:64, Message Key:31, Message Payload: RoundRobinShare2 test 34
Consumer ID:consumer1, Topic:topic1, GroupID:group1, PartitionID:2, Offset:49, Message Key:31, Message Payload: RoundRobinShare2 test 35

结论,由此我们看到实例被共享时,需要一些手段保证线程的安全。

5. 同步异步的参数

上面的例子我们都使用了同步的方法,也就是实时发送,但是如果遇到IO操作等耗时操作时并且不需要让程序等待对方返回,我们可以使用异步发送。异步的好处很明显的,异步可以增加客户体验,可以释放占用资源从而提高系统性能。
kafka中可以使用producer.type参数设置同步还是异步(async/sync),默认是sync。
下面是其它一些相关参数:

  • batch.num.messages 异步发送 每次批量发送的条目
  • queue.buffering.max.ms 异步发送的时候 发送时间间隔 单位是毫秒
  • queue.buffering.max.messages 每次最大的提交量
  • queue.enqueue.timeout.ms 0 代表队列没满的时候直接入队,满了立即扔弃,-1代表无条件阻塞且不丢弃