diff --git a/conf/crawler.flux b/conf/crawler.flux index c116f0a..1d221c2 100644 --- a/conf/crawler.flux +++ b/conf/crawler.flux @@ -172,31 +172,36 @@ streams: - from: "prefilter" to: "status" grouping: - type: LOCAL_OR_SHUFFLE + type: FIELDS + args: ["url"] streamId: "status" - from: "fetcher" to: "status" grouping: - type: LOCAL_OR_SHUFFLE + type: FIELDS + args: ["url"] streamId: "status" - from: "sitemap" to: "status" grouping: - type: LOCAL_OR_SHUFFLE + type: FIELDS + args: ["url"] streamId: "status" - from: "feed" to: "status" grouping: - type: LOCAL_OR_SHUFFLE + type: FIELDS + args: ["url"] streamId: "status" - from: "ssbolt" to: "status" grouping: - type: LOCAL_OR_SHUFFLE + type: FIELDS + args: ["url"] streamId: "status" # part of the topology used to inject seeds diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java index 8b7d54c..bfba4b7 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java @@ -95,16 +95,20 @@ protected int run(String[] args) { // take it from feed default output so that the feed files themselves // don't get included - unless we want them too of course! builder.setBolt("warc", warcbolt, numWorkers).localOrShuffleGrouping("feed"); + + final Fields furl = new Fields("url"); BoltDeclarer statusBolt = builder.setBolt("status", new StatusUpdaterBolt(), numWorkers) - .localOrShuffleGrouping("fetch", Constants.StatusStreamName) - .localOrShuffleGrouping("sitemap", Constants.StatusStreamName) - .localOrShuffleGrouping("feed", Constants.StatusStreamName) - .localOrShuffleGrouping("ssb", Constants.StatusStreamName) - .localOrShuffleGrouping("prefilter", Constants.StatusStreamName).setNumTasks(numShards); + .fieldsGrouping("fetch", Constants.StatusStreamName, furl) + .fieldsGrouping("sitemap", Constants.StatusStreamName, furl) + .fieldsGrouping("feed", Constants.StatusStreamName, furl) + .fieldsGrouping("ssb", Constants.StatusStreamName, furl) + .fieldsGrouping("prefilter", Constants.StatusStreamName, furl); + if (args.length >= 2) { statusBolt.customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping()); } + statusBolt.setNumTasks(numShards); return submit(conf, builder); }