diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java index f78227225ad..e54b166b3de 100644 --- a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java +++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java @@ -528,6 +528,7 @@ private static void validateConfs(Map topoConf, StormTopology to InvalidTopologyException, AuthorizationException { ConfigValidation.validateTopoConf(topoConf); Utils.validateTopologyBlobStoreMap(topoConf); + Utils.validateWorkerLaunchOptions(null, topoConf, null, true); } /** diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java index aca356a8b97..8753fcf9a42 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java @@ -60,6 +60,7 @@ import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.Callable; import java.util.jar.JarFile; @@ -74,6 +75,7 @@ import org.apache.storm.blobstore.BlobStore; import org.apache.storm.blobstore.ClientBlobStore; import org.apache.storm.blobstore.NimbusBlobStore; +import org.apache.storm.daemon.supervisor.ClientSupervisorUtils; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.ClusterSummary; import org.apache.storm.generated.ComponentCommon; @@ -1907,4 +1909,201 @@ private void readArchive(ZipFile zipFile) throws IOException { } } } + + /** + * Return path to the Java command "x", prefixing with ${JAVA_HOME}/bin/ if JAVA_HOME system property is defined. + * Otherwise return the supplied Java command unmodified. + * + * @param cmd Java command, e.g. "java", "jar" etc. + * @return command string to use. + */ + public static String getJavaCmd(String cmd) { + + String ret = null; + String javaHome = System.getenv().get("JAVA_HOME"); + if (StringUtils.isNotBlank(javaHome)) { + ret = javaHome + File.separator + "bin" + File.separator + cmd; + } else { + ret = cmd; + } + return ret; + } + + /** + * Find the list of substitutable variable names in supplied string. Substitutable variable names are + * preceded and followed by a percent sign and composed of upper case characters, numbers, underscore and dash + * characters. + * + * @param str string with 0 or more substitutable variables. + * @return a set with 0 or more variable names. + */ + public static Set findSubstitutableVarNames(String str) { + final String patternStr = "%([A-Z0-9_-]+)%"; + final int matchedGroup = 1; // matched group is 1 because of the capturing parens in the patternStr + + Set ret = new TreeSet<>(); + Pattern p = Pattern.compile(patternStr); + Matcher m = p.matcher(str); + int matchCnt = 0; + while (m.find()) { + matchCnt++; + try { + ret.add(str.substring(m.start(matchedGroup), m.end(matchedGroup))); + } catch (IllegalStateException ex) { + String err = String.format("Internal Error in findSubstitutableVarNames(\"%s\"), pattern \"%s\", matchCnt=%d", + str, patternStr, matchCnt, str); + LOG.error(err, ex); + } + } + return ret; + } + + /** + * Perform variable substitutions using the supplied substitutions. + * Leave unchanged any placeholders for undefined variables. + * + * @param str original string with placeholders for replacements. + * @param substitutions map of substitution variables and values. + * @return final string with variable replaced with supplied substitutions. + */ + public static String substituteVarNames(String str, Map substitutions) { + Set vars = findSubstitutableVarNames(str); + for (String var : vars) { + if (!substitutions.containsKey(var)) { + continue; + } + String orig = "%" + var + "%"; + String substitution = String.valueOf(substitutions.get(var)); + str = str.replace(orig, substitution); + } + return str; + } + + public static List substituteVarNamesInObject(Object value, Map substitutions) { + List rets = new ArrayList<>(); + if (value instanceof String) { + String string = substituteVarNames((String) value, substitutions); + if (StringUtils.isNotBlank(string)) { + rets.add(string); + } + } else if (value instanceof List) { + ((List) value).forEach(x -> { + x = substituteVarNames(x, substitutions); + if (StringUtils.isNotBlank(x)) { + rets.add(x); + } + }); + } + return rets; + } + + /** + * Enumeration of variables that can be substituted in Java command string. + */ + public enum WellKnownRuntimeSubstitutionVars { + ID("ID"), + WORKER_ID("WORKER-ID"), + TOPOLOGY_ID("TOPOLOGY-ID"), + WORKER_PORT("WORKER-PORT"), + HEAP_MEM("HEAP-MEM"), + OFF_HEAP_MEM("OFF-HEAP-MEM"), + LIMIT_MEM("LIMIT-MEM"); + + private String varName; + + WellKnownRuntimeSubstitutionVars(String varName) { + this.varName = varName; + } + + public String getVarName() { + return varName; + } + + public static Map getDummySubstitutions() { + Map ret = new HashMap<>(); + ret.put(ID.getVarName(), "dummyId"); + ret.put(WORKER_ID.getVarName(), "dummy-worker-id"); + ret.put(TOPOLOGY_ID.getVarName(), "dummy-topology-id"); + ret.put(WORKER_PORT.getVarName(), 6700); + ret.put(HEAP_MEM.getVarName(), 1024); + ret.put(OFF_HEAP_MEM.getVarName(), 1024); + ret.put(LIMIT_MEM.getVarName(), 1024); + return ret; + } + } + + /** + * Launch a validation java command (java -showversion java.util.prefs.Base64 1 1) with JVM options + * used in worker launch to validate JVM options. + * + * @param supervisorConf configuration for the supervisor. May be null. + * @param topoConf configuration for the topology. Must be provided. + * @param substitutions may be null in which case it is {@link WellKnownRuntimeSubstitutionVars#getDummySubstitutions()} + * @param throwExceptionOnFailure if true then an exception is thrown instead of returning false + * @return true if the option combination is valid, false otherwise (or throws InvalidTopologyException) + */ + public static boolean validateWorkerLaunchOptions(Map supervisorConf, Map topoConf, + Map substitutions, boolean throwExceptionOnFailure) + throws InvalidTopologyException { + // from storm-server/.../BasicContainer.mkLaunchCommand + if (supervisorConf == null) { + supervisorConf = new HashMap<>(); + } + if (substitutions == null) { + substitutions = WellKnownRuntimeSubstitutionVars.getDummySubstitutions(); + } + List commandList = new ArrayList<>(); + commandList.add(getJavaCmd("java")); + commandList.add("-showversion"); + + Object logWriterChildOpts = topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS); + if (logWriterChildOpts != null) { + commandList.addAll(substituteVarNamesInObject(logWriterChildOpts, substitutions)); + } + + commandList.add("-server"); + commandList.addAll(substituteVarNamesInObject(supervisorConf.get(Config.WORKER_CHILDOPTS), substitutions)); + commandList.addAll(substituteVarNamesInObject(topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), substitutions)); + commandList.addAll(substituteVarNamesInObject(Utils.OR( + topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS), + supervisorConf.get(Config.WORKER_GC_CHILDOPTS)), substitutions)); + + // this config is only available in storm-server module DaemonConfig.WORKER_PROFILER_CHILDOPTS + Object workerProfileChildOpts = supervisorConf.get("worker.profiler.childopts"); + if (workerProfileChildOpts != null) { + commandList.addAll(substituteVarNamesInObject(workerProfileChildOpts, substitutions)); + } + commandList.add("-Dstorm.conf.file=" + ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"))); + commandList.add("-Dstorm.options=" + ConfigUtils.concatIfNotNull(System.getProperty("storm.options"))); + // use java.util.prefs.Base64 1 1 to test if JVM options are valid + commandList.add("java.util.prefs.Base64"); + commandList.add("1"); + commandList.add("1"); + + String cmd = String.join(" ", commandList); + try { + Process process = ClientSupervisorUtils.launchProcess(commandList, null, null, null, null); + int exitCode = process.waitFor(); + switch (exitCode) { + case 0: + LOG.info("Success exitcode 0 when executing command: {}", cmd); + return true; + + default: + LOG.error("Failure exitcode {} when executing command: {}", exitCode, cmd); + if (throwExceptionOnFailure) { + throw new InvalidTopologyException("Invalid topology JVM options tested via command \"" + cmd + "\""); + } else { + return false; + } + } + } catch (Exception ex) { + LOG.error("Exception executing command {}: {}", cmd, ex); + if (throwExceptionOnFailure) { + throw new InvalidTopologyException("Failed to test JVM options via command \"" + cmd + "\""); + } else { + return false; + } + } + } } diff --git a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java index d42a7b193eb..c846aa6311d 100644 --- a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java +++ b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java @@ -19,20 +19,28 @@ package org.apache.storm.utils; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.storm.Config; +import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.shade.com.google.common.collect.ImmutableList; import org.apache.storm.shade.com.google.common.collect.ImmutableMap; import org.apache.storm.shade.com.google.common.collect.ImmutableSet; +import org.apache.storm.shade.com.google.common.collect.Sets; import org.apache.storm.thrift.transport.TTransportException; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.*; - public class UtilsTest { @Test @@ -227,14 +235,72 @@ public void testIsValidConfEmptyNotEqual() { public void checkVersionInfo() { Map versions = new HashMap<>(); String key = VersionInfo.getVersion(); - assertNotEquals("Unknown", key, "Looks like we don't know what version of storm we are"); + Assert.assertNotEquals("Unknown", key, "Looks like we don't know what version of storm we are"); versions.put(key, System.getProperty("java.class.path")); Map conf = new HashMap<>(); conf.put(Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP, versions); NavigableMap alternativeVersions = Utils.getAlternativeVersionsMap(conf); - assertEquals(1, alternativeVersions.size()); + Assert.assertEquals(1, alternativeVersions.size()); IVersionInfo found = alternativeVersions.get(key); - assertNotNull(found); - assertEquals(key, found.getVersion()); + Assert.assertNotNull(found); + Assert.assertEquals(key, found.getVersion()); + } + @Test + public void testFindSubstitutableVarNames() { + Map> testCases = new LinkedHashMap<>(); + testCases.put("(1) this is a test %THIS_ISTEST-8% this is point %INValid-var% test %VALI_D-VAR% %%", + new TreeSet<>(Arrays.asList("THIS_ISTEST-8", "VALI_D-VAR"))); + testCases.put("(2) this is a test %T1HIS_ISTEST-8% this is point %INValid-var% test %__VALI_D-VAR% %% %_1%", + new TreeSet<>(Arrays.asList("T1HIS_ISTEST-8", "__VALI_D-VAR", "_1"))); + + testCases.forEach((key, expected) -> { + Set foundVars = Utils.findSubstitutableVarNames(key); + Set disjunction = Sets.symmetricDifference(foundVars, expected); + Assert.assertEquals(String.format("ERROR: In \"%s\" found != expected, differences=\"%s\"", key, disjunction), + expected, foundVars); + }); + } + + @Test + public void testGetDummySubstitutions() { + Map dummySubs = Utils.WellKnownRuntimeSubstitutionVars.getDummySubstitutions(); + int expectedSize = Utils.WellKnownRuntimeSubstitutionVars.values().length; + Set expectedVars = Stream.of(Utils.WellKnownRuntimeSubstitutionVars.values()) + .map(Utils.WellKnownRuntimeSubstitutionVars::getVarName) + .collect(Collectors.toSet()); + int foundSize = dummySubs.size(); + Set foundVars = new HashSet<>(dummySubs.keySet()); + Collection missingVars = Sets.difference(expectedVars, foundVars); + if (!missingVars.isEmpty()) { + String msg = String.format("Expected %d variables, found %d, missing values for \"%s\"", expectedSize, foundSize, missingVars); + Assert.fail(msg); + } + Collection extraVars = Sets.difference(foundVars, expectedVars); + if (!extraVars.isEmpty()) { + String msg = String.format("Expected %d variables, found %d, extra values for \"%s\"", expectedSize, foundSize, extraVars); + Assert.fail(msg); + } + } + + @Test + public void testValidateWorkerLaunchOptions() throws InvalidTopologyException { + Map supervisorConf = new HashMap<>(); + Map topoConf = new HashMap<>(); + + // both should be active + supervisorConf.put(Config.WORKER_CHILDOPTS, "-DchildOpts1=val1"); + topoConf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-DchildOpts2=val2"); + + // topoConf will override + supervisorConf.put(Config.WORKER_GC_CHILDOPTS, "-DGcOpts=val1"); + topoConf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, "-DGcOpts=val2"); + + supervisorConf.put("worker.profiler.childopts", "-DprofilerOpts=val1"); + topoConf.put(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Df1=f2"); + + Assert.assertTrue(Utils.validateWorkerLaunchOptions(supervisorConf, topoConf, null, false)); + + topoConf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, "--XX:+UseG1GC"); // invalid syntax + Assert.assertFalse(Utils.validateWorkerLaunchOptions(supervisorConf, topoConf, null, false)); } } diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java index 51fef1a0e1b..004023a4978 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java @@ -427,19 +427,21 @@ protected String getWorkerClassPath(String stormJar, List dependencyLoca private String substituteChildOptsInternal(String string, int memOnheap, int memOffheap) { if (StringUtils.isNotBlank(string)) { String p = String.valueOf(port); - string = string.replace("%ID%", p); - string = string.replace("%WORKER-ID%", workerId); - string = string.replace("%TOPOLOGY-ID%", topologyId); - string = string.replace("%WORKER-PORT%", p); + Map varSubstitutions = new HashMap<>(); + varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.ID.getVarName(), p); + varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.WORKER_ID.getVarName(), workerId); + varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.TOPOLOGY_ID.getVarName(), topologyId); + varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.WORKER_PORT.getVarName(), p); if (memOnheap > 0) { - string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap)); + varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.HEAP_MEM.getVarName(), String.valueOf(memOnheap)); } if (memOffheap > 0) { - string = string.replace("%OFF-HEAP-MEM%", String.valueOf(memOffheap)); + varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.OFF_HEAP_MEM.getVarName(), String.valueOf(memOffheap)); } if (memoryLimitMb > 0) { - string = string.replace("%LIMIT-MEM%", String.valueOf(memoryLimitMb)); + varSubstitutions.put(Utils.WellKnownRuntimeSubstitutionVars.LIMIT_MEM.getVarName(), String.valueOf(memoryLimitMb)); } + string = Utils.substituteVarNames(string, varSubstitutions); } return string; } @@ -568,14 +570,7 @@ private List getWorkerProfilerChildOpts(int memOnheap, int memOffheap) { } protected String javaCmd(String cmd) { - String ret = null; - String javaHome = System.getenv().get("JAVA_HOME"); - if (StringUtils.isNotBlank(javaHome)) { - ret = javaHome + File.separator + "bin" + File.separator + cmd; - } else { - ret = cmd; - } - return ret; + return Utils.getJavaCmd(cmd); } /**