From 92ccf9a579877987c554b86533eac5ba62e84531 Mon Sep 17 00:00:00 2001 From: Steven Robertson Date: Sat, 2 Jun 2012 10:08:48 -0700 Subject: [PATCH] Add new task distribution system Still in crazy beta --- dist/__init__.py | 0 dist/_importhack.py | 3 + dist/addrs.py | 4 + dist/client.py | 135 +++++++++++++++++++++++ dist/messages.py | 5 + dist/server.py | 107 ++++++++++++++++++ dist/worker.py | 49 +++++++++ run_job.py | 55 ---------- worker.py | 262 -------------------------------------------- 9 files changed, 303 insertions(+), 317 deletions(-) create mode 100644 dist/__init__.py create mode 100644 dist/_importhack.py create mode 100644 dist/addrs.py create mode 100644 dist/client.py create mode 100644 dist/messages.py create mode 100644 dist/server.py create mode 100644 dist/worker.py delete mode 100644 run_job.py delete mode 100755 worker.py diff --git a/dist/__init__.py b/dist/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dist/_importhack.py b/dist/_importhack.py new file mode 100644 index 0000000..1dbaef6 --- /dev/null +++ b/dist/_importhack.py @@ -0,0 +1,3 @@ +import os, sys +sys.path.insert(0, + os.path.abspath(os.path.join(os.path.dirname(__file__), '../'))) diff --git a/dist/addrs.py b/dist/addrs.py new file mode 100644 index 0000000..b8894f6 --- /dev/null +++ b/dist/addrs.py @@ -0,0 +1,4 @@ +ip = '10.1.10.2' +port = 12615 +names = 'tasks tasks_loprio workers responses'.split() +addrs = dict((k, 'tcp://%s:%d' % (ip, port+i)) for i, k in enumerate(names)) diff --git a/dist/client.py b/dist/client.py new file mode 100644 index 0000000..9b07909 --- /dev/null +++ b/dist/client.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python2 +import os +import uuid +import weakref +import numpy as np +import cPickle as pickle + +import gevent +from gevent import spawn, queue, coros +import zmq.green as zmq + +import _importhack +from cuburn import profile +from cuburn.genome import db, util + +from messages import * + +class RenderClient(object): + def __init__(self, task_addr, rsp_addr, ctx=None, start=True): + ctx = zmq.Context() if ctx is None else ctx + self.tsock = ctx.socket(zmq.REQ) + self.tsock.connect(task_addr) + + self.cid = uuid.uuid1().hex + self.rsock = ctx.socket(zmq.DEALER) + self.rsock.setsockopt(zmq.IDENTITY, self.cid) + self.rsock.connect(rsp_addr) + + self.tq = queue.Queue(0) + + self.taskmap = weakref.WeakValueDictionary() + if start: self.start() + + def put(self, task, rq=None): + """ + Add a task to the render queue. Ths method blocks. Returns the + queue to which the response will be sent. + """ + rq = queue.Queue() if rq is None else rq + self.tq.put((task, rq)) + return rq + + def start(self): + spawn(self._deal_tasks) + spawn(self._deal_rsps) + + def _deal_tasks(self): + for task, rq in self.tq: + rid = uuid.uuid1().hex + self.taskmap[rid] = rq + atask = AddressedTask([self.cid, rid], task) + self.tsock.send_pyobj(atask) + # Wait for an (empty) response. This ratelimits tasks. + self.tsock.recv() + + def _deal_rsps(self): + while True: + rsp = self.rsock.recv_multipart(copy=False) + assert len(rsp) == 2 + rq = self.taskmap.get(rsp[0].bytes, None) + if rq: rq.put(rsp[1]) + +# Time (in seconds) before a job times out +TIMEOUT=240 + +# Max. queue length before request considered lost, as a multiple of the +# number of in-flight requests +QUEUE_LENGTH_FACTOR=2 + +RETRIES=2 + +def iter_genomes(prof, outpath, gpaths): + """ + Walk a list of genome paths, yielding them in an order suitable for + the `genomes` argument of `create_jobs()`. + """ + gdb = db.connect('.') + + for gpath in gpaths: + try: + gnm, basename = gdb.get_anim(gpath) + except IOError: + continue + odir = os.path.join(outpath, basename) + if (os.path.isfile(os.path.join(odir, 'COMPLETE')) or + os.path.isfile(os.path.join(outpath, 'ref', basename+'.ts'))): + continue + gprof = profile.wrap(prof, gnm) + ghash = util.hash(gnm) + times = list(profile.enumerate_times(gprof)) + if not os.path.isdir(odir): + os.makedirs(odir) + with open(os.path.join(odir, 'NFRAMES'), 'w') as fp: + fp.write(str(len(times))) + for i, t in times: + opath = os.path.join(odir, '%05d.%s' % (i, gprof.output_format)) + if not os.path.isfile(opath): + yield Task(opath, ghash, prof, gnm, t) + +def get_result(cli, task, rq): + try: + rsp = rq.get(timeout=TIMEOUT) + except queue.Empty: + cli.put(task, rq) + print '>>', task.id + rsp = rq.get() + + with open(task.id, 'wb') as fp: + fp.write(buffer(rsp)) + print '< ', task.id + +def main(addrs): + parser = profile.add_args() + parser.add_argument('genomes', nargs='+') + args = parser.parse_args() + prof_name, prof = profile.get_from_args(args) + + cli = RenderClient(addrs['tasks_loprio'], addrs['responses']) + + gen = iter_genomes(prof, 'out/%s' % prof_name, args.genomes) + try: + for task in gen: + rq = cli.put(task) + print ' >', task.id + spawn(get_result, cli, task, rq) + except KeyboardInterrupt: + print 'Interrupt received, flushing' + + while cli.taskmap: + print 'Still waiting on %d tasks...' % len(cli.taskmap) + gevent.sleep(3) + +if __name__ == "__main__": + import addrs + main(addrs.addrs) diff --git a/dist/messages.py b/dist/messages.py new file mode 100644 index 0000000..647de2d --- /dev/null +++ b/dist/messages.py @@ -0,0 +1,5 @@ +from collections import namedtuple + +Task = namedtuple('Task', 'id hash profile anim time') +AddressedTask = namedtuple('AddressedTask', 'addr task') +FullTask = namedtuple('FullTask', 'addr task cubin packer') diff --git a/dist/server.py b/dist/server.py new file mode 100644 index 0000000..d01e755 --- /dev/null +++ b/dist/server.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python2 + +import gevent +from gevent import spawn, queue, event +import zmq.green as zmq +import cPickle as pickle + +import _importhack +from cuburn.render import Renderer + +from messages import * + +ctx = zmq.Context() + +def setup_task_listeners(addrs, tq, rq): + hisock = ctx.socket(zmq.REP) + losock = ctx.socket(zmq.REP) + hisock.bind(addrs['tasks']) + losock.bind(addrs['tasks_loprio']) + + loevt = event.Event() + loevt.set() + + @spawn + def listen_hi(): + while True: + if not hisock.poll(timeout=0): + # No messages pending. Set loevt, allowing messages from + # losock to be added to the queue. + loevt.set() + task = hisock.recv_pyobj() + print 'OOOOOH! Got a hiprio evt' + loevt.clear() # Got message; pause listen_lo(). + tq.put(task) + hisock.send('') + + @spawn + def listen_lo(): + while True: + loevt.wait() + tq.put(losock.recv_pyobj()) + losock.send('') + +def setup_worker_listener(addrs, tq, rq): + wsock = ctx.socket(zmq.ROUTER) + wsock.bind(addrs['workers']) + + readyq = queue.Queue() + + compcache = {} + + @spawn + def send_work(): + for addr, task in tq: + print ' >', ' '.join(addr) + if task.hash not in compcache: + try: + rsp = Renderer.compile(task.anim, arch='sm_21') + except: + # Store exceptions, so that we don't try to endlessly + # recompile bad genomes + import traceback + rsp = traceback.format_exc() + print 'Error while compiling task:', rsp + compcache[task.hash] = rsp + else: + rsp = compcache[task.hash] + if isinstance(rsp, basestring): + continue + packer, lib, cubin = rsp + ctask = FullTask(addr, task, cubin, packer) + worker_addr = readyq.get() + wsock.send_multipart([worker_addr, '', pickle.dumps(ctask)]) + + @spawn + def read_rsps(): + while True: + rsp = wsock.recv_multipart(copy=False) + if rsp[2].bytes != '': + print '< ', ' '.join([r.bytes for r in rsp[1:-1]]) + rq.put(rsp[2:]) + readyq.put(rsp[0]) + +def setup_responder(addrs, rq): + rsock = ctx.socket(zmq.ROUTER) + rsock.bind(addrs['responses']) + + @spawn + def send_responses(): + for rsp in rq: + rsock.send_multipart(rsp) + return send_responses + +def main(addrs): + # Channel holding (addr, task) pairs. + tq = queue.Queue(0) + # Queue holding response messages (as a list of raw zmq frames). + rq = queue.Queue() + + setup_task_listeners(addrs, tq, rq) + setup_worker_listener(addrs, tq, rq) + # TODO: Will switch to a Nanny central wait loop + setup_responder(addrs, rq).join() + +if __name__ == "__main__": + import addrs + main(addrs.addrs) diff --git a/dist/worker.py b/dist/worker.py new file mode 100644 index 0000000..b72eb9b --- /dev/null +++ b/dist/worker.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python2 +import sys +from cStringIO import StringIO + +import zmq +import pycuda.driver as cuda +cuda.init() + +import _importhack +from cuburn import render, profile, output +from cuburn.genome import convert, db, use + +from messages import * + +class PrecompiledRenderer(render.Renderer): + def compile(self, gnm): + return self.packer, None, self.cubin + def __init__(self, gnm, gprof, packer, cubin): + self.packer, self.cubin = packer, cubin + super(PrecompiledRenderer, self).__init__(gnm, gprof) + +def main(card_num, worker_addr): + dev = cuda.Device(card_num) + ctx = dev.make_context(cuda.ctx_flags.SCHED_BLOCKING_SYNC) + rmgr = render.RenderManager() + + ctx = zmq.Context() + sock = ctx.socket(zmq.REQ) + sock.connect(worker_addr) + + # Start the request loop with an empty job + sock.send('') + hash = None + while True: + addr, task, cubin, packer = sock.recv_pyobj() + gprof = profile.wrap(task.profile, task.anim) + if hash != task.hash: + rdr = PrecompiledRenderer(task.anim, gprof, packer, cubin) + buf = rmgr.queue_frame(rdr, task.anim, gprof, task.time) + ofile = StringIO() + output.PILOutput.save(buf, ofile, task.id[-3:]) + ofile.seek(0) + sock.send_multipart(addr + [ofile.read()]) + hash = task.hash + print 'Rendered', task.id + +if __name__ == "__main__": + import addrs + main(int(sys.argv[1]), addrs.addrs['workers']) diff --git a/run_job.py b/run_job.py deleted file mode 100644 index 0193b12..0000000 --- a/run_job.py +++ /dev/null @@ -1,55 +0,0 @@ -#!/usr/bin/python - -# Temporary helper script while I update main.py - -import os -import sys -import time -import argparse -import multiprocessing -from subprocess import Popen -from ctypes import * -from itertools import ifilter - -import numpy as np -import Image -import scipy -import pycuda.autoinit - -import cuburn.render -import cuburn.genome - -import pycuda.compiler -import pycuda.driver as cuda -import cuburn.code.interp - -np.set_printoptions(precision=5, edgeitems=20) - -real_stdout = sys.stdout - -def save(rimg, pfx): - noalpha = rimg.buf[:,:,:3] - name = pfx + str(rimg.idx) - img = scipy.misc.toimage(noalpha, cmin=0, cmax=1) - img.save(name+'.png') - print name, rimg.gpu_time - sys.stdout.flush() - -def main(jobfilepath, outprefix): - # This includes the genomes and other cruft, a dedicated reader will be - # built in time tho - info = cuburn.genome.load_info(open(jobfilepath).read()) - - times = np.linspace(0, 1, info.duration * info.fps + 1) - #rtimes = zip(['%05d' % i for i in range(len(times))[1:]], times, times[1:]) - rtimes = [('still', times[0], times[0]), ('motion', times[1], times[2])] - - renderer = cuburn.render.Renderer(info) - renderer.compile() - renderer.load() - - for out in renderer.render(rtimes): - save(out, outprefix) - -if __name__ == "__main__": - main(sys.argv[1], sys.argv[2] if len(sys.argv) > 2 else 'out/') diff --git a/worker.py b/worker.py deleted file mode 100755 index 366fea7..0000000 --- a/worker.py +++ /dev/null @@ -1,262 +0,0 @@ -#!/usr/bin/env python2 -# Render from a job server - -import re -import os -import sys -import time -import uuid -import json -import socket -import itertools -from collections import namedtuple -from subprocess import check_call, check_output -from cStringIO import StringIO - -import scipy -import redis - -sys.path.insert(0, os.path.dirname(__file__)) -from cuburn import render -from cuburn.genome import convert, db, use - -import pycuda.driver as cuda - -pycuda = None - -# The default maximum number of waiting jobs. Also used to determine when a -# job has expired. -QUEUE_LENGTH=10 - -def partition(pred, arg): - return filter(pred, arg), filter(lambda a: not pred(a), arg) - -def git_rev(): - os.environ['GIT_DIR'] = os.path.join(os.path.dirname(__file__) or '.', '.git') - if 'FLOCK_PATH_IGNORE' not in os.environ: - if check_output('git status -z -uno'.split()): - return None - return check_output('git rev-parse HEAD'.split()).strip()[:10] - -uu = lambda t: ':'.join((t, uuid.uuid1().hex)) - -def get_temperature(): - id = pycuda.autoinit.device.pci_bus_id() - try: - out = check_output('nvidia-smi -q -d TEMPERATURE'.split()) - except: - return '' - idx = out.find('\nGPU ' + id) - if idx >= 0: - out.find('Gpu', idx) - if idx >= 0: - idx = out.find(':') - if idx >= 0: - return out[idx+1:idx+3] - return '' - -def work(server): - global pycuda - import pycuda.autoinit - rev = git_rev() - assert rev, 'Repository must be clean!' - r = redis.StrictRedis(server) - wid = uu('workers') - r.sadd('renderpool:' + rev + ':workers', wid) - r.hmset(wid, { - 'host': socket.gethostname(), - 'devid': pycuda.autoinit.device.pci_bus_id(), - 'temp': get_temperature() - }) - r.expire(wid, 180) - last_ping = time.time() - - idx = evt = buf = None - last_idx = last_buf = last_evt = two_evts_ago = None - last_pid = last_gid = rdr = None - - mgr = render.RenderManager() - - while True: - task = r.blpop('renderpool:' + rev + ':queue', 10) - now = time.time() - if now > last_ping - 60: - r.hset(wid, 'temp', get_temperature()) - r.expire(wid, 180) - last_ping = now - - # last_evt will be populated during normal queue operation (when evt - # contains the most recent event), as well as when the render queue is - # flushing due to not receiving a new task before the timeout. - if last_idx is not None: - while not last_evt.query(): - # This delay could probably be a lot higher with zero impact - # on throughput for Fermi cards - time.sleep(0.05) - - sid, sidx, ftag = last_idx - obuf = StringIO() - rdr.out.save(last_buf, obuf, 'jpeg') - obuf.seek(0) - gpu_time = last_evt.time_since(two_evts_ago) - head = ' '.join([sidx, str(gpu_time), ftag]) - r.rpush(sid + ':queue', head + '\0' + obuf.read()) - print 'Pushed frame: %s' % head - - two_evts_ago, last_evt = last_evt, evt - last_idx, last_buf = idx, buf - - if not task: - idx = evt = buf = None - continue - - copy = False - sid, sidx, pid, gid, ftime, ftag = task[1].split(' ', 5) - if pid != last_pid or gid != last_gid or not rdr: - gnm = json.loads(r.get(gid)) - gprof, ignored_times = use.wrap_genome(json.loads(r.get(pid)), gnm) - rdr = render.Renderer(gnm, gprof) - last_pid, last_gid = pid, gid - copy = True - - if last_evt is None: - # Create a dummy event for timing - last_evt = cuda.Event().record(mgr.stream_a) - - evt, buf = mgr.queue_frame(rdr, gnm, gprof, float(ftime), copy) - idx = sid, sidx, ftag - -def iter_genomes(prof, gpaths, pname='540p'): - """ - Walk a list of genome paths, yielding them in an order suitable for - the `genomes` argument of `create_jobs()`. - """ - gdb = db.connect('.') - - for gpath in gpaths: - gname = os.path.basename(gpath).rsplit('.', 1)[0] - odir = 'out/%s/%s' % (pname, gname) - if os.path.isfile(os.path.join(odir, 'COMPLETE')): - continue - with open(gpath) as fp: - gsrc = fp.read() - gnm = convert.edge_to_anim(gdb, json.loads(gsrc)) - gsrc = json.dumps(gnm) - gprof, times = use.wrap_genome(prof, gnm) - gtimes = [] - for i, t in enumerate(times): - opath = os.path.join(odir, '%05d.jpg' % (i+1)) - if not os.path.isfile(opath): - gtimes.append((t, opath)) - if gtimes: - if not os.path.isdir(odir): - os.makedirs(odir) - with open(os.path.join(odir, 'NFRAMES'), 'w') as fp: - fp.write(str(len(times)) + '\n') - yield gsrc, gtimes - -def create_jobs(r, psrc, genomes): - """Attention politicians: it really is this easy. - - `genomes` is an iterable of 2-tuples (gsrc, gframes), where `gframes` is an - iterable of 2-tuples (ftime, fid). - """ - pid = uu('profile') - r.set(pid, psrc) - for gsrc, gframes in genomes: - # TODO: SHA-based? I guess that depends on whether we do precompilation - # on the HTTP server which accepts job requests (and on whether the - # grid remains homogeneous). - gid = uu('genome') - r.set(gid, gsrc) - r.publish('precompile', gid) - - for ftime, fid in gframes: - yield pid, gid, str(ftime), fid - -def run_jobs(r, rev, jobs): - # TODO: session properties - sid = uu('session') - qid = sid + ':queue' - pending = {} # sidx -> job, for any job currently in the queue - waiting = [] # sidx of jobs in queue normally - retry = [] # sidx of jobs in queue a second time - - def push(i, job): - j = ' '.join((sid, str(i)) + job) - r.rpush('renderpool:' + rev + ':queue', j) - - def pull(block): - if block: - ret = r.blpop(qid, 180) - if ret is None: - # TODO: better exception - raise ValueError("Timeout") - ret = ret[1] - else: - ret = r.lpop(qid) - if ret is None: return - tags, jpg = ret.split('\0', 1) - sidx, gpu_time, ftag = tags.split(' ', 2) - sidx, gpu_time = int(sidx), float(gpu_time) - if sidx in waiting: - waiting.remove(sidx) - if sidx in retry: - retry.remove(sidx) - if sidx in pending: - pending.pop(sidx) - else: - print 'Got two responses for %d' % sidx - if retry and retry[0] < sidx - 4 * QUEUE_LENGTH: - # TODO: ensure that this doesn't happen accidentally; raise an - # appropriate exception when it does - print "Double retry!" - expired, waiting[:] = partition(lambda w: w < sidx - QUEUE_LENGTH, - waiting) - for i in expired: - push(i, pending[i]) - retry.append(i) - return sidx, gpu_time, ftag, jpg - - try: - for sidx, job in enumerate(jobs): - while len(pending) > QUEUE_LENGTH: - yield pull(True) - ret = pull(False) - if ret: - yield ret - pending[sidx] = job - waiting.append(sidx) - push(sidx, job) - except KeyboardInterrupt: - print 'Interrupt received, flushing already-dispatched frames' - - while pending: - print '%d...' % len(pending) - yield pull(True) - -def client(ppath, gpaths): - rev = git_rev() - assert rev, 'Repository must be clean!' - r = redis.StrictRedis() - if not r.scard('renderpool:' + rev + ':workers'): - # TODO: expire workers when they disconnect - print 'No workers available at local cuburn revision, exiting.' - return - - with open(ppath) as fp: - psrc = fp.read() - prof = json.loads(psrc) - pname = os.path.basename(ppath).rsplit('.', 1)[0] - - jobiter = create_jobs(r, psrc, iter_genomes(prof, gpaths, pname)) - for sidx, gpu_time, ftag, jpg in run_jobs(r, rev, jobiter): - with open(ftag, 'w') as fp: - fp.write(jpg) - print 'Wrote %s (took %g msec)' % (ftag, gpu_time) - -if __name__ == "__main__": - if sys.argv[1] == 'work': - work(sys.argv[2] if len(sys.argv) > 2 else 'localhost') - else: - client(sys.argv[1], sys.argv[2:])