Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storm topology online update #741

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,7 @@ topology.max.error.report.per.interval: 5
topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory"
topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer"
topology.trident.batch.emit.interval.millis: 500
topology.update: false
topology.update.interval.secs: 10

dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
1 change: 1 addition & 0 deletions storm-core/src/clj/backtype/storm/cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@
(if (contains? executor-stats t)
{t {:time-secs (:time-secs worker-hb)
:uptime (:uptime worker-hb)
:topology-version (:topology-version worker-hb)
:stats (get executor-stats t)}})))
(into {}))))

Expand Down
8 changes: 8 additions & 0 deletions storm-core/src/clj/backtype/storm/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,16 @@
[conf id]
(str (worker-root conf id) "/heartbeats"))

(defn worker-version-root
[conf id]
(str (worker-root conf id) "/version"))

;; workers heartbeat here with pid and timestamp
;; if supervisor stops receiving heartbeat, it kills and restarts the process
;; in local mode, keep a global map of ids to threads for simulating process management
(defn ^LocalState worker-state [conf id]
(LocalState. (worker-heartbeats-root conf id)))

(defn ^LocalState worker-version [conf id]
(LocalState. (worker-version-root conf id)))

7 changes: 7 additions & 0 deletions storm-core/src/clj/backtype/storm/daemon/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
(waiting? [this]))

(def LS-WORKER-HEARTBEAT "worker-heartbeat")
(def LS-WORKER-VERSION "worker-version")

;; LocalState constants
(def LS-ID "supervisor-id")
Expand All @@ -57,6 +58,12 @@
^long transferred
^long failed])

(defn gen-topology-version [storm-id]
(str storm-id "-v" (current-time-string)))

(defn get-topology-id [topology-version]
(.substring topology-version 0 (.lastIndexOf topology-version "-v")))

(defn new-executor-stats []
(ExecutorStats. 0 0 0 0 0))

Expand Down
60 changes: 55 additions & 5 deletions storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@
(:old-status status))
}})

(defn nil-to-unknown [x]
(if x
x
"unknown"))

(defn topology-status [nimbus storm-id]
(-> nimbus :storm-cluster-state (.storm-base storm-id nil) :status))

Expand Down Expand Up @@ -701,7 +706,7 @@
(.assignSlots inimbus topologies))
))

(defn- start-storm [nimbus storm-name storm-id topology-initial-status]
(defn- start-storm [nimbus storm-name storm-id topology-initial-status topology-version]
{:pre [(#{:active :inactive} topology-initial-status)]}
(let [storm-cluster-state (:storm-cluster-state nimbus)
conf (:conf nimbus)
Expand All @@ -713,7 +718,9 @@
storm-id
(StormBase. storm-name
(current-time-secs)
{:type topology-initial-status}
{:type topology-initial-status
:topology-version topology-version
:update-duration-secs 1}
(storm-conf TOPOLOGY-WORKERS)
num-executors))))

Expand Down Expand Up @@ -916,6 +923,7 @@
topology)
(swap! (:submitted-count nimbus) inc)
(let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
topology-version (gen-topology-version storm-id)
storm-conf (normalize-conf
conf
(-> serializedConf
Expand All @@ -938,7 +946,7 @@
(.setup-heartbeats! storm-cluster-state storm-id)
(let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
TopologyInitialStatus/ACTIVE :active}]
(start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))
(start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions)) topology-version))
(mk-assignments nimbus)))
(catch Throwable e
(log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
Expand All @@ -948,6 +956,44 @@
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
(.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
(SubmitOptions. TopologyInitialStatus/ACTIVE)))

(^void updateTopology
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
(validate-topology-name! storm-name)
(check-storm-active! nimbus storm-name true)
(.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
storm-name
(from-json serializedConf)
topology)
(let [storm-id (get-storm-id (:storm-cluster-state nimbus) storm-name)
topology-version (gen-topology-version storm-id)
storm-conf (normalize-conf
conf
(-> serializedConf
from-json
(assoc STORM-ID storm-id)
(assoc TOPOLOGY-NAME storm-name))
topology)
total-storm-conf (merge conf storm-conf)
topology (normalize-topology total-storm-conf topology)
topology (if (total-storm-conf TOPOLOGY-OPTIMIZE)
(optimize-topology topology)
topology)
storm-cluster-state (:storm-cluster-state nimbus)]
(system-topology! total-storm-conf topology) ;; this validates the structure of the topology
(log-message "Received topology update for " storm-name " with conf " storm-conf)
(locking (:submit-lock nimbus)
(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
(let [assignment (.assignment-info storm-cluster-state storm-id nil)
storm-id->supervisors (:node->host assignment)
intervals (total-storm-conf TOPOLOGY-UPDATE-INTERVAL-SECS)
update-duration-secs (* (count storm-id->supervisors) intervals)]
(set-topology-status! nimbus storm-id
(merge (topology-status nimbus storm-id)
{:topology-version topology-version
:update-duration-secs update-duration-secs})))
)
))

(^void killTopology [this ^String name]
(.killTopologyWithOpts this name (KillOptions.)))
Expand Down Expand Up @@ -1077,7 +1123,8 @@
set
count)
(time-delta (:launch-time-secs base))
(extract-status-str base))
(extract-status-str base)
(nil-to-unknown (-> base :status :topology-version)))
))]
(ClusterSummary. supervisor-summaries
nimbus-uptime
Expand Down Expand Up @@ -1105,7 +1152,9 @@
(-> executor first task->component)
host
port
(nil-to-zero (:uptime heartbeat)))
(nil-to-zero (:uptime heartbeat))
(nil-to-unknown (:topology-version heartbeat))
)
(.set_stats stats))
))
]
Expand All @@ -1115,6 +1164,7 @@
executor-summaries
(extract-status-str base)
errors
(nil-to-unknown (-> base :status :topology-version))
)
))

