Skip to content

Commit

Permalink
Route tuples to the status updater bolt based on URLs,fixes #65
Browse files Browse the repository at this point in the history
Signed-off-by: Julien Nioche <[email protected]>
  • Loading branch information
jnioche committed Dec 13, 2023
1 parent 63dafc1 commit efd0d24
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
15 changes: 10 additions & 5 deletions conf/crawler.flux
Original file line number Diff line number Diff line change
Expand Up @@ -172,31 +172,36 @@ streams:
- from: "prefilter"
to: "status"
grouping:
type: LOCAL_OR_SHUFFLE
type: FIELDS
args: ["url"]
streamId: "status"

- from: "fetcher"
to: "status"
grouping:
type: LOCAL_OR_SHUFFLE
type: FIELDS
args: ["url"]
streamId: "status"

- from: "sitemap"
to: "status"
grouping:
type: LOCAL_OR_SHUFFLE
type: FIELDS
args: ["url"]
streamId: "status"

- from: "feed"
to: "status"
grouping:
type: LOCAL_OR_SHUFFLE
type: FIELDS
args: ["url"]
streamId: "status"

- from: "ssbolt"
to: "status"
grouping:
type: LOCAL_OR_SHUFFLE
type: FIELDS
args: ["url"]
streamId: "status"

# part of the topology used to inject seeds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,20 @@ protected int run(String[] args) {
// take it from feed default output so that the feed files themselves
// don't get included - unless we want them too of course!
builder.setBolt("warc", warcbolt, numWorkers).localOrShuffleGrouping("feed");

final Fields furl = new Fields("url");

BoltDeclarer statusBolt = builder.setBolt("status", new StatusUpdaterBolt(), numWorkers)
.localOrShuffleGrouping("fetch", Constants.StatusStreamName)
.localOrShuffleGrouping("sitemap", Constants.StatusStreamName)
.localOrShuffleGrouping("feed", Constants.StatusStreamName)
.localOrShuffleGrouping("ssb", Constants.StatusStreamName)
.localOrShuffleGrouping("prefilter", Constants.StatusStreamName).setNumTasks(numShards);
.fieldsGrouping("fetch", Constants.StatusStreamName, furl)
.fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
.fieldsGrouping("feed", Constants.StatusStreamName, furl)
.fieldsGrouping("ssb", Constants.StatusStreamName, furl)
.fieldsGrouping("prefilter", Constants.StatusStreamName, furl);

if (args.length >= 2) {
statusBolt.customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping());
}
statusBolt.setNumTasks(numShards);

return submit(conf, builder);
}
Expand Down

0 comments on commit efd0d24

Please sign in to comment.