2025年4月1日 星期二 乙巳(蛇)年 正月初二 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 服务器 > 万维网络 > 中间件

kafka安装使用教程

时间:06-11来源:作者:点击数:47

1.下载

下载地址:http://kafka.apache.org/downloads

以键头所指版本为例

2.安装

下载的版本已经编译,直接解压到想要的目录就算安装好了

  • tar -zxf kafka_2.11-0.11.0.1.tgz -C /usr/myapp

3.配置单节点

3.1 配置单节点zookeeper

我们使用kafka自带的zookeeper

  • cd /usr/myapp/kafka_2.11 #进入kafka主目录
  • mkdir -p zk/data #创建zookeeper数据存放目录
  • mkdir -p zk/logs #创建zookeeperl存放日志目录
  • cd config #进入配置文件所在目录
  • mv zookeeper.properties zookeeper.properties.bak #将原配置文件移走
  • cat > zookeeper.properties << EOF
  • tickTime=2000
  • dataDir=/usr/myapp/kafka_2.11/zk/data
  • dataLogDir=/usr/myapp/kafka_2.11/zk/logs
  • clientPort=2181
  • EOF

到具体情况时注意修昨dataDir和dataLogDir为自己的相应目录

3.2 配置单结点kafka
  • cd /usr/myapp/kafka_2.11 #进入kafka主目录
  • mkdir logs #创建logs目录用于存放日志
  • cd config #进入配置文件所在目录
  • mv server.properties server.properties.bak #将原配置文件移走
  • cat > server.properties << EOF
  • broker.id=1
  • listeners=PLAINTEXT://192.168.220.128:9092
  • num.network.threads=3
  • num.io.threads=8
  • socket.send.buffer.bytes=102400
  • socket.receive.buffer.bytes=102400
  • socket.request.max.bytes=104857600
  • log.dirs=/usr/myapp/kafka_2.11/logs
  • num.partitions=1
  • num.recovery.threads.per.data.dir=1
  • offsets.topic.replication.factor=1
  • transaction.state.log.replication.factor=1
  • transaction.state.log.min.isr=1
  • log.retention.hours=168
  • log.segment.bytes=1073741824
  • log.retention.check.interval.ms=300000
  • zookeeper.connect=192.168.220.128:2181
  • zookeeper.connection.timeout.ms=6000
  • group.initial.rebalance.delay.ms=0
  • EOF

上边配置的server.properties的内容基本都是原server.properties的默认配置,到自己安装时主要修改:

broker.id--broker的id;修改为任意自己想要的数值(和zookeeper中的id类似的)

listeners--监听址址;修改为kafka要监听的地址

log.dirs--日志文件存放目录;修改为自己要存放日志的目录

zookeeper.connect--zookeeper监听地址;修改为自己的zookeeper的监听地址,如果是集群所有地址全写上用逗号(半角)隔开即可

4.启动和停止

启动前要配置JAVA_HOME,不然无法启动(和tomcat一样虽然输出started了但并有有启,可查看kafkaServer.out)

启动:

  • ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties #启动zookeeper
  • ./kafka-server-start.sh -daemon ../config/server.properties #启动kafka

停止:

  • ./zookeeper-server-stop.sh #停止zookeeper
  • ./kafka-server-stop.sh #停止kafka,centos7上可能关不了用kill -9直接杀掉

查看是否有zookeeper和kafka进程:

  • jps

5.搭建集群

5.1 zookeeper改造
  • cd /usr/myapp/kafka_2.11/config #进入配置文件路径
  • cat >> zookeeper.properties << EOF
  • initLimit=5
  • syncLimit=2
  • server.1=192.168.220.128:2888:3888
  • server.2=192.168.220.129:2888:3888
  • server.3=192.168.220.130:2888:3888
  • EOF
  • echo '1' > ../zk/data/myid #配置zookeeper的myid文件
5.2 kafka改造

编缉server.properties文件,将所有的zookeeper集群地址追加到zookeeper.connect后,每个地址间用逗号(半角)隔开即可

5.3 集群搭建

将上边安装配置好的kafka打包传到其创机器上,然后解压;

对于zookeeper,各机修改myid文件的为不同的值,然后各机重启zookeeper即可。

对于kafka,各机修改server.properties中的broker.id为不同的值listeners修改为本各机IP,然后各机重启kafka即可。

6.测试是否可用

创建topics:

  • ./kafka-topics.sh --create --bootstrap-server 192.168.220.128:9092 --replication-factor 2 --partitions 1 --topic test_topics

查看已存在的topics:

  • ./kafka-topics.sh --list --bootstrap-server 192.168.220.128:9092

在一台上创建生产者:

  • ./kafka-console-producer.sh --bootstrap-server 192.168.220.128:9092 --topic test_topics

在另一台上创建消费者:

  • ./kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092 --topic test_topics --from-beginning

正常的话在生产者中输入的内容,会在消费者端输出。

7.python连接kafka【可选】

先安装python-kafka模块:

  • pip install kafka-python

生产者示例:

  • imoprt time
  • from kafka import KafkaProducer
  • # 单个server也可以直接bootstrap_servers='192.168.220.128:9092',但为了形式统一还是建议都写成列表
  • producer = KafkaProducer(bootstrap_servers=['192.168.220.128:9092'])
  • for index in range(100):
  • producer.send('test_topics', f'some_message_bytes_{index}'.encode())
  • # 消息的发送异步发送的(send返回的是future),为了避免发送线程还没发送主线程就退出导致整个进程退出,这里让主线程休眠一秒
  • # 特别是加上用户名密码后认证会send耗时增加,不加sleep,可能会出现“加上密码后consumer正常,但producer只有在debug模式才能生产消息”的假象
  • time.sleep(1)

消费者示例:

  • from kafka import KafkaConsumer
  • # 单个server也可以直接bootstrap_servers='192.168.220.128:9092',但为了形式统一还是建议都写成列表
  • consumer = KafkaConsumer('test_topics', bootstrap_servers=['192.168.220.128:9092'])
  • # msg是一个ConsumerRecord类,原始消息值是value属性
  • for msg in consumer:
  • print(msg.value.decode())
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门