Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple output streams for trident #731

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions storm-core/src/jvm/storm/trident/Stream.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package storm.trident;

import java.util.Map;
import java.util.HashMap;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.NullStruct;
import storm.trident.fluent.ChainedAggregatorDeclarer;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.trident.fluent.StreamMap;
import storm.trident.fluent.StreamCollection;
import storm.trident.fluent.GlobalAggregationScheme;
import storm.trident.fluent.GroupedStream;
import storm.trident.fluent.IAggregatableStream;
Expand All @@ -14,6 +18,7 @@
import storm.trident.operation.CombinerAggregator;
import storm.trident.operation.Filter;
import storm.trident.operation.Function;
import storm.trident.operation.MultiFunction;
import storm.trident.operation.ReducerAggregator;
import storm.trident.operation.impl.CombinerAggStateUpdater;
import storm.trident.operation.impl.FilterExecutor;
Expand All @@ -29,6 +34,9 @@
import storm.trident.planner.NodeStateInfo;
import storm.trident.planner.PartitionNode;
import storm.trident.planner.ProcessorNode;
import storm.trident.planner.TupleReceiver;
import storm.trident.planner.processor.MultiEachProcessor;
import storm.trident.planner.processor.MultiOutputMapping;
import storm.trident.planner.processor.AggregateProcessor;
import storm.trident.planner.processor.EachProcessor;
import storm.trident.planner.processor.PartitionPersistProcessor;
Expand Down Expand Up @@ -126,6 +134,62 @@ public Stream each(Fields inputFields, Function function, Fields functionFields)
new EachProcessor(inputFields, function)));
}

/**
Special kind of each that allows user to route tuples to multiple output streams.
The output streams, one per key in StreamMap, are available in the returned
StreamCollection object.
*/
public StreamCollection each(Fields inputFields, MultiFunction function, StreamMap map) {
projectionValidation(inputFields);

//
// Most important object here. It maintains the mapping (graph) from stream names
// to fields and processors. It gets serialized and passed to the backend. This work
// should ultimately happen in the TridentContext. Unfortunately, the TridentContext
// class is currently tied strongly to the notion of one output stream.
//
MultiOutputMapping streamOutputMap = new MultiOutputMapping();
Map<String, Node> outputNodeMap = new HashMap<String,Node>();

//
// Walk the simplistic output mapping (StreamMap) created by the user and
// create identitity processor nodes for each output stream.
//
for (String streamName : map.getStreamNames()) {
Fields streamFields = map.getFields(streamName);

Node outputNode = new ProcessorNode(
_topology.getUniqueStreamId(),
_name,
TridentUtils.fieldsConcat(getOutputFields(), streamFields),
new Fields(), // no output fields; pass input fields to appropriate output factory
new MultiEachProcessor.MultiEachProcessorChild(TridentUtils.fieldsConcat(getOutputFields(), streamFields), streamName));
outputNodeMap.put(streamName, outputNode);
streamOutputMap.addStream(streamName, streamFields, ((ProcessorNode)outputNode).processor);
}

//
// This processor node, labeled workNode, does all the actual work
//
Node workNode = new ProcessorNode(_topology.getUniqueStreamId(),
_name,
TridentUtils.fieldsConcat(getOutputFields(), map.getScopedOutputFields()),
map.getScopedOutputFields(), // each set of fields is scoped by it's corresponding stream name
new MultiEachProcessor(inputFields, function, streamOutputMap));

Stream intermediate = _topology.addSourcedNode(this, workNode);

//
// Link the intermediate stream to each outputNode
//
StreamCollection result = new StreamCollection();
for (Map.Entry<String,Node> link : outputNodeMap.entrySet()) {
result.addStream(link.getKey(), _topology.addSourcedNode(intermediate, link.getValue()));
}

return result;
};

//creates brand new tuples with brand new fields
@Override
public Stream partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
Expand Down
27 changes: 27 additions & 0 deletions storm-core/src/jvm/storm/trident/fluent/StreamCollection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package storm.trident.fluent;

import java.util.Map;
import java.util.HashMap;

import storm.trident.Stream;

public class StreamCollection {

protected Map<String,Stream> streams;

public StreamCollection() {
streams = new HashMap<String,Stream>();
}

public void addStream(String name, Stream s) {
streams.put(name,s);
}

public Stream getStream(String name) {
return streams.get(name);
}

public boolean hasStream(String name) {
return streams.containsKey(name);
}
}
54 changes: 54 additions & 0 deletions storm-core/src/jvm/storm/trident/fluent/StreamMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package storm.trident.fluent;

import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.io.Serializable;
import backtype.storm.tuple.Fields;

public class StreamMap implements Serializable {
protected Map<String, Fields> _map; // Logical map from stream names to fields
protected Fields _allOutputFields;


public StreamMap(Map<String,Fields> map) {
_map = map;
}

public Set<String> getStreamNames() {
return _map.keySet();
}

public Fields getFields(String streamName) {
Fields f = _map.get(streamName);
//return new Fields(namespaceFields(streamName,f));
return f;
}

/**
Returns the output fields of all the declared
streams, scoped by their respective streams
*/
public Fields getScopedOutputFields() {
if (_allOutputFields != null)
return _allOutputFields;

List<String> resultFields = new ArrayList<String>();
for (Map.Entry<String, Fields> streamDeclaration : _map.entrySet()) {
String streamName = streamDeclaration.getKey();
Fields outputFields = streamDeclaration.getValue();
resultFields.addAll(namespaceFields(streamName, outputFields));
}
_allOutputFields = new Fields(resultFields);
return _allOutputFields;
}

private List<String> namespaceFields(String nameSpace, Fields fields) {
List<String> namespaced = new ArrayList<String>(fields.size());
for (String field : fields) {
namespaced.add(nameSpace + "::" + field);
}
return namespaced;
}
}
8 changes: 8 additions & 0 deletions storm-core/src/jvm/storm/trident/operation/MultiFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package storm.trident.operation;

import storm.trident.tuple.TridentTuple;
import storm.trident.planner.processor.MultiStreamCollector;

public interface MultiFunction extends EachOperation {
void execute(TridentTuple tuple, MultiStreamCollector collector);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package storm.trident.planner.processor;

import java.util.Map;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.tuple.Fields;
import backtype.storm.task.TopologyContext;
import storm.trident.planner.TupleReceiver;
import storm.trident.planner.ProcessorContext;
import storm.trident.planner.TridentProcessor;
import storm.trident.operation.MultiFunction;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;
import storm.trident.tuple.MultiOutputFactory;
import storm.trident.tuple.TridentTuple.Factory;
import storm.trident.tuple.TridentTupleView.ProjectionFactory;
import storm.trident.tuple.TridentTupleView.OperationOutputFactory;

/**
For working with multiple output streams.
*/
public class MultiEachProcessor implements TridentProcessor {

public static Logger LOG = LoggerFactory.getLogger(MultiEachProcessor.class);

MultiFunction _function;
TridentContext _context;
MultiStreamCollector _collector;
Fields _inputFields;
ProjectionFactory _projection;
MultiOutputMapping _outputMap;
MultiOutputFactory _outputFactory;

public MultiEachProcessor(Fields inputFields, MultiFunction function, MultiOutputMapping outputMap) {
_function = function;
_inputFields = inputFields;
_outputMap = outputMap;
}

@Override
public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
List<Factory> parents = tridentContext.getParentTupleFactories();
if(parents.size()!=1) {
throw new RuntimeException("MultiEach operation can only have one parent");
}
_context = tridentContext;
_collector = new MultiStreamCollector(tridentContext, _outputMap);
_projection = new ProjectionFactory(parents.get(0), _inputFields);
_function.prepare(conf, new TridentOperationContext(context, _projection));
_outputFactory = new MultiOutputFactory(parents.get(0), _outputMap);
}

@Override
public void cleanup() {
_function.cleanup();
}

@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
_collector.setContext(processorContext, tuple);
_function.execute(_projection.create(tuple), _collector);
}

@Override
public void startBatch(ProcessorContext processorContext) {
}

@Override
public void finishBatch(ProcessorContext processorContext) {
}

@Override
public Factory getOutputFactory() {
return _outputFactory;
}

/**
Not intended for use outside of the multi each context. Pulls the
parent output factory that corresponds to its input stream
*/
public static class MultiEachProcessorChild extends ProjectedProcessor {

String _streamName;

public MultiEachProcessorChild(Fields projectFields, String streamName) {
super(projectFields);
_streamName = streamName;
}

@Override
public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
if(tridentContext.getParentTupleFactories().size()!=1) {
throw new RuntimeException("MultiEachProcessorChild processor can only have one parent");
}
_context = tridentContext;

Factory parentFactory = tridentContext.getParentTupleFactories().get(0);
OperationOutputFactory realParent = ((MultiOutputFactory)parentFactory).getFactory(_streamName);

_factory = new ProjectionFactory(realParent, _projectFields);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package storm.trident.planner.processor;

import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.HashMap;
import backtype.storm.tuple.Fields;
import storm.trident.planner.TupleReceiver;

/**
Maintains, naively, a mapping from stream names to fields and tuple
receivers. Each stream should only ever (in the current impl of a
multi-each) have one receiver.
<p>
FIXME: Does much of the work that the TridentContext
is supposed to do.
*/
public class MultiOutputMapping implements Serializable {

protected Map<String,TupleReceiver> _map;
protected Map<String,Fields> _fields;

public MultiOutputMapping() {
_map = new HashMap<String,TupleReceiver>();
_fields = new HashMap<String,Fields>();
}

public void addStream(String name, Fields fields, TupleReceiver rcvr) {
_map.put(name, rcvr);
_fields.put(name, fields);
}

public TupleReceiver getReceiver(String name) {
return _map.get(name);
}

public Set<String> getStreamNames() {
return _map.keySet();
}

public Fields getFields(String name) {
return _fields.get(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package storm.trident.planner.processor;


import java.util.List;
import storm.trident.planner.ProcessorContext;
import storm.trident.planner.TupleReceiver;
import storm.trident.tuple.TridentTuple;
import storm.trident.tuple.TridentTuple.Factory;
import storm.trident.tuple.MultiOutputFactory;
import storm.trident.tuple.TridentTupleView;
import storm.trident.tuple.TridentTupleView.OperationOutputFactory;

/**
A collector for MultiFunction. Intentionally does not implement
the trident collector interface. The user MUST specify a stream
to route output to.
*/
public class MultiStreamCollector {

MultiOutputFactory _factory;
MultiOutputMapping _outputMap;
TridentContext _triContext;
TridentTuple tuple;
ProcessorContext context;

public MultiStreamCollector(TridentContext context, MultiOutputMapping outputMap) {
_triContext = context;
_factory = new MultiOutputFactory(context.getParentTupleFactories().get(0), outputMap);
_triContext = context;
_outputMap = outputMap;
}

public void setContext(ProcessorContext pc, TridentTuple t) {
this.context = pc;
this.tuple = t;
}

/**
User interface. Routes values to the appropriate stream
*/
public void emitTo(String name, List<Object> values) {
OperationOutputFactory f = _factory.getFactory(name);
TridentTuple toEmit = f.create((TridentTupleView)tuple, values);
_outputMap.getReceiver(name).execute(context, _triContext.getOutStreamId(), toEmit);
}

public void reportError(Throwable t) {
_triContext.getDelegateCollector().reportError(t);
}
}
Loading