Created
July 8, 2014 09:09
-
-
Save nikicat/be2bbc889d7e3555d7d4 to your computer and use it in GitHub Desktop.
gns prototype
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import storage # storage interface | |
class Job: | |
"""Represents Job - some code to run | |
Could be used from different processes | |
to control job execution and status | |
""" | |
def __init__( | |
self, | |
version: '0123456789abcdefefdcba9876543210', | |
module: 'somepkg.somemod', | |
func: 'megafunction', | |
args: '{"arg1": 123, "var2": "blah"}', | |
wakeupat=0, | |
): | |
self._version = version | |
self._module = module | |
self._func = func | |
self._args = args | |
self._status = NEW | |
self._id = None | |
self._exception = None | |
self._retcode = None | |
self._wakeupat = wakeupat | |
self._cont = None | |
def __getstate__(self): | |
"""returns serialized representation""" | |
pass | |
def __setstate__(self, state): | |
"""load from serialized representation""" | |
pass | |
def run(self): | |
"""runs job continulet until it finishes | |
dumps state after each step | |
""" | |
while True: | |
if self._status == NEW: | |
monkey_patch_sys() | |
monkey_patch_sleep() | |
moneky_patch_socket() | |
sys.path.append(get_path(self._version)) | |
module = import_module(self._module) | |
function = getattr(module, function) | |
def trampoline(): | |
retval = function(**self.kwargs) | |
self._cont = continulet(trampoline) | |
self._status = SLEEPING | |
elif self._status == RUNNING: | |
try: | |
self._status, = self._cont.switch(self._exception) | |
except Exception as e: | |
self._exception = e | |
self._status = EXCEPTION | |
elif self._status == SLEEPING: | |
# FIXME: here should be something more intelligent... | |
time.sleep(time.time() - self._wakeupat) | |
self._status = RUNNING | |
storage.savejob(self) | |
def wait(self, timeout): | |
with self.lock(timeout): | |
storage.loadjob(self) | |
def stop_job(jobid, timeout=5): | |
"""sets special flag""" | |
storage.send(jobid, STOP) | |
storage.wait(self, timeout) | |
def create_job(**kwargs): | |
job = Job(**kwargs) | |
jobid = storage.savejob(job) | |
return jobid | |
def process_jobs(): | |
while True: | |
for jobid in storage.getjobs(): | |
process_job(jobid) | |
def process_job(jobid): | |
with storage.lockjob(jobid, timeout=0): | |
job = storage.loadjob(jobid) | |
job.run() | |
storage.removejob(jobid) # removes job to garbage (optional) | |
def gc_thread(self): | |
while True: | |
for jobid in storage.getremovedjobs(): | |
if job_is_too_old(job): | |
storage.erasejob(jobid) | |
def monkey_patch_sleep(): | |
def sleep(timeout): | |
exc = continulet.switch((SLEEPING, time.time() + timeout)) | |
if exc is not None: | |
raise exc | |
time.sleep = sleep |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment