1. 简介

Kafka入门之一:使用Docker安装Kafka和Zookeeper里我们已经演示使用docker安装单节点的Kafka,也就是一个broker。但实际上kafka是天生支持多broker的。在安装之前,我们先来看几个broker参数。 更多参数参见官网.







































































































































































































Property Default Description
broker.id   每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,并且它的存在使得broker无须混淆consumers就可以迁移到不同的host/port上。你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。
log.dirs /tmp/kafka-logs kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。
port 9092 server接受客户端连接的端口
zookeeper.connect null ZooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;为了当某个host宕掉之后你能通过其他ZooKeeper节点进行连接,你可以按照一下方式制定多个hosts:hostname1:port1, hostname2:port2, hostname3:port3.ZooKeeper 允许你增加一个“chroot”路径,将集群中所有kafka数据存放在特定的路径下。当多个Kafka集群或者其他应用使用相同ZooKeeper集群时,可以使用这个方式设置数据存放路径。这种方式的实现可以通过这样设置连接字符串格式,如下所示:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path这样设置就将所有kafka集群数据存放在/chroot/path路径下。注意,在你启动broker之前,你必须创建这个路径,并且consumers必须使用相同的连接格式。
message.max.bytes 1000000 server可以接收的消息最大尺寸。重要的是,consumer和producer有关这个属性的设置必须同步,否则producer发布的消息对consumer来说太大。
num.network.threads 3 server用来处理网络请求的网络线程数目;一般你不需要更改这个属性。
num.io.threads 8 server用来处理请求的I/O线程的数目;这个线程数目至少要等于硬盘的个数。
background.threads 4 用于后台处理的线程数目,例如文件删除;你不需要更改这个属性。
queued.max.requests 500 在网络线程停止读取新请求之前,可以排队等待I/O线程处理的最大请求个数。
host.name null broker的hostname;如果hostname已经设置的话,broker将只会绑定到这个地址上;如果没有设置,它将绑定到所有接口,并发布一份到ZK
advertised.host.name null 如果设置,则就作为broker 的hostname发往producer、consumers以及其他brokers
advertised.port null 此端口将给与producers、consumers、以及其他brokers,它会在建立连接时用到; 它仅在实际端口和server需要绑定的端口不一样时才需要设置。
num.partitions 1 如果创建topic时没有给出划分partitions个数,这个数字将是topic下partitions数目的默认数值。
auto.create.topics.enable true 是否允许自动创建topic。如果是真的,则produce或者fetch 不存在的topic时,会自动创建这个topic。否则需要使用命令行创建topic
default.replication.factor 1 默认备份份数,仅指自动创建的topics
replica.lag.time.max.ms 10000 如果一个follower在这个时间内没有发送fetch请求,leader将从ISR重移除这个follower,并认为这个follower已经挂了
replica.lag.max.messages 4000 如果一个replica没有备份的条数超过这个数值,则leader将移除这个follower,并认为这个follower已经挂了
replica.socket.timeout.ms 301000 leader 备份数据时的socket网络请求的超时时间
replica.socket.receive.buffer.bytes 641024 备份时向leader发送网络请求时的socket receive buffer
replica.fetch.max.bytes 1024*1024 备份时每次fetch的最大值
replica.fetch.min.bytes 500 leader发出备份请求时,数据到达leader的最长等待时间
replica.fetch.min.bytes 1 备份时每次fetch之后回应的最小尺寸
num.replica.fetchers 1 从leader备份数据的线程数
replica.high.watermark.checkpoint.interval.ms 5000 每个replica检查是否将最高水位进行固化的频率
zookeeper.session.timeout.ms 6000 zookeeper会话超时时间。
zookeeper.connection.timeout.ms 6000 客户端等待和zookeeper建立连接的最大时间
zookeeper.sync.time.ms 2000 zk follower落后于zk leader的最长时间
auto.leader.rebalance.enable true 如果这是true,控制者将会自动平衡brokers对于partitions的leadership
leader.imbalance.per.broker.percentage 10 每个broker所允许的leader最大不平衡比率
leader.imbalance.check.interval.seconds 300 检查leader不平衡的频率
offset.metadata.max.bytes 4096 允许客户端保存他们offsets的最大个数
max.connections.per.ip Int.MaxValue 每个ip地址上每个broker可以被连接的最大数目
max.connections.per.ip.overrides   每个ip或者hostname默认的连接的最大覆盖
connections.max.idle.ms 600000 空连接的超时限制
log.roll.jitter.{ms,hours} 0 从logRollTimeMillis抽离的jitter最大数目
num.recovery.threads.per.data.dir 1 每个数据目录用来日志恢复的线程数目
unclean.leader.election.enable true 指明了是否能够使不在ISR中replicas设置用来作为leader
delete.topic.enable false 能够删除topic

这些参数在你还没有了解具体用途之前,你都可以默认,今天我们将会使用几个参数。

2. Kafka集群

2.1 Dockerfile

我们在dockeefile中加入一些broker的参数,即在server.properites文件中设置的参数。

FROM java:openjdk-8-jre-alpine
ARG MIRROR=http://mirrors.aliyun.com/
ARG SCALA_VERSION=2.11
ARG KAFKA_VERSION=0.8.2.2
LABEL name="kafka" version=$VERSION
RUN apk update && apk add ca-certificates && \
    apk add tzdata && \
    ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
    echo "Asia/Shanghai" > /etc/timezone