Expand Down
102 changes: 75 additions & 27 deletions storm-core/src/clj/backtype/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@
[id (read-worker-heartbeat conf id)]))
))

(defn read-worker-version [conf id]
(let [local-state (worker-version conf id)]
(.get local-state LS-WORKER-VERSION)
))

(defn read-worker-versions
"Returns map from worker id to topology-version"
[conf]
(let [ids (my-worker-ids conf)]
(into {}
(dofor [id ids]
[id (read-worker-version conf id)]))
))


(defn matches-an-assignment? [worker-heartbeat assigned-executors]
(let [local-assignment (assigned-executors (:port worker-heartbeat))]
Expand All @@ -89,6 +103,8 @@
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
id->heartbeat (read-worker-heartbeats conf)
id->version (read-worker-versions conf)
storm-id->topology-version (:storm-id->topology-version supervisor)
approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))]
(into
{}
Expand All @@ -99,12 +115,20 @@
:disallowed
(not hb)
:not-started
(and
(@(:storm-id->update-time supervisor) (:storm-id hb))
(< (@(:storm-id->update-time supervisor) (:storm-id hb)) (current-time-secs))
(id->version id)
(not (= (@(:storm-id->topology-version supervisor) (:storm-id hb)) (id->version id))))
:update
(> (- now (:time-secs hb))
(conf SUPERVISOR-WORKER-TIMEOUT-SECS))
:timed-out
true
:valid)]
(log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now)
(log-debug "Worker " id " is " state " version: " (id->version id) ": " (pr-str hb) " at supervisor time-secs " now)
(if (= :update state)
(log-message "Worker " id " is " state " update-time " (@(:storm-id->update-time supervisor) (:storm-id hb)) " topology-version " (id->version id) " to " (@(:storm-id->topology-version supervisor) (:storm-id hb))))
[id [state hb]]
))
)))
Expand Down Expand Up @@ -139,6 +163,7 @@
(defn try-cleanup-worker [conf id]
(try
(rmr (worker-heartbeats-root conf id))
(rmr (worker-version-root conf id))
;; this avoids a race condition with worker or subprocess writing pid around same time
(rmpath (worker-pids-root conf id))
(rmpath (worker-root conf id))
Expand Down Expand Up @@ -167,6 +192,8 @@
:active (atom true)
:uptime (uptime-computer)
:worker-thread-pids-atom (atom {})
:storm-id->topology-version (atom {})
:storm-id->update-time (atom {})
:storm-cluster-state (cluster/mk-storm-cluster-state conf)
:local-state (supervisor-state conf)
:supervisor-id (.getSupervisorId isupervisor)
Expand Down Expand Up @@ -230,7 +257,9 @@
(wait-for-workers-launch
conf
(dofor [[port assignment] reassign-executors]
(let [id (new-worker-ids port)]
(let [id (new-worker-ids port)
topology-version (@(:storm-id->topology-version supervisor) (:storm-id assignment))
topology-version (if topology-version topology-version (:storm-id assignment))]
(log-message "Launching worker with assignment "
(pr-str assignment)
" for this supervisor "
Expand All @@ -239,11 +268,14 @@
port
" with id "
id
" topology-version "
topology-version
)
(launch-worker supervisor
(:storm-id assignment)
port
id)
id
topology-version)
id)))
))

Expand Down Expand Up @@ -281,18 +313,31 @@
;; - should this be done separately from usual monitoring?
;; should we only download when topology is assigned to this supervisor?
(doseq [[storm-id master-code-dir] storm-code-map]
(when (and (not (downloaded-storm-ids storm-id))
(assigned-storm-ids storm-id))
(log-message "Downloading code for storm id "
storm-id
" from "
master-code-dir)
(download-storm-code conf storm-id master-code-dir)
(log-message "Finished downloading code for storm id "
storm-id
" from "
master-code-dir)
))
(let [storm-base (.storm-base (:storm-cluster-state supervisor) storm-id nil)
topology-version (-> storm-base :status :topology-version)]
(if-not storm-base
(log-warn "storm-id " storm-id " storm-base is nil")
(if-not topology-version
(log-warn "storm-id " storm-id " " topology-version " topology-version is nil")
(do
(when (and (not (downloaded-storm-ids topology-version))
(assigned-storm-ids storm-id))
(log-message "Downloading code for storm id "
storm-id
" from "
master-code-dir)
(download-storm-code conf storm-id master-code-dir topology-version)
(log-message "Finished downloading code for storm id "
storm-id
" from "
master-code-dir))
(when-not (= (@(:storm-id->topology-version supervisor) storm-id) topology-version)
(let [rand (Random. (Utils/secureRandomLong))
wait-time (.nextInt rand (-> storm-base :status :update-duration-secs))
update-time (+ (current-time-secs) wait-time)]
(log-message storm-id " topology-version change from " (@(:storm-id->topology-version supervisor) storm-id) " to " topology-version " wait " wait-time " secs until " update-time " to do update restart")
(swap! (:storm-id->update-time supervisor) assoc storm-id update-time))
(swap! (:storm-id->topology-version supervisor) assoc storm-id topology-version)))))))

(log-debug "Writing new assignment "
(pr-str new-assignment))
Expand All @@ -309,7 +354,8 @@
;; synchronize-supervisor doesn't try to launch workers for which the
;; resources don't exist
(doseq [storm-id downloaded-storm-ids]
(when-not (assigned-storm-ids storm-id)
(when-not (or (storm-code-map storm-id)
(storm-code-map (get-topology-id storm-id)))
(log-message "Removing code for storm id "
storm-id)
(rmr (supervisor-stormdist-root conf storm-id))
Expand Down Expand Up @@ -387,10 +433,10 @@
;; distributed implementation

(defmethod download-storm-code
:distributed [conf storm-id master-code-dir]
:distributed [conf storm-id master-code-dir topology-version]
;; Downloading to permanent location is atomic
(let [tmproot (str (supervisor-tmp-dir conf) "/" (uuid))
stormroot (supervisor-stormdist-root conf storm-id)]
stormroot (supervisor-stormdist-root conf topology-version)]
(FileUtils/forceMkdir (File. tmproot))

(Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
Expand All @@ -402,12 +448,12 @@


(defmethod launch-worker
:distributed [supervisor storm-id port worker-id]
:distributed [supervisor storm-id port worker-id topology-version]
(let [conf (:conf supervisor)
storm-home (System/getProperty "storm.home")
stormroot (supervisor-stormdist-root conf storm-id)
stormroot (supervisor-stormdist-root conf topology-version)
stormjar (supervisor-stormjar-path stormroot)
storm-conf (read-supervisor-storm-conf conf storm-id)
storm-conf (read-supervisor-storm-conf conf topology-version)
classpath (add-to-classpath (current-classpath) [stormjar])
childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS))
"%ID%"
Expand All @@ -423,7 +469,7 @@
" -Dworker.port=" port
" -cp " classpath " backtype.storm.daemon.worker "
(java.net.URLEncoder/encode storm-id) " " (:assignment-id supervisor)
" " port " " worker-id)]
" " port " " worker-id " " topology-version)]
(log-message "Launching worker with command: " command)
(launch-process command :environment {"LD_LIBRARY_PATH" (conf JAVA-LIBRARY-PATH)})
))
Expand All @@ -437,9 +483,10 @@
first ))

(defmethod download-storm-code
:local [conf storm-id master-code-dir]
(let [stormroot (supervisor-stormdist-root conf storm-id)]
(FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
:local [conf storm-id master-code-dir topology-version]
(let [stormroot (supervisor-stormdist-root conf topology-version)]
(when-not (exists-file? stormroot)
(FileUtils/copyDirectory (File. master-code-dir) (File. stormroot)))
(let [classloader (.getContextClassLoader (Thread/currentThread))
resources-jar (resources-jar)
url (.getResource classloader RESOURCES-SUBDIR)
Expand All @@ -457,15 +504,16 @@
)))

(defmethod launch-worker
:local [supervisor storm-id port worker-id]
:local [supervisor storm-id port worker-id topology-version]
(let [conf (:conf supervisor)
pid (uuid)
worker (worker/mk-worker conf
(:shared-context supervisor)
storm-id
(:assignment-id supervisor)
port
worker-id)]
worker-id
topology-version)]
(psim/register-process pid worker)
(swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid)
))
Expand Down
Loading