Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STORM-3683] Check if JVM options used for launching worker are valid. #3319

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions storm-client/src/jvm/org/apache/storm/StormSubmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ private static void validateConfs(Map<String, Object> topoConf, StormTopology to
InvalidTopologyException, AuthorizationException {
ConfigValidation.validateTopoConf(topoConf);
Utils.validateTopologyBlobStoreMap(topoConf);
Utils.validateWorkerLaunchOptions(null, topoConf, null, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A validation here is not likely to prevent the issue since it doesn't use the supervisorConf here. But in BasicContainer, it will use supervisorConf, which might have some default values.

We can add the check inside Nimbus.submitTopologyWithOpts (server side), and if it fails the check, throws InvalidTopologyException. This can avoid code duplication and avoid exposing too many method on the client side.

But the problem is still it will use nimbusConf instead of supervisorConf, unless we can find a clean way to use the same source of truth.

}

/**
Expand Down
199 changes: 199 additions & 0 deletions storm-client/src/jvm/org/apache/storm/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.jar.JarFile;
Expand All @@ -74,6 +75,7 @@
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.NimbusBlobStore;
import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.ComponentCommon;
Expand Down Expand Up @@ -1907,4 +1909,201 @@ private void readArchive(ZipFile zipFile) throws IOException {
}
}
}

/**
* Return path to the Java command "x", prefixing with ${JAVA_HOME}/bin/ if JAVA_HOME system property is defined.
* Otherwise return the supplied Java command unmodified.
*
* @param cmd Java command, e.g. "java", "jar" etc.
* @return command string to use.
*/
public static String getJavaCmd(String cmd) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this.


String ret = null;
String javaHome = System.getenv().get("JAVA_HOME");
if (StringUtils.isNotBlank(javaHome)) {
ret = javaHome + File.separator + "bin" + File.separator + cmd;
} else {
ret = cmd;
}
return ret;
}

/**
* Find the list of substitutable variable names in supplied string. Substitutable variable names are
* preceded and followed by a percent sign and composed of upper case characters, numbers, underscore and dash
* characters.
*
* @param str string with 0 or more substitutable variables.
* @return a set with 0 or more variable names.
*/
public static Set<String> findSubstitutableVarNames(String str) {
final String patternStr = "%([A-Z0-9_-]+)%";
final int matchedGroup = 1; // matched group is 1 because of the capturing parens in the patternStr

Set<String> ret = new TreeSet<>();
Pattern p = Pattern.compile(patternStr);
Matcher m = p.matcher(str);
int matchCnt = 0;
while (m.find()) {
matchCnt++;
try {
ret.add(str.substring(m.start(matchedGroup), m.end(matchedGroup)));
} catch (IllegalStateException ex) {
String err = String.format("Internal Error in findSubstitutableVarNames(\"%s\"), pattern \"%s\", matchCnt=%d",
str, patternStr, matchCnt, str);
LOG.error(err, ex);
}
}
return ret;
}

/**
* Perform variable substitutions using the supplied substitutions.
* Leave unchanged any placeholders for undefined variables.
*
* @param str original string with placeholders for replacements.
* @param substitutions map of substitution variables and values.
* @return final string with variable replaced with supplied substitutions.
*/
public static String substituteVarNames(String str, Map<String, Object> substitutions) {
Set<String> vars = findSubstitutableVarNames(str);
for (String var : vars) {
if (!substitutions.containsKey(var)) {
continue;
}
String orig = "%" + var + "%";
String substitution = String.valueOf(substitutions.get(var));
str = str.replace(orig, substitution);
}
return str;
}

public static List<String> substituteVarNamesInObject(Object value, Map<String, Object> substitutions) {
List<String> rets = new ArrayList<>();
if (value instanceof String) {
String string = substituteVarNames((String) value, substitutions);
if (StringUtils.isNotBlank(string)) {
rets.add(string);
}
} else if (value instanceof List) {
((List<String>) value).forEach(x -> {
x = substituteVarNames(x, substitutions);
if (StringUtils.isNotBlank(x)) {
rets.add(x);
}
});
}
return rets;
}

/**
* Enumeration of variables that can be substituted in Java command string.
*/
public enum WellKnownRuntimeSubstitutionVars {
ID("ID"),
WORKER_ID("WORKER-ID"),
TOPOLOGY_ID("TOPOLOGY-ID"),
WORKER_PORT("WORKER-PORT"),
HEAP_MEM("HEAP-MEM"),
OFF_HEAP_MEM("OFF-HEAP-MEM"),
LIMIT_MEM("LIMIT-MEM");

private String varName;

WellKnownRuntimeSubstitutionVars(String varName) {
this.varName = varName;
}

public String getVarName() {
return varName;
}

public static Map<String, Object> getDummySubstitutions() {
Map<String, Object> ret = new HashMap<>();
ret.put(ID.getVarName(), "dummyId");
ret.put(WORKER_ID.getVarName(), "dummy-worker-id");
ret.put(TOPOLOGY_ID.getVarName(), "dummy-topology-id");
ret.put(WORKER_PORT.getVarName(), 6700);
ret.put(HEAP_MEM.getVarName(), 1024);
ret.put(OFF_HEAP_MEM.getVarName(), 1024);
ret.put(LIMIT_MEM.getVarName(), 1024);
return ret;
}
}

/**
* Launch a validation java command (java -showversion java.util.prefs.Base64 1 1) with JVM options
* used in worker launch to validate JVM options.
*
* @param supervisorConf configuration for the supervisor. May be null.
* @param topoConf configuration for the topology. Must be provided.
* @param substitutions may be null in which case it is {@link WellKnownRuntimeSubstitutionVars#getDummySubstitutions()}
* @param throwExceptionOnFailure if true then an exception is thrown instead of returning false
* @return true if the option combination is valid, false otherwise (or throws InvalidTopologyException)
*/
public static boolean validateWorkerLaunchOptions(Map<String, Object> supervisorConf, Map<String, Object> topoConf,
Map<String, Object> substitutions, boolean throwExceptionOnFailure)
throws InvalidTopologyException {
// from storm-server/.../BasicContainer.mkLaunchCommand
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we commonize this, or grab them from a BasicContainer method? Hard to maintain this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BasicContainer is in storm-server package which depends on the storm-client package for compile.
Should be commonized somehow - otherwise this is a hidden forward dependency. Will look into this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that storm-server:BasicContainer.java is now using storm-client:Utils.WellKnownRuntimeSubstitutionVars. Is this sufficient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like others to chime in here. I like the intent of this change, but the way we're replicating the code here so specifically I'm not so happy with. I understand the dependency issue causes problems.

Copy link
Contributor

@Ethanlm Ethanlm Sep 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the replication either. It adds a lot of maintenance overhead. The suggestion I can think of is to move the whole check into server side at Nimbus.submitTopologyWithOpts, which is better in a few ways. But it has its own problem to be taken care of (see my comments above).

Some thoughts on this:
This feature (check GC option conflicts) is nice to have. But it is not hard to find out that workers fail because of GC option conflicts. So I think we don't have to implement this feature if there is no good/clean way to implement it.

if (supervisorConf == null) {
supervisorConf = new HashMap<>();
}
if (substitutions == null) {
substitutions = WellKnownRuntimeSubstitutionVars.getDummySubstitutions();
}
List<String> commandList = new ArrayList<>();
commandList.add(getJavaCmd("java"));
commandList.add("-showversion");

Object logWriterChildOpts = topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
if (logWriterChildOpts != null) {
commandList.addAll(substituteVarNamesInObject(logWriterChildOpts, substitutions));
}

commandList.add("-server");
commandList.addAll(substituteVarNamesInObject(supervisorConf.get(Config.WORKER_CHILDOPTS), substitutions));
commandList.addAll(substituteVarNamesInObject(topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), substitutions));
commandList.addAll(substituteVarNamesInObject(Utils.OR(
topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),
supervisorConf.get(Config.WORKER_GC_CHILDOPTS)), substitutions));

