-
Notifications
You must be signed in to change notification settings - Fork 27
/
fsm.py
236 lines (200 loc) · 8.78 KB
/
fsm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import logging
import os
from enum import Enum
import graphviz as gv
# @brief generic hierarchial state machine class.
#
# states can have substates. If the machine is in a state, then it is also implicitly in that state's parent state
# this basically provides for polymorphism/subclassing of state machines
#
# There are three methods corresponding to each state:
# * on_enter_STATE
# * execute_STATE
# * on_exit_STATE
#
# Subclasses of StateMachine can optionally implement them and they will
# automatically be called at the appropriate times.
class StateMachine(object):
def __init__(self, start_state, end_state):
# print "fsm"
# stores all states in the form _state_hierarchy[state] = parent_state
self._state_hierarchy = {}
self._transitions = {}
self._start_state = start_state
self._end_state = end_state
self._state = None
@property
def start_state(self):
return self._start_state
# Resets the FSM back into the start state
def restart(self):
self.transition(self.start_state)
# Registers a new state (which can optionally be a substate of an existing
# state)
def add_state(self, state, parent_state=None):
if not isinstance(state, Enum):
print type(state)
raise TypeError("State should be an Enum type")
self._state_hierarchy[state] = parent_state
# Runs the FSM
# checks transition conditions for all edges leading away from the current state
# if one evaluates to true, we transition to it
# if more than one evaluates to true, we throw a RuntimeError
def spin(self):
s1 = self.state
# print "current State : "+str(self.state)
# call execute_STATENAME
if self.state is not None:
for state in self.ancestors_of_state(self.state) + [self.state]:
method_name = "execute_" + state.name
# print method_name
state_method = None
try:
state_method = getattr(self, method_name)
# print("calling " + str(state_method))
except AttributeError:
pass
if state_method is not None:
drive_mhd = state.name
# print state_method,"in fsm",state.name,method_name
state_method()
if self.state is None:
self.transition(self.start_state)
else:
# transition if an 'event' fires
next_states = []
if self.state in self._transitions:
for next_state, transition in self._transitions[self.state].items():
if transition['condition']():
next_states += [next_state]
# print(transition['name'])
if len(next_states) > 1:
logging.warn(
"Ambiguous fsm transitions from state'" + str(self.state) +
"'. The following states are reachable now: " + str(
next_states) +
"; Proceeding by taking the first option.")
if len(next_states) > 0:
self.transition(next_states[0])
# if a transition occurred during the spin, we'll spin again
# note: this could potentially cause infinite recursion (although it
# shouldn't)
# print(self._transitions[s1])
print ' :from ' + str(s1) + ' to ' + str(self.state) + ' :'
# if s1 is not None:
# print(self._transitions[s1])
if s1 != self.state:
# print self.state is not self._end_state
StateMachine.spin(self)
# if you add a transition that already exists, the old one will be
# overwritten
def add_transition(self, from_state, to_state, condition, event_name):
if from_state not in self._transitions:
self._transitions[from_state] = {}
self._transitions[from_state][to_state] = {'condition': condition,
'name': event_name}
# sets @state to the new_state given
# calls 'on_exit_STATENAME()' if it exists
# calls 'on_enter_STATENAME()' if it exists
def transition(self, new_state):
# print("TRANSITION: " + str(self.__class__.__name__) + ": " + str(self.state) + " -> " + str(new_state))
if self.state is not None:
for state in self.ancestors_of_state(self.state) + [self.state]:
if not self.state_is_substate(new_state, state):
method_name = "on_exit_" + state.name
# print method_name
state_method = None
try:
# call the transition FROM method if it exists
state_method = getattr(self, method_name)
except AttributeError:
pass
if state_method is not None:
state_method()
for state in self.ancestors_of_state(new_state) + [new_state]:
if not self.state_is_substate(self.state, state):
method_name = "on_enter_" + state.name
# print method_name
state_method = None
try:
# call the transition TO method if it exists
state_method = getattr(self, method_name)
except AttributeError:
pass
if state_method is not None:
state_method()
self._state = new_state
# traverses the state hierarchy to see if it's in @state or one of
# @state's descendent states
def is_in_state(self, state):
return self.state_is_substate(self.state, state)
def state_is_substate(self, state, possible_parent):
ancestor = state
while ancestor is not None:
if possible_parent == ancestor:
return True
ancestor = self._state_hierarchy[ancestor]
return False
# looks at the list @ancestors and returns the one that the current state is a descendant of
# returns None if the current state doesn't descend from one in the list
def corresponding_ancestor_state(self, ancestors):
state = self.state
while state is not None:
if state in ancestors:
return state
state = self._state_hierarchy[state]
return None
# returns a list of the ancestors of the given state
# if B is a child state of A and C is a child state of B, ancestors_of_state(C) == [A, B]
# if @state has no ancestors, returns an empty list
def ancestors_of_state(self, state):
ancestors = []
state = self._state_hierarchy[state]
while state is not None:
ancestors.insert(0, state)
state = self._state_hierarchy[state]
return ancestors
# returns a graphviz.Digraph object
def as_graphviz(self):
g = gv.Digraph(self.__class__.__name__, format='png')
cluster_index = 0
subgraphs = {}
subgraphs[None] = g
for state in self._state_hierarchy:
if state not in subgraphs and state in self._state_hierarchy.values(
):
sg = gv.Digraph(
'cluster_' + str(cluster_index),
graph_attr={'label': state.__module__ + "::" + state.name,
'style': 'dotted'})
cluster_index += 1
subgraphs[state] = sg
for state in self._state_hierarchy:
has_children = state in self._state_hierarchy.values()
if not has_children:
enclosing_graph = subgraphs[self._state_hierarchy[state]]
shape = 'diamond' if state == self.start_state else 'ellipse'
enclosing_graph.node(
state.name,
label=state.__module__ + "::" + state.name,
shape=shape)
for state, subgraph in subgraphs.items():
if state is not None:
subgraphs[self._state_hierarchy[state]].subgraph(subgraph)
for start in self._transitions:
for end, event in self._transitions[start].items():
g.edge(start.name,
end.name,
label=event['name'],
decorate='True')
return g
# writes a png file of the graphviz output to the specified location
def write_diagram_png(self):
g = self.as_graphviz()
if not os.path.exists(os.getcwd() + '/Digraph/'):
os.mkdir(os.getcwd() + '/Digraph/')
g.render(os.getcwd() + '/Digraph/' +
self.__class__.__name__, cleanup=True)
@property
def state(self):
return self._state