Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

查核點 - consumer 吞吐量討論 #1475

Open
harryteng9527 opened this issue Feb 2, 2023 · 9 comments
Open

查核點 - consumer 吞吐量討論 #1475

harryteng9527 opened this issue Feb 2, 2023 · 9 comments

Comments

@harryteng9527
Copy link
Collaborator

related #1437 (comment)

此 issue 的目的是想討論: partition 的分配與 consumer 吞吐量的關係

實驗目的

希望能藉由此次實驗,測試依據 partition 流入資料量當作負載的 assignor 所分配出來的 assignment 會讓 consumers 的吞吐量與延遲能優化到什麼程度

而查核點要使用 Kafka 的預設 assignor - Range assignor 與 NetworkIngress assignor 來比較,下面為這兩個 assignor 的吞吐量表現以及實驗環境

實驗環境

  • 測試時共使用六台節點,3 台節點當作 broker、3 台節點當作 client 端
    • 3 台 brokers 是小台 12 代的電腦
    • 3 個 client 端分別是 12 代、11代、13代電腦
  • 每個 client 節點各執行 1 個 performance tool ,每個 performance tool 開啟 3 個 producers 、 1 個 consumer
    • 共 9 個 producers
    • 3 個 consumers
  • Partition 無 replica
  • Producer 所 send 的 record 大小為 1 KiB,且 value distribution 為 fixed

此次實驗使用的 performance tool 有開啟 throttle 功能,而會使用 throttle 功能是希望營造出流入每個 partition 的流量不同的情境,營造部份 partition 的資料量 skew 的情境,並且使用 Range assignor 與 NetworkIngress assignor 來比較 consumer group 整體的吞吐量

每個 partition 所承受的流量如下:

Topic\partition id 0 1 2 3 4 5 6 7 8
Topic - A 50 MB/s 50 MB/s 50 MB/s 30 MB/s 30 MB/s 30 MB/s 10 MB/s 10 MB/s 10 MB/s
Topic - B 50 MB/s 50 MB/s 50 MB/s 30 MB/s 30 MB/s 30 MB/s 10 MB/s 10 MB/s 10 MB/s
Topic - C 50 MB/s 50 MB/s 50 MB/s 30 MB/s 30 MB/s 30 MB/s 10 MB/s 10 MB/s 10 MB/s

Range assignor

使用 Range assignor 的 assignment 如下:

assignment
Consumer 1 Topic A - 0,1,2 、Topic B - 0,1,2 、Topic C - 0,1,2
Consumer 2 Topic A - 3,4,5 、Topic B - 3,4,5 、Topic C - 3,4,5
Consumer 3 Topic A - 6,7,8 、Topic B - 6,7,8 、Topic C - 6,7,8

Range 的分配讓每個 consumer 需要處理的資料量不同,這樣的 assignment 使得每個 consumer 需處理的資料量如下:

需處理的資料量
Consumer 1 450 MB/s
Consumer 2 270 MB/s
Consumer 3 90 MB/s

實際的吞吐量

與上面表格所想相同,實驗出來的吞吐量跟需處理的流量相近
0202-range-with-producer

Grafana snapshot

NetworkIngress assignor

此 assignor 是利用 NetworkIngress CostFunction 來取得每個 partition 的流入流量,使用的 metrics 是 ServerMetrics 的 BYTES_IN_PER_SEC 並且蒐集 15 分鐘的流入量(fifteenMinuteRate) 當作負載進行分配

此次實驗的 assignment 如下:

assignment
Consumer 1 Topic A - 0,5,8 、Topic B - 0,3,6 、Topic C - 2,5,6
Consumer 2 Topic A - 2,3,7 、Topic B - 2,5,8 、Topic C - 1,4,8
Consumer 3 Topic A - 1,4,6 、Topic B - 1,4,7 、Topic C - 0,3,7

根據此 assignment,每個 consumer 需要處理的資料量如下:

需處理的資料量
Consumer 1 270 MB/s
Consumer 2 270 MB/s
Consumer 3 270 MB/s

實際的吞吐量

0202-network-with-producer-consumer-throughput

Grafana snapshot

實驗討論

為了找出 consumer 吞吐量起起伏伏的原因,目前還在看 source code 且了解 consumer 參數的意義,並做一些小實驗驗證有無正確理解參數的意義,藉此找吞吐量起起伏伏的原因

不過目前可以先排除 producer 打資料不夠快,導致 consumers 需要等待資料的原因

