I. 介绍
Kafka是一种高吞吐量的分布式发布订阅消息系统,KafKa是一款由Apache软件基金会开源,由Scala编写的一个分布式发布订阅消息系统,Kafka最初是由LinkedIn开发,并于2011年初开源,KafKa它最初的目的是为了解决统一、高效低延时、高通量(同时能传输的数据量)并且高可用一个消息平台.
- 官方网站: https://kafka.apache.org/
- Git地址: https://github.com/apache/kafka
- 中文文档: http://kafka.apachecn.org/
- Client:
- Composer: kafka-php
- PECL: php-rdkafka
- Library: enqueue-dev
- 示例:
- 参考阅读:
- 服务
i. 架构
Kafka的架构
消息发送的流程
ii. 角色
- Broker: 对于KafKa集群来说,每一个KafKa实例都被称为一个broker
- Topic(主题): 在KafKa中每一条消息都所属一个Topic下,Topic之间是完全物理隔离的
- Partitions(分区): 一个Topic下面可以拥有一个到多个Partition, Partition也是物理层面的隔离
- Producers(生产者): 向kafka的Topic发布消息
- Consumers(消费者): 向Topic注册,并且接收发布到这些Topic的消息
iii. 特性
- Kafka接收到的消息最终会以文件的形式存在本地保证了,只要消息接受成功理论上就不会丢失
- KafKa通过append来实现消息的追加,保证消息都是有序的有先来后到的顺序
- KafKa集群有良好的容灾机制,比如有N台服务器,可以承受N-1台服务器故障是保证提交的消息不会丢失
- KafKa会更具Topic以及partition来进行消息在本地的物理划分
- KafKa依赖zookeeper实现了offset,你不用关心到你获取了那些消息KafKa会知道并且在你下次获取时接着给你
- 你可以获取任意一个offset的记录
- 消息可以在KafKa内保存很长的时间也可以很短,KafKa基于文件系统能存储消息的容量取决于硬盘空间
- KafKa的性能不会受到消息的数量影响
iii. 使用场景
消息队列(MQ)
KafKa可以代替传统的消息队列软件(阿里的队列软件RocketMQ就是基于KafKa实现的),在队列软件的选择上KafKa已经成了不二之选,使用KafKa来实现队列有如下优点
- KafKa的append来实现消息的追加,保证消息都是有序的有先来后到的顺序,
- 稳定性强队列在使用中最怕丢失数据,KafKa能做到理论上的写成功不丢失
- 分布式容灾好
- 容量大相对于内存队列,KafKa的容量受硬盘影响
- 数据量不会影响到KafKa的速度
分布式日志系统(Log)
在很多时候我们需要对一些庞大的数据进行存留,一些业务型公司可能用不上应为基本可以依靠数据库解决日志的问题,但是服务型公司比如jpush,云监控此类服务,日志存储这块会遇到巨大的问题,日志不能丢,日志存文件不好找,定位一条消息成本高(遍历当天日志文件),实时显示给用户难,这几类问题KafKa都能游刃有余
- KafKa的集群备份机制能做到n/2的可用,当n/2以下的机器宕机时存储的日志不会丢失
- KafKa可以对消息进行分组分片,并且通过offset可以做到获取中间某一条消息(通过算法很容易的到莫个时段的日志)
- KafKa非常容易做到实时日志查询,可以从日志尾部获取需要显示给用户查询的资料即可
数据通道(Messaging)
kafka特有的offset机制能够保证消息至少被获取一次,当程序在获取途中死亡这条消息会被认定为未被消费,下次会继续消费这条消息,此特性使得kafka可以作为一个保障数据传输的通道来使用,但是kafka并没有提供JMS中的”事务性””消息传输担保(消息确认机制)””消息分组”等企业级特性;所以kafka只能使用作为”常规”的消息系统
II. 安装
- https://github.com/alibaba/canal/wiki/Kafka-QuickStart
- http://kafka.apachecn.org/documentation.html#gettingStarted
- https://kafka.apache.org/downloads
- https://github.com/huayang0704/workspace/wiki/KafkaInstall
- https://github.com/huayang0704/workspace/wiki/Kafka
下载安装包
mkdir -p /usr/local/kafka
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz
cp kafka_2.11-2.1.1.tgz /usr/local/kafka
cd /usr/local/kafka/
tar zxvf kafka_2.11-2.1.1.tgz
修改配置文件
vim config/server.properties
zookeeper.connect=127.0.0.1:2181
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092 #本机ip
启动server
Kafka 使用 ZooKeeper 如果你还没有ZooKeeper服务器,你需要先启动一个ZooKeeper服务器
$ ./bin/kafka-server-start.sh -daemon config/server.properties &
# 查看进程(jps: 显示当前所有java进程pid的命令)
$ jps
#28241 CanalLauncher
#22865 QuorumPeerMain
#14753 SimpleCanalClientTest
#867 Application
#25047 Kafka
#4238 Jps
基础操作
### 创建一个 topic
# 让我们创建一个名为“test”的topic,它有一个分区和一个副本:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 运行list(列表)命令来查看这个topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
### 发送一些消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
### 启动一个 consumer
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
异常
错误: 找不到或无法加载主类 kafka.Kafka
下载的是源码包,需要编译。可以下载Binary downloads