Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-150: Replace jar distribution strategy with bittorrent #71

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
567f72a
add ttorrent BitTorrent client dependency to storm core
ptgoetz Jul 17, 2013
abe3414
Add bittorrent tracker and client code for use by nimbus and supervisor
ptgoetz Jul 17, 2013
dcb609a
change nimbus and supervisor to use bittorrent for jar file distribut…
ptgoetz Jul 17, 2013
f70022d
add bittorrent configuration defaults
ptgoetz Jul 17, 2013
539cf33
revert some changes that got lost by a stash apply
ptgoetz Jul 17, 2013
4a76e4c
add configuration values for bittorrent rate limits and seeding
ptgoetz Jul 24, 2013
17f1acc
implement configurable bitorrent rate limits and seeding for nimbus/s…
ptgoetz Jul 24, 2013
3b56e94
modify nimbus/supervisor to exchange all topology code (.jar, .ser) v…
ptgoetz Jul 24, 2013
bf5de3f
fix local mode supervisor
ptgoetz Sep 14, 2013
dec77b7
only start bittorrent tracker in distributed mode.
ptgoetz Sep 14, 2013
efd84a6
only start Nimbus bittorrent tracker in distributed mode.
ptgoetz Sep 14, 2013
2209da9
address issues identified in #629 review.
ptgoetz Sep 26, 2013
3b8059c
synchronize rebalanceRates() method and correct indentation.
ptgoetz Sep 26, 2013
24ae037
Rename BaseTracker and SupervisorTracker to BasePeer and SupervisorPeer
ptgoetz Sep 27, 2013
755bebf
organize imports
ptgoetz Sep 27, 2013
f784ef3
Rename BaseTracker, SupervisorTracker --> BasePeer, SupervisorPeer
ptgoetz Sep 27, 2013
ff79d6f
exclude log4j dependencies from ttorrent dependency
ptgoetz Oct 8, 2013
6b85db9
Merge branch 'master' into bittorrent
ptgoetz Mar 13, 2014
4df45cb
update dependencies and clojure code
ptgoetz Mar 13, 2014
cc56dac
fix for multilang resources being extracted to the wrong location
ptgoetz Apr 10, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ storm.messaging.transport: "backtype.storm.messaging.netty.Context"
### nimbus.* configs are for the master
nimbus.host: "localhost"
nimbus.thrift.port: 6627
nimbus.bittorrent.port: 6969
nimbus.bittorrent.bind.address: 0.0.0.0
nimbus.bittorrent.max.upload.rate: 0.0
nimbus.bittorrent.max.download.rate: 0.0
nimbus.thrift.max_buffer_size: 1048576
nimbus.childopts: "-Xmx1024m"
nimbus.task.timeout.secs: 30
Expand Down Expand Up @@ -90,6 +94,10 @@ supervisor.monitor.frequency.secs: 3
supervisor.heartbeat.frequency.secs: 5
supervisor.enable: true

supervisor.bittorrent.max.upload.rate: 0.0
supervisor.bittorrent.max.download.rate: 0.0
supervisor.bittorrent.seed.duration: 0

### worker.* configs are for task workers
worker.childopts: "-Xmx768m"
worker.heartbeat.frequency.secs: 1
Expand Down
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
<clojure.version>1.4.0</clojure.version>
<compojure.version>1.1.3</compojure.version>
<hiccup.version>0.3.6</hiccup.version>
<commons-io.verson>1.4</commons-io.verson>
<commons-io.verson>2.4</commons-io.verson>
<commons-lang.version>2.5</commons-lang.version>
<commons-exec.version>1.1</commons-exec.version>
<clj-time.version>0.4.1</clj-time.version>
Expand Down Expand Up @@ -412,6 +412,17 @@
<artifactId>netty</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.turn</groupId>
<artifactId>ttorrent</artifactId>
<version>1.4</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>tools.nrepl</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions storm-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@
</dependency>

<!--java-->
<dependency>
<groupId>com.turn</groupId>
<artifactId>ttorrent</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down
10 changes: 8 additions & 2 deletions storm-core/src/clj/backtype/storm/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@
([conf storm-id]
(str (master-stormdist-root conf) file-path-separator storm-id)))

(defn master-stormtorrent-path [stormroot storm-id]
(str stormroot file-path-separator storm-id ".torrent"))

(defn master-stormjar-path [stormroot]
(str stormroot file-path-separator "stormjar.jar"))

Expand Down Expand Up @@ -165,8 +168,11 @@
([conf storm-id]
(str (supervisor-stormdist-root conf) file-path-separator (java.net.URLEncoder/encode storm-id))))

(defn supervisor-stormjar-path [stormroot]
(str stormroot file-path-separator "stormjar.jar"))
(defn supervisor-stormtorrent-path [stormroot storm-id]
(str stormroot file-path-separator storm-id ".torrent"))

