Kafka是一种消息队列,具有高吞吐量、消息持久化、高可靠性的优点。

前两者是因为kafka将数据存储到硬盘上,可以处理TB级的海量数据,支持永久保存,并且是顺序存取,速度上并不慢于内存的随机读写。至于可靠性是基于分布式技术,一台机器挂掉还有其他的。




一、kafka关键词

Zookeeper

Zookeeper其实是一个独立的应用,用来管理其他的集群,而kafka就要做集群。kafka依赖于Zookeeper。



Topic:

消息队列实现的方式一般有两种。

  • 点对点通信,比如BlockingQueue,生产者生产的每个数据只能被一个消费者消费;

  • 发布订阅 ,生产者把消息放到某个位置,可以有很多消费者订阅这个位置,同时或先后地读取这里的消息。

kafka采用的是第二种模式,它存放消息的那个位置就用Topic来标识,当然可以设置多个Topic空间来存放不同业务类型的消息。




二、使用命令行检验kafka与zookeeper的简单通信

下载地址:https://kafka.apache.org/downloads (选择 Binary downloads


1. 修改配置文件

  • 修改配置文件 config/zookeeper.propertiesdataDir 参数为自己的路径,用来存储主题数据

  • 修改配置文件 config/server.propertieslog.dirs 参数为自己的路径,用来存储日志数据

两个路径如果不存在,会自动生成。默认是linux的路径写法,如果是windows,则是D:\java\kafka这种。



2. 开启kafka和zookeeper服务

在根目录下开一个窗口,开启zookeeper

bin\windows\zookeeper-server-start.bat config\zookeeper.properties   # windows


nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties & # linux


Ctrl+C,在根目录下再开一个窗口,开启kafka

bin\windows\kafka-server-start.bat config/server.properties   # windows

nohup ./bin/kafka-server-start.sh ./config/server.properties & # linux



3. 创建topic空间

Ctrl+C,在根目录下开一个窗口,创建topic空间

./bin/windows/kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1  --partitions 1  --topic test   # windows

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test # linux

replication-factor:副本个数

partitions:分区个数

test是主题名


查看已有的主题:

./bin/windows/kafka-topics.bat --list  --bootstrap-server localhost:9092   # windows

./bin/kafka-topics.sh --list --bootstrap-server localhost:9092 # linux



4. 消息的发送与监听

在bin/windows目录下再开一个cmd窗口,使用生产者往主题里发送消息

kafka-console-producer.bat --broker-list localhost:9092 --topic test

出现输入符号,开始输入消息


在bin/windows目录下再开一个cmd窗口,使用消费者从主题里取消息

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test  --from-beginning

from-beginning:从头读取

回车就会出现刚刚输入的消息。可能会有延迟,因为阻塞。




三、spring整合kafka

1. 引入依赖

  • spring-kafka



2. 配置kafka

  • 配置server和consumer

    spring.kafka.bootstrap-servers=localhost:9092
    spring.kafka.consumer.group-id=test-consumer-group
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=3000



3.使用kafka

  • 生产者:在业务逻辑中主动调用send函数发消息

    kafkaTemplate.send(topic, data);


  • 消费者:在handleMessage函数上面加@KafkaListener(topics={“test”}) 注解来被动的接收消息

    @KafkaListener(topics={"test"})
    public void handleMessage(ConsumerRecord record){
    System.out.println(record.value());
    }



4.可能的报错

Connection to node -1 could not be established. Broker may not be available.

解决方法:

首先,可能没有开启kafka和zookeeper服务,运行上面那两个命令。

其次,如果kafka在别的机器上,需要打开端口:修改server.properties文件

原来:
#listeners=PLAINTEXT://:9092
#advertised.listeners=PLAINTEXT://your.host.name:9092

修改为:
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092



启动时报内存不足(error=‘Cannot allocate memory‘ (errno=12))

解决方法:

修改 kafka-server-start.sh/batzookeeper-server-start.sh/bat 中的参数,重新启动。(如果是linux,就是.sh文件)

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
fi




https://www.nowcoder.com/study/live/246/5/5
https://www.nowcoder.com/study/live/246/5/9