-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[STORM-3683] Check if JVM options used for launching worker are valid. #3319
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,7 @@ | |
import java.util.NavigableMap; | ||
import java.util.Set; | ||
import java.util.TreeMap; | ||
import java.util.TreeSet; | ||
import java.util.UUID; | ||
import java.util.concurrent.Callable; | ||
import java.util.jar.JarFile; | ||
|
@@ -74,6 +75,7 @@ | |
import org.apache.storm.blobstore.BlobStore; | ||
import org.apache.storm.blobstore.ClientBlobStore; | ||
import org.apache.storm.blobstore.NimbusBlobStore; | ||
import org.apache.storm.daemon.supervisor.ClientSupervisorUtils; | ||
import org.apache.storm.generated.AuthorizationException; | ||
import org.apache.storm.generated.ClusterSummary; | ||
import org.apache.storm.generated.ComponentCommon; | ||
|
@@ -1907,4 +1909,201 @@ private void readArchive(ZipFile zipFile) throws IOException { | |
} | ||
} | ||
} | ||
|
||
/** | ||
* Return path to the Java command "x", prefixing with ${JAVA_HOME}/bin/ if JAVA_HOME system property is defined. | ||
* Otherwise return the supplied Java command unmodified. | ||
* | ||
* @param cmd Java command, e.g. "java", "jar" etc. | ||
* @return command string to use. | ||
*/ | ||
public static String getJavaCmd(String cmd) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for adding this. |
||
|
||
String ret = null; | ||
String javaHome = System.getenv().get("JAVA_HOME"); | ||
if (StringUtils.isNotBlank(javaHome)) { | ||
ret = javaHome + File.separator + "bin" + File.separator + cmd; | ||
} else { | ||
ret = cmd; | ||
} | ||
return ret; | ||
} | ||
|
||
/** | ||
* Find the list of substitutable variable names in supplied string. Substitutable variable names are | ||
* preceded and followed by a percent sign and composed of upper case characters, numbers, underscore and dash | ||
* characters. | ||
* | ||
* @param str string with 0 or more substitutable variables. | ||
* @return a set with 0 or more variable names. | ||
*/ | ||
public static Set<String> findSubstitutableVarNames(String str) { | ||
final String patternStr = "%([A-Z0-9_-]+)%"; | ||
final int matchedGroup = 1; // matched group is 1 because of the capturing parens in the patternStr | ||
|
||
Set<String> ret = new TreeSet<>(); | ||
Pattern p = Pattern.compile(patternStr); | ||
Matcher m = p.matcher(str); | ||
int matchCnt = 0; | ||
while (m.find()) { | ||
matchCnt++; | ||
try { | ||
ret.add(str.substring(m.start(matchedGroup), m.end(matchedGroup))); | ||
} catch (IllegalStateException ex) { | ||
String err = String.format("Internal Error in findSubstitutableVarNames(\"%s\"), pattern \"%s\", matchCnt=%d", | ||
str, patternStr, matchCnt, str); | ||
LOG.error(err, ex); | ||
} | ||
} | ||
return ret; | ||
} | ||
|
||
/** | ||
* Perform variable substitutions using the supplied substitutions. | ||
* Leave unchanged any placeholders for undefined variables. | ||
* | ||
* @param str original string with placeholders for replacements. | ||
* @param substitutions map of substitution variables and values. | ||
* @return final string with variable replaced with supplied substitutions. | ||
*/ | ||
public static String substituteVarNames(String str, Map<String, Object> substitutions) { | ||
Set<String> vars = findSubstitutableVarNames(str); | ||
for (String var : vars) { | ||
if (!substitutions.containsKey(var)) { | ||
continue; | ||
} | ||
String orig = "%" + var + "%"; | ||
String substitution = String.valueOf(substitutions.get(var)); | ||
str = str.replace(orig, substitution); | ||
} | ||
return str; | ||
} | ||
|
||
public static List<String> substituteVarNamesInObject(Object value, Map<String, Object> substitutions) { | ||
List<String> rets = new ArrayList<>(); | ||
if (value instanceof String) { | ||
String string = substituteVarNames((String) value, substitutions); | ||
if (StringUtils.isNotBlank(string)) { | ||
rets.add(string); | ||
} | ||
} else if (value instanceof List) { | ||
((List<String>) value).forEach(x -> { | ||
x = substituteVarNames(x, substitutions); | ||
if (StringUtils.isNotBlank(x)) { | ||
rets.add(x); | ||
} | ||
}); | ||
} | ||
return rets; | ||
} | ||
|
||
/** | ||
* Enumeration of variables that can be substituted in Java command string. | ||
*/ | ||
public enum WellKnownRuntimeSubstitutionVars { | ||
ID("ID"), | ||
WORKER_ID("WORKER-ID"), | ||
TOPOLOGY_ID("TOPOLOGY-ID"), | ||
WORKER_PORT("WORKER-PORT"), | ||
HEAP_MEM("HEAP-MEM"), | ||
OFF_HEAP_MEM("OFF-HEAP-MEM"), | ||
LIMIT_MEM("LIMIT-MEM"); | ||
|
||
private String varName; | ||
|
||
WellKnownRuntimeSubstitutionVars(String varName) { | ||
this.varName = varName; | ||
} | ||
|
||
public String getVarName() { | ||
return varName; | ||
} | ||
|
||
public static Map<String, Object> getDummySubstitutions() { | ||
Map<String, Object> ret = new HashMap<>(); | ||
ret.put(ID.getVarName(), "dummyId"); | ||
ret.put(WORKER_ID.getVarName(), "dummy-worker-id"); | ||
ret.put(TOPOLOGY_ID.getVarName(), "dummy-topology-id"); | ||
ret.put(WORKER_PORT.getVarName(), 6700); | ||
ret.put(HEAP_MEM.getVarName(), 1024); | ||
ret.put(OFF_HEAP_MEM.getVarName(), 1024); | ||
ret.put(LIMIT_MEM.getVarName(), 1024); | ||
return ret; | ||
} | ||
} | ||
|
||
/** | ||
* Launch a validation java command (java -showversion java.util.prefs.Base64 1 1) with JVM options | ||
* used in worker launch to validate JVM options. | ||
* | ||
* @param supervisorConf configuration for the supervisor. May be null. | ||
* @param topoConf configuration for the topology. Must be provided. | ||
* @param substitutions may be null in which case it is {@link WellKnownRuntimeSubstitutionVars#getDummySubstitutions()} | ||
* @param throwExceptionOnFailure if true then an exception is thrown instead of returning false | ||
* @return true if the option combination is valid, false otherwise (or throws InvalidTopologyException) | ||
*/ | ||
public static boolean validateWorkerLaunchOptions(Map<String, Object> supervisorConf, Map<String, Object> topoConf, | ||
Map<String, Object> substitutions, boolean throwExceptionOnFailure) | ||
throws InvalidTopologyException { | ||
// from storm-server/.../BasicContainer.mkLaunchCommand | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we commonize this, or grab them from a BasicContainer method? Hard to maintain this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BasicContainer is in storm-server package which depends on the storm-client package for compile. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that storm-server:BasicContainer.java is now using storm-client:Utils.WellKnownRuntimeSubstitutionVars. Is this sufficient? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like others to chime in here. I like the intent of this change, but the way we're replicating the code here so specifically I'm not so happy with. I understand the dependency issue causes problems. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like the replication either. It adds a lot of maintenance overhead. The suggestion I can think of is to move the whole check into server side at Some thoughts on this: |
||
if (supervisorConf == null) { | ||
supervisorConf = new HashMap<>(); | ||
} | ||
if (substitutions == null) { | ||
substitutions = WellKnownRuntimeSubstitutionVars.getDummySubstitutions(); | ||
} | ||
List<String> commandList = new ArrayList<>(); | ||
commandList.add(getJavaCmd("java")); | ||
commandList.add("-showversion"); | ||
|
||
Object logWriterChildOpts = topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS); | ||
if (logWriterChildOpts != null) { | ||
commandList.addAll(substituteVarNamesInObject(logWriterChildOpts, substitutions)); | ||
} | ||
|
||
commandList.add("-server"); | ||
commandList.addAll(substituteVarNamesInObject(supervisorConf.get(Config.WORKER_CHILDOPTS), substitutions)); | ||
commandList.addAll(substituteVarNamesInObject(topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), substitutions)); | ||
commandList.addAll(substituteVarNamesInObject(Utils.OR( | ||
topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS), | ||
supervisorConf.get(Config.WORKER_GC_CHILDOPTS)), substitutions)); | ||
|
||
// this config is only available in storm-server module DaemonConfig.WORKER_PROFILER_CHILDOPTS | ||
Object workerProfileChildOpts = supervisorConf.get("worker.profiler.childopts"); | ||
if (workerProfileChildOpts != null) { | ||
commandList.addAll(substituteVarNamesInObject(workerProfileChildOpts, substitutions)); | ||
} | ||
commandList.add("-Dstorm.conf.file=" + ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"))); | ||
commandList.add("-Dstorm.options=" + ConfigUtils.concatIfNotNull(System.getProperty("storm.options"))); | ||
// use java.util.prefs.Base64 1 1 to test if JVM options are valid | ||
commandList.add("java.util.prefs.Base64"); | ||
commandList.add("1"); | ||
commandList.add("1"); | ||
|
||
String cmd = String.join(" ", commandList); | ||
try { | ||
Process process = ClientSupervisorUtils.launchProcess(commandList, null, null, null, null); | ||
int exitCode = process.waitFor(); | ||
switch (exitCode) { | ||
case 0: | ||
LOG.info("Success exitcode 0 when executing command: {}", cmd); | ||
return true; | ||
|
||
default: | ||
LOG.error("Failure exitcode {} when executing command: {}", exitCode, cmd); | ||
if (throwExceptionOnFailure) { | ||
throw new InvalidTopologyException("Invalid topology JVM options tested via command \"" + cmd + "\""); | ||
} else { | ||
return false; | ||
} | ||
} | ||
} catch (Exception ex) { | ||
LOG.error("Exception executing command {}: {}", cmd, ex); | ||
if (throwExceptionOnFailure) { | ||
throw new InvalidTopologyException("Failed to test JVM options via command \"" + cmd + "\""); | ||
} else { | ||
return false; | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -427,19 +427,21 @@ protected String getWorkerClassPath(String stormJar, List<String> dependencyLoca | |
private String substituteChildOptsInternal(String string, int memOnheap, int memOffheap) { | ||
if (StringUtils.isNotBlank(string)) { | ||
String p = String.valueOf(port); | ||
string = string.replace("%ID%", p); | ||
string = string.replace("%WORKER-ID%", workerId); | ||
string = string.replace("%TOPOLOGY-ID%", topologyId); | ||
string = string.replace("%WORKER-PORT%", p); | ||
Map<String, Object> varSubstitutions = new HashMap<>(); | ||
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.ID.getVarName(), p); | ||
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.WORKER_ID.getVarName(), workerId); | ||
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.TOPOLOGY_ID.getVarName(), topologyId); | ||
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.WORKER_PORT.getVarName(), p); | ||
if (memOnheap > 0) { | ||
string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap)); | ||
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.HEAP_MEM.getVarName(), String.valueOf(memOnheap)); | ||
} | ||
if (memOffheap > 0) { | ||
string = string.replace("%OFF-HEAP-MEM%", String.valueOf(memOffheap)); | ||
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.OFF_HEAP_MEM.getVarName(), String.valueOf(memOffheap)); | ||
} | ||
if (memoryLimitMb > 0) { | ||
string = string.replace("%LIMIT-MEM%", String.valueOf(memoryLimitMb)); | ||
varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.LIMIT_MEM.getVarName(), String.valueOf(memoryLimitMb)); | ||
} | ||
string = Utils.substituteVarNames(string, varSubstitutions); | ||
} | ||
return string; | ||
} | ||
|
@@ -568,14 +570,7 @@ private List<String> getWorkerProfilerChildOpts(int memOnheap, int memOffheap) { | |
} | ||
|
||
protected String javaCmd(String cmd) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any need to keep this method now that it is available in Utils? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I took this out, a bunch of test classes started failing. They were using a Mock class to override this class method to return just "java". And in the test, they check the returned array. In order to avoid changing a whole bunch of other classes, I left this signature unchanged. But this definitely looks ugly and the tests should be fixed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. I'm fine keeping this for now. |
||
String ret = null; | ||
String javaHome = System.getenv().get("JAVA_HOME"); | ||
if (StringUtils.isNotBlank(javaHome)) { | ||
ret = javaHome + File.separator + "bin" + File.separator + cmd; | ||
} else { | ||
ret = cmd; | ||
} | ||
return ret; | ||
return Utils.getJavaCmd(cmd); | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A validation here is not likely to prevent the issue since it doesn't use the supervisorConf here. But in BasicContainer, it will use supervisorConf, which might have some default values.
We can add the check inside
Nimbus.submitTopologyWithOpts
(server side), and if it fails the check, throws InvalidTopologyException. This can avoid code duplication and avoid exposing too many method on the client side.But the problem is still it will use nimbusConf instead of supervisorConf, unless we can find a clean way to use the same source of truth.