(defn supervisor-stormjar-path [stormroot storm-id]
(str stormroot file-path-separator storm-id file-path-separator "stormjar.jar"))

(defn supervisor-stormcode-path [stormroot]
(str stormroot file-path-separator "stormcode.ser"))
Expand Down
19 changes: 17 additions & 2 deletions storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
(:use [backtype.storm.scheduler.DefaultScheduler])
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
(:import [backtype.storm.torrent NimbusTracker])
(:use [backtype.storm bootstrap util])
(:use [backtype.storm.config :only [validate-configs-with-schemas]])
(:use [backtype.storm.daemon common])
Expand Down Expand Up @@ -59,6 +60,8 @@
scheduler
))

(defmulti mk-bt-tracker cluster-mode)

(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
Expand All @@ -76,6 +79,7 @@
(halt-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
:bt-tracker (mk-bt-tracker conf)
}))

(defn inbox [nimbus]
Expand Down Expand Up @@ -306,13 +310,14 @@
;; need to somehow maintain stream/component ids inside tuples
topology)

(defn- setup-storm-code [conf storm-id tmp-jar-location storm-conf topology]
(defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf topology]
(let [stormroot (master-stormdist-root conf storm-id)]
(FileUtils/forceMkdir (File. stormroot))
(FileUtils/cleanDirectory (File. stormroot))
(setup-jar conf tmp-jar-location stormroot)
(FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
(FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf))
(if (:bt-tracker nimbus) (.trackAndSeed (:bt-tracker nimbus) stormroot storm-id))
))

(defn- read-storm-topology [conf storm-id]
Expand Down Expand Up @@ -830,6 +835,7 @@
(when-not (empty? to-cleanup-ids)
(doseq [id to-cleanup-ids]
(log-message "Cleaning up " id)
(if (:bt-tracker nimbus) (.stop (:bt-tracker nimbus) id))
(.teardown-heartbeats! storm-cluster-state id)
(.teardown-topology-errors! storm-cluster-state id)
(rmr (master-stormdist-root conf id))
Expand Down Expand Up @@ -955,7 +961,7 @@
;; lock protects against multiple topologies being submitted at once and
;; cleanup thread killing topology in b/w assignment and starting the topology
(locking (:submit-lock nimbus)
(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
(setup-storm-code nimbus conf storm-id uploadedJarLocation storm-conf topology)
(.setup-heartbeats! storm-cluster-state storm-id)
(let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
TopologyInitialStatus/ACTIVE :active}]
Expand Down Expand Up @@ -1177,12 +1183,21 @@
(FileUtils/copyFile src-file (File. (master-stormjar-path stormroot)))
))

(defmethod mk-bt-tracker :distributed [conf]
(NimbusTracker. conf)
)


;; local implementation

(defmethod setup-jar :local [conf & args]
nil
)

(defmethod mk-bt-tracker :local [conf]
nil
)

(defn -launch [nimbus]
(launch-server! (read-storm-config) nimbus))

Expand Down
34 changes: 23 additions & 11 deletions storm-core/src/clj/backtype/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
;; limitations under the License.
(ns backtype.storm.daemon.supervisor
(:import [backtype.storm.scheduler ISupervisor])
(:import [backtype.storm.torrent SupervisorPeer])
(:use [backtype.storm bootstrap])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.daemon [worker :as worker]])
Expand All @@ -24,6 +25,7 @@
(bootstrap)

(defmulti download-storm-code cluster-mode)
(defmulti mk-bt-tracker cluster-mode)
(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))

;; used as part of a map from port to this
Expand Down Expand Up @@ -199,6 +201,7 @@
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
))
:bt-tracker (mk-bt-tracker conf)
})

(defn sync-processes [supervisor]
Expand Down Expand Up @@ -237,6 +240,9 @@
". Current supervisor time: " now
". State: " state
", Heartbeat: " (pr-str heartbeat))
(if (:bt-tracker supervisor)
(.stop (:bt-tracker supervisor) (:storm-id heartbeat))
)
(shutdown-worker supervisor id)
))
(doseq [id (vals new-worker-ids)]
Expand Down Expand Up @@ -322,7 +328,7 @@
storm-id
" from "
master-code-dir)
(download-storm-code conf storm-id master-code-dir)
(download-storm-code conf storm-id master-code-dir supervisor)
(log-message "Finished downloading code for storm id "
storm-id
" from "
Expand Down Expand Up @@ -425,26 +431,27 @@
;; distributed implementation

(defmethod download-storm-code
:distributed [conf storm-id master-code-dir]
:distributed [conf storm-id master-code-dir supervisor]
;; Downloading to permanent location is atomic
(let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
stormroot (supervisor-stormdist-root conf storm-id)]
(FileUtils/forceMkdir (File. tmproot))

(Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
(Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
(Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
(extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
(FileUtils/forceMkdir (File. (supervisor-stormdist-root conf)))
(Utils/downloadFromMaster conf (master-stormtorrent-path master-code-dir storm-id) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id))
(.download (:bt-tracker supervisor) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id) storm-id)
(extract-dir-from-jar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id) RESOURCES-SUBDIR stormroot)
))

(defmethod mk-bt-tracker
:distributed [conf]
(SupervisorPeer. conf)
)

(defmethod launch-worker
:distributed [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
storm-home (System/getProperty "storm.home")
stormroot (supervisor-stormdist-root conf storm-id)
stormjar (supervisor-stormjar-path stormroot)
stormjar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id)
storm-conf (read-supervisor-storm-conf conf storm-id)
classpath (add-to-classpath (current-classpath) [stormjar])
childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS))
Expand Down Expand Up @@ -475,7 +482,7 @@
first ))

