Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASSIGNOR] Implement CostAwareAssignor #1524

Open
wants to merge 67 commits into
base: main
Choose a base branch
from

Conversation

harryteng9527
Copy link
Collaborator

@harryteng9527 harryteng9527 commented Feb 27, 2023

此 PR 實作 CostAwareAssignor,基於使用者選擇的 cost function 量化 partition 的負載,再依照量化的負載分配 partition 給 consumers,達到負載平衡的目的

分配方式

此 PR 實作較直覺的 greedy 分配,將 tp 分配給負載最低的 consumer。在分配前,會先用 tp 篩選 consumer,以下為篩選流程:

  1. 將沒有訂閱此 topic 的 consumer 過濾掉
  2. 利用 tp 的 incompatibility 過濾不適合分配的 consumer,關於 incompatibility 可參考 [COST] Provide feedback on incompatible partitions for partition cost #1578[COST] Implement feedback for network ingress cost #1637
  3. 若有適合分配的 consumers,則從中挑選負載最低的 consumer 分配。否則,從有訂閱該 topic 的 consumer(s) 中挑選負載最低的 consumer

什麼是適合分配的 consumer

分配 tp 當下,consumer 當時被分配到的 partitions 的 incompatibility 中不存在 tp,即為適合的 consumer

以 NetworkIngressCost 為例,因為流入速度較慢的 partition 會拖慢同節點內其他 partitions 的消費速度,所以同個節點中,流入速度差異過大的 partition 會被視為不適合放在同一個 consumer 上 (詳細可看 #1475 )


先前 PR 內容

這隻 PR 先做出分配節點內流量相近的 partitions 給同一個 consumer,並在這隻 PR 上討論上面幾點,或是這隻 PR 先把第1版 NetworkIngressAssignor 實作完,之後再開其他 PR 優化

目前有以現在推上來的第1版做實驗,實驗環境與流程 #1475 相同

Throughput

0227-throughput

Latency

0227-latency

做完實驗後發現,NetworkIngress assignor 能夠將 consumer 處理不完的資料(partition)分配給其他 consumer,所以整體的吞吐量上升,也降低了延遲。
只不過目前的實驗情境(使用 throttle )是刻意製造的,希望之後改用 Key distribution 製造出比較能說服人的情境

@chia7712
Copy link
Contributor

做完實驗後發現,NetworkIngress assignor 能夠將 consumer 處理不完的資料(partition)分配給其他 consumer,所以整體的吞吐量上升,也降低了延遲。
只不過目前的實驗情境(使用 throttle )是刻意製造的,希望之後改用 Key distribution 製造出比較能說服人的情境

請問可否提供明確的數字?例如大概提升了幾趴

@harryteng9527
Copy link
Collaborator Author

請問可否提供明確的數字?例如大概提升了幾趴

吞吐量

  • Range 的平均吞吐量為 718MB/s
  • NetworkIngress 的平均吞吐量為 779 MB/s

NetworkIngress 的分配讓 consumers 的吞吐量大約提昇 8.5%

latency

consumer 處理每個 fetch request 的 latency 如下:

  • Range 處理 fetch request 的 latency 為 15.7 ms
  • NetworkIngress 處理 fetch request 的 latency 為 10.92 ms

NetworkIngress 的延遲相較 Range assignor 降低了 30 %

計算 latency 的方式如下:
image

  • Li 為 consumeri 的 average fetch latency
  • Ri 為 consumeri 平均發送的 fetch request 數量
  • n 為 consumer 的總數

@harryteng9527 harryteng9527 changed the title Implement NetworkIngress assignor [ASSIGNOR] Implement NetworkIngress assignor Mar 2, 2023
@chia7712
Copy link
Contributor

chia7712 commented Mar 8, 2023

麻煩rebase

@harryteng9527
Copy link
Collaborator Author

harryteng9527 commented May 2, 2023

這次的 commit 為 Assignor 新增 CombinatorShuffler 兩個元件,為了測試有沒有達到負載平衡的功能,所以沒有另外開 PR 實作這兩個元件,元件所做的工作大致如下:

Combinator

利用 greedy 的策略將 partition 分配給 cost 最低的 consumer,這邊不會考慮 incompatible 的情形

Shuffler

主要的工作是將 partitions 一直洗牌組合,最後在組合的解中找到一組相對好的解。但在洗牌前會先去判斷 greedy 的分配結果可不可行(主要是因為洗牌找解的過程中,可能會滿耗時的(實測大概 10 sec 左右))

  1. 先判斷有無 incompatibility,若無則直接回傳 combinator 所分配的結果
  2. 若有 incompatibility,先計算 consumer 有沒有被分配到 incompatible partition。若 consumer 拿到的 partitions 都是適合的,直接回傳 combinator 所分配的結果
  3. 若 consumer 拿到的 partitions 中有不適合的,開始做 shuffler 的洗牌組合
  4. 為每個隨機組合計算標準差,取標準差最低的 10%,從中取得最少擁有不適合 partition 的組合當作最終 assignment
洗牌組合流程

流程大概如下:

  1. 使用者可設定找解的時間,在時間內找出一大堆解(assignment 的組合)
  2. 用限制的條件(訂閱內容、incompatible)去 filter 掉解,找出符合限制的解
  3. 從符合限制的解中,找出較好的解將之當作assignment
計算隨機組合的標準差

計算標準差是為了看組合中 consumer 之間分配的 cost 差異有沒有很大

@chia7712
Copy link
Contributor

chia7712 commented May 2, 2023

@harryteng9527 感謝更新,可否先提供一下數字?例如這個方法的改善程度,以及計算出一個可用結果所需的時間(成本)

@harryteng9527
Copy link
Collaborator Author

harryteng9527 commented May 2, 2023

實驗環境

節點

總共使用 15 台節點做實驗:

  • 6 台節點當 Brokers
  • 3 台節點當 Producer 端,每台開啟 3 個 producers
  • 6 台節點當 Consumer 端,每台開啟 2 個 consumers
Broker Producer Consumer
Total 6 9 12

Topic / Partition 數量

叢集內有 10 個 topics,每個 topic 有 16 個 partitions,共 160 個 partitions

Partition 依照 Kafka 預設的擺放

Producer 發送的 record size / 分佈

  • key 使用 zipfian 分佈,exponent 設定為 1.25
  • record size 約為 1KiB

找解成本

以上面提到的實驗環境來評估找解的成本,主要會有下列幾項:

等待 bean 的時間 + 找一大堆解的時間(使用者可自訂,預設為 5 秒) + 從一大堆解中找到最終解的時間
=> 18 ms + 5000 ms + 1418 ms = 6436 ms

成本會跟使用者所設定的找解時間partition 數量有關係,這次實驗所隨機找到的解數量有:95657 個


整體差距

以下實驗是使用 Performance tool,分別選擇 Range assignor 與 CostAware assignor 量測 consumers 吞吐量,執行時間為 3 分鐘

image

上圖為執行 Performance tool 測試時,全部 Topic 的 ByteIn 與 ByteOut 圖表,綠色為 ByteIn黃色為 ByteOut

從這張圖可以看到使用 CostAware assignor 時, consumer 消費 topics 的速度跟得上 producer 送到 topics 的速度。而使用 Range assignor 會有大約 1GiB 的 Lag

平均值

0503-avg

  • Range 平均吞吐量:4.084615385 GiB
  • CostAware 平均吞吐量:4.852307692 GiB

CostAware assignor 的平均吞吐量提升了 18.79 %

最大值&最小值差異

這邊比較使用 Range 與 CostAware assignor 時,Consumer group 吞吐量最大值與最小值的差異

  • Range 最大值CostAware 最大值的差距:2 GiB
  • Range 最小值CostAware 最小值的差距:0.44 GiB

@chia7712
Copy link
Contributor

chia7712 commented May 2, 2023

從這張圖可以看到使用 CostAware assignor 時, consumer 消費 topics 的速度跟得上 producer 送到 topics 的速度

請問一下綠色 (ByteIn) 的圖看不太出差異,可否提供一下數字?平均和最大最小的寫入吞吐量

@harryteng9527
Copy link
Collaborator Author

平均和最大最小的寫入吞吐量

平均寫入吞吐量 (GiB) 最大寫入吞吐量 (GiB) 最小寫入吞吐量 (GiB)
Range 實驗 5.091538462 6.35 4.79
CostAware 實驗 5.112307692 6.34 4.45

@chia7712
Copy link
Contributor

chia7712 commented May 2, 2023

命名的部分要稍微思考一下,如果程式碼的實作和命名差很多,通常代表寫的時候腦袋有點混亂 ...

@harryteng9527
Copy link
Collaborator Author

harryteng9527 commented May 23, 2023

目前對 Cost-Aware assignor 做了多次實驗,都有通過查核點,使用的 Cost-Aware assignor 是使用這隻 PR 的程式碼來測試

以下是查核點實驗所執行的時間、查核項目:

  • 第一、三個查核點執行 3 分鐘的實驗,確認 group 的吞吐量與 consumer 最大、最小吞吐量差異,都有提昇 15 %
  • 第二個查核點執行約 15 分鐘的實驗,量測使用兩個不同 assignor 的 consumers 的 e2e latency

實驗環境

  • 6 台節點當 brokers,共有 160 個 partitions

  • 3 台節點當 producers,六台節點當 consumers

Client 端皆開啟 Performance tool,Producer 發送的 records 大小固定 1KiB,Key 的分佈為 ZipFian

實驗圖

第一、三查核點

此二查核點是比較 Cost-Aware assignor 與 Kafka default assignor 的吞吐量以及 consumer 最大、小吞吐量差異(吞吐量全距)

Grafana snapshot

實驗時間為三分鐘,兩個實驗可以一起跑,以下是實驗數據

Consumer group 吞吐量

在執行三分鐘的時間內,Producers 平均吞吐量如下表

Range 實驗 Cost-Aware 實驗
Producer 平均吞吐量 3014.46 MiB/s 2808.07 MiB/s

Consumer group 的吞吐量折線圖如下,因為有將負載(流入 partition 的流量)較平均的分配給 consumers,所以使用 Cost-Aware assignor 的 consumer group 吞吐量較高

Consumer group 吞吐量比較

Consumer group 的平均吞吐量提昇約 30%,計算與圖表如下:

  • 使用 Cost-Aware assignor 的 cnsumer group 平均吞吐量為 2427.16 MiB/s

  • 使用 Range assignor 的 consumer group 平均吞吐量為 1864.38 MiB/s

Consumer group 平均吞吐量比較

Consumer group 中最大與最小吞吐量差值

下面是用 consumer group 中 consumer 吞吐量全距(吞吐量最大的 consumer 減去吞吐量最小的 consumer) 所製成的折線圖

Consumer 吞吐量全距

全距平均值如下

  • 使用 Cost-Aware assignor 的 consumers 全距為 59.93 MiB
  • 使用 Range assignor 的 consumers 全距為 386.51 MiB

全距平均值

第二查核點

此查核點是與 Kafka default assignor 比較,可降低平均 e2e latency 15%

Grafana snapshot

實驗的時間約為 15 mins,e2e latency 的實驗數據是用 Performance tool 紀錄的,下面是平均端對端延遲的實驗圖表

平均端對端延遲

平均端對端延遲

此圖表所紀錄的是 Consumer group 中 consumer 每秒的端對端延遲平均值,計算方式如下表格

Time\Consumer name c1 c2 c3 平均端對端延遲 (ms)
1 sec 5 15 22 (5+15+22)/3 = 14
2 sec 15 7 4 8.67
3 sec 3 69 654 242

表格中的 Time 為圖表的 x 軸

y 軸的點代表表格內的 平均端對端延遲,計算方式為 consumer 的 e2e latency 平均

平均端對端延遲的平均值

  • Cost-Aware 的 e2e 平均值為 811.2 ms,為實驗時每秒端對端延遲的平均
  • Kafka default 的 e2e 平均值為 2214.28 ms

滿足查核點的 e2e latency 降低 15 %

平均端對端延遲的平均值

測試 15 分鐘的吞吐量圖表

因為在測試 e2e 情境時,也有觀察吞吐量,所以也放一下 15min 的實驗狀況。

Producer 每秒吞吐量如下表格

Range 實驗 Cost-Aware 實驗
Producer 平均吞吐量 2321.76 MiB/s 2430.45 MiB/s

以下分別是吞吐量以及平均吞吐量的圖表,目前計算下來平均吞吐量能提昇 33 %

  • 使用 Cost-Aware assignor 的平均吞吐量為 2236 MiB/s
  • 使用 Range assignor 的平均吞吐量為 1678 MiB/s

Consumer group 吞吐量

Consumer group 平均吞吐量

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants