diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 2dbba24c9f6..e0e0660e69f 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -40,6 +40,10 @@ storm.messaging.transport: "backtype.storm.messaging.netty.Context"
### nimbus.* configs are for the master
nimbus.host: "localhost"
nimbus.thrift.port: 6627
+nimbus.bittorrent.port: 6969
+nimbus.bittorrent.bind.address: 0.0.0.0
+nimbus.bittorrent.max.upload.rate: 0.0
+nimbus.bittorrent.max.download.rate: 0.0
nimbus.thrift.max_buffer_size: 1048576
nimbus.childopts: "-Xmx1024m"
nimbus.task.timeout.secs: 30
@@ -90,6 +94,10 @@ supervisor.monitor.frequency.secs: 3
supervisor.heartbeat.frequency.secs: 5
supervisor.enable: true
+supervisor.bittorrent.max.upload.rate: 0.0
+supervisor.bittorrent.max.download.rate: 0.0
+supervisor.bittorrent.seed.duration: 0
+
### worker.* configs are for task workers
worker.childopts: "-Xmx768m"
worker.heartbeat.frequency.secs: 1
diff --git a/pom.xml b/pom.xml
index eee717e5e64..24379f6cb90 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,7 +152,7 @@
1.4.0
1.1.3
0.3.6
- 1.4
+ 2.4
2.5
1.1
0.4.1
@@ -412,6 +412,17 @@
netty
${netty.version}
+
+ com.turn
+ ttorrent
+ 1.4
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
org.clojure
tools.nrepl
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index c5baad0ed41..cb860dcae20 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -78,6 +78,10 @@
+
+ com.turn
+ ttorrent
+
commons-io
commons-io
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index 955fdfed920..7d8edf3f559 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -134,6 +134,9 @@
([conf storm-id]
(str (master-stormdist-root conf) file-path-separator storm-id)))
+(defn master-stormtorrent-path [stormroot storm-id]
+ (str stormroot file-path-separator storm-id ".torrent"))
+
(defn master-stormjar-path [stormroot]
(str stormroot file-path-separator "stormjar.jar"))
@@ -165,8 +168,11 @@
([conf storm-id]
(str (supervisor-stormdist-root conf) file-path-separator (java.net.URLEncoder/encode storm-id))))
-(defn supervisor-stormjar-path [stormroot]
- (str stormroot file-path-separator "stormjar.jar"))
+(defn supervisor-stormtorrent-path [stormroot storm-id]
+ (str stormroot file-path-separator storm-id ".torrent"))
+
+(defn supervisor-stormjar-path [stormroot storm-id]
+ (str stormroot file-path-separator storm-id file-path-separator "stormjar.jar"))
(defn supervisor-stormcode-path [stormroot]
(str stormroot file-path-separator "stormcode.ser"))
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index eaef6c12c38..43cc0200c41 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -24,6 +24,7 @@
(:use [backtype.storm.scheduler.DefaultScheduler])
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
+ (:import [backtype.storm.torrent NimbusTracker])
(:use [backtype.storm bootstrap util])
(:use [backtype.storm.config :only [validate-configs-with-schemas]])
(:use [backtype.storm.daemon common])
@@ -59,6 +60,8 @@
scheduler
))
+(defmulti mk-bt-tracker cluster-mode)
+
(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
@@ -76,6 +79,7 @@
(halt-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
+ :bt-tracker (mk-bt-tracker conf)
}))
(defn inbox [nimbus]
@@ -306,13 +310,14 @@
;; need to somehow maintain stream/component ids inside tuples
topology)
-(defn- setup-storm-code [conf storm-id tmp-jar-location storm-conf topology]
+(defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf topology]
(let [stormroot (master-stormdist-root conf storm-id)]
(FileUtils/forceMkdir (File. stormroot))
(FileUtils/cleanDirectory (File. stormroot))
(setup-jar conf tmp-jar-location stormroot)
(FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
(FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf))
+ (if (:bt-tracker nimbus) (.trackAndSeed (:bt-tracker nimbus) stormroot storm-id))
))
(defn- read-storm-topology [conf storm-id]
@@ -830,6 +835,7 @@
(when-not (empty? to-cleanup-ids)
(doseq [id to-cleanup-ids]
(log-message "Cleaning up " id)
+ (if (:bt-tracker nimbus) (.stop (:bt-tracker nimbus) id))
(.teardown-heartbeats! storm-cluster-state id)
(.teardown-topology-errors! storm-cluster-state id)
(rmr (master-stormdist-root conf id))
@@ -955,7 +961,7 @@
;; lock protects against multiple topologies being submitted at once and
;; cleanup thread killing topology in b/w assignment and starting the topology
(locking (:submit-lock nimbus)
- (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
+ (setup-storm-code nimbus conf storm-id uploadedJarLocation storm-conf topology)
(.setup-heartbeats! storm-cluster-state storm-id)
(let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
TopologyInitialStatus/ACTIVE :active}]
@@ -1177,12 +1183,21 @@
(FileUtils/copyFile src-file (File. (master-stormjar-path stormroot)))
))
+(defmethod mk-bt-tracker :distributed [conf]
+ (NimbusTracker. conf)
+ )
+
+
;; local implementation
(defmethod setup-jar :local [conf & args]
nil
)
+(defmethod mk-bt-tracker :local [conf]
+ nil
+ )
+
(defn -launch [nimbus]
(launch-server! (read-storm-config) nimbus))
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 43cb6fe8b81..67e70fd7d27 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -15,6 +15,7 @@
;; limitations under the License.
(ns backtype.storm.daemon.supervisor
(:import [backtype.storm.scheduler ISupervisor])
+ (:import [backtype.storm.torrent SupervisorPeer])
(:use [backtype.storm bootstrap])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.daemon [worker :as worker]])
@@ -24,6 +25,7 @@
(bootstrap)
(defmulti download-storm-code cluster-mode)
+(defmulti mk-bt-tracker cluster-mode)
(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
;; used as part of a map from port to this
@@ -199,6 +201,7 @@
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
))
+ :bt-tracker (mk-bt-tracker conf)
})
(defn sync-processes [supervisor]
@@ -237,6 +240,9 @@
". Current supervisor time: " now
". State: " state
", Heartbeat: " (pr-str heartbeat))
+ (if (:bt-tracker supervisor)
+ (.stop (:bt-tracker supervisor) (:storm-id heartbeat))
+ )
(shutdown-worker supervisor id)
))
(doseq [id (vals new-worker-ids)]
@@ -322,7 +328,7 @@
storm-id
" from "
master-code-dir)
- (download-storm-code conf storm-id master-code-dir)
+ (download-storm-code conf storm-id master-code-dir supervisor)
(log-message "Finished downloading code for storm id "
storm-id
" from "
@@ -425,26 +431,27 @@
;; distributed implementation
(defmethod download-storm-code
- :distributed [conf storm-id master-code-dir]
+ :distributed [conf storm-id master-code-dir supervisor]
;; Downloading to permanent location is atomic
(let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
stormroot (supervisor-stormdist-root conf storm-id)]
- (FileUtils/forceMkdir (File. tmproot))
-
- (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
- (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
- (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
- (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
- (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
+ (FileUtils/forceMkdir (File. (supervisor-stormdist-root conf)))
+ (Utils/downloadFromMaster conf (master-stormtorrent-path master-code-dir storm-id) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id))
+ (.download (:bt-tracker supervisor) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id) storm-id)
+ (extract-dir-from-jar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id) RESOURCES-SUBDIR stormroot)
))
+(defmethod mk-bt-tracker
+ :distributed [conf]
+ (SupervisorPeer. conf)
+ )
(defmethod launch-worker
:distributed [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
storm-home (System/getProperty "storm.home")
stormroot (supervisor-stormdist-root conf storm-id)
- stormjar (supervisor-stormjar-path stormroot)
+ stormjar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id)
storm-conf (read-supervisor-storm-conf conf storm-id)
classpath (add-to-classpath (current-classpath) [stormjar])
childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS))
@@ -475,7 +482,7 @@
first ))
(defmethod download-storm-code
- :local [conf storm-id master-code-dir]
+ :local [conf storm-id master-code-dir supervisor]
(let [stormroot (supervisor-stormdist-root conf storm-id)]
(FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
(let [classloader (.getContextClassLoader (Thread/currentThread))
@@ -494,6 +501,11 @@
))
)))
+(defmethod mk-bt-tracker
+ :local [conf]
+ nil
+ )
+
(defmethod launch-worker
:local [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 281ae525df8..84fe4131c24 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -221,6 +221,31 @@ public class Config extends HashMap {
public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
public static final Object NIMBUS_THRIFT_PORT_SCHEMA = Number.class;
+ /**
+ * Which port the Nimbus BitTorrent tracker should bind to.
+ */
+ public static final String NIMBUS_BITTORRENT_PORT = "nimbus.bittorrent.port";
+ public static final Object NIMBUS_BITTORRENT_PORT_SCHEMA = Number.class;
+
+ /**
+ * Which network interface the BitTorrent tracker should listen on. The
+ * default of 0.0.0.0 will listen on all interfaces. This can be an IP
+ * address or a hostname.
+ */
+ public static final String NIMBUS_BITTORRENT_BIND_ADDRESS = "nimbus.bittorrent.bind.address";
+ public static final Object NIMBUS_BITTORRENT_BIND_ADDRESS_SCHEMA = String.class;
+
+ /**
+ * Max upload rate for topology torrents in kB/sec. 0.0 == unlimited.
+ */
+ public static final String NIMBUS_BITTORRENT_MAX_UPLOAD_RATE = "nimbus.bittorrent.max.upload.rate";
+ public static final Object NIMBUS_BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class;
+
+ /**
+ * Max download rate for topology torrents in kB/sec. 0.0 == unlimited.
+ */
+ public static final String NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE = "nimbus.bittorrent.max.download.rate";
+ public static final Object NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class;
/**
* The maximum buffer size thrift should use when reading messages.
*/
@@ -453,6 +478,27 @@ public class Config extends HashMap {
*/
public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class;
+
+ /**
+ * Max upload rate for topology torrents in kB/sec. 0.0 == unlimited.
+ */
+ public static final String SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE = "supervisor.bittorrent.max.upload.rate";
+ public static final Object SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class;
+
+ /**
+ * Max download rate for topology torrents in kB/sec. 0.0 == unlimited.
+ */
+ public static final String SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE = "supervisor.bittorrent.max.download.rate";
+ public static final Object SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class;
+
+ /**
+ * Time in seconds that a supervisor should seed after completing a topology torrent download.
+ * A value of 0 will disable seeding (download only). A value of -1 indicates that the supervisor
+ * should seed indefinitely (until the topology is killed).
+ */
+ public static final String SUPERVISOR_BITTORRENT_SEED_DURATION = "supervisor.bittorrent.seed.duration";
+ public static final Object SUPERVISOR_BITTORRENT_SEED_DURATION_SCHEMA = Number.class;
+
/**
* The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings are replaced
diff --git a/storm-core/src/jvm/backtype/storm/torrent/BasePeer.java b/storm-core/src/jvm/backtype/storm/torrent/BasePeer.java
new file mode 100644
index 00000000000..9b939128168
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/torrent/BasePeer.java
@@ -0,0 +1,34 @@
+package backtype.storm.torrent;
+
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.turn.ttorrent.client.Client;
+
+public abstract class BasePeer {
+ private static final Logger LOG = LoggerFactory.getLogger(BasePeer.class);
+
+ protected HashMap clients = new HashMap();
+ protected Double maxDownload;
+ protected Double maxUpload;
+
+ protected synchronized void rebalanceRates(){
+ int clientCount = this.clients.size();
+ if(clientCount > 0){
+ double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount;
+ double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount;
+ LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount);
+ LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl));
+ for(Client client : this.clients.values()) {
+ client.setMaxDownloadRate(maxDl);
+ client.setMaxUploadRate(maxUl);
+ }
+ }
+ }
+
+ protected static String format(double val){
+ return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val);
+ }
+}
diff --git a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java
new file mode 100644
index 00000000000..0283dea285a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java
@@ -0,0 +1,86 @@
+package backtype.storm.torrent;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+
+import com.turn.ttorrent.client.Client;
+import com.turn.ttorrent.client.SharedTorrent;
+import com.turn.ttorrent.common.Torrent;
+import com.turn.ttorrent.tracker.TrackedTorrent;
+import com.turn.ttorrent.tracker.Tracker;
+
+public class NimbusTracker extends BasePeer {
+ private static final Logger LOG = LoggerFactory.getLogger(NimbusTracker.class);
+ private Tracker tracker;
+ private InetAddress nimbusHost;
+ private String hostName;
+ private Integer port;
+
+ public NimbusTracker (Map conf) throws IOException{
+ this.hostName = (String)conf.get(Config.NIMBUS_HOST);
+ this.port = (Integer)conf.get(Config.NIMBUS_BITTORRENT_PORT);
+ this.maxDownload = (Double)conf.get(Config.NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE);
+ this.maxUpload = (Double)conf.get(Config.NIMBUS_BITTORRENT_MAX_UPLOAD_RATE);
+ LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxDownload), format(this.maxDownload));
+
+ String bindAddress = (String)conf.get(Config.NIMBUS_BITTORRENT_BIND_ADDRESS);
+
+ LOG.info("Starting bt tracker bound to interface '{}'", bindAddress);
+ this.nimbusHost = InetAddress.getByName(this.hostName);
+ InetSocketAddress socketAddr = new InetSocketAddress(bindAddress, port);
+ this.tracker = new Tracker(socketAddr);
+ LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl());
+ this.tracker.start();
+ }
+
+ public void stop(String topologyId){
+ LOG.info("Stop seeding/tracking for topology {}", topologyId);
+ Client client = this.clients.remove(topologyId);
+ if(client != null){
+ Torrent torrent = client.getTorrent();
+ client.stop();
+ this.tracker.remove(torrent);
+ }
+ rebalanceRates();
+ }
+
+ public void trackAndSeed(String dir, String topologyId) throws IOException, NoSuchAlgorithmException, InterruptedException, URISyntaxException{
+
+ File destDir = new File(dir);
+ LOG.info("Generating torrent for directory: {}", destDir.getAbsolutePath());
+
+ URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce");
+ LOG.info("Creating torrent with announce URL: {}", uri);
+ ArrayList files = new ArrayList();
+ files.add(new File(destDir, "stormjar.jar"));
+ files.add(new File(destDir, "stormconf.ser"));
+ files.add(new File(destDir, "stormcode.ser"));
+
+ Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus");
+ File torrentFile = new File(destDir, topologyId + ".torrent");
+ torrent.save(new FileOutputStream(torrentFile));
+ LOG.info("Saved torrent: {}" + torrentFile.getAbsolutePath());
+ this.tracker.announce(new TrackedTorrent(torrent));
+ LOG.info("Torrent announced to tracker.");
+ Client client = new Client(this.nimbusHost, new SharedTorrent(torrent, destDir.getParentFile(), true));
+ this.clients.put(topologyId, client);
+ rebalanceRates();
+ client.share();
+ LOG.info("Seeding torrent...");
+ }
+
+
+}
diff --git a/storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java b/storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java
new file mode 100644
index 00000000000..6c5820ac58c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java
@@ -0,0 +1,67 @@
+package backtype.storm.torrent;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+
+import com.turn.ttorrent.client.Client;
+import com.turn.ttorrent.client.Client.ClientState;
+import com.turn.ttorrent.client.SharedTorrent;
+
+public class SupervisorPeer extends BasePeer{
+ private static final Logger LOG = LoggerFactory.getLogger(SupervisorPeer.class);
+
+ private Integer seedDuration;
+
+ public SupervisorPeer(Map conf){
+ LOG.info("Creating supervisor bt tracker.");
+ this.maxDownload = (Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE);
+ this.maxUpload = (Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE);
+ this.seedDuration = (Integer)conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION);
+ LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxDownload), format(this.maxDownload));
+ }
+
+ public void stop(String topologyId){
+ LOG.info("Stopping bt client for topology {}", topologyId);
+ Client client = this.clients.remove(topologyId);
+ if(client != null){
+ client.stop();
+ }
+ rebalanceRates();
+ }
+
+ public void download(String torrentPath, String topologyId) throws IOException, NoSuchAlgorithmException{
+ LOG.info("Initiating BitTorrent download.");
+ InetAddress netAddr = InetAddress.getLocalHost();
+ File torrentFile = new File(torrentPath);
+ File destDir = torrentFile.getParentFile();
+ LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath());
+ LOG.info("Saving files to directory: {}", destDir.getAbsolutePath());
+ SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir);
+
+ Client client = new Client(netAddr, st);
+ this.clients.put(topologyId, client);
+ rebalanceRates();
+ client.share(this.seedDuration);
+ if(this.seedDuration == 0){
+ client.waitForCompletion();
+ } else {
+ LOG.info("Waiting for seeding to begin...");
+ while(client.getState() != ClientState.SEEDING && client.getState() != ClientState.ERROR){
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ LOG.info("BitTorrent download complete.");
+ }
+
+}