diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 2dbba24c9f6..e0e0660e69f 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -40,6 +40,10 @@ storm.messaging.transport: "backtype.storm.messaging.netty.Context" ### nimbus.* configs are for the master nimbus.host: "localhost" nimbus.thrift.port: 6627 +nimbus.bittorrent.port: 6969 +nimbus.bittorrent.bind.address: 0.0.0.0 +nimbus.bittorrent.max.upload.rate: 0.0 +nimbus.bittorrent.max.download.rate: 0.0 nimbus.thrift.max_buffer_size: 1048576 nimbus.childopts: "-Xmx1024m" nimbus.task.timeout.secs: 30 @@ -90,6 +94,10 @@ supervisor.monitor.frequency.secs: 3 supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true +supervisor.bittorrent.max.upload.rate: 0.0 +supervisor.bittorrent.max.download.rate: 0.0 +supervisor.bittorrent.seed.duration: 0 + ### worker.* configs are for task workers worker.childopts: "-Xmx768m" worker.heartbeat.frequency.secs: 1 diff --git a/pom.xml b/pom.xml index eee717e5e64..24379f6cb90 100644 --- a/pom.xml +++ b/pom.xml @@ -152,7 +152,7 @@ 1.4.0 1.1.3 0.3.6 - 1.4 + 2.4 2.5 1.1 0.4.1 @@ -412,6 +412,17 @@ netty ${netty.version} + + com.turn + ttorrent + 1.4 + + + org.slf4j + slf4j-log4j12 + + + org.clojure tools.nrepl diff --git a/storm-core/pom.xml b/storm-core/pom.xml index c5baad0ed41..cb860dcae20 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -78,6 +78,10 @@ + + com.turn + ttorrent + commons-io commons-io diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index 955fdfed920..7d8edf3f559 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -134,6 +134,9 @@ ([conf storm-id] (str (master-stormdist-root conf) file-path-separator storm-id))) +(defn master-stormtorrent-path [stormroot storm-id] + (str stormroot file-path-separator storm-id ".torrent")) + (defn master-stormjar-path [stormroot] (str stormroot file-path-separator "stormjar.jar")) @@ -165,8 +168,11 @@ ([conf storm-id] (str (supervisor-stormdist-root conf) file-path-separator (java.net.URLEncoder/encode storm-id)))) -(defn supervisor-stormjar-path [stormroot] - (str stormroot file-path-separator "stormjar.jar")) +(defn supervisor-stormtorrent-path [stormroot storm-id] + (str stormroot file-path-separator storm-id ".torrent")) + +(defn supervisor-stormjar-path [stormroot storm-id] + (str stormroot file-path-separator storm-id file-path-separator "stormjar.jar")) (defn supervisor-stormcode-path [stormroot] (str stormroot file-path-separator "stormcode.ser")) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index eaef6c12c38..43cc0200c41 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -24,6 +24,7 @@ (:use [backtype.storm.scheduler.DefaultScheduler]) (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) + (:import [backtype.storm.torrent NimbusTracker]) (:use [backtype.storm bootstrap util]) (:use [backtype.storm.config :only [validate-configs-with-schemas]]) (:use [backtype.storm.daemon common]) @@ -59,6 +60,8 @@ scheduler )) +(defmulti mk-bt-tracker cluster-mode) + (defn nimbus-data [conf inimbus] (let [forced-scheduler (.getForcedScheduler inimbus)] {:conf conf @@ -76,6 +79,7 @@ (halt-process! 20 "Error when processing an event") )) :scheduler (mk-scheduler conf inimbus) + :bt-tracker (mk-bt-tracker conf) })) (defn inbox [nimbus] @@ -306,13 +310,14 @@ ;; need to somehow maintain stream/component ids inside tuples topology) -(defn- setup-storm-code [conf storm-id tmp-jar-location storm-conf topology] +(defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf topology] (let [stormroot (master-stormdist-root conf storm-id)] (FileUtils/forceMkdir (File. stormroot)) (FileUtils/cleanDirectory (File. stormroot)) (setup-jar conf tmp-jar-location stormroot) (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology)) (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf)) + (if (:bt-tracker nimbus) (.trackAndSeed (:bt-tracker nimbus) stormroot storm-id)) )) (defn- read-storm-topology [conf storm-id] @@ -830,6 +835,7 @@ (when-not (empty? to-cleanup-ids) (doseq [id to-cleanup-ids] (log-message "Cleaning up " id) + (if (:bt-tracker nimbus) (.stop (:bt-tracker nimbus) id)) (.teardown-heartbeats! storm-cluster-state id) (.teardown-topology-errors! storm-cluster-state id) (rmr (master-stormdist-root conf id)) @@ -955,7 +961,7 @@ ;; lock protects against multiple topologies being submitted at once and ;; cleanup thread killing topology in b/w assignment and starting the topology (locking (:submit-lock nimbus) - (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology) + (setup-storm-code nimbus conf storm-id uploadedJarLocation storm-conf topology) (.setup-heartbeats! storm-cluster-state storm-id) (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive TopologyInitialStatus/ACTIVE :active}] @@ -1177,12 +1183,21 @@ (FileUtils/copyFile src-file (File. (master-stormjar-path stormroot))) )) +(defmethod mk-bt-tracker :distributed [conf] + (NimbusTracker. conf) + ) + + ;; local implementation (defmethod setup-jar :local [conf & args] nil ) +(defmethod mk-bt-tracker :local [conf] + nil + ) + (defn -launch [nimbus] (launch-server! (read-storm-config) nimbus)) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 43cb6fe8b81..67e70fd7d27 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -15,6 +15,7 @@ ;; limitations under the License. (ns backtype.storm.daemon.supervisor (:import [backtype.storm.scheduler ISupervisor]) + (:import [backtype.storm.torrent SupervisorPeer]) (:use [backtype.storm bootstrap]) (:use [backtype.storm.daemon common]) (:require [backtype.storm.daemon [worker :as worker]]) @@ -24,6 +25,7 @@ (bootstrap) (defmulti download-storm-code cluster-mode) +(defmulti mk-bt-tracker cluster-mode) (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor)))) ;; used as part of a map from port to this @@ -199,6 +201,7 @@ (log-error t "Error when processing event") (halt-process! 20 "Error when processing an event") )) + :bt-tracker (mk-bt-tracker conf) }) (defn sync-processes [supervisor] @@ -237,6 +240,9 @@ ". Current supervisor time: " now ". State: " state ", Heartbeat: " (pr-str heartbeat)) + (if (:bt-tracker supervisor) + (.stop (:bt-tracker supervisor) (:storm-id heartbeat)) + ) (shutdown-worker supervisor id) )) (doseq [id (vals new-worker-ids)] @@ -322,7 +328,7 @@ storm-id " from " master-code-dir) - (download-storm-code conf storm-id master-code-dir) + (download-storm-code conf storm-id master-code-dir supervisor) (log-message "Finished downloading code for storm id " storm-id " from " @@ -425,26 +431,27 @@ ;; distributed implementation (defmethod download-storm-code - :distributed [conf storm-id master-code-dir] + :distributed [conf storm-id master-code-dir supervisor] ;; Downloading to permanent location is atomic (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid)) stormroot (supervisor-stormdist-root conf storm-id)] - (FileUtils/forceMkdir (File. tmproot)) - - (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot)) - (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot)) - (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot)) - (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot) - (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) + (FileUtils/forceMkdir (File. (supervisor-stormdist-root conf))) + (Utils/downloadFromMaster conf (master-stormtorrent-path master-code-dir storm-id) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id)) + (.download (:bt-tracker supervisor) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id) storm-id) + (extract-dir-from-jar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id) RESOURCES-SUBDIR stormroot) )) +(defmethod mk-bt-tracker + :distributed [conf] + (SupervisorPeer. conf) + ) (defmethod launch-worker :distributed [supervisor storm-id port worker-id] (let [conf (:conf supervisor) storm-home (System/getProperty "storm.home") stormroot (supervisor-stormdist-root conf storm-id) - stormjar (supervisor-stormjar-path stormroot) + stormjar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id) storm-conf (read-supervisor-storm-conf conf storm-id) classpath (add-to-classpath (current-classpath) [stormjar]) childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS)) @@ -475,7 +482,7 @@ first )) (defmethod download-storm-code - :local [conf storm-id master-code-dir] + :local [conf storm-id master-code-dir supervisor] (let [stormroot (supervisor-stormdist-root conf storm-id)] (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot)) (let [classloader (.getContextClassLoader (Thread/currentThread)) @@ -494,6 +501,11 @@ )) ))) +(defmethod mk-bt-tracker + :local [conf] + nil + ) + (defmethod launch-worker :local [supervisor storm-id port worker-id] (let [conf (:conf supervisor) diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 281ae525df8..84fe4131c24 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -221,6 +221,31 @@ public class Config extends HashMap { public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port"; public static final Object NIMBUS_THRIFT_PORT_SCHEMA = Number.class; + /** + * Which port the Nimbus BitTorrent tracker should bind to. + */ + public static final String NIMBUS_BITTORRENT_PORT = "nimbus.bittorrent.port"; + public static final Object NIMBUS_BITTORRENT_PORT_SCHEMA = Number.class; + + /** + * Which network interface the BitTorrent tracker should listen on. The + * default of 0.0.0.0 will listen on all interfaces. This can be an IP + * address or a hostname. + */ + public static final String NIMBUS_BITTORRENT_BIND_ADDRESS = "nimbus.bittorrent.bind.address"; + public static final Object NIMBUS_BITTORRENT_BIND_ADDRESS_SCHEMA = String.class; + + /** + * Max upload rate for topology torrents in kB/sec. 0.0 == unlimited. + */ + public static final String NIMBUS_BITTORRENT_MAX_UPLOAD_RATE = "nimbus.bittorrent.max.upload.rate"; + public static final Object NIMBUS_BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class; + + /** + * Max download rate for topology torrents in kB/sec. 0.0 == unlimited. + */ + public static final String NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE = "nimbus.bittorrent.max.download.rate"; + public static final Object NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class; /** * The maximum buffer size thrift should use when reading messages. */ @@ -453,6 +478,27 @@ public class Config extends HashMap { */ public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs"; public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class; + + /** + * Max upload rate for topology torrents in kB/sec. 0.0 == unlimited. + */ + public static final String SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE = "supervisor.bittorrent.max.upload.rate"; + public static final Object SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class; + + /** + * Max download rate for topology torrents in kB/sec. 0.0 == unlimited. + */ + public static final String SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE = "supervisor.bittorrent.max.download.rate"; + public static final Object SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class; + + /** + * Time in seconds that a supervisor should seed after completing a topology torrent download. + * A value of 0 will disable seeding (download only). A value of -1 indicates that the supervisor + * should seed indefinitely (until the topology is killed). + */ + public static final String SUPERVISOR_BITTORRENT_SEED_DURATION = "supervisor.bittorrent.seed.duration"; + public static final Object SUPERVISOR_BITTORRENT_SEED_DURATION_SCHEMA = Number.class; + /** * The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings are replaced diff --git a/storm-core/src/jvm/backtype/storm/torrent/BasePeer.java b/storm-core/src/jvm/backtype/storm/torrent/BasePeer.java new file mode 100644 index 00000000000..9b939128168 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/torrent/BasePeer.java @@ -0,0 +1,34 @@ +package backtype.storm.torrent; + +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.turn.ttorrent.client.Client; + +public abstract class BasePeer { + private static final Logger LOG = LoggerFactory.getLogger(BasePeer.class); + + protected HashMap clients = new HashMap(); + protected Double maxDownload; + protected Double maxUpload; + + protected synchronized void rebalanceRates(){ + int clientCount = this.clients.size(); + if(clientCount > 0){ + double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount; + double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount; + LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount); + LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl)); + for(Client client : this.clients.values()) { + client.setMaxDownloadRate(maxDl); + client.setMaxUploadRate(maxUl); + } + } + } + + protected static String format(double val){ + return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val); + } +} diff --git a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java new file mode 100644 index 00000000000..0283dea285a --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java @@ -0,0 +1,86 @@ +package backtype.storm.torrent; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; + +import com.turn.ttorrent.client.Client; +import com.turn.ttorrent.client.SharedTorrent; +import com.turn.ttorrent.common.Torrent; +import com.turn.ttorrent.tracker.TrackedTorrent; +import com.turn.ttorrent.tracker.Tracker; + +public class NimbusTracker extends BasePeer { + private static final Logger LOG = LoggerFactory.getLogger(NimbusTracker.class); + private Tracker tracker; + private InetAddress nimbusHost; + private String hostName; + private Integer port; + + public NimbusTracker (Map conf) throws IOException{ + this.hostName = (String)conf.get(Config.NIMBUS_HOST); + this.port = (Integer)conf.get(Config.NIMBUS_BITTORRENT_PORT); + this.maxDownload = (Double)conf.get(Config.NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE); + this.maxUpload = (Double)conf.get(Config.NIMBUS_BITTORRENT_MAX_UPLOAD_RATE); + LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxDownload), format(this.maxDownload)); + + String bindAddress = (String)conf.get(Config.NIMBUS_BITTORRENT_BIND_ADDRESS); + + LOG.info("Starting bt tracker bound to interface '{}'", bindAddress); + this.nimbusHost = InetAddress.getByName(this.hostName); + InetSocketAddress socketAddr = new InetSocketAddress(bindAddress, port); + this.tracker = new Tracker(socketAddr); + LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl()); + this.tracker.start(); + } + + public void stop(String topologyId){ + LOG.info("Stop seeding/tracking for topology {}", topologyId); + Client client = this.clients.remove(topologyId); + if(client != null){ + Torrent torrent = client.getTorrent(); + client.stop(); + this.tracker.remove(torrent); + } + rebalanceRates(); + } + + public void trackAndSeed(String dir, String topologyId) throws IOException, NoSuchAlgorithmException, InterruptedException, URISyntaxException{ + + File destDir = new File(dir); + LOG.info("Generating torrent for directory: {}", destDir.getAbsolutePath()); + + URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce"); + LOG.info("Creating torrent with announce URL: {}", uri); + ArrayList files = new ArrayList(); + files.add(new File(destDir, "stormjar.jar")); + files.add(new File(destDir, "stormconf.ser")); + files.add(new File(destDir, "stormcode.ser")); + + Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus"); + File torrentFile = new File(destDir, topologyId + ".torrent"); + torrent.save(new FileOutputStream(torrentFile)); + LOG.info("Saved torrent: {}" + torrentFile.getAbsolutePath()); + this.tracker.announce(new TrackedTorrent(torrent)); + LOG.info("Torrent announced to tracker."); + Client client = new Client(this.nimbusHost, new SharedTorrent(torrent, destDir.getParentFile(), true)); + this.clients.put(topologyId, client); + rebalanceRates(); + client.share(); + LOG.info("Seeding torrent..."); + } + + +} diff --git a/storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java b/storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java new file mode 100644 index 00000000000..6c5820ac58c --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java @@ -0,0 +1,67 @@ +package backtype.storm.torrent; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.security.NoSuchAlgorithmException; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; + +import com.turn.ttorrent.client.Client; +import com.turn.ttorrent.client.Client.ClientState; +import com.turn.ttorrent.client.SharedTorrent; + +public class SupervisorPeer extends BasePeer{ + private static final Logger LOG = LoggerFactory.getLogger(SupervisorPeer.class); + + private Integer seedDuration; + + public SupervisorPeer(Map conf){ + LOG.info("Creating supervisor bt tracker."); + this.maxDownload = (Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE); + this.maxUpload = (Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE); + this.seedDuration = (Integer)conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION); + LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxDownload), format(this.maxDownload)); + } + + public void stop(String topologyId){ + LOG.info("Stopping bt client for topology {}", topologyId); + Client client = this.clients.remove(topologyId); + if(client != null){ + client.stop(); + } + rebalanceRates(); + } + + public void download(String torrentPath, String topologyId) throws IOException, NoSuchAlgorithmException{ + LOG.info("Initiating BitTorrent download."); + InetAddress netAddr = InetAddress.getLocalHost(); + File torrentFile = new File(torrentPath); + File destDir = torrentFile.getParentFile(); + LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath()); + LOG.info("Saving files to directory: {}", destDir.getAbsolutePath()); + SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); + + Client client = new Client(netAddr, st); + this.clients.put(topologyId, client); + rebalanceRates(); + client.share(this.seedDuration); + if(this.seedDuration == 0){ + client.waitForCompletion(); + } else { + LOG.info("Waiting for seeding to begin..."); + while(client.getState() != ClientState.SEEDING && client.getState() != ClientState.ERROR){ + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + LOG.info("BitTorrent download complete."); + } + +}