// this config is only available in storm-server module DaemonConfig.WORKER_PROFILER_CHILDOPTS
Object workerProfileChildOpts = supervisorConf.get("worker.profiler.childopts");
if (workerProfileChildOpts != null) {
commandList.addAll(substituteVarNamesInObject(workerProfileChildOpts, substitutions));
}
commandList.add("-Dstorm.conf.file=" + ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")));
commandList.add("-Dstorm.options=" + ConfigUtils.concatIfNotNull(System.getProperty("storm.options")));
// use java.util.prefs.Base64 1 1 to test if JVM options are valid
commandList.add("java.util.prefs.Base64");
commandList.add("1");
commandList.add("1");

String cmd = String.join(" ", commandList);
try {
Process process = ClientSupervisorUtils.launchProcess(commandList, null, null, null, null);
int exitCode = process.waitFor();
switch (exitCode) {
case 0:
LOG.info("Success exitcode 0 when executing command: {}", cmd);
return true;

default:
LOG.error("Failure exitcode {} when executing command: {}", exitCode, cmd);
if (throwExceptionOnFailure) {
throw new InvalidTopologyException("Invalid topology JVM options tested via command \"" + cmd + "\"");
} else {
return false;
}
}
} catch (Exception ex) {
LOG.error("Exception executing command {}: {}", cmd, ex);
if (throwExceptionOnFailure) {
throw new InvalidTopologyException("Failed to test JVM options via command \"" + cmd + "\"");
} else {
return false;
}
}
}
}
78 changes: 72 additions & 6 deletions storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,28 @@
package org.apache.storm.utils;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.storm.Config;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.shade.com.google.common.collect.ImmutableList;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.shade.com.google.common.collect.ImmutableSet;
import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.thrift.transport.TTransportException;
import org.junit.Assert;
import org.junit.Test;

