Last active
August 29, 2015 14:05
-
-
Save dankrause/0309d3509e9cc74c3e31 to your computer and use it in GitHub Desktop.
Toy finite state machine implementation in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
class FSMException(Exception): | |
pass | |
class DuplicateRule(FSMException): | |
pass | |
class InvalidStateTransition(FSMException): | |
pass | |
class State(object): | |
def __init__(self, func): | |
self._func = func | |
def __call__(self, event, old_state): | |
return self._func(event, old_state) | |
@property | |
def name(self): | |
return self._func.__name__ | |
class TerminalState(State): | |
pass | |
class Event(dict): | |
def __init__(self, name, *args, **kwargs): | |
self._name = name | |
super(Event, self).__init__(*args, **kwargs) | |
@property | |
def name(self): | |
return self._name | |
class FiniteStateMachine(object): | |
def __init__(self): | |
self._state = None | |
self._rules = {} | |
def add_rule(self, event_types, valid_states, end_state): | |
for trigger in list(itertools.product(event_types, valid_states)): | |
if trigger in self._rules: | |
raise DuplicateRule(*trigger) | |
self._rules[trigger] = end_state | |
def start(self, event): | |
while self._state is None or not isinstance(self._state, TerminalState): | |
trigger = (event.name, self.state) | |
if trigger not in self._rules: | |
raise InvalidStateTransition(*trigger) | |
old_state = self.state | |
self._state = self._rules[trigger] | |
event = self._state(event, old_state) | |
def reset(self): | |
self._state = None | |
@property | |
def state(self): | |
return self._state |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fsm import * | |
import time | |
@State | |
def starting(event, old_state): | |
print "Initializing... {}".format(event) | |
time.sleep(2) | |
return Event("run", **event) | |
@State | |
def running(event, old_state): | |
print "Running... {}".format(event) | |
time.sleep(1) | |
countdown = event["countdown"] | |
total = event.get("total", 0) + countdown | |
if countdown == 0: | |
return Event("stop", total=total) | |
else: | |
return Event("run", countdown=countdown-1, total=total) | |
@State | |
def stopping(event, old_state): | |
print "Shutting Down... {}".format(event) | |
time.sleep(1) | |
if event["total"] < 50: | |
return Event("error", reason="total is less than 50", total=event["total"]) | |
else: | |
return Event("halt", **event) | |
@TerminalState | |
def stopped(event, old_state): | |
print "Stopped. {}".format(event) | |
@TerminalState | |
def error(event, old_state): | |
print "An error occurrred in {}: {}".format(old_state, event) | |
def build_fsm(): | |
f = FiniteStateMachine() | |
f.add_rule(["start"], [None, error, stopped], starting) | |
f.add_rule(["run"], [starting, running], running) | |
f.add_rule(["error"], [None, starting, running, stopping], error) | |
f.add_rule(["stop"], [running], stopping) | |
f.add_rule(["halt"], [stopping], stopped) | |
return f | |
fsm = build_fsm() | |
fsm.start(Event("start", countdown=10)) | |
fsm.reset() | |
fsm.start(Event("start", countdown=5)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment