Last active
November 21, 2019 14:16
-
-
Save andre-merzky/a6f1eb33dc5c55c51438e041a0349ae7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sys | |
import json | |
import radical.utils as ru | |
import subprocess as sp | |
prof = ru.Profiler('flux.test') | |
rep = ru.Reporter('flux.test') | |
# ------------------------------------------------------------------------------ | |
# | |
def start_flux(): | |
''' | |
run `flux start` which will open a new subshell. Feed `flux env` on stdin | |
of that shell and parse the output. Export to `os.environ`, and expand | |
`sys.path` wth the found `PYTHONPATH`, so that `import flux` then works. | |
''' | |
rep.header('flux startup') | |
prof.prof('flux_start') | |
cmd = '/home/merzky/projects/flux/install/bin/flux start' | |
proc = sp.Popen(cmd.split(), stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.STDOUT) | |
proc.stdin.write('flux env\necho "OK"\n') | |
rep.info('parse env\n') | |
while True: | |
line = proc.stdout.readline().strip() | |
if line == 'OK': | |
break | |
elif line.startswith('export '): | |
line = line.split(' ', 1)[1] | |
k, v = line.split('=', 1) | |
v = v.rstrip('"').lstrip('"') | |
os.environ[k] = v | |
if k == 'PYTHONPATH': | |
sys.path.extend(v.split(':')) | |
rep.info(' ppath %s\n' % v) | |
elif k == 'FLUX_URI': | |
rep.info(' uri %s\n' % v) | |
rep.ok('>>ok\n') | |
prof.prof('flux_started') | |
return proc | |
# ------------------------------------------------------------------------------ | |
# | |
def stop_flux(proc): | |
''' | |
send `exit` to the flux shell stdin to trigger termination, and print all | |
output found up to completion. | |
''' | |
rep.header('flux stop') | |
rep.info('terminate\n') | |
prof.prof('flux_stop') | |
proc.stdin.write('exit\n') | |
while True: | |
line = proc.stdout.readline().strip() | |
if not line: | |
break | |
rep.plain(' -> %s\n' % line) | |
rep.ok('>>ok\n') | |
prof.prof('flux_stopped') | |
# ------------------------------------------------------------------------------ | |
# | |
def use_flux(): | |
''' | |
Submit `n` tasks, and listen for state transtion events. Once `n` | |
`INACTIVE` events are found, we are done | |
FIXME: how to obtain exit codes? | |
''' | |
rep.header('execute workload') | |
import flux | |
from flux import job | |
h = flux.Flux() | |
h.event_subscribe('job-state') | |
njobs = 256 | |
jobspec = json.dumps(ru.read_json('spec.json')) | |
rep.info('submit ') | |
prof.prof('submit_start') | |
for i in range(njobs): | |
jobid = job.submit(h, jobspec) | |
rep.progress('.') | |
rep.ok('>>ok\n') | |
prof.prof('submit_stop') | |
done = 0 | |
rep.info('collect ') | |
prof.prof('collect_start') | |
while done < njobs: | |
event = h.event_recv() | |
for _, state in event.payload['transitions']: | |
if state == 'INACTIVE': | |
done += 1 | |
rep.progress('+') | |
rep.ok('>>ok\n') | |
prof.prof('collect_stop') | |
h.event_unsubscribe('job-state') | |
# ------------------------------------------------------------------------------ | |
# | |
rep.title('FLUX example') | |
proc = start_flux() | |
use_flux() | |
use_flux() | |
stop_flux(proc) | |
rep.header('done') | |
# ------------------------------------------------------------------------------ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment