From 9ec852dd03296d46f38659a6769036b9bf460839 Mon Sep 17 00:00:00 2001 From: Jacob Perkins Date: Thu, 7 Nov 2013 12:00:57 -0600 Subject: [PATCH 1/2] Multiple output streams for trident --- storm-core/src/jvm/storm/trident/Stream.java | 64 +++++++++++ .../trident/fluent/StreamCollection.java | 27 +++++ .../jvm/storm/trident/fluent/StreamMap.java | 54 +++++++++ .../trident/operation/MultiFunction.java | 8 ++ .../planner/processor/MultiEachProcessor.java | 104 ++++++++++++++++++ .../planner/processor/MultiOutputMapping.java | 44 ++++++++ .../processor/MultiStreamCollector.java | 50 +++++++++ .../trident/testing/DebugMultiStream.java | 37 +++++++ .../trident/tuple/MultiOutputFactory.java | 69 ++++++++++++ 9 files changed, 457 insertions(+) create mode 100644 storm-core/src/jvm/storm/trident/fluent/StreamCollection.java create mode 100644 storm-core/src/jvm/storm/trident/fluent/StreamMap.java create mode 100644 storm-core/src/jvm/storm/trident/operation/MultiFunction.java create mode 100644 storm-core/src/jvm/storm/trident/planner/processor/MultiEachProcessor.java create mode 100644 storm-core/src/jvm/storm/trident/planner/processor/MultiOutputMapping.java create mode 100644 storm-core/src/jvm/storm/trident/planner/processor/MultiStreamCollector.java create mode 100644 storm-core/src/jvm/storm/trident/testing/DebugMultiStream.java create mode 100644 storm-core/src/jvm/storm/trident/tuple/MultiOutputFactory.java diff --git a/storm-core/src/jvm/storm/trident/Stream.java b/storm-core/src/jvm/storm/trident/Stream.java index e30289bc5..ea8b712e1 100644 --- a/storm-core/src/jvm/storm/trident/Stream.java +++ b/storm-core/src/jvm/storm/trident/Stream.java @@ -1,11 +1,15 @@ package storm.trident; +import java.util.Map; +import java.util.HashMap; import backtype.storm.generated.Grouping; import backtype.storm.generated.NullStruct; import storm.trident.fluent.ChainedAggregatorDeclarer; import backtype.storm.grouping.CustomStreamGrouping; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; +import storm.trident.fluent.StreamMap; +import storm.trident.fluent.StreamCollection; import storm.trident.fluent.GlobalAggregationScheme; import storm.trident.fluent.GroupedStream; import storm.trident.fluent.IAggregatableStream; @@ -14,6 +18,7 @@ import storm.trident.operation.CombinerAggregator; import storm.trident.operation.Filter; import storm.trident.operation.Function; +import storm.trident.operation.MultiFunction; import storm.trident.operation.ReducerAggregator; import storm.trident.operation.impl.CombinerAggStateUpdater; import storm.trident.operation.impl.FilterExecutor; @@ -29,6 +34,9 @@ import storm.trident.planner.NodeStateInfo; import storm.trident.planner.PartitionNode; import storm.trident.planner.ProcessorNode; +import storm.trident.planner.TupleReceiver; +import storm.trident.planner.processor.MultiEachProcessor; +import storm.trident.planner.processor.MultiOutputMapping; import storm.trident.planner.processor.AggregateProcessor; import storm.trident.planner.processor.EachProcessor; import storm.trident.planner.processor.PartitionPersistProcessor; @@ -126,6 +134,62 @@ public Stream each(Fields inputFields, Function function, Fields functionFields) new EachProcessor(inputFields, function))); } + /** + Special kind of each that allows user to route tuples to multiple output streams. + The output streams, one per key in StreamMap, are available in the returned + StreamCollection object. + */ + public StreamCollection each(Fields inputFields, MultiFunction function, StreamMap map) { + projectionValidation(inputFields); + + // + // Most important object here. It maintains the mapping (graph) from stream names + // to fields and processors. It gets serialized and passed to the backend. This work + // should ultimately happen in the TridentContext. Unfortunately, the TridentContext + // class is currently tied strongly to the notion of one output stream. + // + MultiOutputMapping streamOutputMap = new MultiOutputMapping(); + Map outputNodeMap = new HashMap(); + + // + // Walk the simplistic output mapping (StreamMap) created by the user and + // create identitity processor nodes for each output stream. + // + for (String streamName : map.getStreamNames()) { + Fields streamFields = map.getFields(streamName); + + Node outputNode = new ProcessorNode( + _topology.getUniqueStreamId(), + _name, + TridentUtils.fieldsConcat(getOutputFields(), streamFields), + new Fields(), // no output fields; pass input fields to appropriate output factory + new MultiEachProcessor.MultiEachProcessorChild(TridentUtils.fieldsConcat(getOutputFields(), streamFields), streamName)); + outputNodeMap.put(streamName, outputNode); + streamOutputMap.addStream(streamName, streamFields, ((ProcessorNode)outputNode).processor); + } + + // + // This processor node, labeled workNode, does all the actual work + // + Node workNode = new ProcessorNode(_topology.getUniqueStreamId(), + _name, + TridentUtils.fieldsConcat(getOutputFields(), map.getScopedOutputFields()), + map.getScopedOutputFields(), // each set of fields is scoped by it's corresponding stream name + new MultiEachProcessor(inputFields, function, streamOutputMap)); + + Stream intermediate = _topology.addSourcedNode(this, workNode); + + // + // Link the intermediate stream to each outputNode + // + StreamCollection result = new StreamCollection(); + for (Map.Entry link : outputNodeMap.entrySet()) { + result.addStream(link.getKey(), _topology.addSourcedNode(intermediate, link.getValue())); + } + + return result; + }; + //creates brand new tuples with brand new fields @Override public Stream partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) { diff --git a/storm-core/src/jvm/storm/trident/fluent/StreamCollection.java b/storm-core/src/jvm/storm/trident/fluent/StreamCollection.java new file mode 100644 index 000000000..98c7fca9a --- /dev/null +++ b/storm-core/src/jvm/storm/trident/fluent/StreamCollection.java @@ -0,0 +1,27 @@ +package storm.trident.fluent; + +import java.util.Map; +import java.util.HashMap; + +import storm.trident.Stream; + +public class StreamCollection { + + protected Map streams; + + public StreamCollection() { + streams = new HashMap(); + } + + public void addStream(String name, Stream s) { + streams.put(name,s); + } + + public Stream getStream(String name) { + return streams.get(name); + } + + public boolean hasStream(String name) { + return streams.containsKey(name); + } +} diff --git a/storm-core/src/jvm/storm/trident/fluent/StreamMap.java b/storm-core/src/jvm/storm/trident/fluent/StreamMap.java new file mode 100644 index 000000000..0c3e8f776 --- /dev/null +++ b/storm-core/src/jvm/storm/trident/fluent/StreamMap.java @@ -0,0 +1,54 @@ +package storm.trident.fluent; + +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.io.Serializable; +import backtype.storm.tuple.Fields; + +public class StreamMap implements Serializable { + protected Map _map; // Logical map from stream names to fields + protected Fields _allOutputFields; + + + public StreamMap(Map map) { + _map = map; + } + + public Set getStreamNames() { + return _map.keySet(); + } + + public Fields getFields(String streamName) { + Fields f = _map.get(streamName); + //return new Fields(namespaceFields(streamName,f)); + return f; + } + + /** + Returns the output fields of all the declared + streams, scoped by their respective streams + */ + public Fields getScopedOutputFields() { + if (_allOutputFields != null) + return _allOutputFields; + + List resultFields = new ArrayList(); + for (Map.Entry streamDeclaration : _map.entrySet()) { + String streamName = streamDeclaration.getKey(); + Fields outputFields = streamDeclaration.getValue(); + resultFields.addAll(namespaceFields(streamName, outputFields)); + } + _allOutputFields = new Fields(resultFields); + return _allOutputFields; + } + + private List namespaceFields(String nameSpace, Fields fields) { + List namespaced = new ArrayList(fields.size()); + for (String field : fields) { + namespaced.add(nameSpace + "::" + field); + } + return namespaced; + } +} diff --git a/storm-core/src/jvm/storm/trident/operation/MultiFunction.java b/storm-core/src/jvm/storm/trident/operation/MultiFunction.java new file mode 100644 index 000000000..54932c120 --- /dev/null +++ b/storm-core/src/jvm/storm/trident/operation/MultiFunction.java @@ -0,0 +1,8 @@ +package storm.trident.operation; + +import storm.trident.tuple.TridentTuple; +import storm.trident.planner.processor.MultiStreamCollector; + +public interface MultiFunction extends EachOperation { + void execute(TridentTuple tuple, MultiStreamCollector collector); +} diff --git a/storm-core/src/jvm/storm/trident/planner/processor/MultiEachProcessor.java b/storm-core/src/jvm/storm/trident/planner/processor/MultiEachProcessor.java new file mode 100644 index 000000000..3d92b09ed --- /dev/null +++ b/storm-core/src/jvm/storm/trident/planner/processor/MultiEachProcessor.java @@ -0,0 +1,104 @@ +package storm.trident.planner.processor; + +import java.util.Map; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import backtype.storm.tuple.Fields; +import backtype.storm.task.TopologyContext; +import storm.trident.planner.TupleReceiver; +import storm.trident.planner.ProcessorContext; +import storm.trident.planner.TridentProcessor; +import storm.trident.operation.MultiFunction; +import storm.trident.operation.TridentOperationContext; +import storm.trident.tuple.TridentTuple; +import storm.trident.tuple.MultiOutputFactory; +import storm.trident.tuple.TridentTuple.Factory; +import storm.trident.tuple.TridentTupleView.ProjectionFactory; +import storm.trident.tuple.TridentTupleView.OperationOutputFactory; + +/** + For working with multiple output streams. + */ +public class MultiEachProcessor implements TridentProcessor { + + public static Logger LOG = LoggerFactory.getLogger(MultiEachProcessor.class); + + MultiFunction _function; + TridentContext _context; + MultiStreamCollector _collector; + Fields _inputFields; + ProjectionFactory _projection; + MultiOutputMapping _outputMap; + MultiOutputFactory _outputFactory; + + public MultiEachProcessor(Fields inputFields, MultiFunction function, MultiOutputMapping outputMap) { + _function = function; + _inputFields = inputFields; + _outputMap = outputMap; + } + + @Override + public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) { + List parents = tridentContext.getParentTupleFactories(); + if(parents.size()!=1) { + throw new RuntimeException("MultiEach operation can only have one parent"); + } + _context = tridentContext; + _collector = new MultiStreamCollector(tridentContext, _outputMap); + _projection = new ProjectionFactory(parents.get(0), _inputFields); + _function.prepare(conf, new TridentOperationContext(context, _projection)); + _outputFactory = new MultiOutputFactory(parents.get(0), _outputMap); + } + + @Override + public void cleanup() { + _function.cleanup(); + } + + @Override + public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) { + _collector.setContext(processorContext, tuple); + _function.execute(_projection.create(tuple), _collector); + } + + @Override + public void startBatch(ProcessorContext processorContext) { + } + + @Override + public void finishBatch(ProcessorContext processorContext) { + } + + @Override + public Factory getOutputFactory() { + return _outputFactory; + } + + /** + Not intended for use outside of the multi each context. Pulls the + parent output factory that corresponds to its input stream + */ + public static class MultiEachProcessorChild extends ProjectedProcessor { + + String _streamName; + + public MultiEachProcessorChild(Fields projectFields, String streamName) { + super(projectFields); + _streamName = streamName; + } + + @Override + public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) { + if(tridentContext.getParentTupleFactories().size()!=1) { + throw new RuntimeException("MultiEachProcessorChild processor can only have one parent"); + } + _context = tridentContext; + + Factory parentFactory = tridentContext.getParentTupleFactories().get(0); + OperationOutputFactory realParent = ((MultiOutputFactory)parentFactory).getFactory(_streamName); + + _factory = new ProjectionFactory(realParent, _projectFields); + } + } +} diff --git a/storm-core/src/jvm/storm/trident/planner/processor/MultiOutputMapping.java b/storm-core/src/jvm/storm/trident/planner/processor/MultiOutputMapping.java new file mode 100644 index 000000000..878727b71 --- /dev/null +++ b/storm-core/src/jvm/storm/trident/planner/processor/MultiOutputMapping.java @@ -0,0 +1,44 @@ +package storm.trident.planner.processor; + +import java.io.Serializable; +import java.util.Map; +import java.util.Set; +import java.util.HashMap; +import backtype.storm.tuple.Fields; +import storm.trident.planner.TupleReceiver; + +/** + Maintains, naively, a mapping from stream names to fields and tuple + receivers. Each stream should only ever (in the current impl of a + multi-each) have one receiver. +

+ FIXME: Does much of the work that the TridentContext + is supposed to do. + */ +public class MultiOutputMapping implements Serializable { + + protected Map _map; + protected Map _fields; + + public MultiOutputMapping() { + _map = new HashMap(); + _fields = new HashMap(); + } + + public void addStream(String name, Fields fields, TupleReceiver rcvr) { + _map.put(name, rcvr); + _fields.put(name, fields); + } + + public TupleReceiver getReceiver(String name) { + return _map.get(name); + } + + public Set getStreamNames() { + return _map.keySet(); + } + + public Fields getFields(String name) { + return _fields.get(name); + } +} diff --git a/storm-core/src/jvm/storm/trident/planner/processor/MultiStreamCollector.java b/storm-core/src/jvm/storm/trident/planner/processor/MultiStreamCollector.java new file mode 100644 index 000000000..1685c5f8f --- /dev/null +++ b/storm-core/src/jvm/storm/trident/planner/processor/MultiStreamCollector.java @@ -0,0 +1,50 @@ +package storm.trident.planner.processor; + + +import java.util.List; +import storm.trident.planner.ProcessorContext; +import storm.trident.planner.TupleReceiver; +import storm.trident.tuple.TridentTuple; +import storm.trident.tuple.TridentTuple.Factory; +import storm.trident.tuple.MultiOutputFactory; +import storm.trident.tuple.TridentTupleView; +import storm.trident.tuple.TridentTupleView.OperationOutputFactory; + +/** + A collector for MultiFunction. Intentionally does not implement + the trident collector interface. The user MUST specify a stream + to route output to. + */ +public class MultiStreamCollector { + + MultiOutputFactory _factory; + MultiOutputMapping _outputMap; + TridentContext _triContext; + TridentTuple tuple; + ProcessorContext context; + + public MultiStreamCollector(TridentContext context, MultiOutputMapping outputMap) { + _triContext = context; + _factory = new MultiOutputFactory(context.getParentTupleFactories().get(0), outputMap); + _triContext = context; + _outputMap = outputMap; + } + + public void setContext(ProcessorContext pc, TridentTuple t) { + this.context = pc; + this.tuple = t; + } + + /** + User interface. Routes values to the appropriate stream + */ + public void emitTo(String name, List values) { + OperationOutputFactory f = _factory.getFactory(name); + TridentTuple toEmit = f.create((TridentTupleView)tuple, values); + _outputMap.getReceiver(name).execute(context, _triContext.getOutStreamId(), toEmit); + } + + public void reportError(Throwable t) { + _triContext.getDelegateCollector().reportError(t); + } +} diff --git a/storm-core/src/jvm/storm/trident/testing/DebugMultiStream.java b/storm-core/src/jvm/storm/trident/testing/DebugMultiStream.java new file mode 100644 index 000000000..aeb1ba679 --- /dev/null +++ b/storm-core/src/jvm/storm/trident/testing/DebugMultiStream.java @@ -0,0 +1,37 @@ +package storm.trident.testing; + +import java.util.Map; + +import backtype.storm.tuple.Values; +import storm.trident.operation.MultiFunction; +import storm.trident.planner.processor.MultiStreamCollector; +import storm.trident.operation.TridentOperationContext; +import storm.trident.tuple.TridentTuple; + +public class DebugMultiStream implements MultiFunction { + + protected String _streamA; + protected String _streamB; + + public DebugMultiStream(String streamA, String streamB) { + _streamA = streamA; + _streamB = streamB; + } + + @Override + public void execute(TridentTuple tuple, MultiStreamCollector collector) { + if (tuple.getString(0).startsWith("A")) { + collector.emitTo(_streamA, new Values("A")); + } else { + collector.emitTo(_streamB, new Values("B")); + } + } + + @Override + public void prepare(Map conf, TridentOperationContext context) { + } + + @Override + public void cleanup() { + } +} diff --git a/storm-core/src/jvm/storm/trident/tuple/MultiOutputFactory.java b/storm-core/src/jvm/storm/trident/tuple/MultiOutputFactory.java new file mode 100644 index 000000000..3c54f583d --- /dev/null +++ b/storm-core/src/jvm/storm/trident/tuple/MultiOutputFactory.java @@ -0,0 +1,69 @@ +package storm.trident.tuple; + +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import clojure.lang.IPersistentVector; +import clojure.lang.PersistentVector; +import clojure.lang.RT; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import storm.trident.tuple.TridentTupleView.OperationOutputFactory; +import storm.trident.planner.processor.MultiOutputMapping; + +/** + Maintains several output factories, one per output stream, + for use with MultiEachProcessor + */ +public class MultiOutputFactory implements TridentTuple.Factory { + + Map _factories; + + public MultiOutputFactory(TridentTuple.Factory parent, MultiOutputMapping map) { + + _factories = new HashMap(); + for (String streamName : map.getStreamNames()) { + _factories.put(streamName, new OperationOutputFactory(parent, map.getFields(streamName))); + } + } + + /** + Main way of interacting with MultiOutputFactory; returns + the factory that corresponds with the input stream name + */ + public OperationOutputFactory getFactory(String name) { + return _factories.get(name); + } + + + /** TO SATISFY FACTORY INTERFACE ONLY; UNSAFE SINCE STATE MUST BE MANAGED EXTERNALLY **/ + + String _currentStream; + + public void setStream(String streamName) { + _currentStream = streamName; + } + + public TridentTuple create(TridentTupleView parent, List selfVals) { + return _factories.get(_currentStream).create(parent, selfVals); + } + + @Override + public Map getFieldIndex() { + return _factories.get(_currentStream).getFieldIndex(); + } + + @Override + public int numDelegates() { + return _factories.get(_currentStream).numDelegates(); + } + + @Override + public List getOutputFields() { + return _factories.get(_currentStream).getOutputFields(); + } +} From a898616a964574ca96399515b4019ee29613fdc6 Mon Sep 17 00:00:00 2001 From: Jacob Perkins Date: Tue, 12 Nov 2013 10:57:35 -0600 Subject: [PATCH 2/2] added tests for new multi outputstream functionality --- .../clj/storm/trident/integration_test.clj | 56 ++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/storm-core/test/clj/storm/trident/integration_test.clj b/storm-core/test/clj/storm/trident/integration_test.clj index f617c94bb..2aafd98dc 100644 --- a/storm-core/test/clj/storm/trident/integration_test.clj +++ b/storm-core/test/clj/storm/trident/integration_test.clj @@ -2,7 +2,8 @@ (:use [clojure test]) (:require [backtype.storm [testing :as t]]) (:import [storm.trident.testing Split CountAsAggregator StringLength TrueFilter - MemoryMapState$Factory]) + MemoryMapState$Factory DebugMultiStream]) + (:import [storm.trident.fluent StreamMap]) (:import [storm.trident.state StateSpec]) (:import [storm.trident.operation.impl CombinerAggStateUpdater]) (:use [storm.trident testing]) @@ -126,7 +127,37 @@ (is (= [[0]] (exec-drpc drpc "numwords" ""))) (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8"))) ))))) - + +(deftest test-multi-each + (t/with-local-cluster [cluster] + (with-drpc [drpc] + (letlocals + (bind topo (TridentTopology.)) + (bind multi-stream + (-> topo + (.newDRPCStream "multioutput" drpc) + (.each (fields "args") (DebugMultiStream. "streamA" "streamB") (StreamMap. { "streamA" (fields "aValue") "streamB" (fields "bValue") })) + )) + (bind s1 + (-> multi-stream + (.getStream "streamA") + (.each (fields "aValue") (Split.) (fields "word")) + )) + ;; Rearrange fields here to illustrate they're from different streams + (bind s2 + (-> multi-stream + (.getStream "streamB") + (.each (fields "bValue") (Split.) (fields "word")) + (.project (fields "word" "bValue" "args")) + )) + + (.merge topo [s1 s2]) + (with-topology [cluster topo] + (is (t/ms= [["A: This starts with A" "A" "A"]] (exec-drpc drpc "multioutput" "A: This starts with A"))) + (is (t/ms= [["B" "B" "B: This starts with B"]] (exec-drpc drpc "multioutput" "B: This starts with B"))) + ))))) + + (deftest test-split-merge (t/with-local-cluster [cluster] (with-drpc [drpc] @@ -202,6 +233,25 @@ )) (bind stream (-> topo (.newStream "tester" feeder))) + (bind multi-stream (-> stream + (.each (fields "sentence") (DebugMultiStream. "streamA" "streamB") (StreamMap. { "streamA" (fields "aValue") "streamB" (fields "bValue") })) + )) + (bind s1 + (-> multi-stream + (.getStream "streamA"))) + (bind s2 + (-> multi-stream + (.getStream "streamB"))) + + ;; test each; can't select fields from different stream + (is (thrown? IllegalArgumentException + (-> s1 + (.each (fields "bValue") (Split.) (fields "word"))))) + + (is (thrown? IllegalArgumentException + (-> s2 + (.each (fields "aValue") (Split.) (fields "word"))))) + ;; test .each (is (thrown? IllegalArgumentException (-> stream @@ -255,6 +305,8 @@ (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count")))))) ))) + + ;; (deftest test-split-merge ;; (t/with-local-cluster [cluster] ;; (with-drpc [drpc]