From c613f10ed0f824304ac4b5b1a9d3cc67ef3f4caa Mon Sep 17 00:00:00 2001 From: anfeng Date: Fri, 5 Jul 2013 17:53:06 -0700 Subject: [PATCH 1/8] enable nimbus to be asisgned with available port, instead of preconfifured port --- .../src/clj/backtype/storm/bootstrap.clj | 2 +- storm-core/src/clj/backtype/storm/cluster.clj | 10 ++ .../src/clj/backtype/storm/daemon/common.clj | 13 ++- .../src/clj/backtype/storm/daemon/nimbus.clj | 25 ++++- .../clj/backtype/storm/daemon/supervisor.clj | 102 +++++++++--------- storm-core/src/clj/backtype/storm/testing.clj | 2 + .../clj/backtype/storm/dynamic_port_test.clj | 41 +++++++ 7 files changed, 139 insertions(+), 56 deletions(-) create mode 100644 storm-core/test/clj/backtype/storm/dynamic_port_test.clj diff --git a/storm-core/src/clj/backtype/storm/bootstrap.clj b/storm-core/src/clj/backtype/storm/bootstrap.clj index 70b80057e..ede15c667 100644 --- a/storm-core/src/clj/backtype/storm/bootstrap.clj +++ b/storm-core/src/clj/backtype/storm/bootstrap.clj @@ -39,7 +39,7 @@ KillOptions SubmitOptions RebalanceOptions JavaObject JavaObjectArg TopologyInitialStatus])) (import (quote [backtype.storm.daemon.common StormBase Assignment - SupervisorInfo WorkerHeartbeat])) + SupervisorInfo WorkerHeartbeat HostPort])) (import (quote [backtype.storm.grouping CustomStreamGrouping])) (import (quote [java.io File FileOutputStream FileInputStream])) (import (quote [java.util Collection List Random Map HashMap Collections ArrayList LinkedList])) diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 7231b15d6..fa8395718 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -120,11 +120,14 @@ (remove-storm! [this storm-id]) (report-error [this storm-id task-id error]) (errors [this storm-id task-id]) + (nimbus-info [this]) ;; fetch nimbus host + port as NimbusHostPort record + (set-nimbus! [this info]) ;; announce nimbus host+port (disconnect [this]) ) +(def NIMBUS-ROOT "nimbus") (def ASSIGNMENTS-ROOT "assignments") (def CODE-ROOT "code") (def STORMS-ROOT "storms") @@ -132,6 +135,7 @@ (def WORKERBEATS-ROOT "workerbeats") (def ERRORS-ROOT "errors") +(def NIMBUS-SUBTREE (str "/" NIMBUS-ROOT)) (def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT)) (def STORMS-SUBTREE (str "/" STORMS-ROOT)) (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT)) @@ -222,6 +226,12 @@ (reify StormClusterState + (nimbus-info [this] + (maybe-deserialize (get-data cluster-state NIMBUS-SUBTREE false))) + + (set-nimbus! [this info] + (set-data cluster-state NIMBUS-SUBTREE (Utils/serialize info))) + (assignments [this callback] (when callback (reset! assignments-callback callback)) diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj index d1456eaa6..885d495e0 100644 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ b/storm-core/src/clj/backtype/storm/daemon/common.clj @@ -1,5 +1,6 @@ (ns backtype.storm.daemon.common (:use [backtype.storm log config util]) + (:import [java.net ServerSocket]) (:import [backtype.storm.generated StormTopology InvalidTopologyException GlobalStreamId]) (:import [backtype.storm.utils Utils]) @@ -47,7 +48,7 @@ (def LS-LOCAL-ASSIGNMENTS "local-assignments") (def LS-APPROVED-WORKERS "approved-workers") - +(defrecord HostPort [host port]) (defrecord WorkerHeartbeat [time-secs storm-id executors port]) @@ -331,3 +332,13 @@ (->> executor->node+port (mapcat (fn [[e node+port]] (for [t (executor-id->tasks e)] [t node+port]))) (into {}))) + +; allocate an available server port if port_in_conf<=0 +(defn assign-server-port [port] + (if (pos? port) port + (let [serverSocket (ServerSocket. 0) + port_assigned (.getLocalPort serverSocket)] + (.close serverSocket) + (log-message "Grabbed port number " port_assigned " instead of the configured port " port) + port_assigned))) + diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index e58aeedd0..6d781a4d6 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -3,6 +3,7 @@ (:import [org.apache.thrift7.protocol TBinaryProtocol TBinaryProtocol$Factory]) (:import [org.apache.thrift7 TException]) (:import [org.apache.thrift7.transport TNonblockingServerTransport TNonblockingServerSocket]) + (:import [java.net InetAddress]) (:import [java.nio ByteBuffer]) (:import [java.io FileNotFoundException]) (:import [java.nio.channels Channels WritableByteChannel]) @@ -1125,10 +1126,25 @@ (waiting? [this] (timer-waiting? (:timer nimbus)))))) -(defn launch-server! [conf nimbus] +(defn announce-nimbus-info [nimbus host port] + (let [storm-cluster-state (:storm-cluster-state nimbus) + nimbus-host-port (HostPort. host port)] + (.set-nimbus! storm-cluster-state nimbus-host-port))) + +(defn config-with-nimbus-port-assigned [conf] + (let [port_in_conf (.intValue (Integer. (conf NIMBUS-THRIFT-PORT))) + nimbus_port (assign-server-port port_in_conf)] + (assoc (assoc conf + NIMBUS-THRIFT-PORT (.toString (Integer. nimbus_port))) + NIMBUS-HOST (.getCanonicalHostName (InetAddress/getLocalHost))))) + +(defn launch-server! [conf inimbus] (validate-distributed-mode! conf) - (let [service-handler (service-handler conf nimbus) - options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT))) + (let [extended-conf (config-with-nimbus-port-assigned conf) + nimbus-port (extended-conf NIMBUS-THRIFT-PORT) + nimbus (nimbus-data extended-conf inimbus) + service-handler (service-handler extended-conf inimbus) + options (-> (TNonblockingServerSocket. nimbus-port) (THsHaServer$Args.) (.workerThreads 64) (.protocolFactory (TBinaryProtocol$Factory.)) @@ -1136,7 +1152,8 @@ ) server (THsHaServer. options)] (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server)))) - (log-message "Starting Nimbus server...") + (log-message "Starting Nimbus server with port " nimbus-port) + (announce-nimbus-info nimbus (extended-conf NIMBUS-HOST) nimbus-port) (.serve server))) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 99d0be0b9..2cc0b2acf 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -321,64 +321,66 @@ ;; another thread launches events to restart any dead processes if necessary (defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor] (log-message "Starting Supervisor with conf " conf) - (.prepare isupervisor conf (supervisor-isupervisor-dir conf)) - (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf))) - (let [supervisor (supervisor-data conf shared-context isupervisor) - [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)] - sync-processes (partial sync-processes supervisor) - synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager) - heartbeat-fn (fn [] (.supervisor-heartbeat! - (:storm-cluster-state supervisor) - (:supervisor-id supervisor) - (SupervisorInfo. (current-time-secs) - (:my-hostname supervisor) - (:assignment-id supervisor) - (keys @(:curr-assignment supervisor)) - ;; used ports - (.getMetadata isupervisor) + (let [nimbusHostPort (.nimbus-info (cluster/mk-storm-cluster-state conf)) + conf (assoc (assoc conf NIMBUS-HOST (:host nimbusHostPort)) NIMBUS-THRIFT-PORT (:port nimbusHostPort))] + (.prepare isupervisor conf (supervisor-isupervisor-dir conf)) + (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf))) + (let [supervisor (supervisor-data conf shared-context isupervisor) + [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)] + sync-processes (partial sync-processes supervisor) + synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager) + heartbeat-fn (fn [] (.supervisor-heartbeat! + (:storm-cluster-state supervisor) + (:supervisor-id supervisor) + (SupervisorInfo. (current-time-secs) + (:my-hostname supervisor) + (:assignment-id supervisor) + (keys @(:curr-assignment supervisor)) + ;; used ports + (.getMetadata isupervisor) (conf SUPERVISOR-SCHEDULER-META) ((:uptime supervisor)))))] - (heartbeat-fn) - ;; should synchronize supervisor so it doesn't launch anything after being down (optimization) - (schedule-recurring (:timer supervisor) - 0 - (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS) - heartbeat-fn) - (when (conf SUPERVISOR-ENABLE) - ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up - ;; to date even if callbacks don't all work exactly right - (schedule-recurring (:timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor))) + (heartbeat-fn) + ;; should synchronize supervisor so it doesn't launch anything after being down (optimization) (schedule-recurring (:timer supervisor) 0 - (conf SUPERVISOR-MONITOR-FREQUENCY-SECS) - (fn [] (.add processes-event-manager sync-processes)))) - (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor)) - (reify - Shutdownable - (shutdown [this] - (log-message "Shutting down supervisor " (:supervisor-id supervisor)) - (reset! (:active supervisor) false) + (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS) + heartbeat-fn) + (when (conf SUPERVISOR-ENABLE) + ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up + ;; to date even if callbacks don't all work exactly right + (schedule-recurring (:timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor))) + (schedule-recurring (:timer supervisor) + 0 + (conf SUPERVISOR-MONITOR-FREQUENCY-SECS) + (fn [] (.add processes-event-manager sync-processes)))) + (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor)) + (reify + Shutdownable + (shutdown [this] + (log-message "Shutting down supervisor " (:supervisor-id supervisor)) + (reset! (:active supervisor) false) (cancel-timer (:timer supervisor)) (.shutdown event-manager) (.shutdown processes-event-manager) (.disconnect (:storm-cluster-state supervisor))) - SupervisorDaemon - (get-conf [this] - conf) - (get-id [this] - (:supervisor-id supervisor)) - (shutdown-all-workers [this] - (let [ids (my-worker-ids conf)] - (doseq [id ids] - (shutdown-worker supervisor id) - ))) - DaemonCommon - (waiting? [this] - (or (not @(:active supervisor)) - (and - (timer-waiting? (:timer supervisor)) - (every? (memfn waiting?) managers))) - )))) + SupervisorDaemon + (get-conf [this] + conf) + (get-id [this] + (:supervisor-id supervisor)) + (shutdown-all-workers [this] + (let [ids (my-worker-ids conf)] + (doseq [id ids] + (shutdown-worker supervisor id) + ))) + DaemonCommon + (waiting? [this] + (or (not @(:active supervisor)) + (and + (timer-waiting? (:timer supervisor)) + (every? (memfn waiting?) managers))) + ))))) (defn kill-supervisor [supervisor] (.shutdown supervisor) diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj index 2eb92d814..e7a713a14 100644 --- a/storm-core/src/clj/backtype/storm/testing.clj +++ b/storm-core/src/clj/backtype/storm/testing.clj @@ -109,6 +109,7 @@ {STORM-CLUSTER-MODE "local" STORM-ZOOKEEPER-PORT zk-port STORM-ZOOKEEPER-SERVERS ["localhost"]}) + daemon-conf (nimbus/config-with-nimbus-port-assigned daemon-conf) nimbus-tmp (local-temp-path) port-counter (mk-counter supervisor-slot-port-min) nimbus (nimbus/service-handler @@ -127,6 +128,7 @@ supervisor-confs (if (sequential? supervisors) supervisors (repeat supervisors {}))] + (nimbus/announce-nimbus-info cluster-map (daemon-conf NIMBUS-HOST) (daemon-conf NIMBUS-THRIFT-PORT)) (doseq [sc supervisor-confs] (add-supervisor cluster-map :ports ports-per-supervisor :conf sc)) cluster-map diff --git a/storm-core/test/clj/backtype/storm/dynamic_port_test.clj b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj new file mode 100644 index 000000000..4e988f822 --- /dev/null +++ b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj @@ -0,0 +1,41 @@ +(ns backtype.storm.dynamic-port-test + (:use [clojure test]) + (:require [backtype.storm.daemon [nimbus :as nimbus]]) + + (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter]) + (:import [backtype.storm.scheduler INimbus]) + (:use [backtype.storm bootstrap testing]) + (:use [backtype.storm.daemon common]) + ) + +(bootstrap) + +(deftest test-dynamic-nimbus-port + (with-simulated-time-local-cluster [cluster :supervisors 4 + :daemon-conf {STORM-LOCAL-MODE-ZMQ true + NIMBUS-THRIFT-PORT 0}] + (let [topology (thrift/mk-topology + {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)} + {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.) + :parallelism-hint 6)}) + results (complete-topology cluster + topology + ;; important for test that + ;; #tuples = multiple of 4 and 6 + :storm-conf {TOPOLOGY-WORKERS 3} + :mock-sources {"1" [["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ["a"] ["b"] + ]} + )] + (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]])) + (read-tuples results "2")))))) From af4097311ec8f968a6d9672f76aef2a3287d81c3 Mon Sep 17 00:00:00 2001 From: anfeng Date: Sat, 6 Jul 2013 13:44:25 -0700 Subject: [PATCH 2/8] UI server uses Nimbus host/port from ZK --- storm-core/src/clj/backtype/storm/ui/core.clj | 59 ++++++++++--------- .../clj/backtype/storm/dynamic_port_test.clj | 59 ++++++++++++++++--- 2 files changed, 83 insertions(+), 35 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index 4cb3660f0..906c452f6 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -7,6 +7,7 @@ (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID system-id?]]]) (:use [ring.adapter.jetty :only [run-jetty]]) (:use [clojure.string :only [trim]]) + (:use [backtype.storm bootstrap]) (:import [backtype.storm.generated ExecutorSpecificStats ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats ErrorInfo ClusterSummary SupervisorSummary TopologySummary @@ -20,12 +21,12 @@ (:import [org.apache.commons.lang StringEscapeUtils]) (:gen-class)) -(def ^:dynamic *STORM-CONF* (read-storm-config)) - -(defmacro with-nimbus [nimbus-sym & body] - `(thrift/with-nimbus-connection [~nimbus-sym (*STORM-CONF* NIMBUS-HOST) (*STORM-CONF* NIMBUS-THRIFT-PORT)] - ~@body - )) +(defmacro with-nimbus [conf-sym nimbus-sym & body] + `(let [state# (cluster/mk-storm-cluster-state ~conf-sym) + hostPort# (.nimbus-info state#)] + (thrift/with-nimbus-connection [~nimbus-sym (:host hostPort#) (Integer. (:port hostPort#))] + ~@body + ))) (defn get-filled-stats [summs] (->> summs @@ -122,8 +123,9 @@ (sorted-table ["Key" "Value"] (map #(vector (key %) (str (val %))) conf))) -(defn main-page [] - (with-nimbus nimbus +(defn main-page [conf] + (log-message "UI main page being constructed ...") + (with-nimbus conf nimbus (let [summ (.getClusterInfo ^Nimbus$Client nimbus)] (concat [[:h2 "Cluster Summary"]] @@ -470,8 +472,8 @@ (StringEscapeUtils/escapeJavaScript name) "', '" command "', " is-wait ", " default-wait ")")}]) -(defn topology-page [id window include-sys?] - (with-nimbus nimbus +(defn topology-page [conf id window include-sys?] + (with-nimbus conf nimbus (let [window (if window window ":all-time") window-hint (window-hint window) summ (.getTopologyInfo ^Nimbus$Client nimbus id) @@ -705,8 +707,8 @@ :sort-list "[[0,1]]" ))) -(defn component-page [topology-id component window include-sys?] - (with-nimbus nimbus +(defn component-page [conf topology-id component window include-sys?] + (with-nimbus conf nimbus (let [window (if window window ":all-time") summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id) topology (.getTopology ^Nimbus$Client nimbus topology-id) @@ -732,36 +734,37 @@ sys? (if (or (nil? sys?) (= "false" (:value sys?))) false true)] sys?)) -(defroutes main-routes +(defn main-routes [conf] + (routes (GET "/" [:as {cookies :cookies}] - (-> (main-page) + (-> (main-page conf) ui-template)) (GET "/topology/:id" [:as {cookies :cookies} id & m] (let [include-sys? (get-include-sys? cookies)] - (-> (topology-page id (:window m) include-sys?) + (-> (topology-page conf id (:window m) include-sys?) (concat [(mk-system-toggle-button include-sys?)]) ui-template))) (GET "/topology/:id/component/:component" [:as {cookies :cookies} id component & m] (let [include-sys? (get-include-sys? cookies)] - (-> (component-page id component (:window m) include-sys?) + (-> (component-page conf id component (:window m) include-sys?) (concat [(mk-system-toggle-button include-sys?)]) ui-template))) (POST "/topology/:id/activate" [id] - (with-nimbus nimbus + (with-nimbus conf nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) name (.get_name tplg)] (.activate nimbus name) (log-message "Activating topology '" name "'"))) (resp/redirect (str "/topology/" id))) (POST "/topology/:id/deactivate" [id] - (with-nimbus nimbus + (with-nimbus conf nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) name (.get_name tplg)] (.deactivate nimbus name) (log-message "Deactivating topology '" name "'"))) (resp/redirect (str "/topology/" id))) (POST "/topology/:id/rebalance/:wait-time" [id wait-time] - (with-nimbus nimbus + (with-nimbus conf nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) name (.get_name tplg) options (RebalanceOptions.)] @@ -770,7 +773,7 @@ (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs"))) (resp/redirect (str "/topology/" id))) (POST "/topology/:id/kill/:wait-time" [id wait-time] - (with-nimbus nimbus + (with-nimbus conf nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) name (.get_name tplg) options (KillOptions.)] @@ -779,7 +782,7 @@ (log-message "Killing topology '" name "' with wait time: " wait-time " secs"))) (resp/redirect (str "/topology/" id))) (route/resources "/") - (route/not-found "Page not found")) + (route/not-found "Page not found"))) (defn exception->html [ex] (concat @@ -798,12 +801,14 @@ (resp/content-type "text/html")) )))) -(def app - (-> #'main-routes - (wrap-reload '[backtype.storm.ui.core]) - catch-errors)) +(defn app [conf] + (-> conf + main-routes + (wrap-reload '[backtype.storm.ui.core]) + catch-errors)) -(defn start-server! [] (run-jetty app {:port (Integer. (*STORM-CONF* UI-PORT)) +(defn start-server! [conf] (run-jetty (app conf) + {:port (Integer. (conf UI-PORT)) :join? false})) -(defn -main [] (start-server!)) +(defn -main [] (start-server! (read-storm-config))) \ No newline at end of file diff --git a/storm-core/test/clj/backtype/storm/dynamic_port_test.clj b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj index 4e988f822..6500cc7bd 100644 --- a/storm-core/test/clj/backtype/storm/dynamic_port_test.clj +++ b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj @@ -1,8 +1,9 @@ (ns backtype.storm.dynamic-port-test (:use [clojure test]) - (:require [backtype.storm.daemon [nimbus :as nimbus]]) - + (:require [backtype.storm.daemon [nimbus :as nimbus]]) + (:require [backtype.storm.ui [core :as ui]]) (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter]) + (:import [backtype.storm.security.auth ThriftServer]) (:import [backtype.storm.scheduler INimbus]) (:use [backtype.storm bootstrap testing]) (:use [backtype.storm.daemon common]) @@ -10,11 +11,39 @@ (bootstrap) +(defn launch-nimbus-server [conf ^Nimbus$Iface service-handler] + (let [port (Integer. (conf NIMBUS-THRIFT-PORT)) + server (ThriftServer. conf (Nimbus$Processor. service-handler) port)] + (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server)))) + (.start (Thread. #(.serve server))) + (wait-for-condition #(.isServing server)) + server )) + +(defmacro with-server [[cluster-sym & args] & body] + `(let [~cluster-sym (mk-local-storm-cluster ~@args) + conf# (:daemon-conf ~cluster-sym) + service-handler# (:nimbus ~cluster-sym) + server# (launch-nimbus-server conf# service-handler#)] + (try + ~@body + (catch Throwable t# + (log-error t# "Error in cluster") + (throw t#) + ) + (finally + (try + (kill-local-storm-cluster ~cluster-sym) + (.stop server#) + (catch Throwable t1#)))) + )) + (deftest test-dynamic-nimbus-port - (with-simulated-time-local-cluster [cluster :supervisors 4 - :daemon-conf {STORM-LOCAL-MODE-ZMQ true - NIMBUS-THRIFT-PORT 0}] - (let [topology (thrift/mk-topology + (with-server [cluster + :supervisors 4 + :daemon-conf {STORM-LOCAL-MODE-ZMQ true + NIMBUS-THRIFT-PORT 0}] + (let [conf (:daemon-conf cluster) + topology (thrift/mk-topology {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)} {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.) :parallelism-hint 6)}) @@ -35,7 +64,21 @@ ["a"] ["b"] ["a"] ["b"] ["a"] ["b"] - ]} - )] + ]})] + (is (pos? (Integer. (conf NIMBUS-THRIFT-PORT)))) (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]])) (read-tuples results "2")))))) + +(deftest test-ui-access-nimbus + (with-server [cluster + :supervisors 0 + :daemon-conf {STORM-LOCAL-MODE-ZMQ true + NIMBUS-THRIFT-PORT -1}] + (let [conf (:daemon-conf cluster) + ui-server-app (ui/app conf) + req {:uri "/" :request-method :get} + resp (ui-server-app req)] + (is (pos? (Integer. (conf NIMBUS-THRIFT-PORT)))) + (log-message "ui server app:" ui-server-app) + (is (= 200 (:status resp))) + (is (pos? (.indexOf (:body resp) "Cluster Summary")))))) From c506397aef65402c1388d1aeecb6e995e076e9d7 Mon Sep 17 00:00:00 2001 From: anfeng Date: Sat, 6 Jul 2013 14:07:35 -0700 Subject: [PATCH 3/8] minor format changes --- storm-core/src/clj/backtype/storm/ui/core.clj | 4 +++- storm-core/test/clj/backtype/storm/dynamic_port_test.clj | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index 906c452f6..0ffa79f0e 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -21,6 +21,8 @@ (:import [org.apache.commons.lang StringEscapeUtils]) (:gen-class)) +(bootstrap) + (defmacro with-nimbus [conf-sym nimbus-sym & body] `(let [state# (cluster/mk-storm-cluster-state ~conf-sym) hostPort# (.nimbus-info state#)] @@ -811,4 +813,4 @@ {:port (Integer. (conf UI-PORT)) :join? false})) -(defn -main [] (start-server! (read-storm-config))) \ No newline at end of file +(defn -main [] (start-server! (read-storm-config))) diff --git a/storm-core/test/clj/backtype/storm/dynamic_port_test.clj b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj index 6500cc7bd..97d2d2bd0 100644 --- a/storm-core/test/clj/backtype/storm/dynamic_port_test.clj +++ b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj @@ -32,8 +32,8 @@ ) (finally (try - (kill-local-storm-cluster ~cluster-sym) (.stop server#) + (kill-local-storm-cluster ~cluster-sym) (catch Throwable t1#)))) )) From 55cc96303be3afb0cfd222dcf3f92e795f5dbf2f Mon Sep 17 00:00:00 2001 From: anfeng Date: Sat, 6 Jul 2013 23:50:05 -0700 Subject: [PATCH 4/8] Java NimbusClient is now leveraging ZK info for Nimbus connection --- .../src/clj/backtype/storm/bootstrap.clj | 3 ++- storm-core/src/clj/backtype/storm/cluster.clj | 1 - .../src/clj/backtype/storm/daemon/common.clj | 2 -- .../src/clj/backtype/storm/daemon/nimbus.clj | 2 +- .../clj/backtype/storm/daemon/supervisor.clj | 2 +- storm-core/src/clj/backtype/storm/ui/core.clj | 2 +- .../jvm/backtype/storm/utils/HostPort.java | 26 +++++++++++++++++++ .../backtype/storm/utils/NimbusClient.java | 24 +++++++++++++++++ .../clj/backtype/storm/dynamic_port_test.clj | 17 ++++++++++-- 9 files changed, 70 insertions(+), 9 deletions(-) create mode 100644 storm-core/src/jvm/backtype/storm/utils/HostPort.java diff --git a/storm-core/src/clj/backtype/storm/bootstrap.clj b/storm-core/src/clj/backtype/storm/bootstrap.clj index ede15c667..3dba99789 100644 --- a/storm-core/src/clj/backtype/storm/bootstrap.clj +++ b/storm-core/src/clj/backtype/storm/bootstrap.clj @@ -39,8 +39,9 @@ KillOptions SubmitOptions RebalanceOptions JavaObject JavaObjectArg TopologyInitialStatus])) (import (quote [backtype.storm.daemon.common StormBase Assignment - SupervisorInfo WorkerHeartbeat HostPort])) + SupervisorInfo WorkerHeartbeat])) (import (quote [backtype.storm.grouping CustomStreamGrouping])) + (import (quote [backtype.storm.utils HostPort])) (import (quote [java.io File FileOutputStream FileInputStream])) (import (quote [java.util Collection List Random Map HashMap Collections ArrayList LinkedList])) (import (quote [org.apache.commons.io FileUtils])) diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index fa8395718..bca259cb1 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -5,7 +5,6 @@ (:use [backtype.storm util log config]) (:require [backtype.storm [zookeeper :as zk]]) (:require [backtype.storm.daemon [common :as common]]) - ) (defprotocol ClusterState diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj index 885d495e0..6d08107f9 100644 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ b/storm-core/src/clj/backtype/storm/daemon/common.clj @@ -48,8 +48,6 @@ (def LS-LOCAL-ASSIGNMENTS "local-assignments") (def LS-APPROVED-WORKERS "approved-workers") -(defrecord HostPort [host port]) - (defrecord WorkerHeartbeat [time-secs storm-id executors port]) (defrecord ExecutorStats [^long processed diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 6d781a4d6..c27c40d99 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -1135,7 +1135,7 @@ (let [port_in_conf (.intValue (Integer. (conf NIMBUS-THRIFT-PORT))) nimbus_port (assign-server-port port_in_conf)] (assoc (assoc conf - NIMBUS-THRIFT-PORT (.toString (Integer. nimbus_port))) + NIMBUS-THRIFT-PORT (Integer. nimbus_port)) NIMBUS-HOST (.getCanonicalHostName (InetAddress/getLocalHost))))) (defn launch-server! [conf inimbus] diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 2cc0b2acf..e19d19420 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -322,7 +322,7 @@ (defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor] (log-message "Starting Supervisor with conf " conf) (let [nimbusHostPort (.nimbus-info (cluster/mk-storm-cluster-state conf)) - conf (assoc (assoc conf NIMBUS-HOST (:host nimbusHostPort)) NIMBUS-THRIFT-PORT (:port nimbusHostPort))] + conf (assoc (assoc conf NIMBUS-HOST (.host nimbusHostPort)) NIMBUS-THRIFT-PORT (.port nimbusHostPort))] (.prepare isupervisor conf (supervisor-isupervisor-dir conf)) (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf))) (let [supervisor (supervisor-data conf shared-context isupervisor) diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index 0ffa79f0e..f78c47e4b 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -26,7 +26,7 @@ (defmacro with-nimbus [conf-sym nimbus-sym & body] `(let [state# (cluster/mk-storm-cluster-state ~conf-sym) hostPort# (.nimbus-info state#)] - (thrift/with-nimbus-connection [~nimbus-sym (:host hostPort#) (Integer. (:port hostPort#))] + (thrift/with-nimbus-connection [~nimbus-sym (.host hostPort#) (.port hostPort#)] ~@body ))) diff --git a/storm-core/src/jvm/backtype/storm/utils/HostPort.java b/storm-core/src/jvm/backtype/storm/utils/HostPort.java new file mode 100644 index 000000000..232c935b0 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/utils/HostPort.java @@ -0,0 +1,26 @@ +package backtype.storm.utils; + +import java.io.Serializable; + +public class HostPort implements Serializable { + private static final long serialVersionUID = -4903345013121505849L; + String _host; + int _port; + + public HostPort(String host, int port) { + _host = host; + _port = port; + } + + public String host() { + return _host; + } + + public int port() { + return _port; + } + + public String toString() { + return _host + ":" + _port; + } +} diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java index 8869b9d61..525bf8c1b 100644 --- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java +++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java @@ -3,11 +3,15 @@ import backtype.storm.Config; import backtype.storm.security.auth.ThriftClient; import backtype.storm.generated.Nimbus; + +import java.util.List; import java.util.Map; import org.apache.thrift7.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.netflix.curator.framework.CuratorFramework; + public class NimbusClient extends ThriftClient { private Nimbus.Client _client; private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class); @@ -16,6 +20,26 @@ public static NimbusClient getConfiguredClient(Map conf) { try { String nimbusHost = (String) conf.get(Config.NIMBUS_HOST); int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)); + if (nimbusPort <= 0) { + List zk_hosts = (List)conf.get(Config.STORM_ZOOKEEPER_SERVERS); + Object zk_port = conf.get(Config.STORM_ZOOKEEPER_PORT); + CuratorFramework zk_fw = Utils.newCurator(conf, + zk_hosts,zk_port, + (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), + null); //auth info + zk_fw.start(); + try { + byte[] zk_data = zk_fw.getData().forPath("/nimbus"); + HostPort nimbus_host_port = (HostPort) Utils.deserialize(zk_data); + LOG.info("Nimbus spec at ZK: "+nimbus_host_port); + nimbusHost = nimbus_host_port.host(); + nimbusPort = nimbus_host_port.port(); + } catch (Exception e) { + LOG.warn("Failure in obtaining Nimbus host/port from Zookeeper "+zk_hosts+" port "+zk_port, e); + } finally { + zk_fw.close(); + } + } return new NimbusClient(conf, nimbusHost, nimbusPort); } catch (TTransportException ex) { throw new RuntimeException(ex); diff --git a/storm-core/test/clj/backtype/storm/dynamic_port_test.clj b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj index 97d2d2bd0..c49dceaae 100644 --- a/storm-core/test/clj/backtype/storm/dynamic_port_test.clj +++ b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj @@ -5,6 +5,7 @@ (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter]) (:import [backtype.storm.security.auth ThriftServer]) (:import [backtype.storm.scheduler INimbus]) + (:import [backtype.storm.utils NimbusClient]) (:use [backtype.storm bootstrap testing]) (:use [backtype.storm.daemon common]) ) @@ -69,7 +70,7 @@ (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]])) (read-tuples results "2")))))) -(deftest test-ui-access-nimbus +(deftest test-ui-access-nimbus-server (with-server [cluster :supervisors 0 :daemon-conf {STORM-LOCAL-MODE-ZMQ true @@ -79,6 +80,18 @@ req {:uri "/" :request-method :get} resp (ui-server-app req)] (is (pos? (Integer. (conf NIMBUS-THRIFT-PORT)))) - (log-message "ui server app:" ui-server-app) (is (= 200 (:status resp))) (is (pos? (.indexOf (:body resp) "Cluster Summary")))))) + +(deftest test-client-access-nimbus-server + (with-server [cluster + :supervisors 0 + :daemon-conf {STORM-LOCAL-MODE-ZMQ true + NIMBUS-THRIFT-PORT -2}] + (let [conf (assoc (:daemon-conf cluster) NIMBUS-THRIFT-PORT 0) + nimbus (NimbusClient/getConfiguredClient conf) + client (.getClient nimbus)] + (testing "Accessing Nimbus without knowing host/port" + (is (thrown-cause? NotAliveException + (.activate client "non_existing_topology")))) + (.close nimbus)))) From cb1cdeef00f420b8bbf2699212125ead96bb5ab7 Mon Sep 17 00:00:00 2001 From: anfeng Date: Sun, 7 Jul 2013 09:07:18 -0700 Subject: [PATCH 5/8] Utils.getInt() uses Integer.parseInt(String) --- storm-core/src/jvm/backtype/storm/utils/NimbusClient.java | 4 +++- storm-core/src/jvm/backtype/storm/utils/Utils.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java index 525bf8c1b..c18974579 100644 --- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java +++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java @@ -31,9 +31,11 @@ public static NimbusClient getConfiguredClient(Map conf) { try { byte[] zk_data = zk_fw.getData().forPath("/nimbus"); HostPort nimbus_host_port = (HostPort) Utils.deserialize(zk_data); - LOG.info("Nimbus spec at ZK: "+nimbus_host_port); + LOG.debug("Nimbus spec at ZK: "+nimbus_host_port); nimbusHost = nimbus_host_port.host(); nimbusPort = nimbus_host_port.port(); + conf.put(Config.NIMBUS_HOST, nimbusHost); + conf.put(Config.NIMBUS_THRIFT_PORT, (Integer)nimbusPort); } catch (Exception e) { LOG.warn("Failure in obtaining Nimbus host/port from Zookeeper "+zk_hosts+" port "+zk_port, e); } finally { diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index a31402e5a..f2974da88 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -274,8 +274,10 @@ public static Integer getInt(Object o) { return (Integer) o; } else if (o instanceof Short) { return ((Short) o).intValue(); + } else if (o instanceof String) { + return Integer.parseInt((String) o); } else { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); + throw new IllegalArgumentException("Don't know how to convert " + o + " (class "+o.getClass().getName()+") to int"); } } From ba508699e6802543c573f01afe5d1c326677ff65 Mon Sep 17 00:00:00 2001 From: anfeng Date: Wed, 10 Jul 2013 10:56:32 -0700 Subject: [PATCH 6/8] enable UI port to be dynamically assigned --- storm-core/src/clj/backtype/storm/cluster.clj | 12 +++++++++++- .../src/clj/backtype/storm/daemon/common.clj | 10 ---------- storm-core/src/clj/backtype/storm/testing.clj | 2 ++ storm-core/src/clj/backtype/storm/ui/core.clj | 17 ++++++++++++++--- storm-core/src/clj/backtype/storm/util.clj | 12 +++++++++++- .../clj/backtype/storm/dynamic_port_test.clj | 3 ++- 6 files changed, 40 insertions(+), 16 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index bca259cb1..8ccf72c3f 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -119,14 +119,17 @@ (remove-storm! [this storm-id]) (report-error [this storm-id task-id error]) (errors [this storm-id task-id]) - (nimbus-info [this]) ;; fetch nimbus host + port as NimbusHostPort record + (nimbus-info [this]) ;; fetch nimbus host + port as HostPort (set-nimbus! [this info]) ;; announce nimbus host+port + (ui-port [this]) ;; fetch ui port as Integer + (set-ui-port! [this info]) ;; announce ui port (disconnect [this]) ) (def NIMBUS-ROOT "nimbus") +(def UI-ROOT "ui") (def ASSIGNMENTS-ROOT "assignments") (def CODE-ROOT "code") (def STORMS-ROOT "storms") @@ -135,6 +138,7 @@ (def ERRORS-ROOT "errors") (def NIMBUS-SUBTREE (str "/" NIMBUS-ROOT)) +(def UI-SUBTREE (str "/" UI-ROOT)) (def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT)) (def STORMS-SUBTREE (str "/" STORMS-ROOT)) (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT)) @@ -231,6 +235,12 @@ (set-nimbus! [this info] (set-data cluster-state NIMBUS-SUBTREE (Utils/serialize info))) + (ui-port [this] + (maybe-deserialize (get-data cluster-state UI-SUBTREE false))) + + (set-ui-port! [this info] + (set-data cluster-state UI-SUBTREE (Utils/serialize info))) + (assignments [this callback] (when callback (reset! assignments-callback callback)) diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj index 6d08107f9..c44b5802d 100644 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ b/storm-core/src/clj/backtype/storm/daemon/common.clj @@ -1,6 +1,5 @@ (ns backtype.storm.daemon.common (:use [backtype.storm log config util]) - (:import [java.net ServerSocket]) (:import [backtype.storm.generated StormTopology InvalidTopologyException GlobalStreamId]) (:import [backtype.storm.utils Utils]) @@ -331,12 +330,3 @@ (mapcat (fn [[e node+port]] (for [t (executor-id->tasks e)] [t node+port]))) (into {}))) -; allocate an available server port if port_in_conf<=0 -(defn assign-server-port [port] - (if (pos? port) port - (let [serverSocket (ServerSocket. 0) - port_assigned (.getLocalPort serverSocket)] - (.close serverSocket) - (log-message "Grabbed port number " port_assigned " instead of the configured port " port) - port_assigned))) - diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj index e7a713a14..963898ede 100644 --- a/storm-core/src/clj/backtype/storm/testing.clj +++ b/storm-core/src/clj/backtype/storm/testing.clj @@ -5,6 +5,7 @@ [common :as common] [worker :as worker] [executor :as executor]]) + (:require [backtype.storm.ui [core :as ui]]) (:require [backtype.storm [process-simulator :as psim]]) (:import [org.apache.commons.io FileUtils]) (:import [java.io File]) @@ -110,6 +111,7 @@ STORM-ZOOKEEPER-PORT zk-port STORM-ZOOKEEPER-SERVERS ["localhost"]}) daemon-conf (nimbus/config-with-nimbus-port-assigned daemon-conf) + ui-conf (ui/config-with-ui-port-assigned daemon-conf) nimbus-tmp (local-temp-path) port-counter (mk-counter supervisor-slot-port-min) nimbus (nimbus/service-handler diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index f78c47e4b..eb303ebbc 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -809,8 +809,19 @@ (wrap-reload '[backtype.storm.ui.core]) catch-errors)) -(defn start-server! [conf] (run-jetty (app conf) - {:port (Integer. (conf UI-PORT)) - :join? false})) +(defn config-with-ui-port-assigned [conf] + (let [port-in-conf (.intValue (Integer. (conf UI-PORT))) + ui-port (assign-server-port port-in-conf)] + (assoc conf UI-PORT (Integer. ui-port)))) + +(defn announce-ui-port [conf] + (let [state (cluster/mk-storm-cluster-state conf) + ui-port (conf UI-PORT)] + (.set-ui-port! state ui-port))) + +(defn start-server! [conf] + (let [conf (config-with-ui-port-assigned conf)] + (announce-ui-port conf) + (run-jetty (app conf) {:port (conf UI-PORT) :join? false}))) (defn -main [] (start-server! (read-storm-config))) diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj index d9383873d..380631ee6 100644 --- a/storm-core/src/clj/backtype/storm/util.clj +++ b/storm-core/src/clj/backtype/storm/util.clj @@ -1,5 +1,5 @@ (ns backtype.storm.util - (:import [java.net InetAddress]) + (:import [java.net InetAddress ServerSocket]) (:import [java.util Map Map$Entry List ArrayList Collection Iterator HashMap]) (:import [java.io FileReader]) (:import [backtype.storm Config]) @@ -833,3 +833,13 @@ (meta form)) (list form x))) ([x form & more] `(-<> (-<> ~x ~form) ~@more))) + +; allocate an available server port if port_in_conf<=0 +(defn assign-server-port [port] + (if (pos? port) port + (let [serverSocket (ServerSocket. 0) + port_assigned (.getLocalPort serverSocket)] + (.close serverSocket) + (log-message "Grabbed port number " port_assigned " instead of the configured port " port) + port_assigned))) + diff --git a/storm-core/test/clj/backtype/storm/dynamic_port_test.clj b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj index c49dceaae..eb5cac630 100644 --- a/storm-core/test/clj/backtype/storm/dynamic_port_test.clj +++ b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj @@ -74,7 +74,8 @@ (with-server [cluster :supervisors 0 :daemon-conf {STORM-LOCAL-MODE-ZMQ true - NIMBUS-THRIFT-PORT -1}] + NIMBUS-THRIFT-PORT -1 + UI-PORT 0}] (let [conf (:daemon-conf cluster) ui-server-app (ui/app conf) req {:uri "/" :request-method :get} From 8c9612f8848b48ddeec7f1b3e1400374cdb10488 Mon Sep 17 00:00:00 2001 From: anfeng Date: Wed, 10 Jul 2013 13:34:24 -0700 Subject: [PATCH 7/8] UI server updates nimbus info from ZK, and then keeps in its local conf --- storm-core/src/clj/backtype/storm/testing.clj | 1 + storm-core/src/clj/backtype/storm/ui/core.clj | 8 +++----- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj index 963898ede..0bb25f5f9 100644 --- a/storm-core/src/clj/backtype/storm/testing.clj +++ b/storm-core/src/clj/backtype/storm/testing.clj @@ -131,6 +131,7 @@ supervisors (repeat supervisors {}))] (nimbus/announce-nimbus-info cluster-map (daemon-conf NIMBUS-HOST) (daemon-conf NIMBUS-THRIFT-PORT)) + (ui/announce-ui-port daemon-conf) (doseq [sc supervisor-confs] (add-supervisor cluster-map :ports ports-per-supervisor :conf sc)) cluster-map diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index eb303ebbc..ff8b19382 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -24,11 +24,9 @@ (bootstrap) (defmacro with-nimbus [conf-sym nimbus-sym & body] - `(let [state# (cluster/mk-storm-cluster-state ~conf-sym) - hostPort# (.nimbus-info state#)] - (thrift/with-nimbus-connection [~nimbus-sym (.host hostPort#) (.port hostPort#)] - ~@body - ))) + `(thrift/with-nimbus-connection [~nimbus-sym (~conf-sym NIMBUS-HOST) (~conf-sym NIMBUS-THRIFT-PORT)] + ~@body + )) (defn get-filled-stats [summs] (->> summs From 1b3bc15173ac4ddd3da3ee30f04a849b1e1aaabd Mon Sep 17 00:00:00 2001 From: anfeng Date: Thu, 18 Jul 2013 08:50:55 -0700 Subject: [PATCH 8/8] assign port #s for supervisors --- .../src/clj/backtype/storm/daemon/supervisor.clj | 10 ++++++++-- .../src/jvm/backtype/storm/utils/NimbusClient.java | 2 -- .../test/clj/backtype/storm/dynamic_port_test.clj | 4 +++- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index e19d19420..1cf1520e4 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -317,12 +317,18 @@ (.add processes-event-manager sync-processes) ))) +(defn assign-worker-ports [conf] + (let [new-ports (vec (for [port (get conf SUPERVISOR-SLOTS-PORTS)] (assign-server-port port)))] + (assoc conf SUPERVISOR-SLOTS-PORTS new-ports) + )) + ;; in local state, supervisor stores who its current assignments are ;; another thread launches events to restart any dead processes if necessary (defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor] - (log-message "Starting Supervisor with conf " conf) (let [nimbusHostPort (.nimbus-info (cluster/mk-storm-cluster-state conf)) - conf (assoc (assoc conf NIMBUS-HOST (.host nimbusHostPort)) NIMBUS-THRIFT-PORT (.port nimbusHostPort))] + conf (assoc (assoc conf NIMBUS-HOST (.host nimbusHostPort)) NIMBUS-THRIFT-PORT (.port nimbusHostPort)) + conf (assign-worker-ports conf)] + (log-message "Starting Supervisor with conf " conf) (.prepare isupervisor conf (supervisor-isupervisor-dir conf)) (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf))) (let [supervisor (supervisor-data conf shared-context isupervisor) diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java index c18974579..e28ec111c 100644 --- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java +++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java @@ -34,8 +34,6 @@ public static NimbusClient getConfiguredClient(Map conf) { LOG.debug("Nimbus spec at ZK: "+nimbus_host_port); nimbusHost = nimbus_host_port.host(); nimbusPort = nimbus_host_port.port(); - conf.put(Config.NIMBUS_HOST, nimbusHost); - conf.put(Config.NIMBUS_THRIFT_PORT, (Integer)nimbusPort); } catch (Exception e) { LOG.warn("Failure in obtaining Nimbus host/port from Zookeeper "+zk_hosts+" port "+zk_port, e); } finally { diff --git a/storm-core/test/clj/backtype/storm/dynamic_port_test.clj b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj index eb5cac630..02af3e76c 100644 --- a/storm-core/test/clj/backtype/storm/dynamic_port_test.clj +++ b/storm-core/test/clj/backtype/storm/dynamic_port_test.clj @@ -38,9 +38,10 @@ (catch Throwable t1#)))) )) -(deftest test-dynamic-nimbus-port +(deftest test-dynamic-ports-for-nimbus-n-supervisors (with-server [cluster :supervisors 4 + :ports-per-supervisor [ 0 0 ] :daemon-conf {STORM-LOCAL-MODE-ZMQ true NIMBUS-THRIFT-PORT 0}] (let [conf (:daemon-conf cluster) @@ -96,3 +97,4 @@ (is (thrown-cause? NotAliveException (.activate client "non_existing_topology")))) (.close nimbus)))) +