Skip to content

Commit

Permalink
Merge branch 'main' into choosePartition
Browse files Browse the repository at this point in the history
  • Loading branch information
chinghongfang committed Aug 17, 2023
2 parents 4215c91 + d4c3d4b commit eb1bbe3
Show file tree
Hide file tree
Showing 264 changed files with 8,560 additions and 4,216 deletions.
3 changes: 2 additions & 1 deletion app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ dependencies {

implementation project(':common')
implementation project(':fs')
implementation project(':connector')
implementation libs["kafka-client"]
implementation libs["jcommander"]
// we don't use slf4j actually, and it is used by kafka so we swallow the log.
implementation libs["slf4j-nop"]
implementation libs["opencsv"]
implementation libs["commons-math3"]
implementation libs["jackson-datatype-jdk8"]
implementation libs["kafka-connect-api"]
}

application {
Expand Down
9 changes: 6 additions & 3 deletions app/src/main/java/org/astraea/app/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import org.astraea.app.automation.Automation;
import org.astraea.app.benchmark.BalancerBenchmarkApp;
import org.astraea.app.performance.Performance;
import org.astraea.app.publisher.MetricPublisher;
import org.astraea.app.version.Version;
Expand All @@ -39,7 +40,9 @@ public class App {
"version",
Version.class,
"metric_publisher",
MetricPublisher.class);
MetricPublisher.class,
"balancer_benchmark",
BalancerBenchmarkApp.class);

static void execute(Map<String, Class<?>> mains, List<String> args) throws Throwable {

Expand Down Expand Up @@ -70,8 +73,8 @@ static void execute(Map<String, Class<?>> mains, List<String> args) throws Throw
method.invoke(null, (Object) args.subList(1, args.size()).toArray(String[]::new));
} catch (InvocationTargetException targetException) {
// Print out ParameterException, don't throw.
if (targetException.getTargetException() instanceof ParameterException) {
System.out.println(targetException.getTargetException().getMessage());
if (targetException.getTargetException() instanceof ParameterException exception) {
System.out.println(exception.getMessage());
} else {
throw targetException.getTargetException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package org.astraea.app.argument;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class PositiveIntegerListField extends PositiveNumberListField<Integer> {
@Override
public List<Integer> convert(String value) {
return Stream.of(value.split(SEPARATOR)).map(Integer::valueOf).collect(Collectors.toList());
return Stream.of(value.split(SEPARATOR)).map(Integer::valueOf).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package org.astraea.app.argument;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class PositiveShortListField extends PositiveNumberListField<Short> {
@Override
public List<Short> convert(String value) {
return Stream.of(value.split(SEPARATOR)).map(Short::valueOf).collect(Collectors.toList());
return Stream.of(value.split(SEPARATOR)).map(Short::valueOf).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package org.astraea.app.argument;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StringListField extends ListField<String> {
@Override
public List<String> convert(String value) {
return Stream.of(value.split(SEPARATOR)).collect(Collectors.toList());
return Stream.of(value.split(SEPARATOR)).toList();
}
}
48 changes: 48 additions & 0 deletions app/src/main/java/org/astraea/app/argument/URLListField.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.argument;

import com.beust.jcommander.ParameterException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.astraea.common.Utils;

public class URLListField extends ListField<URL> {
@Override
public List<URL> convert(String value) {
return Stream.of(value.split(SEPARATOR))
.map(string -> Utils.packException(() -> new URL(string)))
.collect(Collectors.toList());
}

@Override
protected void check(String name, String value) {
super.check(name, value);
Stream.of(value.split(SEPARATOR))
.forEach(
string -> {
try {
new URL(string);
} catch (MalformedURLException e) {
throw new ParameterException(string + " should be URL");
}
});
}
}
4 changes: 2 additions & 2 deletions app/src/main/java/org/astraea/app/backup/Backup.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public void restoreDistribution(ClusterInfo clusterInfo, String bootstrapServers
.sorted(
Comparator.comparing(
replica -> !replica.isLeader()))
.map(replica -> replica.nodeInfo().id())
.collect(Collectors.toUnmodifiableList()))))))
.map(Replica::brokerId)
.toList())))))
.configs(topic.config().raw())
.run()
.toCompletableFuture()
Expand Down
Loading

0 comments on commit eb1bbe3

Please sign in to comment.