1、kafka架构图及下载

https://archive.apache.org/dist/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz

此外,java安装并配置好环境变量。

2、修改kafka配置文件

vim ./config/server.properties
broker.id=0 #每个broker的唯一标识,不可重复
delete.topic.enable=true #允许删除topic
log.dirs=/root/kafka/logs #出于安全考虑,修改默认log位置。kafka的消息队列数据和日志都存储在logs文件夹下面。
(logs文件夹在kafka根目录下创建)

#在zookeeper的数据机构中,每个子目录项如 NameService 都被称作为 znode(目录节点),和文件系统一样,我们能够自由的增加、删除#znode,在一个znode下增加、删除子znode,唯一的不同在于znode是可以存储数据的
zookeeper.connect=s4:2181,s5:2181,s6:2181  #指定zookeeper集群

3、kafka和ZK集群配置文件同步

xync 分发同步 kafka和zk目录 (会经常用到同步工具xsync)

或使用mobaxterm的multi-exec功能同时修改多台机器的配置。

4、zookeeper的分布式安装配置

4.1安装

kafka依赖于zookeeper,先下载安装好zookeeper。(三台机器都要安装配置,可以使用同步脚本xsync,或者mobaxterm的的multi-execution功能) 安装好后将zookeeper配置文件模板改为配置文件

mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
dataDir=/tmp/zookeeper #修改默认路径,指定路径为/root/zookeeper/zkData

4.2修改配置文件

在zkData目录下创建myid文件,myid文件内容(整型数字:1,2,3)对应三个集群节点的编号。 并在zoo.cfg文件末尾增加如下配置:

server.1=s4:2888:3888
server.2=s5:2888:3888
server.3=s6:2888:3888

数字123对应myid文件的内容,指定了节点的编号。 s4,s5,s6是主机名或IP;2888是follower与leader服务器通讯传递副本(replicator)的端口;3888是leader挂掉后集群重新选举时通信的端口。

4.3启动验证

配置完毕启动zookeeper。查看状态,显示Mode为leader或follower即表示集群启动成功。

设置开机启动,使用admin用户启动zookeeper:

$ sudo su #切换到root用户
$ vim /etc/rc.d/rc.local
新增配置
su admin -c "/usr/local/zookeeper/startBase.sh"  #组件脚本全路径

startBase.sh脚本内容:

#!/bin/bash
# chkconfig:   2345 60 20
# description:  zookeeper start
APP_HOME=/usr/local/zookeeper/bin
cd $APP_HOME
echo $PWD
source /etc/profile
./zkServer.sh start ../conf/zoo.cfg

5、启动kafka服务并测试

  110  cd kafka/bin/
  106  sh bin/kafka-server-start.sh config/server.properties
  113  ./kafka-topics.sh --create --zookeeper s4:2181 --partitions 2 --replication-factor 2 --topic topic01
    117  ./kafka-topics.sh --list --zookeeper s4:2181
  118  ./kafka-topics.sh --list --zookeeper s5:2181
  119  ./kafka-topics.sh --list --zookeeper s6:2181

#描述topic信息
  ./kafka-topics.sh --describe --zookeeper s5:2181 --topic topic01
# 启动 kafka,指定配置文件
# 创建topic,指定zk服务器,分区数、副本数、topic名字

6、 生成、消费数据

s4生产数据:

kafka-console-producer.sh --broker-list s4:9092 --topic topic01

hi there i am richard

s6或者s5 消费数据:

kafka-console-consumer.sh --zookeeper s5:2181 --topic topic01 #消费最新数据

消费者消费消息,指定消费组名

./kafka-console-consumer.sh --bootstrap-server s4:9092,s5:9092,s6:9092 --new-consumer --consumer-property group.id=consumer_group01 --topic topic01

#这里可看到所有消费的消息
kjh
iouioy
khkhjkl
lllllllllllllllllll

查看正在运行的消费组

./kafka-consumer-groups.sh --bootstrap-server s4:9092 --list --new-consumer
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

consumer_group01

消费堆积情况查看

./kafka-consumer-groups.sh --bootstrap-server s4:9092 --describe --group consumer_group01
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
topic01                        0          1               1               0          consumer-1-dd3fc747-7a23-4057-a5e3-e6268f2ee802   /10.3.3.5                      consumer-1
topic01                        1          1               1               0          consumer-1-dd3fc747-7a23-4057-a5e3-e6268f2ee802   /10.3.3.5                      consumer-1

消费所有数据,从头开始

bin/kafka-console-consumer.sh --zookeeper s5:2181 --topic topic01 --from beginning  
bin/kafka-console-consumer.sh --bootstrap-server s5:9092 --topic topic01 --from-beginning
#offset数据不经过zk,而是通过kafka集群bootstrap-server
#offset数据保存在kafka集群的topic里

查看topic信息

bin/kafka-topics.sh --zookeeper s5:2181 --describe topic topic01
Topic:topic01   PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: topic01  Partition: 0    Leader: 1       Replicas: 1,0   Isr: 1
        Topic: topic01  Partition: 1    Leader: 1       Replicas: 2,1   Isr: 1

删除topic

bin/kafka-topics.sh --zookeeper s5:2181 --delete --topic topic01