為何可以排除 producer 資料打不夠快

我覺得可以排除的原因有下列兩點:

  1. 使用兩個 assignor 的實驗的 producer 參數都是相同的,若有打不夠快的問題,使用 Range assignor 的實驗應該也會受到影響
  2. 有使用 Grafana 監控 partition log size 的增長速率、 consumer 的消費速率來觀察是不是 producer 資料送得不夠快
Grafana 監控的數據

Range assignor 的數據如下:

  1. 以 consumed rate 與 partition 增長的速率的角度來看
    0202-range-01

此圖所表示的是 Consumer 所消費的吞吐量(圖表的 Legend 為 Consumer#)與 partition 內資料的增長速率(可看作 producer 送資料的速率,圖表的 Legend 為 log size#)

上圖給我的感覺為幾乎 send 多少資料就消費多少資料,除了太多資料量而消費不來的 Consumer#175 會有一點 Lag 外

  1. 以 log size 與 consumed byte 總量的角度來看

這邊與 rate 的差別是直接看 consumer 消費的總 bytes 數、partitions 資料量的增長
0202-range-log-consumed-total

從這張圖來看也是沒有 producer 影響到 consumer 的情形。因為 Consumer 消費的資料量都會 ≤ partition 內儲存的資料量


NetworkIngress assignor 的數據如下:

  1. 以 consumed rate 與 partition 增長的速率的角度來看

分別以各個 consumer 來觀察
0202-network-02

以上圖來看,Consumer#174 表示一個 consumer 所消費的 total bytes 再換算成 rate 去看消費的速率,而 assigned by 174 表達的是 Consumer#174 所被分配到的 partitions 資料的增長速率,PQL 如下

# example
rate(kafka_consumer_consumer_fetch_manager_metrics_bytes_consumed_total{topic=""}[3s]) # Consumer#174
sum(rate(kafka_log_log_size)) # assigned by 174

大概介紹完此圖表使用的 query 、物理意義後,下面再貼上其他的 consumer 圖表
0202-network-03

0202-network-04

這幾張圖帶給我的感覺是 consumer 沒辦法好好的消化送進 partition 的資料。如 174 與 176, consumer 的消費速率與 partition 的資料差了一半

所以不會是 producer 資料打不夠快造成 consumer 需要等待資料,進而影響到吞吐量

  1. 以 log size 與 consumed byte 總量的角度來看

0202-network-log-consumed-total

從這張圖,也能得知 producer 沒有影響到 consumer。因為 Consumer 總共消費的資料量是低於總資料量的,圖中的 Consumer# 為消費的 total bytes、log size# 為 consumer# 被分配到的 partitions 的資料量

@chia7712
Copy link
Contributor

chia7712 commented Feb 3, 2023

@harryteng9527 感謝測試,這次實驗做得完整很多,讚讚

3 台 brokers 是小台 12 代的電腦

可否描述一下 partitions 在這三台設備的分佈?

@chia7712
Copy link
Contributor

chia7712 commented Feb 3, 2023

NetworkIngress assignor
此 assignor 是利用 NetworkIngress CostFunction 來取得每個 partition 的流入流量,使用的 metrics 是 ServerMetrics 的 BYTES_IN_PER_SEC 並且蒐集 15 分鐘的流入量(fifteenMinuteRate) 當作負載進行分配
此次實驗的 assignment 如下:

另外可否針對這個不穩的狀況再做一個實驗,在既有叢集下,把 producers 關掉,然後只開 consumers 拉資料(用一樣的partitions分佈)看看是否一樣有資料流不穩的狀態,這個實驗不需要 assignor,全部都手動指定 partitions

@harryteng9527
Copy link
Collaborator Author

可否描述一下 partitions 在這三台設備的分佈?

partition 分佈如下表:

Partition
Broker - 1001 Topic A - 0,3,6 、Topic B - 2,5,8 、 Topic C - 1,4,7
Broker - 1002 Topic A - 1,4,7 、Topic B - 0,3,6 、 Topic C - 2,5,8
Broker - 1003 Topic A - 2,5,8 、Topic B - 1,4,7 、 Topic C - 0,3,6

創建 partition 的指令如下:

./kafka-topics.sh --bootstrap-server 192.168.103.171:9092 --create --topic topicA --replica-assignment 1001,1002,1003,1001,1002,1003,1001,1002,1003
./kafka-topics.sh --bootstrap-server 192.168.103.171:9092 --create --topic topicB --replica-assignment 1002,1003,1001,1002,1003,1001,1002,1003,1001
./kafka-topics.sh --bootstrap-server 192.168.103.171:9092 --create --topic topicC --replica-assignment 1003,1001,1002,1003,1001,1002,1003,1001,1002

會想這樣分佈是因為想要讓每個 broker 都承受相同負載,像上面提到的 partition 流量分佈

@harryteng9527
Copy link
Collaborator Author

目前發現了 Consumer 讀取每個節點的吞吐量會受到各節點 partition 剩餘資料量的多寡影響,並且有去閱讀相關 source code

  1. Kafka Consumer Client 端發送 Fetch request 的邏輯
  2. Kafka Broker 處理 Fetch request 的邏輯

這次實驗最後得出了一個結論, assignor 必須感知在分配相同 broker 的 partitions 時,要避免分配流量差異過大的 partition 給 consumer,否則會造成情境一實驗的問題 - 吞吐量低落

以下利用實驗驗證當 Consumer 被分配到不同資料規模的 partition 時吞吐量的變化

情境一、 Partitions 配置在同一台 broker 上

Topic 與 Partition 數量

創建 1 個 topic: test,該 topic 有 2 個 partitions,分別送不同規模的資料給 test 中的 partitions,每個 partition 的資料量如下:

Topic-Partition 編號 資料量
test-0 0 KiB
test-1 200 GiB

叢集拓樸

                                  switch(10G)
    ┌───────────┬───────────┬─────────┴──────────┐
 Broker1     Broker2     Broker3              Client1  

資料量分配

資料分佈

測試方式

  1. 灌資料到 partition 內存放:Performance tool 先開啟 producer,將資料寫到 partition 中,producer 發送的每筆 record 皆為 1KiB ,資料量如上圖所示。

  2. 使用 Performance tool 測試 consumer 的吞吐量

    • 開啟 1 個 consumer

      • consumer 訂閱方式:使用 assign 的方式指定讀取 test-0test-1

      • offset 設定:earliest

      • 其餘參數設置:default

實驗數據

以下是根據閱讀 source code 後,先自己用紙筆計算在實驗環境執行後會有什麼樣的結果,再根據實驗來驗證是否思考的正確

Fetch Request

Consumer 使用的參數都是預設,所以 fetch.max.wait.ms 為 500ms。

這實驗讓 broker 接收到的 fetch request 大都在 4 req/s 附近,是因為目前 consumer fetch 的邏輯所導致,以下簡單自己計算 fetch request 應該為多少,再以實驗佐證

自己紙筆計算
  1. Consumer 發送 fetch request 給 test-0, test-1 所在的節點 Broker
  2. Broker 根據接收到的 fetch request 讀取 partition(s) 的資料
  3. Broker 讀取 test-0, test-1 的資料後,立刻 response 資料給 consumer
  4. Consumer 接收到 response 後開始處理 partition 的資料,處理的流程如下:
    • 將 partition 的資料處理成一批一批的
    • 檢查有沒有要發送的 fetch request,以目前的例子,test-0 會被包進 fetch request 中,因為 consumer 不需要處理 test-0 的資料
    • 將一批資料回傳給 consumer
  5. Broker 收到來自 consumer 的 fetch request 後,讀取 test-0 的資料,而因為 test-0 中沒有資料,所以這個 fetch request 會在 broker 中等待一段時間再 response
    • 這邊會影響到 consumer 的吞吐量,因為 consumer 目前的實作不會對正在處理 自己的fetch request 的 broker 再次發送 fetch request,所以 consumer 需要等待 broker response 後才能再次發送 fetch request 給 broker

所以在此實驗中, broker 每秒會收到的 fetch request 數量會是 4

  • 第一次 fetch 發送 test-0, test-1
  • 第二次 fetch 發送 test-0,因為 test-1 還未處理完,不會對 test-1 發送 fetch request (因為 test-0 沒有資料,broker 會等待 500ms)
  • 第三次 fetch 發送 test-0, test-1
  • 第四次 fetch 發送 test-0,因為 test-1 還未處理完,不會對 test-1 發送 fetch request (因為 test-0 沒有資料,broker 會等待 500ms)
實驗佐證

如自己思考相同,Broker 每秒接收到的 fetch request 為 4

broker side fetch

Delay expire:

這個 metric 是 Kafka broker 紀錄 未完成的delay operation 用的,例如 test-0 沒有資料好 fetch,所以 broker 會等待直到 expire

如上面紙筆計算時會有 2 次 fetch request 會等待直到 expire,因為 test-0 沒有資料。實驗後也發現確實如此

delay expire

Throughput

因為上面分析出來發送 fetch request 的數量很低,所以吞吐量自然也會受到影響

為什麼是 2MiB 這個是其他參數可以設置的,本次實驗不討論

  • Consumer 的消費速率如下圖,差不多為 2MiB

test-1_throughput-0

情境二、 Partitions 配置在不同台 broker 上

Topic 與 Partition 數量

創建 1 個 topic: test,該 topic 有 2 個 partitions,分別送不同規模的資料給 test 中的 partitions,每個 partition 的資料量如下:

Topic-Partition 編號 資料量
test-0 0 KiB
test-1 200 GiB

叢集拓樸

                                  switch(10G)
    ┌───────────┬───────────┬─────────┴──────────┐
 Broker1     Broker2     Broker3              Client1  

資料量分配

資料分佈2

測試方式

  1. 灌資料到 partition 內存放:Performance tool 先開啟 producer,將資料寫到 partition 中,producer 發送的每筆 record 皆為 1KiB ,資料量如上圖所示。

  2. 使用 Performance tool 測試 consumer 的吞吐量

    • 開啟 1 個 consumer

      • consumer 訂閱方式:使用 assign 的方式指定讀取 test-0test-1

      • offset 設定:earliest

      • 其餘參數設置:default

實驗數據

也是先以紙筆的方式先計算實驗預期會如何

情境二的實驗,因為有資料與沒資料的 partition 放在不同 broker 上,所以 consumer 的吞吐量不會被沒有資料的 partition 影響,吞吐量會比情境一的 2MiB 還要高得多

Throughput

throughput-2

@chia7712
Copy link
Contributor

這次實驗最後得出了一個結論, assignor 必須感知在分配相同 broker 的 partitions 時,要避免分配流量差異過大的 partition 給 consumer,否則會造成情境一實驗的問題 - 吞吐量低落

麻煩也把 workaround 寫上去,並且說明原因

@harryteng9527
Copy link
Collaborator Author

Workaround

  • 分配 assignment 時,需要分配同一個節點內負載相近的 partition(s) 給 同一個consumer,要避免分配負載差異太大的 partition 給同個 consumer

原因

  • 若分配同一節點內負載差異很大的 partition 給 consumer ,會遇到此次實驗發現的問題:

    1. broker 發現讀不到 partition 的資料,故多等待一段時間 (fetch request 還在 broker 上,未 response)

    2. consumer 不會發送 fetch request 給還在處理該 consumer fetch request 的節點

    3. 不常發送 fetch request 拉取資料,造成 consumer 吞吐量下降

TODO

  • 還需要做個實驗去得到上面提到的 負載相近 數字

@chia7712
Copy link
Contributor

可否也說明一下如果使用者不是用我們的assignor,但也遇到這個問題的話,他們可以調整哪個參數來處理這個議題?然後為何那個參數可以修復該問題

@harryteng9527
Copy link
Collaborator Author

這個問題

若這個問題是指:consumer 遇到同個節點 partition 剩餘讀取資料量差異過大的情境,那麼使用者可以將 fetch.max.wait.ms 調小

Why

會發生這個問題主要是因為 broker 第一次讀取 partition 資料時讀不到資料,而這會牽扯到

  1. consumer 讀取資料的邏輯
    • consumer 不會對還未 response 的節點發送 fetch request
  2. broker 讀取資料後回應 consumer 的處理方式(不論有沒有讀到)
    • 處理方式取決於 consumer 的兩個參數 fetch.max.wait.msfetch.min.bytes

下面舉個例子說明:

  • 為什麼調整此參數會優化吞吐量

Example

Kafka consumer 中 fetch.max.wait.msfetch.min.bytes 的預設值為 500ms 與 1byte

如下圖,紅色線表示的是 fetch.max.wait.ms 的大小

若 broker 沒有讀取到任何資料,broker 會等待一段時間(fetch.max.wait.ms) 後再讀取一次
delay

所以把 fetch.max.wait.ms 調小,broker 等待的時間就會縮短,就會比較快 response consumer ,讓 consumer 能夠繼續發送 fetch request 拉取資料

delay-2

@chia7712
Copy link
Contributor

@harryteng9527 解釋得很好,麻煩之後用同樣的水平把這個問題和解法放到 QA 文件裡面

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants