Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BALANCER] 比較網路的優化效果 #1823

Open
garyparrot opened this issue Jun 22, 2023 · 3 comments
Open

[BALANCER] 比較網路的優化效果 #1823

garyparrot opened this issue Jun 22, 2023 · 3 comments

Comments

@garyparrot
Copy link
Collaborator

比較 Balancer 的優化效果

@garyparrot
Copy link
Collaborator Author

這邊敘述建立情境的方法

建立叢集

下載下面這個檔案,解壓縮後裡面有一個序列化後的 ClusterInfo
exp2-cluster-info-before-greedy.bin.zip

用 Astraea 專案內的 library 可以反序列化裡面的資訊,之後要將裡面描述的叢集套用到一個 6 節點的 Kafka Cluster,其中每個節點 ID 必須要是 0,1,2,3,4,5。

下面是一個套用的範例程式碼

  @Test
  void restoreClusterInfo() throws IOException {
    var file = "/path/to/exp2-cluster-info-before-greedy.bin";
    try (
        var admin = Admin.of(bootstrap);
        var stream = Files.newInputStream(Path.of(file))) {
      var cluster = ByteUtils.readClusterInfo(stream.readAllBytes());

      System.out.println("Delete Topics");
      admin.topicNames(false)
          .thenApply(x -> x.stream()
              .filter(xx -> !xx.startsWith("__"))
              .collect(Collectors.toSet()))
          .thenApply(x -> {
            System.out.println("Delete: " + x);
            return x;
          })
          .thenCompose(admin::deleteTopics)
          .toCompletableFuture()
          .join();

      Utils.sleep(Duration.ofSeconds(10));

      System.out.println("Recreate Topics");
      cluster.replicas().stream()
          .collect(Collectors.groupingBy(Replica::topic, Collectors.mapping(Replica::topicPartition, Collectors.counting())))
          .entrySet()
          .stream()
          .map(x -> admin.creator()
              .topic(x.getKey())
              .numberOfPartitions(x.getValue().intValue())
              .numberOfReplicas((short)1)
              .run()
              .toCompletableFuture())
          .toList()
          .forEach(CompletableFuture::join);

      System.out.println("Relocate Replicas");
      admin.moveToBrokers(cluster.topicPartitions()
          .stream()
          .collect(Collectors.toUnmodifiableMap(
              tp -> tp,
              tp -> cluster.replicas(tp).stream()
                  .sorted(Comparator.comparing(x -> !x.isPreferredLeader()))
                  .map(Replica::brokerId)
                  .collect(Collectors.toList())
          )))
          .toCompletableFuture()
          .join();
      Utils.sleep(Duration.ofSeconds(5));

      System.out.println("Relocate to Folders");
      admin.moveToFolders(cluster.replicas()
          .stream()
          .collect(Collectors.toUnmodifiableMap(Replica::topicPartitionReplica, Replica::path)))
          .toCompletableFuture()
          .join();
      Utils.sleep(Duration.ofSeconds(5));

      System.out.println("Leader election");
      admin.preferredLeaderElection(cluster.topicPartitions());
      Utils.sleep(Duration.ofSeconds(5));

      admin.topicNames(true)
          .thenCompose(admin::clusterInfo)
          .toCompletableFuture()
          .join()
          .replicas()
          .stream()
          .filter(x -> x.isFuture() || x.isAdding() || x.isRemoving())
          .forEach(System.out::println);
    }
  }

建立 Producer & Consumer

exp2-ansible.txt
上述文字檔案裡面描述 Producer & Consumer 的設定,總共會需要 7 台獨立的設備,每一台執行一個 Astraea Performance Tool

image

每個欄位會對應到 Performance Tool 的參數

          - "./bin/app"
          - "performance"
          - "--bootstrap.servers {{ bootstrap_servers }}"
          - "--run.until 3h"
          - "--topics {{ topics }}"
          - "{{ throttle if throttle_enable else '' }}"
          - "{{ throughput if throughput_enable else '' }}"
          - "--record.key.table.seed {{ key_table_seed }}"
          - "--key.distribution {{ key_distribution }}"
          - "--key.distribution.config '{{ key_distribution_config | default('k=v') }}'"
          - "--read.idle 15s"
          - "--producers {{ 0 if (no_producer=='true') else 4 }}"
          - "--consumers {{ 0 if (no_consumer=='true') else 12 }}"
          - "--configs receive.buffer.bytes=1002400,buffer.memory=500000000,linger.ms=1,batch.size=1000000,fetch.max.bytes=50000000,fetch.min.bytes=20000000,max.partition.fetch.bytes=20000000,fetch.max.wait.ms=5,max.poll.records=9999999,check.crcs=false,acks=0,metrics.sample.window.ms=5000"

流量的趨勢

完成上述步驟後叢集會有這樣的流量趨勢:

節點的輸入

image

節點的輸出

image

如果趨勢不符合的話代表有地方重現失敗

@brandboat
Copy link
Member

想請教一下具體要比較的話,我們打算怎麼處理呢?

@garyparrot
Copy link
Collaborator Author

想請教一下具體要比較的話,我們打算怎麼處理呢?

Hi @brandboat,

我平常比較的方法是,以比較 A 方法和 B 方法的好壞為例:

  1. 設定一個比較的目標,那個目標需要可以被量化,比如比較節點的網路使用量使用程度可以去比較叢集內每個節點的流量之 Max-Min 或標準差。
  2. 設計一個 Kafka 使用情境,情境有 topic/partition/replica 數量、和每個 topic/partition 的寫入和讀取流量,情境的長相不同會大幅度影響問題的難度,簡單的情境人或演算法進來做大概都能做很好,為了有鑑別度,我通常是直接設計一個非常不好優化的情境(或至少對人來說沒有那麼直覺),比如 Partition 輸入輸出流量會很不一樣,然後每個 topic 的 replication factor 很不一樣等。
  3. 對一個實際的叢集套用上面的 Kafka 情境,套用後要確定 2 的應用端的行為符合預期,如果 Producer 本來預期寫入 x 流量但實際上只有 y 流量則代表有地方不正確,要檢查哪邊出了問題,Consumer 的部分也一樣要做檢查。
  4. 啟動 A 工具去優化上述的叢集
  5. 測量 A 的優化成果(使用1)
  6. 對另外一個實際的叢集套用上面的 Kafka 情境,這個情境要和 3 一模一樣
  7. 啟動 B 工具去優化上述的叢集
  8. 測量 B 的優化成果(使用1)

用 5 和 8 得到的數據去寫出比較的結論:在 xyz 硬體和 2 的情境下,方法 A 和 方法 B 有著 ... 結論。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants