diff --git a/storm-core/src/clj/backtype/storm/bootstrap.clj b/storm-core/src/clj/backtype/storm/bootstrap.clj index 70b80057e..3dba99789 100644 --- a/storm-core/src/clj/backtype/storm/bootstrap.clj +++ b/storm-core/src/clj/backtype/storm/bootstrap.clj @@ -41,6 +41,7 @@ (import (quote [backtype.storm.daemon.common StormBase Assignment SupervisorInfo WorkerHeartbeat])) (import (quote [backtype.storm.grouping CustomStreamGrouping])) + (import (quote [backtype.storm.utils HostPort])) (import (quote [java.io File FileOutputStream FileInputStream])) (import (quote [java.util Collection List Random Map HashMap Collections ArrayList LinkedList])) (import (quote [org.apache.commons.io FileUtils])) diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 7231b15d6..8ccf72c3f 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -5,7 +5,6 @@ (:use [backtype.storm util log config]) (:require [backtype.storm [zookeeper :as zk]]) (:require [backtype.storm.daemon [common :as common]]) - ) (defprotocol ClusterState @@ -120,11 +119,17 @@ (remove-storm! [this storm-id]) (report-error [this storm-id task-id error]) (errors [this storm-id task-id]) + (nimbus-info [this]) ;; fetch nimbus host + port as HostPort + (set-nimbus! [this info]) ;; announce nimbus host+port + (ui-port [this]) ;; fetch ui port as Integer + (set-ui-port! [this info]) ;; announce ui port (disconnect [this]) ) +(def NIMBUS-ROOT "nimbus") +(def UI-ROOT "ui") (def ASSIGNMENTS-ROOT "assignments") (def CODE-ROOT "code") (def STORMS-ROOT "storms") @@ -132,6 +137,8 @@ (def WORKERBEATS-ROOT "workerbeats") (def ERRORS-ROOT "errors") +(def NIMBUS-SUBTREE (str "/" NIMBUS-ROOT)) +(def UI-SUBTREE (str "/" UI-ROOT)) (def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT)) (def STORMS-SUBTREE (str "/" STORMS-ROOT)) (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT)) @@ -222,6 +229,18 @@ (reify StormClusterState + (nimbus-info [this] + (maybe-deserialize (get-data cluster-state NIMBUS-SUBTREE false))) + + (set-nimbus! [this info] + (set-data cluster-state NIMBUS-SUBTREE (Utils/serialize info))) + + (ui-port [this] + (maybe-deserialize (get-data cluster-state UI-SUBTREE false))) + + (set-ui-port! [this info] + (set-data cluster-state UI-SUBTREE (Utils/serialize info))) + (assignments [this callback] (when callback (reset! assignments-callback callback)) diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj index d1456eaa6..c44b5802d 100644 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ b/storm-core/src/clj/backtype/storm/daemon/common.clj @@ -47,8 +47,6 @@ (def LS-LOCAL-ASSIGNMENTS "local-assignments") (def LS-APPROVED-WORKERS "approved-workers") - - (defrecord WorkerHeartbeat [time-secs storm-id executors port]) (defrecord ExecutorStats [^long processed @@ -331,3 +329,4 @@ (->> executor->node+port (mapcat (fn [[e node+port]] (for [t (executor-id->tasks e)] [t node+port]))) (into {}))) + diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index e58aeedd0..c27c40d99 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -3,6 +3,7 @@ (:import [org.apache.thrift7.protocol TBinaryProtocol TBinaryProtocol$Factory]) (:import [org.apache.thrift7 TException]) (:import [org.apache.thrift7.transport TNonblockingServerTransport TNonblockingServerSocket]) + (:import [java.net InetAddress]) (:import [java.nio ByteBuffer]) (:import [java.io FileNotFoundException]) (:import [java.nio.channels Channels WritableByteChannel]) @@ -1125,10 +1126,25 @@ (waiting? [this] (timer-waiting? (:timer nimbus)))))) -(defn launch-server! [conf nimbus] +(defn announce-nimbus-info [nimbus host port] + (let [storm-cluster-state (:storm-cluster-state nimbus) + nimbus-host-port (HostPort. host port)] + (.set-nimbus! storm-cluster-state nimbus-host-port))) + +(defn config-with-nimbus-port-assigned [conf] + (let [port_in_conf (.intValue (Integer. (conf NIMBUS-THRIFT-PORT))) + nimbus_port (assign-server-port port_in_conf)] + (assoc (assoc conf + NIMBUS-THRIFT-PORT (Integer. nimbus_port)) + NIMBUS-HOST (.getCanonicalHostName (InetAddress/getLocalHost))))) + +(defn launch-server! [conf inimbus] (validate-distributed-mode! conf) - (let [service-handler (service-handler conf nimbus) - options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT))) + (let [extended-conf (config-with-nimbus-port-assigned conf) + nimbus-port (extended-conf NIMBUS-THRIFT-PORT) + nimbus (nimbus-data extended-conf inimbus) + service-handler (service-handler extended-conf inimbus) + options (-> (TNonblockingServerSocket. nimbus-port) (THsHaServer$Args.) (.workerThreads 64) (.protocolFactory (TBinaryProtocol$Factory.)) @@ -1136,7 +1152,8 @@ ) server (THsHaServer. options)] (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server)))) - (log-message "Starting Nimbus server...") + (log-message "Starting Nimbus server with port " nimbus-port) + (announce-nimbus-info nimbus (extended-conf NIMBUS-HOST) nimbus-port) (.serve server))) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 99d0be0b9..1cf1520e4 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -317,68 +317,76 @@ (.add processes-event-manager sync-processes) ))) +(defn assign-worker-ports [conf] + (let [new-ports (vec (for [port (get conf SUPERVISOR-SLOTS-PORTS)] (assign-server-port port)))] + (assoc conf SUPERVISOR-SLOTS-PORTS new-ports) + )) + ;; in local state, supervisor stores who its current assignments are ;; another thread launches events to restart any dead processes if necessary (defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor] - (log-message "Starting Supervisor with conf " conf) - (.prepare isupervisor conf (supervisor-isupervisor-dir conf)) - (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf))) - (let [supervisor (supervisor-data conf shared-context isupervisor) - [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)] - sync-processes (partial sync-processes supervisor) - synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager) - heartbeat-fn (fn [] (.supervisor-heartbeat! - (:storm-cluster-state supervisor) - (:supervisor-id supervisor) - (SupervisorInfo. (current-time-secs) - (:my-hostname supervisor) - (:assignment-id supervisor) - (keys @(:curr-assignment supervisor)) - ;; used ports - (.getMetadata isupervisor) + (let [nimbusHostPort (.nimbus-info (cluster/mk-storm-cluster-state conf)) + conf (assoc (assoc conf NIMBUS-HOST (.host nimbusHostPort)) NIMBUS-THRIFT-PORT (.port nimbusHostPort)) + conf (assign-worker-ports conf)] + (log-message "Starting Supervisor with conf " conf) + (.prepare isupervisor conf (supervisor-isupervisor-dir conf)) + (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf))) + (let [supervisor (supervisor-data conf shared-context isupervisor) + [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)] + sync-processes (partial sync-processes supervisor) + synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager) + heartbeat-fn (fn [] (.supervisor-heartbeat! + (:storm-cluster-state supervisor) + (:supervisor-id supervisor) + (SupervisorInfo. (current-time-secs) + (:my-hostname supervisor) + (:assignment-id supervisor) + (keys @(:curr-assignment supervisor)) + ;; used ports + (.getMetadata isupervisor) (conf SUPERVISOR-SCHEDULER-META) ((:uptime supervisor)))))] - (heartbeat-fn) - ;; should synchronize supervisor so it doesn't launch anything after being down (optimization) - (schedule-recurring (:timer supervisor) - 0 - (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS) - heartbeat-fn) - (when (conf SUPERVISOR-ENABLE) - ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up - ;; to date even if callbacks don't all work exactly right - (schedule-recurring (:timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor))) + (heartbeat-fn) + ;; should synchronize supervisor so it doesn't launch anything after being down (optimization) (schedule-recurring (:timer supervisor) 0 - (conf SUPERVISOR-MONITOR-FREQUENCY-SECS) - (fn [] (.add processes-event-manager sync-processes)))) - (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor)) - (reify - Shutdownable - (shutdown [this] - (log-message "Shutting down supervisor " (:supervisor-id supervisor)) - (reset! (:active supervisor) false) + (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS) + heartbeat-fn) + (when (conf SUPERVISOR-ENABLE) + ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up + ;; to date even if callbacks don't all work exactly right + (schedule-recurring (:timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor))) + (schedule-recurring (:timer supervisor) + 0 + (conf SUPERVISOR-MONITOR-FREQUENCY-SECS) + (fn [] (.add processes-event-manager sync-processes)))) + (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor)) + (reify + Shutdownable + (shutdown [this] + (log-message "Shutting down supervisor " (:supervisor-id supervisor)) + (reset! (:active supervisor) false) (cancel-timer (:timer supervisor)) (.shutdown event-manager) (.shutdown processes-event-manager) (.disconnect (:storm-cluster-state supervisor))) - SupervisorDaemon - (get-conf [this] - conf) - (get-id [this] - (:supervisor-id supervisor)) - (shutdown-all-workers [this] - (let [ids (my-worker-ids conf)] - (doseq [id ids] - (shutdown-worker supervisor id) - ))) - DaemonCommon - (waiting? [this] - (or (not @(:active supervisor)) - (and - (timer-waiting? (:timer supervisor)) - (every? (memfn waiting?) managers))) - )))) + SupervisorDaemon + (get-conf [this] + conf) + (get-id [this] + (:supervisor-id supervisor)) + (shutdown-all-workers [this] + (let [ids (my-worker-ids conf)] + (doseq [id ids] + (shutdown-worker supervisor id) + ))) + DaemonCommon + (waiting? [this] + (or (not @(:active supervisor)) + (and + (timer-waiting? (:timer supervisor)) + (every? (memfn waiting?) managers))) + ))))) (defn kill-supervisor [supervisor] (.shutdown supervisor) diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj index 2eb92d814..0bb25f5f9 100644 --- a/storm-core/src/clj/backtype/storm/testing.clj +++ b/storm-core/src/clj/backtype/storm/testing.clj @@ -5,6 +5,7 @@ [common :as common] [worker :as worker] [executor :as executor]]) + (:require [backtype.storm.ui [core :as ui]]) (:require [backtype.storm [process-simulator :as psim]]) (:import [org.apache.commons.io FileUtils]) (:import [java.io File]) @@ -109,6 +110,8 @@ {STORM-CLUSTER-MODE "local" STORM-ZOOKEEPER-PORT zk-port STORM-ZOOKEEPER-SERVERS ["localhost"]}) + daemon-conf (nimbus/config-with-nimbus-port-assigned daemon-conf) + ui-conf (ui/config-with-ui-port-assigned daemon-conf) nimbus-tmp (local-temp-path) port-counter (mk-counter supervisor-slot-port-min) nimbus (nimbus/service-handler @@ -127,6 +130,8 @@ supervisor-confs (if (sequential? supervisors) supervisors (repeat supervisors {}))] + (nimbus/announce-nimbus-info cluster-map (daemon-conf NIMBUS-HOST) (daemon-conf NIMBUS-THRIFT-PORT)) + (ui/announce-ui-port daemon-conf) (doseq [sc supervisor-confs] (add-supervisor cluster-map :ports ports-per-supervisor :conf sc)) cluster-map diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index 4cb3660f0..ff8b19382 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -7,6 +7,7 @@ (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID system-id?]]]) (:use [ring.adapter.jetty :only [run-jetty]]) (:use [clojure.string :only [trim]]) + (:use [backtype.storm bootstrap]) (:import [backtype.storm.generated ExecutorSpecificStats ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats ErrorInfo ClusterSummary SupervisorSummary TopologySummary @@ -20,10 +21,10 @@ (:import [org.apache.commons.lang StringEscapeUtils]) (:gen-class)) -(def ^:dynamic *STORM-CONF* (read-storm-config)) +(bootstrap) -(defmacro with-nimbus [nimbus-sym & body] - `(thrift/with-nimbus-connection [~nimbus-sym (*STORM-CONF* NIMBUS-HOST) (*STORM-CONF* NIMBUS-THRIFT-PORT)] +(defmacro with-nimbus [conf-sym nimbus-sym & body] + `(thrift/with-nimbus-connection [~nimbus-sym (~conf-sym NIMBUS-HOST) (~conf-sym NIMBUS-THRIFT-PORT)] ~@body )) @@ -122,8 +123,9 @@ (sorted-table ["Key" "Value"] (map #(vector (key %) (str (val %))) conf))) -(defn main-page [] - (with-nimbus nimbus +(defn main-page [conf] + (log-message "UI main page being constructed ...") + (with-nimbus conf nimbus (let [summ (.getClusterInfo ^Nimbus$Client nimbus)] (concat [[:h2 "Cluster Summary"]] @@ -470,8 +472,8 @@ (StringEscapeUtils/escapeJavaScript name) "', '" command "', " is-wait ", " default-wait ")")}]) -(defn topology-page [id window include-sys?] - (with-nimbus nimbus +(defn topology-page [conf id window include-sys?] + (with-nimbus conf nimbus (let [window (if window window ":all-time") window-hint (window-hint window) summ (.getTopologyInfo ^Nimbus$Client nimbus id) @@ -705,8 +707,8 @@ :sort-list "[[0,1]]" ))) -(defn component-page [topology-id component window include-sys?] - (with-nimbus nimbus +(defn component-page [conf topology-id component window include-sys?] + (with-nimbus conf nimbus (let [window (if window window ":all-time") summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id) topology (.getTopology ^Nimbus$Client nimbus topology-id) @@ -732,36 +734,37 @@ sys? (if (or (nil? sys?) (= "false" (:value sys?))) false true)] sys?)) -(defroutes main-routes +(defn main-routes [conf] + (routes (GET "/" [:as {cookies :cookies}] - (-> (main-page) + (-> (main-page conf) ui-template)) (GET "/topology/:id" [:as {cookies :cookies} id & m] (let [include-sys? (get-include-sys? cookies)] - (-> (topology-page id (:window m) include-sys?) + (-> (topology-page conf id (:window m) include-sys?) (concat [(mk-system-toggle-button include-sys?)]) ui-template))) (GET "/topology/:id/component/:component" [:as {cookies :cookies} id component & m] (let [include-sys? (get-include-sys? cookies)] - (-> (component-page id component (:window m) include-sys?) + (-> (component-page conf id component (:window m) include-sys?) (concat [(mk-system-toggle-button include-sys?)]) ui-template))) (POST "/topology/:id/activate" [id] - (with-nimbus nimbus + (with-nimbus conf nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) name (.get_name tplg)] (.activate nimbus name) (log-message "Activating topology '" name "'"))) (resp/redirect (str "/topology/" id))) (POST "/topology/:id/deactivate" [id] - (with-nimbus nimbus + (with-nimbus conf nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) name (.get_name tplg)] (.deactivate nimbus name) (log-message "Deactivating topology '" name "'"))) (resp/redirect (str "/topology/" id))) (POST "/topology/:id/rebalance/:wait-time" [id wait-time] - (with-nimbus nimbus + (with-nimbus conf nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) name (.get_name tplg) options (RebalanceOptions.)] @@ -770,7 +773,7 @@ (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs"))) (resp/redirect (str "/topology/" id))) (POST "/topology/:id/kill/:wait-time" [id wait-time] - (with-nimbus nimbus + (with-nimbus conf nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) name (.get_name tplg) options (KillOptions.)] @@ -779,7 +782,7 @@ (log-message "Killing topology '" name "' with wait time: " wait-time " secs"))) (resp/redirect (str "/topology/" id))) (route/resources "/") - (route/not-found "Page not found")) + (route/not-found "Page not found"))) (defn exception->html [ex] (concat @@ -798,12 +801,25 @@ (resp/content-type "text/html")) )))) -(def app - (-> #'main-routes - (wrap-reload '[backtype.storm.ui.core]) - catch-errors)) +(defn app [conf] + (-> conf + main-routes + (wrap-reload '[backtype.storm.ui.core]) + catch-errors)) -(defn start-server! [] (run-jetty app {:port (Integer. (*STORM-CONF* UI-PORT)) - :join? false})) +(defn config-with-ui-port-assigned [conf] + (let [port-in-conf (.intValue (Integer. (conf UI-PORT))) + ui-port (assign-server-port port-in-conf)] + (assoc conf UI-PORT (Integer. ui-port)))) -(defn -main [] (start-server!)) +(defn announce-ui-port [conf] + (let [state (cluster/mk-storm-cluster-state conf) + ui-port (conf UI-PORT)] + (.set-ui-port! state ui-port))) + +(defn start-server! [conf] + (let [conf (config-with-ui-port-assigned conf)] + (announce-ui-port conf) + (run-jetty (app conf) {:port (conf UI-PORT) :join? false}))) + +(defn -main [] (start-server! (read-storm-config))) diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj index d9383873d..380631ee6 100644 --- a/storm-core/src/clj/backtype/storm/util.clj +++ b/storm-core/src/clj/backtype/storm/util.clj @@ -1,5 +1,5 @@ (ns backtype.storm.util - (:import [java.net InetAddress]) + (:import [java.net InetAddress ServerSocket]) (:import [java.util Map Map$Entry List ArrayList Collection Iterator HashMap]) (:import [java.io FileReader]) (:import [backtype.storm Config]) @@ -833,3 +833,13 @@ (meta form)) (list form x))) ([x form & more] `(-<> (-<> ~x ~form) ~@more))) + +; allocate an available server port if port_in_conf<=0 +(defn assign-server-port [port] + (if (pos? port) port + (let [serverSocket (ServerSocket. 0) + port_assigned (.getLocalPort serverSocket)] + (.close serverSocket) + (log-message "Grabbed port number " port_assigned " instead of the configured port " port) + port_assigned))) + diff --git a/storm-core/src/jvm/backtype/storm/utils/HostPort.java b/storm-core/src/jvm/backtype/storm/utils/HostPort.java new file mode 100644 index 000000000..232c935b0 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/utils/HostPort.java @@ -0,0 +1,26 @@ +package backtype.storm.utils; + +import java.io.Serializable; + +public class HostPort implements Serializable { + private static final long serialVersionUID = -4903345013121505849L; + String _host; + int _port; + + public HostPort(String host, int port) { + _host = host; + _port = port; + } + + public String host() { + return _host; + } + + public int port() { + return _port; + } + + public String toString() { + return _host + ":" + _port; + } +} diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java index 8869b9d61..e28ec111c 100644 --- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java +++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java @@ -3,11 +3,15 @@ import backtype.storm.Config; import backtype.storm.security.auth.ThriftClient; import backtype.storm.generated.Nimbus; + +import java.util.List; import java.util.Map; import org.apache.thrift7.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.netflix.curator.framework.CuratorFramework; + public class NimbusClient extends ThriftClient { private Nimbus.Client _client; private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class); @@ -16,6 +20,26 @@ public static NimbusClient getConfiguredClient(Map conf) { try { String nimbusHost = (String) conf.get(Config.NIMBUS_HOST); int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)); + if (nimbusPort <= 0) { + List zk_hosts = (List)conf.get(Config.STORM_ZOOKEEPER_SERVERS); + Object zk_port = conf.get(Config.STORM_ZOOKEEPER_PORT); + CuratorFramework zk_fw = Utils.newCurator(conf, + zk_hosts,zk_port, + (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), + null); //auth info + zk_fw.start(); + try { + byte[] zk_data = zk_fw.getData().forPath("/nimbus"); + HostPort nimbus_host_port = (HostPort) Utils.deserialize(zk_data); + LOG.debug("Nimbus spec at ZK: "+nimbus_host_port); + nimbusHost = nimbus_host_port.host(); + nimbusPort = nimbus_host_port.port(); + } catch (Exception e) { + LOG.warn("Failure in obtaining Nimbus host/port from Zookeeper "+zk_hosts+" port "+zk_port, e); + } finally { + zk_fw.close(); + } + } return new NimbusClient(conf, nimbusHost, nimbusPort); } catch (TTransportException ex) { throw new RuntimeException(ex); diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index a31402e5a..f2974da88 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -274,8 +274,10 @@ public static Integer getInt(Object o) { return (Integer) o; } else if (o instanceof Short) { return ((Short) o).intValue(); + } else if (o instanceof String) { + return Integer.parseInt((String) o); } else { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); + throw new IllegalArgumentException("Don't know how to convert " + o + " (class "+o.getClass().getName()+") to int"); } } diff --git a/storm-core/test/clj/backtype/storm/dynamic_port_test.clj b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj new file mode 100644 index 000000000..02af3e76c --- /dev/null +++ b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj @@ -0,0 +1,100 @@ +(ns backtype.storm.dynamic-port-test + (:use [clojure test]) + (:require [backtype.storm.daemon [nimbus :as nimbus]]) + (:require [backtype.storm.ui [core :as ui]]) + (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter]) + (:import [backtype.storm.security.auth ThriftServer]) + (:import [backtype.storm.scheduler INimbus]) + (:import [backtype.storm.utils NimbusClient]) + (:use [backtype.storm bootstrap testing]) + (:use [backtype.storm.daemon common]) + ) + +(bootstrap) + +(defn launch-nimbus-server [conf ^Nimbus$Iface service-handler] + (let [port (Integer. (conf NIMBUS-THRIFT-PORT)) + server (ThriftServer. conf (Nimbus$Processor. service-handler) port)] + (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server)))) + (.start (Thread. #(.serve server))) + (wait-for-condition #(.isServing server)) + server )) + +(defmacro with-server [[cluster-sym & args] & body] + `(let [~cluster-sym (mk-local-storm-cluster ~@args) + conf# (:daemon-conf ~cluster-sym) + service-handler# (:nimbus ~cluster-sym) + server# (launch-nimbus-server conf# service-handler#)] + (try + ~@body + (catch Throwable t# + (log-error t# "Error in cluster") + (throw t#) + ) + (finally + (try + (.stop server#) + (kill-local-storm-cluster ~cluster-sym) + (catch Throwable t1#)))) + )) + +(deftest test-dynamic-ports-for-nimbus-n-supervisors + (with-server [cluster + :supervisors 4 + :ports-per-supervisor [ 0 0 ] + :daemon-conf {STORM-LOCAL-MODE-ZMQ true + NIMBUS-THRIFT-PORT 0}] + (let [conf (:daemon-conf cluster) + topology (thrift/mk-topology + {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)} + {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.) + :parallelism-hint 6)}) + results (complete-topology cluster + topology + ;; important for test that + ;; #tuples = multiple of 4 and 6 + :storm-conf {TOPOLOGY-WORKERS 3} + :mock-sources {"1" [["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ]})] + (is (pos? (Integer. (conf NIMBUS-THRIFT-PORT)))) + (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]])) + (read-tuples results "2")))))) + +(deftest test-ui-access-nimbus-server + (with-server [cluster + :supervisors 0 + :daemon-conf {STORM-LOCAL-MODE-ZMQ true + NIMBUS-THRIFT-PORT -1 + UI-PORT 0}] + (let [conf (:daemon-conf cluster) + ui-server-app (ui/app conf) + req {:uri "/" :request-method :get} + resp (ui-server-app req)] + (is (pos? (Integer. (conf NIMBUS-THRIFT-PORT)))) + (is (= 200 (:status resp))) + (is (pos? (.indexOf (:body resp) "Cluster Summary")))))) + +(deftest test-client-access-nimbus-server + (with-server [cluster + :supervisors 0 + :daemon-conf {STORM-LOCAL-MODE-ZMQ true + NIMBUS-THRIFT-PORT -2}] + (let [conf (assoc (:daemon-conf cluster) NIMBUS-THRIFT-PORT 0) + nimbus (NimbusClient/getConfiguredClient conf) + client (.getClient nimbus)] + (testing "Accessing Nimbus without knowing host/port" + (is (thrown-cause? NotAliveException + (.activate client "non_existing_topology")))) + (.close nimbus)))) +