搭建zookeeper、kafka集群
1、安装jdk环境变量(略)(3台机器)
export JAVA_HOME=/usr/local/java
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar
2、解压好zookeeper二进制包后、修改配置文件(3台机器)
node1 节点操作
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz -C /usr/local/src/
ln -s /usr/local/src/apache-zookeeper-3.6.0-bin/ /usr/local/zookeeper1
cd /usr/local/zookeeper1/config cp zoo_sample.cfg zoo.cfg
编辑zoo.cfg配置文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper1/data
dataLogDir=/usr/local/zookeeper1/logs
clientPort=2181
server.1=192.168.7.114:2888:3888 server.2=192.168.7.115:2888:3888 server.3=192.168.7.116:2888:3888
保存退出!
cd /usr/local/zookeeper1/data
echo "1" > myid (在第一台机器操作)
node2 节点操作
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz -C /usr/local/src/
ln -s /usr/local/src/apache-zookeeper-3.6.0-bin/ /usr/local/zookeeper2
cd /usr/local/zookeeper2/config
cp zoo_sample.cfg zoo.cfg
编辑zoo.cfg配置文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper2/data
dataLogDir=/usr/local/zookeeper2/logs
clientPort=2181
server.1=192.168.7.114:2888:3888 server.2=192.168.7.115:2888:3888 server.3=192.168.7.116:2888:3888
保存退出!
cd /usr/local/zookeeper2/data echo "2" > myid (在第二台机器操作)
node3节点操作
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz -C /usr/local/src/
ln -s /usr/local/src/apache-zookeeper-3.6.0-bin/ /usr/local/zookeeper3
cd /usr/local/zookeeper3/config
cp zoo_sample.cfg zoo.cfg
编辑zoo.cfg配置文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper3/data
dataLogDir=/usr/local/zookeeper3/logs
clientPort=2181
server.1=192.168.7.114:2888:3888 server.2=192.168.7.115:2888:3888 server.3=192.168.7.116:2888:3888
保存退出!
cd /usr/local/zookeeper1/data echo "3" > myid (在第三台机器操作)
3、三台机器添加环境变量配置
vim /etc/profile
添加一下配置
zookeeper
export ZOOKEEPER_HOME=/usr/local/zookeeper1
export PATH=$ZOOKEEPER_HOME/bin:$PATH
export PATH source /etc/profile
4、然后启动zookeeper服务(三台机器)
zookeeper集群成功搭建
kafka集群
1、解压kafka二进制包(3台机器)
tar -zxvf kafka_2.12-2.6.0.tgz -C /usr/local/src/ ln -s /usr/local/src/kafka_2.12-2.6.0/ /usr/local/kafka1
node1节点操作
2、修改配置文件
vim /usr/local/kafka1/config/server.properties
配置文件如下
broker.id=1
listeners=PLAINTEXT://192.168.7.114:9092
advertised.listeners=PLAINTEXT://192.168.7.114:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
delete.topic.enable=true
log.dirs=/usr/local/kafka1/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.7.114:2181,192.168.7.115:2181,192.168.7.116:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
修改consumer.properties配置文件
bootstrap.servers=192.168.7.114:9092 group.id=test-consumer-group
修改producer.properties配置文件
bootstrap.servers=192.168.7.114:9092 compression.type=none
node2节点操作
vim /usr/local/kafka1/config/server.properties
配置文件如下
broker.id=2
listeners=PLAINTEXT://192.168.7.115:9092
advertised.listeners=PLAINTEXT://192.168.7.115:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
delete.topic.enable=true log.dirs=/usr/local/kafka2/
data num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.7.114:2181,192.168.7.115:2181,192.168.7.116:2181
zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0
修改consumer.properties配置文件
bootstrap.servers=192.168.7.115:9092 group.id=test-consumer-group
修改producer.properties配置文件
bootstrap.servers=192.168.7.115:9092 compression.type=none
node3节点操作
vim /usr/local/kafka1/config/server.properties
配置文件如下
broker.id=3
listeners=PLAINTEXT://192.168.7.116:9092
advertised.listeners=PLAINTEXT://192.168.7.116:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
delete.topic.enable=true
log.dirs=/usr/local/kafka3/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.7.114:2181,192.168.7.115:2181,192.168.7.116:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
修改consumer.properties配置文件
bootstrap.servers=192.168.7.116:9092
group.id=test-consumer-group
修改producer.properties配置文件
bootstrap.servers=192.168.7.116:9092
compression.type=none
3、三台机器启动kafka服务
./kafka-server-start.sh -daemon ../config/server.properties
查看是否配置成功,在随意一台节点创建一个topic,测试其他两台是否会同步topic
常用命令
启动kafka
./kafka-server-start.sh ../config/server.properties
停止kafka
./kafka-server-stop.sh
创建kafka
./kafka-topics.sh --create --zookeeper 192.168.7.114:2181 --replication-factor 3 --partitions 3 --topic test
列出所有的topic
./kafka-topics.sh -list -zookeeper 127.0.0.1:2181
查看topic分区副本详细信息
./kafka-topics.sh --describe --zookeeper 192.168.7.114:2181 --topic test
kafka的消息所在的位置Topic、Partitions、Offsets三个因素决定。
kafka2种消费模式
点对点模式:队列中的一条消息由一个专门的消费者进行消费,消费者受到这条消息并确认后,队列就会删除这条信息,防止重复访问
发布/订阅模式
生产者将数据推送入队列,同一条消息会被所有消费者消费,而消费有两种情况:
消费者主动拉取消息
队列向消费者推送信息