沒什麼組織、一些倏忽即逝有趣或不有趣的想法
Kafka First Touch
分散式事件串流平台 (Distributed Event Streaming Platform)
核心架構元件
資料流向
擴展性設計
效能優化
Topic
Broker
Producer
Consumer
Practice
Topic to Partition to to Broker are all many-to-many relationships. A topic can have multiple partitions, and a partition can be assigned to multiple brokers.
The Partition has a leader and multiple followers. The leader is responsible for handling read and write requests, while the followers replicate the data from the leader.
The Controller is responsible for managing the broker state and partition assignment. It is elected from the brokers in the cluster and maintains the metadata about the cluster.
The Producer sends messages to a specific topic, and the Consumer reads and processes messages from the topic. The Consumer Group mechanism is used to balance the load among consumers in the same group.
ZooKeeper 作為分散式協調服務在 Kafka 中扮演核心角色:
Controller 與 ZooKeeper 的關係
叢集協調與管理
元數據管理
高可用性保證
Kafka 3.0 正將 zookeeper 移除,將 controller 與 broker 整合,並將 metadata 存儲在 broker 中,這將大幅簡化架構,提升效能,降低維護成本。
The flow of how consumer consumes messages from Kafka:
The consumer subscribes to a topic and starts polling for messages.
The consumer sends a fetch request to the broker to fetch messages from the assigned partitions.
The broker responds with the messages available in the partition.
The consumer processes the messages and commits the offset to the broker.
The consumer continues to poll for new messages.
The data_model and behavior of Fetcher in Kafka Consumer:
Sequence Diagram with detailed operations:
detailed about prepareFetchRequests():
Key Features Explained:
Node Management:
Fetch Session Handling:
Network Operations:
Data Collection:
Key Method Descriptions:
Record Processing Flow:
Detailed Explanation:
// In Fetcher class
public Fetch<K, V> collectFetch() {
return fetchCollector.collectFetch(fetchBuffer);
}
// In FetchCollector class
public ConsumerRecords<K, V> collectFetch(FetchBuffer buffer) {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
for (TopicPartition partition : buffer.partitions()) {
// Get raw records for partition
Records rawRecords = buffer.getRecords(partition);
// Process each record
for (Record raw : rawRecords) {
// Deserialize key and value
K key = keyDeserializer.deserialize(raw.key());
V value = valueDeserializer.deserialize(raw.value());
// Create ConsumerRecord with metadata
ConsumerRecord<K, V> record = new ConsumerRecord<>(
partition.topic(),
partition.partition(),
raw.offset(),
raw.timestamp(),
raw.timestampType(),
raw.keySize(),
raw.valueSize(),
key,
value,
raw.headers(),
Optional.of(raw.leaderEpoch())
);
records.computeIfAbsent(partition, p -> new ArrayList<>())
.add(record);
}
}
return new ConsumerRecords<>(records);
}
Key Aspects:
Node Management:
Node 在 Kafka 中代表一個 Broker 節點,而不是 Controller
Node 是一個基礎抽象,代表叢集中的任何 Broker
Controller 是一個特殊的角色,由其中一個 Broker 擔任
Consumer 只與一般的 Broker Node 互動,不直接與 Controller 通訊
Controller 相關的協調工作由 Broker 內部處理
Detailed Explanation:
// In Fetcher class
private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = new HashMap<>();
// Get list of nodes we need to fetch from
for (TopicPartition partition : subscriptions.fetchablePartitions()) {
Node node = metadata.currentLeader(partition);
if (node == null) {
// Handle missing leader
continue;
}
// Check node state
if (client.isUnavailable(node)) {
// Node is in backoff period after failure
continue;
}
if (!client.ready(node, time.milliseconds())) {
// Connection not ready, may need to initiate
client.tryConnect(node);
continue;
}
// Get or create fetch session for node
FetchSessionHandler session = sessions.computeIfAbsent(
node,
n -> new FetchSessionHandler(logContext, n)
);
// Build fetch request for this node
FetchSessionHandler.FetchRequestData data = session.build(
fetchConfig,
partition,
nextFetchOffset(partition)
);
fetchRequestMap.put(node, data);
}
return fetchRequestMap;
}
Key Aspects:
the difference between Node and Broker in Kafka Implementation:
Key Differences:
A lightweight representation of network endpoint
Contains basic connection information:
public class Node {
private final int id;
private final String host;
private final int port;
private final String rack;
// Used for network connections
}
Used primarily by the client for network operations
Represents any network endpoint (could be broker or controller)
Broker:
A full Kafka broker instance
Contains complete broker configuration and state:
public class Broker {
private final int id;
private final Node node;
private final Map<String, Object> config;
private final List<String> endpoints;
private final Set<String> roles;
// Contains complete broker metadata
}
Includes additional metadata like:
Usage Example
// In Fetcher class
private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = new HashMap<>();
for (TopicPartition partition : subscriptions.fetchablePartitions()) {
// Gets Node (network endpoint) for the broker leading this partition
Node node = metadata.currentLeader(partition);
if (node == null) {
// Handle missing leader
continue;
}
// Network operations use Node
if (client.isUnavailable(node)) {
continue;
}
// Fetch sessions are maintained per Node
FetchSessionHandler session = sessions.computeIfAbsent(
node,
n -> new FetchSessionHandler(logContext, n)
);
// Build fetch request for this node
fetchRequestMap.put(node, session.build());
}
return fetchRequestMap;
}
Interaction Flow:
Key Points:
Node is used for:
Broker is used for:
In Consumer Context:
The decision to get metadata from ZooKeeper vs. Controller is based on whether the Kafka cluster is running in legacy mode (ZooKeeper-based) or KRaft (ZooKeeper-less) mode. This isn't decided at runtime - it's determined by how the Kafka cluster is configured when it starts up.
The key configuration that determines this:
KRaft Mode (Controller-based):
controller.quorum.voters
Legacy Mode (ZooKeeper-based):
zookeeper.connect
The client code doesn't need to know or decide which mode is being used - it simply makes metadata requests to brokers, and the brokers handle getting the metadata from the appropriate source based on how they were configured.
This is part of Kafka's evolution to remove the ZooKeeper dependency (KIP-500), making the system simpler to deploy and maintain.
How brokers decide which broker is the leader for a partition:
Initial Leader Selection:
Leader Election Process:
Leadership Eligibility:
Failure Handling:
Configuration Factors: