cuburn/dist/client.py
2014-12-20 18:27:27 -08:00

149 lines
4.3 KiB
Python

#!/usr/bin/env python2
import os
import uuid
import weakref
import numpy as np
import cPickle as pickle
import gevent
from gevent import spawn, queue, coros
import zmq.green as zmq
import _importhack
from cuburn import profile, output
from cuburn.genome import db, util
from messages import *
# TODO: remove this dependency (loading the output module to get the suffix
# requires a compiler / default instance)
import pycuda.autoinit
class RenderClient(object):
def __init__(self, task_addr, rsp_addr, ctx=None, start=True):
ctx = zmq.Context() if ctx is None else ctx
self.tsock = ctx.socket(zmq.REQ)
self.tsock.connect(task_addr)
self.cid = uuid.uuid1().hex
self.rsock = ctx.socket(zmq.DEALER)
self.rsock.setsockopt(zmq.IDENTITY, self.cid)
self.rsock.connect(rsp_addr)
self.tq = queue.Channel()
self.taskmap = weakref.WeakValueDictionary()
if start: self.start()
def put(self, task, rq=None):
"""
Add a task to the render queue. Ths method blocks. Returns the
queue to which the response will be sent.
"""
rq = queue.Queue() if rq is None else rq
self.tq.put((task, rq))
return rq
def start(self):
spawn(self._deal_tasks)
spawn(self._deal_rsps)
def _deal_tasks(self):
for task, rq in self.tq:
rid = uuid.uuid1().hex
self.taskmap[rid] = rq
atask = AddressedTask([self.cid, rid], task)
self.tsock.send_pyobj(atask)
# Wait for an (empty) response. This ratelimits tasks.
self.tsock.recv()
def _deal_rsps(self):
while True:
rsp = self.rsock.recv_multipart(copy=False)
rq = self.taskmap.get(rsp[0].bytes, None)
if rq: rq.put((rsp[1].bytes, rsp[2].bytes.split('\0'), rsp[3:]))
# Time (in seconds) before a job times out
# TODO: replace timeout mechanism with polling?
TIMEOUT=4800
# Max. queue length before request considered lost, as a multiple of the
# number of in-flight requests
QUEUE_LENGTH_FACTOR=2
RETRIES=2
def iter_genomes(prof, outpath, gpaths):
"""
Walk a list of genome paths, yielding them in an order suitable for
the `genomes` argument of `create_jobs()`.
"""
gdb = db.connect('.')
for gpath in gpaths:
try:
gnm, basename = gdb.get_anim(gpath)
except IOError:
continue
odir = os.path.join(outpath, basename)
if (os.path.isfile(os.path.join(odir, 'COMPLETE')) or
os.path.isfile(os.path.join(outpath, 'ref', basename+'.ts'))):
continue
gprof = profile.wrap(prof, gnm)
ghash = util.hash(gnm)
times = list(profile.enumerate_times(gprof))
if not os.path.isdir(odir):
os.makedirs(odir)
with open(os.path.join(odir, 'NFRAMES'), 'w') as fp:
fp.write(str(len(times)))
outmod = output.get_output_for_profile(gprof)
for i, t in times:
opath = os.path.join(odir, '%05d' % i)
if not os.path.isfile(opath + outmod.suffix):
yield Task(opath, ghash, prof, gnm, t)
def get_result(cli, task, rq):
try:
log, names, bufs = rq.get(timeout=TIMEOUT)
except queue.Empty:
cli.put(task, rq)
print '>>', task.id
log, names, bufs = rq.get()
with open(task.id + '.log', 'wb') as fp:
fp.write(log)
for name in reversed(names):
buf = bufs.pop()
with open(task.id + name, 'wb') as fp:
fp.write(buffer(buf))
print '< ', task.id
def main(addrs):
parser = profile.add_args()
parser.add_argument('genomes', nargs='+')
args = parser.parse_args()
prof_name, prof = profile.get_from_args(args)
cli = RenderClient(addrs['tasks_loprio'], addrs['responses'])
gen = iter_genomes(prof, 'out/%s' % prof_name, args.genomes)
try:
for task in gen:
rq = cli.put(task)
print ' >', task.id
spawn(get_result, cli, task, rq)
except KeyboardInterrupt:
print 'Interrupt received, flushing'
while cli.taskmap:
for k, v in cli.taskmap.items():
if not v.getters:
cli.taskmap.pop(k)
print 'Still waiting on %d tasks...' % len(cli.taskmap)
gevent.sleep(3)
if __name__ == "__main__":
import addrs
main(addrs.addrs)