沒什麼組織、一些倏忽即逝有趣或不有趣的想法
Kafka First Touch
graph TB
subgraph Kafka Cluster
subgraph ZooKeeper Ensemble
Z1[ZooKeeper 1]
Z2[ZooKeeper 2]
Z3[ZooKeeper 3]
end
subgraph Brokers
B1[Broker 1]
B2[Broker 2]
B3[Broker 3]
C[Controller - one type of Broker]
end
Z1 --- Z2
Z2 --- Z3
Z3 --- Z1
Z1 --> B1
Z2 --> B2
Z3 --> B3
Z1 --> C
C --> B1
C --> B2
C --> B3
end
P1[Producer 1] --> B1
P2[Producer 2] --> B2
B1 --> Con1[Consumer 1]
B2 --> Con2[Consumer 2]
B3 --> Con3[Consumer 3]
classDef zk fill:#e1d5e7,stroke:#9673a6
classDef broker fill:#dae8fc,stroke:#6c8ebf
classDef client fill:#d5e8d4,stroke:#82b366
classDef controller fill:#fff2cc,stroke:#d6b656
class Z1,Z2,Z3 zk
class B1,B2,B3 broker
class P1,P2,Con1,Con2,Con3 client
class C controller分散式事件串流平台 (Distributed Event Streaming Platform)
核心架構元件
資料流向
擴展性設計
效能優化
Topic
Broker
Producer
Consumer
Practice
Topic to Partition to to Broker are all many-to-many relationships. A topic can have multiple partitions, and a partition can be assigned to multiple brokers.
The Partition has a leader and multiple followers. The leader is responsible for handling read and write requests, while the followers replicate the data from the leader.
The Controller is responsible for managing the broker state and partition assignment. It is elected from the brokers in the cluster and maintains the metadata about the cluster.
The Producer sends messages to a specific topic, and the Consumer reads and processes messages from the topic. The Consumer Group mechanism is used to balance the load among consumers in the same group.
ZooKeeper 作為分散式協調服務在 Kafka 中扮演核心角色:
Controller 與 ZooKeeper 的關係
叢集協調與管理
元數據管理
高可用性保證
Kafka 3.0 正將 zookeeper 移除,將 controller 與 broker 整合,並將 metadata 存儲在 broker 中,這將大幅簡化架構,提升效能,降低維護成本。
The flow of how consumer consumes messages from Kafka:
The consumer subscribes to a topic and starts polling for messages.
The consumer sends a fetch request to the broker to fetch messages from the assigned partitions.
The broker responds with the messages available in the partition.
The consumer processes the messages and commits the offset to the broker.
The consumer continues to poll for new messages.
sequenceDiagram
participant App
participant KafkaConsumer
participant ConsumerNetworkClient
participant Fetcher
participant Broker
App->>KafkaConsumer: new KafkaConsumer(props)
App->>KafkaConsumer: subscribe(topics)
loop Poll Loop
App->>KafkaConsumer: poll(Duration)
activate KafkaConsumer
KafkaConsumer->>Fetcher: sendFetches()
activate Fetcher
Fetcher->>ConsumerNetworkClient: send(Node, FetchRequest)
activate ConsumerNetworkClient
ConsumerNetworkClient->>Broker: FetchRequest
Broker-->>ConsumerNetworkClient: FetchResponse
ConsumerNetworkClient-->>Fetcher: RequestFuture
deactivate ConsumerNetworkClient
Fetcher->>Fetcher: handleFetchSuccess()
Fetcher-->>KafkaConsumer: ConsumerRecords
deactivate Fetcher
KafkaConsumer-->>App: ConsumerRecords
deactivate KafkaConsumer
App->>App: process records
end
App->>KafkaConsumer: close() The data_model and behavior of Fetcher in Kafka Consumer:
classDiagram
class Fetcher {
-ConsumerNetworkClient client
-FetchCollector fetchCollector
-FetchBuffer fetchBuffer
-Logger log
-Time time
-ApiVersions apiVersions
+sendFetches() int
+collectFetch() Fetch~K,V~
+close(Timer timer)
#closeInternal(Timer timer)
-handleFetchSuccess(Node, FetchRequestData, ClientResponse)
-handleFetchFailure(Node, FetchRequestData, RuntimeException)
-sendFetchesInternal(Map~Node,FetchRequestData~ requests)
-createFetchRequest(Node, FetchRequestData)
+clearBufferedDataForUnassignedPartitions(Collection~TopicPartition~)
}
class Node {
-int id
-String host
-int port
-String rack
+idString() String
+hasRack() boolean
}
class FetchSessionHandler {
-int sessionId
-int epoch
+FetchRequestData build()
+handleResponse(FetchResponse)
}
class FetchCollector {
-Deserializers~K,V~ deserializers
-FetchMetricsManager metricsManager
+collectFetch(FetchBuffer) Fetch~K,V~
-parseRecord(ByteBuffer) ConsumerRecord~K,V~
}
class ConsumerNetworkClient {
-KafkaClient client
-Time time
+send(Node, Request) RequestFuture
+poll(Timer, PollCondition)
-trySend(long now)
}
Fetcher --> Node : sends requests to
Fetcher --> FetchSessionHandler : manages sessions
Fetcher --> FetchCollector : collects records
Fetcher --> ConsumerNetworkClient : network operationsSequence Diagram with detailed operations:
sequenceDiagram
participant Consumer as KafkaConsumer
participant Fetcher
participant Network as ConsumerNetworkClient
participant Session as FetchSessionHandler
participant Node
participant Broker
Note over Consumer,Broker: Fetch Operation Start
Consumer->>Fetcher: sendFetches()
activate Fetcher
Note over Fetcher: Prepare fetch requests for each node
Fetcher->>Fetcher: prepareFetchRequests()
loop For each Node with pending fetches
Fetcher->>Session: build()
Note over Session: Creates fetch session data
with epoch and session ID
Session-->>Fetcher: FetchRequestData
Fetcher->>Network: send(Node, FetchRequest)
activate Network
Note over Network: Attempts to send request
if node is ready
Network->>Node: checkReady()
alt Node Ready
Node-->>Network: true
Network->>Broker: FetchRequest
Note over Broker: Process fetch request
with session handling
Broker-->>Network: FetchResponse
alt Success Response
Network->>Fetcher: handleFetchSuccess(Node, Data, Response)
Note over Fetcher: Updates fetch positions
Processes received records
Fetcher->>Session: handleResponse(FetchResponse)
Note over Session: Updates session state
Validates epoch
else Failure Response
Network->>Fetcher: handleFetchFailure(Node, Data, Exception)
Note over Fetcher: Handles errors
Updates metrics
end
else Node Not Ready
Node-->>Network: false
Note over Network: Request queued for retry
end
deactivate Network
end
Consumer->>Fetcher: collectFetch()
activate Fetcher
Note over Fetcher: Collects accumulated records
Fetcher->>Consumer: Fetch
deactivate Fetcher
Note over Consumer,Broker: Fetch Operation Complete detailed about prepareFetchRequests():
sequenceDiagram
participant Client
participant AbstractFetch
participant MetricsManager
participant SubscriptionState
participant Metadata
participant FetchSessionHandler
Client->>AbstractFetch: prepareFetchRequests()
AbstractFetch->>MetricsManager: maybeUpdateAssignment(subscriptions)
AbstractFetch->>AbstractFetch: fetchablePartitions()
loop For each fetchable partition
AbstractFetch->>SubscriptionState: position(partition)
AbstractFetch->>Metadata: fetch() for leader info
AbstractFetch->>AbstractFetch: selectReadReplica(partition, leader, currentTimeMs)
alt Node is available & no pending requests
AbstractFetch->>FetchSessionHandler: newBuilder()
AbstractFetch->>Metadata: topicIds()
AbstractFetch->>FetchSessionHandler: builder.add(partition, partitionData)
else Node unavailable or has pending requests
Note over AbstractFetch: Skip fetch for this partition
end
end
AbstractFetch-->>Client: Return MapKey Features Explained:
Node Management:
Fetch Session Handling:
Network Operations:
Data Collection:
Key Method Descriptions:
Record Processing Flow:
sequenceDiagram
participant Consumer as KafkaConsumer
participant Fetcher
participant FC as FetchCollector
participant FB as FetchBuffer
participant Deserializer
Note over Consumer,Deserializer: Record Processing Flow
Consumer->>Fetcher: poll(Duration)
activate Fetcher
Fetcher->>FB: addFetchedData(PartitionData)
activate FB
Note over FB: Stores raw records
by TopicPartition
FB-->>Fetcher: completed
deactivate FB
Fetcher->>FC: collectFetch(FetchBuffer)
activate FC
loop For each TopicPartition
FC->>FB: getRecords(TopicPartition)
FB-->>FC: raw records
loop For each Record
FC->>Deserializer: deserializeKey(byte[])
Deserializer-->>FC: key object
FC->>Deserializer: deserializeValue(byte[])
Deserializer-->>FC: value object
FC->>FC: createConsumerRecord(...)
Note over FC: Creates record with:
- topic, partition, offset
- timestamp, headers
- key, value
end
end
FC-->>Fetcher: ConsumerRecords
deactivate FC
Fetcher-->>Consumer: ConsumerRecords
deactivate Fetcher Detailed Explanation:
// In Fetcher class
public Fetch<K, V> collectFetch() {
return fetchCollector.collectFetch(fetchBuffer);
}
// In FetchCollector class
public ConsumerRecords<K, V> collectFetch(FetchBuffer buffer) {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
for (TopicPartition partition : buffer.partitions()) {
// Get raw records for partition
Records rawRecords = buffer.getRecords(partition);
// Process each record
for (Record raw : rawRecords) {
// Deserialize key and value
K key = keyDeserializer.deserialize(raw.key());
V value = valueDeserializer.deserialize(raw.value());
// Create ConsumerRecord with metadata
ConsumerRecord<K, V> record = new ConsumerRecord<>(
partition.topic(),
partition.partition(),
raw.offset(),
raw.timestamp(),
raw.timestampType(),
raw.keySize(),
raw.valueSize(),
key,
value,
raw.headers(),
Optional.of(raw.leaderEpoch())
);
records.computeIfAbsent(partition, p -> new ArrayList<>())
.add(record);
}
}
return new ConsumerRecords<>(records);
}
Key Aspects:
Node Management:
Node 在 Kafka 中代表一個 Broker 節點,而不是 Controller
Node 是一個基礎抽象,代表叢集中的任何 Broker
Controller 是一個特殊的角色,由其中一個 Broker 擔任
Consumer 只與一般的 Broker Node 互動,不直接與 Controller 通訊
Controller 相關的協調工作由 Broker 內部處理
classDiagram
class Node {
-int id
-String host
-int port
-String rack
+idString() String
+hasRack() boolean
+isConnected() boolean
}
class Fetcher {
-Map~Node, FetchSessionHandler~ sessions
-ConsumerNetworkClient client
+sendFetches() int
-prepareFetchRequests() Map~Node, RequestData~
-handleNodeState(Node)
}
class ConsumerNetworkClient {
-Map~Node, ConnectionState~ connections
+ready(Node, long) boolean
+connectionFailed(Node) boolean
+tryConnect(Node)
-handleDisconnection(Node)
}
class FetchSessionHandler {
-int sessionId
-Node node
-int epoch
+build() FetchRequestData
+handleResponse(FetchResponse)
}
Fetcher --> Node
Fetcher --> ConsumerNetworkClient
Fetcher --> FetchSessionHandlerDetailed Explanation:
// In Fetcher class
private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = new HashMap<>();
// Get list of nodes we need to fetch from
for (TopicPartition partition : subscriptions.fetchablePartitions()) {
Node node = metadata.currentLeader(partition);
if (node == null) {
// Handle missing leader
continue;
}
// Check node state
if (client.isUnavailable(node)) {
// Node is in backoff period after failure
continue;
}
if (!client.ready(node, time.milliseconds())) {
// Connection not ready, may need to initiate
client.tryConnect(node);
continue;
}
// Get or create fetch session for node
FetchSessionHandler session = sessions.computeIfAbsent(
node,
n -> new FetchSessionHandler(logContext, n)
);
// Build fetch request for this node
FetchSessionHandler.FetchRequestData data = session.build(
fetchConfig,
partition,
nextFetchOffset(partition)
);
fetchRequestMap.put(node, data);
}
return fetchRequestMap;
}
Key Aspects:
the difference between Node and Broker in Kafka Implementation:
classDiagram
class Node {
-int id
-String host
-int port
-String rack
+idString() String
+hasRack() boolean
+isConnected() boolean
}
class Broker {
-int id
-Node node
-Map~String,Object~ config
-List~String~ endpoints
-Set~String~ roles
+rack() Optional~String~
+isEmpty() boolean
+hasMoved(Node) boolean
}
class KafkaCluster {
-List~Broker~ brokers
-Map~Integer,Node~ nodes
}
Broker --> Node : contains
KafkaCluster --> Broker : managesKey Differences:
A lightweight representation of network endpoint
Contains basic connection information:
public class Node {
private final int id;
private final String host;
private final int port;
private final String rack;
// Used for network connections
}
Used primarily by the client for network operations
Represents any network endpoint (could be broker or controller)
Broker:
A full Kafka broker instance
Contains complete broker configuration and state:
public class Broker {
private final int id;
private final Node node;
private final Map<String, Object> config;
private final List<String> endpoints;
private final Set<String> roles;
// Contains complete broker metadata
}
Includes additional metadata like:
Usage Example
// In Fetcher class
private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = new HashMap<>();
for (TopicPartition partition : subscriptions.fetchablePartitions()) {
// Gets Node (network endpoint) for the broker leading this partition
Node node = metadata.currentLeader(partition);
if (node == null) {
// Handle missing leader
continue;
}
// Network operations use Node
if (client.isUnavailable(node)) {
continue;
}
// Fetch sessions are maintained per Node
FetchSessionHandler session = sessions.computeIfAbsent(
node,
n -> new FetchSessionHandler(logContext, n)
);
// Build fetch request for this node
fetchRequestMap.put(node, session.build());
}
return fetchRequestMap;
}
Interaction Flow:
sequenceDiagram participant Client participant AbstractFetch participant LocalMetadataCache participant KafkaBroker participant ZK_or_Controller Note over ZK_or_Controller: Maintains authoritative
cluster metadata Note over KafkaBroker: Gets metadata from
ZK/Controller Note over LocalMetadataCache: Client's local cache
of cluster metadata Client->>AbstractFetch: prepareFetchRequests() AbstractFetch->>LocalMetadataCache: fetch() for leader info alt Metadata is stale or missing LocalMetadataCache->>KafkaBroker: Request metadata update KafkaBroker->>ZK_or_Controller: Get latest metadata ZK_or_Controller-->>KafkaBroker: Return metadata KafkaBroker-->>LocalMetadataCache: Update metadata end LocalMetadataCache-->>AbstractFetch: Return cached metadata
Key Points:
Node is used for:
Broker is used for:
In Consumer Context:
The decision to get metadata from ZooKeeper vs. Controller is based on whether the Kafka cluster is running in legacy mode (ZooKeeper-based) or KRaft (ZooKeeper-less) mode. This isn't decided at runtime - it's determined by how the Kafka cluster is configured when it starts up.
sequenceDiagram
participant Client
participant Broker
participant Controller
participant ZooKeeper
alt KRaft Mode (KIP-500)
Note over Broker,Controller: Cluster started with
controller.quorum.voters set
Client->>Broker: Metadata Request
Broker->>Controller: Get metadata
Controller-->>Broker: Return metadata
Broker-->>Client: Metadata Response
else Legacy Mode (ZooKeeper-based)
Note over Broker,ZooKeeper: Cluster started with
zookeeper.connect set
Client->>Broker: Metadata Request
Broker->>ZooKeeper: Get metadata
ZooKeeper-->>Broker: Return metadata
Broker-->>Client: Metadata Response
endThe key configuration that determines this:
KRaft Mode (Controller-based):
controller.quorum.votersLegacy Mode (ZooKeeper-based):
zookeeper.connectThe client code doesn't need to know or decide which mode is being used - it simply makes metadata requests to brokers, and the brokers handle getting the metadata from the appropriate source based on how they were configured.
This is part of Kafka's evolution to remove the ZooKeeper dependency (KIP-500), making the system simpler to deploy and maintain.
How brokers decide which broker is the leader for a partition:
Initial Leader Selection:
Leader Election Process:
sequenceDiagram
participant Controller
participant ISR as In-Sync Replicas
participant Broker
Note over Controller: Detect leader failure or
need for new leader
Controller->>ISR: Get list of eligible replicas
alt Has eligible ISR
Controller->>Controller: Select first replica from ISR
Controller->>Broker: Notify new leader
Broker-->>Controller: Accept leadership
else No eligible ISR
alt unclean.leader.election.enable=true
Controller->>Controller: Select from all replicas
else unclean.leader.election.enable=false
Controller->>Controller: Keep partition offline
end
end
Controller->>Broker: Broadcast metadata updateLeadership Eligibility:
Failure Handling:
Configuration Factors: