前言
建议先阅读下消息/任务队列,了解下消息队列中间件的宏观理论、概念及取舍
整体来说,本书是对源码的“照本宣科”,提炼的东西不多,试试另外一本书:《learning apache kafka》
Apache Kafka is a distributed streaming platform. What exactly does that mean? A streaming platform has three key capabilities:
- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
- Store streams of records in a fault-tolerant durable way.
- Process streams of records as they occur.
给自己提几个问题
- kafka 将消息保存在磁盘中,在其设计理念中并不惧怕磁盘操作,它以顺序方式读写磁盘。具体如何体现?
- 多面的offset。一个msg写入所有副本后才会consumer 可见(消息commit 成功)。leader / follower 拿到的最新的offset=LEO, 所有副本都拿到的offset = HW
- 一个consumer 消费partition 到哪个offset 是由consumer 自己维护的
书中源码基于0.10.0.1
宏观概念
仅从逻辑概念上看
每个topic包含多个分区,每个分区包含多个副本。作为producer,一个topic消息放入哪个分区,hash一下即可。 《learning apache kafka》every partition is mapped to a logical log file that is represented as a set of segment files of equal sizes. Every partition is an ordered, immutable sequence of messages;
Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息 Partitions are nothing but separate queues in Kafka to make it more scalable. When we increase partitions or we have 1+ number of Partitions it is expected that you run multiple consumers. Ideally number of Consumer should be equal to number of Partitions. 分区相当于把“车道”拓宽了。
整体架构图
细化一下是这样的
代码使用
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
<version>0.8.0</version>
</dependency>
生产者
// 配置属性
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
// 构建Producer
Producer<String, String> producer = new Producer<String, String>(config);
// 构建msg
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, nEvents + "", msg);
// 发送msg
producer.send(data);
// 关闭
producer.close();
消费者
Kafka系列(四)Kafka消费者:从Kafka中读取数据
// 配置属性
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("customerCountries"));
// 拉取循环
try {
while (true) { //1)
ConsumerRecords<String, String> records = consumer.poll(100); //2)
for (ConsumerRecord<String, String> record : records) //3){
log.debug("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close(); //4
}
背景知识
网络通信
kafka-producer/consumer 与zk 通信的部分相对有限,主要是与kafka server交互,通信时使用自定义的协议,一个线程(kafka 服务端一个线程就不够用了)裸调java NIO 进行网络通信。
- producer 使用 NetworkClient 与kafka server 交互
- consumer 使用 ConsumerNetworkClient(聚合了NetworkClient)与kafka server 交互
-
协议对象如下图所示,
org.apache.kafka.common.protocol.ApiKeys
定义了所有 Request/Response类型,FetchXX 是一个具体的例子 -
NetworkClient 发请求比较“委婉” 先send(缓存),最后poll真正开始发请求
- send,Send a new request. Note that the request is not actually transmitted on the network until one of the
poll(long)
variants is invoked. At this point the request will either be transmitted successfully or will fail.Use the returned future to obtain the result of the send. - poll,Poll for any network IO.
- send,Send a new request. Note that the request is not actually transmitted on the network until one of the
传递保证语义(Delivery(guarantee) sematic)
Delivery guarantee 有以下三个级别
- At most once,可以丢,但不能重复
- At least once,不能丢,可能重复
- exactly once,只会传递一次
这三个级别不是一个配置保证的,而是producer 与consumer 配合实现的。比如想实现“exactly once”,可以为每个消息标识唯一id,producer 可能重复发送,而consumer 忽略已经消费过的消息即可。
《learning apache kafka》
- producers and consumers work on the traditional push-and-pull model, where producers push the message to a Kafka broker and consumers pull the message from the broker.
- Log compaction,相同key的value 只会保留最新的
- Message compression in Kafka, For the cases where network bandwidth is a bottleneck, Kafka provides a message group compression feature for efficient message delivery.
- replication modes。Asynchronous replication: as soon as a lead replica writes the message to its local log, it sends the acknowledgement to the message client and does not wait for acknowledgements from follower replicas。Synchronous replication 则反之