Storm与kafka集成(上)
1. 简介
我们知道storm的作用主要是进行流式实时计算,对于均匀的数据流storm处理是非常有效的,但是现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用storm做实时计算的话可能因为数据拥堵而导致服务器挂掉,应对这种情况,使用kafka作为消息队列是非常合适的选择,kafka可以将不均匀的数据转换成均匀的消息流,从而和storm比较完善的结合,这样才可以实现稳定的流式计算,storm和kafka结合,实质上无非是把Kafka的数据消费,是由Storm去消费,通过KafkaSpout将数据输送到Storm,然后让Storm安装业务需求对接受的数据做实时处理,最后将处理后的数据输出或者保存到文件、数据库、分布式存储等等。
2. 搭建storm和kafka集群
我们要搭建storm和kafka集群,这里使用docker镜像。所以首先使用docker file建立镜像。
2.1 storm的docker file
FROM openjdk:8-jre-alpine
ARG MIRROR=http://mirrors.aliyun.com
ARG BIN_VERSION=apache-storm-1.0.3
# Install required packages
RUN apk add --no-cache \
bash \
python \
su-exec
RUN wget -q -O - ${MIRROR}/apache/storm/${BIN_VERSION}/${BIN_VERSION}.tar.gz | tar -xzf - -C /usr/share \
&& mv /usr/share/${BIN_VERSION} /usr/share/storm \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
WORKDIR /usr/share/storm
# add startup script
ADD entrypoint.sh entrypoint.sh
ADD cluster.xml log4j2/cluster.xml
ADD worker.xml log4j2/worker.xml
RUN chmod +x entrypoint.sh
# supervisor: worker ports
EXPOSE 6700 6701 6702 6703
# logviewer
EXPOSE 8000
# DRPC and remote deployment
EXPOSE 6627 3772 3773
ENTRYPOINT ["/usr/share/storm/entrypoint.sh"]
2.2 zookeeper和kafak的docker file
这里略,参见本博客关于kafka的博文。
2.3 使用docker-compose 启动集群
version: '2.0'
services:
zookeeper0:
image: alex/zookeeper_cluster:3.4.6
container_name: zookeeper0
hostname: zookeeper0
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
expose:
- 2181
- 2888
- 3888
environment:
ZOOKEEPER_PORT: 2181
ZOOKEEPER_ID: 0
ZOOKEEPER_SERVERS: server.0=zookeeper0:2888:3888 server.1=zookeeper1:28881:38881 server.2=zookeeper2:28882:38882
zookeeper1:
image: alex/zookeeper_cluster:3.4.6
container_name: zookeeper1
hostname: zookeeper1
ports:
- "2182:2182"
- "28881:28881"
- "38881:38881"
expose:
- 2182
- 2888
- 3888
environment:
ZOOKEEPER_PORT: 2182
ZOOKEEPER_ID: 1
ZOOKEEPER_SERVERS: server.0=zookeeper0:2888:3888 server.1=zookeeper1:28881:38881 server.2=zookeeper2:28882:38882
zookeeper2:
image: alex/zookeeper_cluster:3.4.6
container_name: zookeeper2
hostname: zookeeper2
ports:
- "2183:2183"
- "28882:28882"
- "38882:38882"
expose:
- 2183
- 2888
- 3888
environment:
ZOOKEEPER_PORT: 2183
ZOOKEEPER_ID: 2
ZOOKEEPER_SERVERS: server.0=zookeeper0:2888:3888 server.1=zookeeper1:28881:38881 server.2=zookeeper2:28882:38882
kafka0:
image: alex/kafka_cluster:0.8.2.2
container_name: kafka0
hostname: kafka0
ports:
- "9092:9092"
environment:
ZOOKEEPER_CONNECT: zookeeper0:2181,zookeeper1:2182,zookeeper2:2183
BROKER_ID: 0
BROKER_PORT: 9092
ADVERTISED_HOST_NAME: kafka0
HOST_NAME: kafka0
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper0
- zookeeper1
- zookeeper2
expose:
- 9092
kafka1:
image: alex/kafka_cluster:0.8.2.2
container_name: kafka1
hostname: kafka1
ports:
- "9093:9093"
environment:
ZOOKEEPER_CONNECT: zookeeper0:2181,zookeeper1:2182,zookeeper2:2183
BROKER_ID: 1
BROKER_PORT: 9093
ADVERTISED_HOST_NAME: kafka1
HOST_NAME: kafka1
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper0
- zookeeper1
- zookeeper2
expose:
- 9093
kafka2:
image: alex/kafka_cluster:0.8.2.2
container_name: kafka2
hostname: kafka2
ports:
- "9094:9094"
environment:
ZOOKEEPER_CONNECT: zookeeper0:2181,zookeeper1:2182,zookeeper2:2183
BROKER_ID: 2
BROKER_PORT: 9094
ADVERTISED_HOST_NAME: kafka2
HOST_NAME: kafka2
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper0
- zookeeper1
- zookeeper2
expose:
- 9094
nimbus:
image: alex/storm:1.0.3
container_name: nimbus
command: nimbus -c nimbus.host=nimbus
environment:
- STORM_ZOOKEEPER_SERVERS=zookeeper0,zookeeper1,zookeeper2
hostname: nimbus
ports:
- "6627:6627"
ui:
image: alex/storm:1.0.3
container_name: ui
command: ui -c nimbus.host=nimbus
environment:
- STORM_ZOOKEEPER_SERVERS=zookeeper0,zookeeper1,zookeeper2
hostname: ui
ports:
- "8080:8080"
depends_on:
- nimbus
supervisor1:
image: alex/storm:1.0.3
container_name: supervisor1
command: supervisor -c nimbus.host=nimbus -c supervisor.slots.ports=[6700,6701,6702,6703]
environment:
- STORM_ZOOKEEPER_SERVERS=zookeeper0,zookeeper1,zookeeper2
hostname: supervisor1
ports:
- "8000:8000"
depends_on:
- nimbus