RUN apk add --no-cache wget bash \
    && mkdir /opt \
    && wget -q -O - $MIRROR/apache/kafka/$KAFKA_VERSION/kafka_$SCALA_VERSION-$KAFKA_VERSION.tgz | tar -xzf - -C /opt \
    && mv /opt/kafka_$SCALA_VERSION-$KAFKA_VERSION /opt/kafka \
    && sed -i 's/num.partitions.*$/num.partitions=3/g' /opt/kafka/config/server.properties

RUN echo "cd /opt/kafka" > /opt/kafka/start.sh &&\
    echo "sed -i 's%zookeeper.connect=.*$%zookeeper.connect=zookeeper:2181%g'  /opt/kafka/config/server.properties" >> /opt/kafka/start.sh &&\
    echo "[ ! -z $""BROKER_ID"" ] && sed -i 's%broker.id=.*$%broker.id='$""BROKER_ID'""%g'  /opt/kafka/config/server.properties" >> /opt/kafka/start.sh &&\
    echo "[ ! -z $""BROKER_PORT"" ] && sed -i 's%port=.*$%port='$""BROKER_PORT'""%g'  /opt/kafka/config/server.properties" >> /opt/kafka/start.sh &&\
    echo "sed -i 's%#advertised.host.name=.*$%advertised.host.name='$""(hostname -i)'""%g'  /opt/kafka/config/server.properties" >> /opt/kafka/start.sh &&\
    echo "[ ! -z $""ADVERTISED_HOST_NAME"" ] && sed -i 's%.*advertised.host.name=.*$%advertised.host.name='$""ADVERTISED_HOST_NAME'""%g'  /opt/kafka/config/server.properties" >> /opt/kafka/start.sh &&\
    echo "sed -i 's%#host.name=.*$%host.name='$""(hostname -i)'""%g'  /opt/kafka/config/server.properties" >> /opt/kafka/start.sh &&\
    echo "[ ! -z $""HOST_NAME"" ] && sed -i 's%.*host.name=.*$%host.name='$""HOST_NAME'""%g'  /opt/kafka/config/server.properties" >> /opt/kafka/start.sh &&\
    echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties &&\
    echo "bin/kafka-server-start.sh config/server.properties" >> /opt/kafka/start.sh &&\
    chmod a+x /opt/kafka/start.sh

EXPOSE 9092
WORKDIR /opt/kafka
ENTRYPOINT ["sh", "/opt/kafka/start.sh"]

2.2 启动zookeeper

为简单起见,zookeeper我们暂时使用单个容器。

sudo docker build -f zookeeper.Dockerfile -t alex/zookeeper:3.4.6 .

2.3 启动Kafka集群

使用下面命令启动三个Kafka容器,分别是kafka0,kafka1,kafka2,我们看到启动容器时传入了BROKER_ID,BROKER_PORT等变量。这就可以很方便的启动多个容器。

docker run -itd --name kafka0 -h kafka0 -p9092:9092 -e BROKER_ID=0 -e BROKER_PORT=9092 --link zookeeper alex/kafka_cluster:0.8.2.2
docker run -itd --name kafka1 -h kafka1 -p9093:9092 -e BROKER_ID=1 -e BROKER_PORT=9092 --link zookeeper alex/kafka_cluster:0.8.2.2
docker run -itd --name kafka2 -h kafka2 -p9094:9092 -e BROKER_ID=2 -e BROKER_PORT=9092 --link zookeeper alex/kafka_cluster:0.8.2.2

3. 验证是否集群正常

进入kafka0容器内, 创建topic1

docker exec -it kafka0 bash
bin/kafka-topics.sh --zookeeper zookeeper:2181 --topic topic1 --create --replication-factor 2 --partitions 3
bin/kafka-topics.sh --zookeeper zookeeper:2181 --topic topic1 --describe

2016-10-22_21-17-05
启动consumer

bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic topic1

进入kafka2容器内, 启动producer

bin/kafka-console-producer.sh --broker-list kafka2:9092 --topic topic1`</pre>

2016-10-22_21-16-06

发送一些消息,我看到consumer端正常接收。
2016-10-22_21-16-26

这就说明集群配置成功。

4. 验证min.insync.replicas和request.required.acks的作用。

我们创建topic2,并将min.insync.replicas设为2,partitions为1。

bin/kafka-topics.sh --zookeeper zookeeper:2181 --topic topic2 --create --config min.insync.replicas=2 --replication-factor 2 --partitions 1
bin/kafka-topics.sh --zookeeper zookeeper:2181 --topic topic2 --describe

2016-10-22_23-26-17

我们看到topic2分配在了kafka2上,并且kafka0是备份,ISR是0和2,现在我们将kafka1和kafka2的broker关闭。我们看到leader已自动转为broker0。然后启动consumer

bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic topic

在另一个终端上启动producer,并将request-required-acks设置为-1

bin/kafka-console-producer.sh --broker-list kafka0:9092 --topic topic2 --request-required-acks -1

2016-10-22_23-35-05

我们看到此时发送消息,kafka出现错误。原因是我们在producer发送时设置了检查replicas是否都收到数据的确认,但是因为我们已经关闭broker2, 所以尝试发送3次未果后,producer返回错误。
显然,我们将producer以默认设置发送消息时,consumer马上收到消息了。

2016-10-22_22-02-59