文章目录
- 安装ZK
- 下载解压kafka
- 启动Zookeeper并提前启动好
- 修改kafka配置文件
- server1.properties 的配置:
- server2.properties 的配置:
- server3.properties 的配置:
- 分别启动3个服务
- 集群下创建Topic
- 集群下启动Consumer
- 集群下启动Producer
一、简介
Apache Kafka是一个快速、可扩展的、高吞吐的、可容错的分布式“发布-订阅”消息系统,使用Scala与Java语言编写,能够将消息从一个端点传递到另一个端点,较之传统的消息中间件(例如ActiveMQ、RabbitMQ),Kafka具有高吞吐量、内置分区、支持消息副本和高容错的特性,非常适合大规模消息处理应用程序。
ip:172.22.40.107
安装路径:/usr/local/kafka
单机版集群,实际上就是在同一台机器上,运行多个kafka服务,只是端口不同。
基于kafka单机版安装流程,可参考:CentOS安装kafka 2.8.0单机版
所有的kafka节点连接到相同的ZK(或ZK集群),需要先安装一个ZK。ZK的安装可参考:
linux下安装zookeeper集群-3.6.3版本
在本例中ZK也安装在这台机器上。
注意:单机的kafka和集群的kafka不要混用一个zk,否则会出现数据混乱的问题。
安装ZK
咱们使用【linux下安装zookeeper集群-3.6.3版本】里面安装好的zk集群:
172.22.40.104
172.22.40.105
172.22.40.106
下载解压kafka
获取下载地址(点开具体版本,这里我们选择2.8.0版):http://kafka.apache.org/downloads
cd /usr/local/
wget https://mirror.bit.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
或
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzvf kafka_2.13-2.8.0.tgz
mv kafka_2.13-2.8.0 kafka && cd kafka
注意:wget如果失效的话,请自行到官网下载。
启动Zookeeper并提前启动好
我们使用之前安装的zk集群。
修改kafka安装目录下的 config/server.properties 文件中 zookeeper.connect 指向zk地址:
zookeeper.connect=172.22.40.104:2181,172.22.40.105:2181,172.22.40.106:2181
修改kafka配置文件
复制3个配置文件
cd /usr/local/kafka/config
cp server.properties server1.properties
cp server.properties server2.properties
cp server.properties server3.properties
修改配置文件中的broker.id分别为1、2、3
listeners这一行取消注释,端口号分别为9093、9094、9095
log.dirs分别设置为kafka-logs1、kafka-logs2、kafka-logs3(先创建):
mkdir -p /tmp/kafka-logs1 /tmp/kafka-logs2 /tmp/kafka-logs3
server1.properties 的配置:
broker.id=1
listeners=PLAINTEXT://172.22.40.107:9093
log.dirs=/tmp/kafka-logs1
num.partitions=1
auto.create.topics.enable=true
delete.topic.enable=true
server2.properties 的配置:
broker.id=2
listeners=PLAINTEXT://172.22.40.107:9094
log.dirs=/tmp/kafka-logs2
num.partitions=1
auto.create.topics.enable=true
delete.topic.enable=true
server3.properties 的配置:
broker.id=3
listeners=PLAINTEXT://172.22.40.107:9095
log.dirs=/tmp/kafka-logs3
num.partitions=1
auto.create.topics.enable=true
delete.topic.enable=true
分别启动3个服务
cd /usr/local/kafka/bin
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
./kafka-server-start.sh -daemon ../config/server3.properties
nohup ./kafka-server-start.sh ../config/server1.properties &
nohup ./kafka-server-start.sh ../config/server2.properties &
nohup ./kafka-server-start.sh ../config/server3.properties &
PS:如果遇到zk node exists的问题,先把brokers节点删掉(临时解决方案)。
集群下创建Topic
在bin目录下,创建一个名为“myschooling”的topic,只有一个副本,一个分区:
sh kafka-topics.sh --create --zookeeper 172.22.40.104:2181 --replication-factor 1 --partitions 1 --topic myschooling
查看已经创建的 topics:
sh kafka-topics.sh -list -zookeeper 172.22.40.104:2181
集群下启动Consumer
在一个新的远程窗口中:
./kafka-console-consumer.sh --bootstrap-server 172.22.40.107:9093,172.22.40.107:9094,172.22.40.107:9095 --topic myschooling
集群下启动Producer
打开一个新的窗口,在kafka解压目录下:
./kafka-console-producer.sh --broker-list 172.22.40.107:9093,172.22.40.107:9094,172.22.40.107:9095 --topic myschooling