diff --git a/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java b/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java index 3065b23d6..53752f6af 100644 --- a/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java +++ b/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java @@ -140,8 +140,35 @@ public Map> getTargets(String componentId) { public String toJSONString() { Map obj = new HashMap(); obj.put("task->component", _taskToComponent); - // TODO: jsonify StormTopology - // at the minimum should send source info + Map bolts = _topology.get_bolts(); + Map spouts = _topology.get_spouts(); + for (String compId : _taskToComponent.values()) { + Map compMap = new HashMap(); + if (bolts.containsKey(compId)) { + List jsonSources = new ArrayList(); + Map sources = getSources(compId); + for (GlobalStreamId sourceId : sources.keySet()) { + jsonSources.add(sourceId.get_componentId()); + } + compMap.put("sources", jsonSources); + } + + if (spouts.containsKey(compId) || bolts.containsKey(compId)) { + List jsonTargets = new ArrayList(); + for (Map targets : getTargets(compId).values()) { + for (String target : targets.keySet()) { + jsonTargets.add(target); + } + } + if (!jsonTargets.isEmpty()) { + compMap.put("targets", jsonTargets); + } + } + + if (!compMap.isEmpty()) { + obj.put(compId, compMap); + } + } return JSONValue.toJSONString(obj); } diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java index 49c428ae3..309a8c69e 100644 --- a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java +++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java @@ -40,6 +40,7 @@ public Number launch(Map conf, TopologyContext context) throws IOException { setupInfo.put("pidDir", context.getPIDDir()); setupInfo.put("conf", conf); setupInfo.put("context", context); + setupInfo.put("componentId", context.getThisComponentId()); writeMessage(setupInfo); return (Number)readMessage().get("pid"); diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py index f42e42694..d5292de1c 100755 --- a/storm-core/src/multilang/py/storm.py +++ b/storm-core/src/multilang/py/storm.py @@ -121,7 +121,9 @@ def log(msg): def initComponent(): setupInfo = readMsg() sendpid(setupInfo['pidDir']) - return [setupInfo['conf'], setupInfo['context']] + context = setupInfo['context'] + context['componentId'] = setupInfo['conponentId'] + return setupInfo['conf'], context class Tuple(object): def __init__(self, id, component, stream, task, values):