Kafka First Touch
graph TB subgraph Kafka Cluster subgraph ZooKeeper Ensemble Z1[ZooKeeper 1] Z2[ZooKeeper 2] Z3[ZooKeeper 3] end subgraph Brokers B1[Broker 1] B2[Broker 2] B3[Broker 3] C[Controller - one type of Broker] end Z1 --- Z2 Z2 --- Z3 Z3 --- Z1 Z1 --> B1 Z2 --> B2 Z3 --> B3 Z1 --> C C --> B1 C --> B2 C --> B3 end P1[Producer 1] --> B1 P2[Producer 2] --> B2 B1 --> Con1[Consumer 1] B2 --> Con2[Consumer 2] B3 --> Con3[Consumer 3] classDef zk fill:#e1d5e7,stroke:#9673a6 classDef broker fill:#dae8fc,stroke:#6c8ebf classDef client fill:#d5e8d4,stroke:#82b366 classDef controller fill:#fff2cc,stroke:#d6b656 class Z1,Z2,Z3 zk class B1,B2,B3 broker class P1,P2,Con1,Con2,Con3 client class C controller
分散式事件串流平台 (Distributed Event Streaming Platform)
核心架構元件
資料流向
擴展性設計
效能優化
Topic
Broker
Producer
Consumer
Practice
Topic to Partition to to Broker are all many-to-many relationships. A topic can have multiple partitions, and a partition can be assigned to multiple brokers.
The Partition has a leader and multiple followers. The leader is responsible for handling read and write requests, while the followers replicate the data from the leader.
The Controller is responsible for managing the broker state and partition assignment. It is elected from the brokers in the cluster and maintains the metadata about the cluster.
The Producer sends messages to a specific topic, and the Consumer reads and processes messages from the topic. The Consumer Group mechanism is used to balance the load among consumers in the same group.
ZooKeeper 作為分散式協調服務在 Kafka 中扮演核心角色:
Controller 與 ZooKeeper 的關係
叢集協調與管理
元數據管理
高可用性保證
Kafka 3.0 正將 zookeeper 移除,將 controller 與 broker 整合,並將 metadata 存儲在 broker 中,這將大幅簡化架構,提升效能,降低維護成本。
The flow of how consumer consumes messages from Kafka:
The consumer subscribes to a topic and starts polling for messages.
The consumer sends a fetch request to the broker to fetch messages from the assigned partitions.
The broker responds with the messages available in the partition.
The consumer processes the messages and commits the offset to the broker.
The consumer continues to poll for new messages.
sequenceDiagram participant App participant KafkaConsumer participant ConsumerNetworkClient participant Fetcher participant Broker App->>KafkaConsumer: new KafkaConsumer(props) App->>KafkaConsumer: subscribe(topics) loop Poll Loop App->>KafkaConsumer: poll(Duration) activate KafkaConsumer KafkaConsumer->>Fetcher: sendFetches() activate Fetcher Fetcher->>ConsumerNetworkClient: send(Node, FetchRequest) activate ConsumerNetworkClient ConsumerNetworkClient->>Broker: FetchRequest Broker-->>ConsumerNetworkClient: FetchResponse ConsumerNetworkClient-->>Fetcher: RequestFuturedeactivate ConsumerNetworkClient Fetcher->>Fetcher: handleFetchSuccess() Fetcher-->>KafkaConsumer: ConsumerRecords deactivate Fetcher KafkaConsumer-->>App: ConsumerRecords deactivate KafkaConsumer App->>App: process records end App->>KafkaConsumer: close()
The data_model and behavior of Fetcher in Kafka Consumer:
classDiagram class Fetcher { -ConsumerNetworkClient client -FetchCollector fetchCollector -FetchBuffer fetchBuffer -Logger log -Time time -ApiVersions apiVersions +sendFetches() int +collectFetch() Fetch~K,V~ +close(Timer timer) #closeInternal(Timer timer) -handleFetchSuccess(Node, FetchRequestData, ClientResponse) -handleFetchFailure(Node, FetchRequestData, RuntimeException) -sendFetchesInternal(Map~Node,FetchRequestData~ requests) -createFetchRequest(Node, FetchRequestData) +clearBufferedDataForUnassignedPartitions(Collection~TopicPartition~) } class Node { -int id -String host -int port -String rack +idString() String +hasRack() boolean } class FetchSessionHandler { -int sessionId -int epoch +FetchRequestData build() +handleResponse(FetchResponse) } class FetchCollector { -Deserializers~K,V~ deserializers -FetchMetricsManager metricsManager +collectFetch(FetchBuffer) Fetch~K,V~ -parseRecord(ByteBuffer) ConsumerRecord~K,V~ } class ConsumerNetworkClient { -KafkaClient client -Time time +send(Node, Request) RequestFuture +poll(Timer, PollCondition) -trySend(long now) } Fetcher --> Node : sends requests to Fetcher --> FetchSessionHandler : manages sessions Fetcher --> FetchCollector : collects records Fetcher --> ConsumerNetworkClient : network operations
Sequence Diagram with detailed operations:
sequenceDiagram participant Consumer as KafkaConsumer participant Fetcher participant Network as ConsumerNetworkClient participant Session as FetchSessionHandler participant Node participant Broker Note over Consumer,Broker: Fetch Operation Start Consumer->>Fetcher: sendFetches() activate Fetcher Note over Fetcher: Prepare fetch requests for each node Fetcher->>Fetcher: prepareFetchRequests() loop For each Node with pending fetches Fetcher->>Session: build() Note over Session: Creates fetch session data
with epoch and session ID Session-->>Fetcher: FetchRequestData Fetcher->>Network: send(Node, FetchRequest) activate Network Note over Network: Attempts to send request
if node is ready Network->>Node: checkReady() alt Node Ready Node-->>Network: true Network->>Broker: FetchRequest Note over Broker: Process fetch request
with session handling Broker-->>Network: FetchResponse alt Success Response Network->>Fetcher: handleFetchSuccess(Node, Data, Response) Note over Fetcher: Updates fetch positions
Processes received records Fetcher->>Session: handleResponse(FetchResponse) Note over Session: Updates session state
Validates epoch else Failure Response Network->>Fetcher: handleFetchFailure(Node, Data, Exception) Note over Fetcher: Handles errors
Updates metrics end else Node Not Ready Node-->>Network: false Note over Network: Request queued for retry end deactivate Network end Consumer->>Fetcher: collectFetch() activate Fetcher Note over Fetcher: Collects accumulated records Fetcher->>Consumer: Fetchdeactivate Fetcher Note over Consumer,Broker: Fetch Operation Complete
detailed about prepareFetchRequests():
sequenceDiagram participant Client participant AbstractFetch participant MetricsManager participant SubscriptionState participant Metadata participant FetchSessionHandler Client->>AbstractFetch: prepareFetchRequests() AbstractFetch->>MetricsManager: maybeUpdateAssignment(subscriptions) AbstractFetch->>AbstractFetch: fetchablePartitions() loop For each fetchable partition AbstractFetch->>SubscriptionState: position(partition) AbstractFetch->>Metadata: fetch() for leader info AbstractFetch->>AbstractFetch: selectReadReplica(partition, leader, currentTimeMs) alt Node is available & no pending requests AbstractFetch->>FetchSessionHandler: newBuilder() AbstractFetch->>Metadata: topicIds() AbstractFetch->>FetchSessionHandler: builder.add(partition, partitionData) else Node unavailable or has pending requests Note over AbstractFetch: Skip fetch for this partition end end AbstractFetch-->>Client: Return Map
Key Features Explained:
Node Management:
Fetch Session Handling:
Network Operations:
Data Collection:
Key Method Descriptions:
Record Processing Flow:
sequenceDiagram participant Consumer as KafkaConsumer participant Fetcher participant FC as FetchCollector participant FB as FetchBuffer participant Deserializer Note over Consumer,Deserializer: Record Processing Flow Consumer->>Fetcher: poll(Duration) activate Fetcher Fetcher->>FB: addFetchedData(PartitionData) activate FB Note over FB: Stores raw records
by TopicPartition FB-->>Fetcher: completed deactivate FB Fetcher->>FC: collectFetch(FetchBuffer) activate FC loop For each TopicPartition FC->>FB: getRecords(TopicPartition) FB-->>FC: raw records loop For each Record FC->>Deserializer: deserializeKey(byte[]) Deserializer-->>FC: key object FC->>Deserializer: deserializeValue(byte[]) Deserializer-->>FC: value object FC->>FC: createConsumerRecord(...) Note over FC: Creates record with:
- topic, partition, offset
- timestamp, headers
- key, value end end FC-->>Fetcher: ConsumerRecordsdeactivate FC Fetcher-->>Consumer: ConsumerRecords deactivate Fetcher
Detailed Explanation:
// In Fetcher class
public Fetch<K, V> collectFetch() {
return fetchCollector.collectFetch(fetchBuffer);
}
// In FetchCollector class
public ConsumerRecords<K, V> collectFetch(FetchBuffer buffer) {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
for (TopicPartition partition : buffer.partitions()) {
// Get raw records for partition
Records rawRecords = buffer.getRecords(partition);
// Process each record
for (Record raw : rawRecords) {
// Deserialize key and value
K key = keyDeserializer.deserialize(raw.key());
V value = valueDeserializer.deserialize(raw.value());
// Create ConsumerRecord with metadata
ConsumerRecord<K, V> record = new ConsumerRecord<>(
partition.topic(),
partition.partition(),
raw.offset(),
raw.timestamp(),
raw.timestampType(),
raw.keySize(),
raw.valueSize(),
key,
value,
raw.headers(),
Optional.of(raw.leaderEpoch())
);
records.computeIfAbsent(partition, p -> new ArrayList<>())
.add(record);
}
}
return new ConsumerRecords<>(records);
}
Key Aspects:
Node Management:
Node 在 Kafka 中代表一個 Broker 節點,而不是 Controller
Node 是一個基礎抽象,代表叢集中的任何 Broker
Controller 是一個特殊的角色,由其中一個 Broker 擔任
Consumer 只與一般的 Broker Node 互動,不直接與 Controller 通訊
Controller 相關的協調工作由 Broker 內部處理
classDiagram class Node { -int id -String host -int port -String rack +idString() String +hasRack() boolean +isConnected() boolean } class Fetcher { -Map~Node, FetchSessionHandler~ sessions -ConsumerNetworkClient client +sendFetches() int -prepareFetchRequests() Map~Node, RequestData~ -handleNodeState(Node) } class ConsumerNetworkClient { -Map~Node, ConnectionState~ connections +ready(Node, long) boolean +connectionFailed(Node) boolean +tryConnect(Node) -handleDisconnection(Node) } class FetchSessionHandler { -int sessionId -Node node -int epoch +build() FetchRequestData +handleResponse(FetchResponse) } Fetcher --> Node Fetcher --> ConsumerNetworkClient Fetcher --> FetchSessionHandler
Detailed Explanation:
// In Fetcher class
private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = new HashMap<>();
// Get list of nodes we need to fetch from
for (TopicPartition partition : subscriptions.fetchablePartitions()) {
Node node = metadata.currentLeader(partition);
if (node == null) {
// Handle missing leader
continue;
}
// Check node state
if (client.isUnavailable(node)) {
// Node is in backoff period after failure
continue;
}
if (!client.ready(node, time.milliseconds())) {
// Connection not ready, may need to initiate
client.tryConnect(node);
continue;
}
// Get or create fetch session for node
FetchSessionHandler session = sessions.computeIfAbsent(
node,
n -> new FetchSessionHandler(logContext, n)
);
// Build fetch request for this node
FetchSessionHandler.FetchRequestData data = session.build(
fetchConfig,
partition,
nextFetchOffset(partition)
);
fetchRequestMap.put(node, data);
}
return fetchRequestMap;
}
Key Aspects:
the difference between Node and Broker in Kafka Implementation:
classDiagram class Node { -int id -String host -int port -String rack +idString() String +hasRack() boolean +isConnected() boolean } class Broker { -int id -Node node -Map~String,Object~ config -List~String~ endpoints -Set~String~ roles +rack() Optional~String~ +isEmpty() boolean +hasMoved(Node) boolean } class KafkaCluster { -List~Broker~ brokers -Map~Integer,Node~ nodes } Broker --> Node : contains KafkaCluster --> Broker : manages
Key Differences:
A lightweight representation of network endpoint
Contains basic connection information:
public class Node {
private final int id;
private final String host;
private final int port;
private final String rack;
// Used for network connections
}
Used primarily by the client for network operations
Represents any network endpoint (could be broker or controller)
Broker:
A full Kafka broker instance
Contains complete broker configuration and state:
public class Broker {
private final int id;
private final Node node;
private final Map<String, Object> config;
private final List<String> endpoints;
private final Set<String> roles;
// Contains complete broker metadata
}
Includes additional metadata like:
Usage Example
// In Fetcher class
private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = new HashMap<>();
for (TopicPartition partition : subscriptions.fetchablePartitions()) {
// Gets Node (network endpoint) for the broker leading this partition
Node node = metadata.currentLeader(partition);
if (node == null) {
// Handle missing leader
continue;
}
// Network operations use Node
if (client.isUnavailable(node)) {
continue;
}
// Fetch sessions are maintained per Node
FetchSessionHandler session = sessions.computeIfAbsent(
node,
n -> new FetchSessionHandler(logContext, n)
);
// Build fetch request for this node
fetchRequestMap.put(node, session.build());
}
return fetchRequestMap;
}
Interaction Flow:
sequenceDiagram participant Client participant AbstractFetch participant LocalMetadataCache participant KafkaBroker participant ZK_or_Controller Note over ZK_or_Controller: Maintains authoritative
cluster metadata Note over KafkaBroker: Gets metadata from
ZK/Controller Note over LocalMetadataCache: Client's local cache
of cluster metadata Client->>AbstractFetch: prepareFetchRequests() AbstractFetch->>LocalMetadataCache: fetch() for leader info alt Metadata is stale or missing LocalMetadataCache->>KafkaBroker: Request metadata update KafkaBroker->>ZK_or_Controller: Get latest metadata ZK_or_Controller-->>KafkaBroker: Return metadata KafkaBroker-->>LocalMetadataCache: Update metadata end LocalMetadataCache-->>AbstractFetch: Return cached metadata
Key Points:
Node is used for:
Broker is used for:
In Consumer Context:
The decision to get metadata from ZooKeeper vs. Controller is based on whether the Kafka cluster is running in legacy mode (ZooKeeper-based) or KRaft (ZooKeeper-less) mode. This isn't decided at runtime - it's determined by how the Kafka cluster is configured when it starts up.
sequenceDiagram participant Client participant Broker participant Controller participant ZooKeeper alt KRaft Mode (KIP-500) Note over Broker,Controller: Cluster started with
controller.quorum.voters set Client->>Broker: Metadata Request Broker->>Controller: Get metadata Controller-->>Broker: Return metadata Broker-->>Client: Metadata Response else Legacy Mode (ZooKeeper-based) Note over Broker,ZooKeeper: Cluster started with
zookeeper.connect set Client->>Broker: Metadata Request Broker->>ZooKeeper: Get metadata ZooKeeper-->>Broker: Return metadata Broker-->>Client: Metadata Response end
The key configuration that determines this:
KRaft Mode (Controller-based):
controller.quorum.voters
Legacy Mode (ZooKeeper-based):
zookeeper.connect
The client code doesn't need to know or decide which mode is being used - it simply makes metadata requests to brokers, and the brokers handle getting the metadata from the appropriate source based on how they were configured.
This is part of Kafka's evolution to remove the ZooKeeper dependency (KIP-500), making the system simpler to deploy and maintain.
How brokers decide which broker is the leader for a partition:
Initial Leader Selection:
Leader Election Process:
sequenceDiagram participant Controller participant ISR as In-Sync Replicas participant Broker Note over Controller: Detect leader failure or
need for new leader Controller->>ISR: Get list of eligible replicas alt Has eligible ISR Controller->>Controller: Select first replica from ISR Controller->>Broker: Notify new leader Broker-->>Controller: Accept leadership else No eligible ISR alt unclean.leader.election.enable=true Controller->>Controller: Select from all replicas else unclean.leader.election.enable=false Controller->>Controller: Keep partition offline end end Controller->>Broker: Broadcast metadata update
Leadership Eligibility:
Failure Handling:
Configuration Factors: