Kafka是一种消息队列,具有高吞吐量、消息持久化、高可靠性的优点。
前两者是因为kafka将数据存储到硬盘上,可以处理TB级的海量数据,支持永久保存,并且是顺序存取,速度上并不慢于内存的随机读写。至于可靠性是基于分布式技术,一台机器挂掉还有其他的。
一、kafka关键词
Zookeeper
Zookeeper其实是一个独立的应用,用来管理其他的集群,而kafka就要做集群。kafka依赖于Zookeeper。
Topic:
消息队列实现的方式一般有两种。
点对点通信,比如BlockingQueue,生产者生产的每个数据只能被一个消费者消费;
发布订阅 ,生产者把消息放到某个位置,可以有很多消费者订阅这个位置,同时或先后地读取这里的消息。
kafka采用的是第二种模式,它存放消息的那个位置就用Topic来标识,当然可以设置多个Topic空间来存放不同业务类型的消息。
二、使用命令行检验kafka与zookeeper的简单通信
下载地址:https://kafka.apache.org/downloads (选择 Binary downloads
)
1. 修改配置文件
修改配置文件 config/
zookeeper.properties
的dataDir
参数为自己的路径,用来存储主题数据修改配置文件 config/
server.properties
的log.dirs
参数为自己的路径,用来存储日志数据
两个路径如果不存在,会自动生成。默认是linux的路径写法,如果是windows,则是D:\java\kafka
这种。
2. 开启kafka和zookeeper服务
在根目录下开一个窗口,开启zookeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties # windows |
Ctrl+C
,在根目录下再开一个窗口,开启kafka
bin\windows\kafka-server-start.bat config/server.properties # windows |
3. 创建topic空间
Ctrl+C
,在根目录下开一个窗口,创建topic空间
./bin/windows/kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test # windows |
replication-factor:副本个数
partitions:分区个数
test是主题名
查看已有的主题:
./bin/windows/kafka-topics.bat --list --bootstrap-server localhost:9092 # windows |
4. 消息的发送与监听
在bin/windows目录下再开一个cmd窗口,使用生产者往主题里发送消息
kafka-console-producer.bat --broker-list localhost:9092 --topic test |
出现输入符号,开始输入消息
在bin/windows目录下再开一个cmd窗口,使用消费者从主题里取消息
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning |
from-beginning:从头读取
回车就会出现刚刚输入的消息。可能会有延迟,因为阻塞。
三、spring整合kafka
1. 引入依赖
spring-kafka
2. 配置kafka
配置server和consumer
localhost:9092 =
test-consumer-group =
true =
3000 =
3.使用kafka
生产者:在业务逻辑中主动调用send函数发消息
kafkaTemplate.send(topic, data);
消费者:在handleMessage函数上面加@KafkaListener(topics={“test”}) 注解来被动的接收消息
"test"}) (topics={
public void handleMessage(ConsumerRecord record){
System.out.println(record.value());
}
4.可能的报错
Connection to node -1 could not be established. Broker may not be available.
解决方法:
首先,可能没有开启kafka和zookeeper服务,运行上面那两个命令。
其次,如果kafka在别的机器上,需要打开端口:修改server.properties
文件
原来: |
启动时报内存不足(error=‘Cannot allocate memory‘ (errno=12))
解决方法:
修改 kafka-server-start.sh/bat
和 zookeeper-server-start.sh/bat
中的参数,重新启动。(如果是linux,就是.sh
文件)
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then |
https://www.nowcoder.com/study/live/246/5/5
https://www.nowcoder.com/study/live/246/5/9