(defmethod download-storm-code
:local [conf storm-id master-code-dir]
:local [conf storm-id master-code-dir supervisor]
(let [stormroot (supervisor-stormdist-root conf storm-id)]
(FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
(let [classloader (.getContextClassLoader (Thread/currentThread))
Expand All @@ -494,6 +501,11 @@
))
)))

(defmethod mk-bt-tracker
:local [conf]
nil
)

(defmethod launch-worker
:local [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
Expand Down
46 changes: 46 additions & 0 deletions storm-core/src/jvm/backtype/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,31 @@ public class Config extends HashMap<String, Object> {
public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
public static final Object NIMBUS_THRIFT_PORT_SCHEMA = Number.class;

/**
* Which port the Nimbus BitTorrent tracker should bind to.
*/
public static final String NIMBUS_BITTORRENT_PORT = "nimbus.bittorrent.port";
public static final Object NIMBUS_BITTORRENT_PORT_SCHEMA = Number.class;

/**
* Which network interface the BitTorrent tracker should listen on. The
* default of 0.0.0.0 will listen on all interfaces. This can be an IP
* address or a hostname.
*/
public static final String NIMBUS_BITTORRENT_BIND_ADDRESS = "nimbus.bittorrent.bind.address";
public static final Object NIMBUS_BITTORRENT_BIND_ADDRESS_SCHEMA = String.class;

/**
* Max upload rate for topology torrents in kB/sec. 0.0 == unlimited.
*/
public static final String NIMBUS_BITTORRENT_MAX_UPLOAD_RATE = "nimbus.bittorrent.max.upload.rate";
public static final Object NIMBUS_BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class;

/**
* Max download rate for topology torrents in kB/sec. 0.0 == unlimited.
*/
public static final String NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE = "nimbus.bittorrent.max.download.rate";
public static final Object NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class;
/**
* The maximum buffer size thrift should use when reading messages.
*/
Expand Down Expand Up @@ -453,6 +478,27 @@ public class Config extends HashMap<String, Object> {
*/
public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class;

/**
* Max upload rate for topology torrents in kB/sec. 0.0 == unlimited.
*/
public static final String SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE = "supervisor.bittorrent.max.upload.rate";
public static final Object SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class;

/**
* Max download rate for topology torrents in kB/sec. 0.0 == unlimited.
*/
public static final String SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE = "supervisor.bittorrent.max.download.rate";
public static final Object SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class;

/**
* Time in seconds that a supervisor should seed after completing a topology torrent download.
* A value of 0 will disable seeding (download only). A value of -1 indicates that the supervisor
* should seed indefinitely (until the topology is killed).
*/
public static final String SUPERVISOR_BITTORRENT_SEED_DURATION = "supervisor.bittorrent.seed.duration";
public static final Object SUPERVISOR_BITTORRENT_SEED_DURATION_SCHEMA = Number.class;


/**
* The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings are replaced
Expand Down
34 changes: 34 additions & 0 deletions storm-core/src/jvm/backtype/storm/torrent/BasePeer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package backtype.storm.torrent;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs the Apache Header.


import java.util.HashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.turn.ttorrent.client.Client;

public abstract class BasePeer {
private static final Logger LOG = LoggerFactory.getLogger(BasePeer.class);

protected HashMap<String, Client> clients = new HashMap<String, Client>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not very clear here what the key is to the clients. Could we either have a comment that this is topology ID, or a clearer name.

protected Double maxDownload;
protected Double maxUpload;

protected synchronized void rebalanceRates(){
int clientCount = this.clients.size();
if(clientCount > 0){
double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount;
double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount;
LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount);
LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl));
for(Client client : this.clients.values()) {
client.setMaxDownloadRate(maxDl);
client.setMaxUploadRate(maxUl);
}
}
}

protected static String format(double val){
return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val);
}
}
Loading