import static org.junit.Assert.*;

public class UtilsTest {

@Test
Expand Down Expand Up @@ -227,14 +235,72 @@ public void testIsValidConfEmptyNotEqual() {
public void checkVersionInfo() {
Map<String, String> versions = new HashMap<>();
String key = VersionInfo.getVersion();
assertNotEquals("Unknown", key, "Looks like we don't know what version of storm we are");
Assert.assertNotEquals("Unknown", key, "Looks like we don't know what version of storm we are");
versions.put(key, System.getProperty("java.class.path"));
Map<String, Object> conf = new HashMap<>();
conf.put(Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP, versions);
NavigableMap<String, IVersionInfo> alternativeVersions = Utils.getAlternativeVersionsMap(conf);
assertEquals(1, alternativeVersions.size());
Assert.assertEquals(1, alternativeVersions.size());
IVersionInfo found = alternativeVersions.get(key);
assertNotNull(found);
assertEquals(key, found.getVersion());
Assert.assertNotNull(found);
Assert.assertEquals(key, found.getVersion());
}
@Test
public void testFindSubstitutableVarNames() {
Map<String, Set<String>> testCases = new LinkedHashMap<>();
testCases.put("(1) this is a test %THIS_ISTEST-8% this is point %INValid-var% test %VALI_D-VAR% %%",
new TreeSet<>(Arrays.asList("THIS_ISTEST-8", "VALI_D-VAR")));
testCases.put("(2) this is a test %T1HIS_ISTEST-8% this is point %INValid-var% test %__VALI_D-VAR% %% %_1%",
new TreeSet<>(Arrays.asList("T1HIS_ISTEST-8", "__VALI_D-VAR", "_1")));

testCases.forEach((key, expected) -> {
Set<String> foundVars = Utils.findSubstitutableVarNames(key);
Set<String> disjunction = Sets.symmetricDifference(foundVars, expected);
Assert.assertEquals(String.format("ERROR: In \"%s\" found != expected, differences=\"%s\"", key, disjunction),
expected, foundVars);
});
}

@Test
public void testGetDummySubstitutions() {
Map<String, Object> dummySubs = Utils.WellKnownRuntimeSubstitutionVars.getDummySubstitutions();
int expectedSize = Utils.WellKnownRuntimeSubstitutionVars.values().length;
Set<String> expectedVars = Stream.of(Utils.WellKnownRuntimeSubstitutionVars.values())
.map(Utils.WellKnownRuntimeSubstitutionVars::getVarName)
.collect(Collectors.toSet());
int foundSize = dummySubs.size();
Set<String> foundVars = new HashSet<>(dummySubs.keySet());
Collection<String> missingVars = Sets.difference(expectedVars, foundVars);
if (!missingVars.isEmpty()) {
String msg = String.format("Expected %d variables, found %d, missing values for \"%s\"", expectedSize, foundSize, missingVars);
Assert.fail(msg);
}
Collection<String> extraVars = Sets.difference(foundVars, expectedVars);
if (!extraVars.isEmpty()) {
String msg = String.format("Expected %d variables, found %d, extra values for \"%s\"", expectedSize, foundSize, extraVars);
Assert.fail(msg);
}
}

@Test
public void testValidateWorkerLaunchOptions() throws InvalidTopologyException {
Map<String, Object> supervisorConf = new HashMap<>();
Map<String, Object> topoConf = new HashMap<>();

// both should be active
supervisorConf.put(Config.WORKER_CHILDOPTS, "-DchildOpts1=val1");
topoConf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-DchildOpts2=val2");

// topoConf will override
supervisorConf.put(Config.WORKER_GC_CHILDOPTS, "-DGcOpts=val1");
topoConf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, "-DGcOpts=val2");

supervisorConf.put("worker.profiler.childopts", "-DprofilerOpts=val1");
topoConf.put(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Df1=f2");

Assert.assertTrue(Utils.validateWorkerLaunchOptions(supervisorConf, topoConf, null, false));

topoConf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, "--XX:+UseG1GC"); // invalid syntax
Assert.assertFalse(Utils.validateWorkerLaunchOptions(supervisorConf, topoConf, null, false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -427,19 +427,21 @@ protected String getWorkerClassPath(String stormJar, List<String> dependencyLoca
private String substituteChildOptsInternal(String string, int memOnheap, int memOffheap) {
if (StringUtils.isNotBlank(string)) {
String p = String.valueOf(port);
string = string.replace("%ID%", p);
string = string.replace("%WORKER-ID%", workerId);
string = string.replace("%TOPOLOGY-ID%", topologyId);
string = string.replace("%WORKER-PORT%", p);
Map<String, Object> varSubstitutions = new HashMap<>();
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.ID.getVarName(), p);
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.WORKER_ID.getVarName(), workerId);
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.TOPOLOGY_ID.getVarName(), topologyId);
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.WORKER_PORT.getVarName(), p);
if (memOnheap > 0) {
string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.HEAP_MEM.getVarName(), String.valueOf(memOnheap));
}
if (memOffheap > 0) {
string = string.replace("%OFF-HEAP-MEM%", String.valueOf(memOffheap));
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.OFF_HEAP_MEM.getVarName(), String.valueOf(memOffheap));
}
if (memoryLimitMb > 0) {
string = string.replace("%LIMIT-MEM%", String.valueOf(memoryLimitMb));
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.LIMIT_MEM.getVarName(), String.valueOf(memoryLimitMb));
}
string = Utils.substituteVarNames(string, varSubstitutions);
}
return string;
}
Expand Down Expand Up @@ -568,14 +570,7 @@ private List<String> getWorkerProfilerChildOpts(int memOnheap, int memOffheap) {
}

protected String javaCmd(String cmd) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any need to keep this method now that it is available in Utils?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I took this out, a bunch of test classes started failing. They were using a Mock class to override this class method to return just "java". And in the test, they check the returned array. In order to avoid changing a whole bunch of other classes, I left this signature unchanged. But this definitely looks ugly and the tests should be fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'm fine keeping this for now.

String ret = null;
String javaHome = System.getenv().get("JAVA_HOME");
if (StringUtils.isNotBlank(javaHome)) {
ret = javaHome + File.separator + "bin" + File.separator + cmd;
} else {
ret = cmd;
}
return ret;
return Utils.getJavaCmd(cmd);
}

/**
Expand Down