diff --git a/dist/__init__.py b/dist/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/dist/_importhack.py b/dist/_importhack.py deleted file mode 100644 index 1dbaef6..0000000 --- a/dist/_importhack.py +++ /dev/null @@ -1,3 +0,0 @@ -import os, sys -sys.path.insert(0, - os.path.abspath(os.path.join(os.path.dirname(__file__), '../'))) diff --git a/dist/addrs.py b/dist/addrs.py deleted file mode 100644 index fe17537..0000000 --- a/dist/addrs.py +++ /dev/null @@ -1,4 +0,0 @@ -ip = '127.0.0.1' -port = 12615 -names = 'tasks tasks_loprio workers responses'.split() -addrs = dict((k, 'tcp://%s:%d' % (ip, port+i)) for i, k in enumerate(names)) diff --git a/dist/client.py b/dist/client.py deleted file mode 100644 index 7dce37f..0000000 --- a/dist/client.py +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env python2 -import os -import uuid -import weakref -import numpy as np -import cPickle as pickle - -import gevent -from gevent import spawn, queue, coros -import zmq.green as zmq - -import _importhack -from cuburn import profile, output -from cuburn.genome import db, util - -from messages import * - -# TODO: remove this dependency (loading the output module to get the suffix -# requires a compiler / default instance) -import pycuda.autoinit - -class RenderClient(object): - def __init__(self, task_addr, rsp_addr, ctx=None, start=True): - ctx = zmq.Context() if ctx is None else ctx - self.tsock = ctx.socket(zmq.REQ) - self.tsock.connect(task_addr) - - self.cid = uuid.uuid1().hex - self.rsock = ctx.socket(zmq.DEALER) - self.rsock.setsockopt(zmq.IDENTITY, self.cid) - self.rsock.connect(rsp_addr) - - self.tq = queue.Channel() - - self.taskmap = weakref.WeakValueDictionary() - if start: self.start() - - def put(self, task, rq=None): - """ - Add a task to the render queue. Ths method blocks. Returns the - queue to which the response will be sent. - """ - rq = queue.Queue() if rq is None else rq - self.tq.put((task, rq)) - return rq - - def start(self): - spawn(self._deal_tasks) - spawn(self._deal_rsps) - - def _deal_tasks(self): - for task, rq in self.tq: - rid = uuid.uuid1().hex - self.taskmap[rid] = rq - atask = AddressedTask([self.cid, rid], task) - self.tsock.send_pyobj(atask) - # Wait for an (empty) response. This ratelimits tasks. - self.tsock.recv() - - def _deal_rsps(self): - while True: - rsp = self.rsock.recv_multipart(copy=False) - rq = self.taskmap.get(rsp[0].bytes, None) - if rq: rq.put((rsp[1].bytes, rsp[2].bytes.split('\0'), rsp[3:])) - -# Time (in seconds) before a job times out -# TODO: replace timeout mechanism with polling? -TIMEOUT=4800 - -# Max. queue length before request considered lost, as a multiple of the -# number of in-flight requests -QUEUE_LENGTH_FACTOR=2 - -RETRIES=2 - -def iter_genomes(prof, outpath, gpaths): - """ - Walk a list of genome paths, yielding them in an order suitable for - the `genomes` argument of `create_jobs()`. - """ - gdb = db.connect('.') - - for gpath in gpaths: - try: - gnm, basename = gdb.get_anim(gpath) - except IOError: - continue - odir = os.path.join(outpath, basename) - if (os.path.isfile(os.path.join(odir, 'COMPLETE')) or - os.path.isfile(os.path.join(outpath, 'ref', basename+'.ts'))): - continue - gprof = profile.wrap(prof, gnm) - ghash = util.hash(gnm) - times = list(profile.enumerate_times(gprof)) - if not os.path.isdir(odir): - os.makedirs(odir) - with open(os.path.join(odir, 'NFRAMES'), 'w') as fp: - fp.write(str(len(times))) - outmod = output.get_output_for_profile(gprof) - for i, t in times: - opath = os.path.join(odir, '%05d' % i) - if not os.path.isfile(opath + outmod.suffix): - yield Task(opath, ghash, prof, gnm, t) - -def get_result(cli, task, rq): - try: - log, names, bufs = rq.get(timeout=TIMEOUT) - except queue.Empty: - cli.put(task, rq) - print '>>', task.id - log, names, bufs = rq.get() - - with open(task.id + '.log', 'wb') as fp: - fp.write(log) - - for name in reversed(names): - buf = bufs.pop() - with open(task.id + name, 'wb') as fp: - fp.write(buffer(buf)) - print '< ', task.id - -def main(addrs): - parser = profile.add_args() - parser.add_argument('genomes', nargs='+') - args = parser.parse_args() - prof_name, prof = profile.get_from_args(args) - - cli = RenderClient(addrs['tasks_loprio'], addrs['responses']) - - gen = iter_genomes(prof, 'out/%s' % prof_name, args.genomes) - try: - for task in gen: - rq = cli.put(task) - print ' >', task.id - spawn(get_result, cli, task, rq) - except KeyboardInterrupt: - print 'Interrupt received, flushing' - - while cli.taskmap: - for k, v in cli.taskmap.items(): - if not v.getters: - cli.taskmap.pop(k) - print 'Still waiting on %d tasks...' % len(cli.taskmap) - gevent.sleep(3) - -if __name__ == "__main__": - import addrs - main(addrs.addrs) diff --git a/dist/messages.py b/dist/messages.py deleted file mode 100644 index 792537c..0000000 --- a/dist/messages.py +++ /dev/null @@ -1,5 +0,0 @@ -from collections import namedtuple - -Task = namedtuple('Task', 'id hash profile anim times') -AddressedTask = namedtuple('AddressedTask', 'addr task') -FullTask = namedtuple('FullTask', 'addr task cubin packer') diff --git a/dist/server.py b/dist/server.py deleted file mode 100644 index 8f4dcf8..0000000 --- a/dist/server.py +++ /dev/null @@ -1,107 +0,0 @@ -#!/usr/bin/env python2 -from itertools import takewhile - -import gevent -from gevent import spawn, queue, event -import zmq.green as zmq -import cPickle as pickle - -import _importhack -from cuburn.render import Renderer - -from messages import * - -ctx = zmq.Context() - -def setup_task_listeners(addrs, tq, rq): - hisock = ctx.socket(zmq.REP) - losock = ctx.socket(zmq.REP) - hisock.bind(addrs['tasks']) - losock.bind(addrs['tasks_loprio']) - - loevt = event.Event() - loevt.set() - - @spawn - def listen_hi(): - while True: - if not hisock.poll(timeout=0): - # No messages pending. Set loevt, allowing messages from - # losock to be added to the queue. - loevt.set() - task = hisock.recv_pyobj() - loevt.clear() # Got message; pause listen_lo(). - tq.put(task) - hisock.send('') - - @spawn - def listen_lo(): - while True: - loevt.wait() - tq.put(losock.recv_pyobj()) - losock.send('') - -def setup_worker_listener(addrs, tq, rq): - wsock = ctx.socket(zmq.ROUTER) - wsock.bind(addrs['workers']) - - readyq = queue.Queue() - - compcache = {} - - @spawn - def send_work(): - for addr, task in tq: - print ' >', ' '.join(addr) - if task.hash not in compcache: - try: - rsp = Renderer.compile(task.anim, arch='sm_35') - except: - # Store exceptions, so that we don't try to endlessly - # recompile bad genomes - import traceback - rsp = traceback.format_exc() - print 'Error while compiling task:', rsp - compcache[task.hash] = rsp - else: - rsp = compcache[task.hash] - if isinstance(rsp, basestring): - continue - packer, lib, cubin = rsp - ctask = FullTask(addr, task, cubin, packer) - worker_addr = readyq.get() - wsock.send_multipart([worker_addr, '', pickle.dumps(ctask)]) - - @spawn - def read_rsps(): - while True: - rsp = wsock.recv_multipart(copy=False) - if rsp[2].bytes != '': - print '< ', rsp[2].bytes, rsp[3].bytes - rq.put(rsp[2:]) - readyq.put(rsp[0]) - -def setup_responder(addrs, rq): - rsock = ctx.socket(zmq.ROUTER) - rsock.bind(addrs['responses']) - - @spawn - def send_responses(): - for rsp in rq: - rsock.send_multipart(rsp) - return send_responses - -def main(addrs): - # Channel holding (addr, task) pairs. - tq = queue.Channel() - # Queue holding response messages (as a list of raw zmq frames). - rq = queue.Queue() - - setup_task_listeners(addrs, tq, rq) - setup_worker_listener(addrs, tq, rq) - # TODO: Will switch to a Nanny central wait loop - setup_responder(addrs, rq).join() - -if __name__ == "__main__": - import addrs - main(addrs.addrs) diff --git a/dist/worker.py b/dist/worker.py deleted file mode 100644 index 3847587..0000000 --- a/dist/worker.py +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env python2 -import sys -import socket -from cStringIO import StringIO - -import gevent -from gevent import spawn, queue -import zmq.green as zmq -import pycuda.driver as cuda -cuda.init() - -import _importhack -from cuburn import render, profile, output -from cuburn.genome import convert, db, use - -from messages import * - -class PrecompiledRenderer(render.Renderer): - def compile(self, gnm): - return self.packer, None, self.cubin - def __init__(self, gnm, gprof, packer, cubin): - self.packer, self.cubin = packer, cubin - super(PrecompiledRenderer, self).__init__(gnm, gprof) - -def main(worker_addr): - rmgr = render.RenderManager() - - ctx = zmq.Context() - in_queue = queue.Queue(0) - out_queue = queue.Queue(0) - - def request_loop(): - sock = ctx.socket(zmq.REQ) - sock.connect(worker_addr) - - # Start the request loop with an empty job - sock.send('') - - hash = None - while True: - log = [('worker', socket.gethostname() + ':' + - cuda.Context.get_current().get_device().pci_bus_id())] - addr, task, cubin, packer = sock.recv_pyobj() - gprof = profile.wrap(task.profile, task.anim) - if hash != task.hash: - rdr = PrecompiledRenderer(task.anim, gprof, packer, cubin) - for t in task.times: - evt, buf = rmgr.queue_frame(rdr, task.anim, gprof, t) - while not evt.query(): - gevent.sleep(0.01) - out, frame_log = rdr.out.encode(buf) - log += frame_log - print 'Rendered', task.id, 'in', int(evt.time()), 'ms' - final_out, final_log = rdr.out.encode(None) - assert not (out and final_out), 'Got output from two sources!' - out = out or final_out - log += final_log - log = '\0'.join([k + ' ' + v for k, v in log]) - - suffixes, files = zip(*[(k, v.read()) - for k, v in sorted(out.items())]) - # TODO: reduce copies, generally spruce up the memory usage here - sock.send_multipart(addr + [log, '\0'.join(suffixes)] + list(files)) - - # Spawn two request loops to take advantage of CUDA pipelining. - spawn(request_loop) - request_loop() - -if __name__ == "__main__": - import addrs - dev = cuda.Device(int(sys.argv[1])) - cuctx = dev.make_context(cuda.ctx_flags.SCHED_BLOCKING_SYNC) - try: - main(addrs.addrs['workers']) - finally: - cuda.Context.pop() diff --git a/distribute.py b/distribute.py new file mode 100755 index 0000000..ed0d659 --- /dev/null +++ b/distribute.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python2 + +import os +import sys +import socket +import argparse +import subprocess +import traceback + +import gevent +import gevent.event +import gevent.queue +import gevent.pool +from gevent import monkey +monkey.patch_all() + +import json +import warnings +from subprocess import Popen +from itertools import ifilter +from collections import namedtuple + +import numpy as np + +sys.path.insert(0, os.path.dirname(__file__)) +from cuburn import render, filters, output, profile +from cuburn.genome import convert, use, db + +ready_str = 'worker ready' +closing_encoder_str = 'closing encoder' +output_file_str = 'here is a file for you' +done_str = 'we done here' + +def write_str(out, val): + out.write(np.array([len(val)], '>u4').tostring()) + out.write(val) + out.flush() + +def write_filelike(out, filelike): + filelike.seek(0, 2) + out.write(np.array([filelike.tell()], '>u8').tostring()) + filelike.seek(0) + buf = filelike.read(1024 * 1024) + while buf: + out.write(buf) + buf = filelike.read(1024 * 1024) + out.flush() + +def read_str(infp): + sz_buf = infp.read(4) + assert len(sz_buf) == 4, 'Incomplete read of str size' + assert sz_buf[0] == '\0', 'No str should be that big' + sz = np.frombuffer(sz_buf, '>u4')[0] + msg = infp.read(sz) + assert len(msg) == sz, 'Incomplete read, expected %d got %d' % (sz, len(msg)) + return msg + +def copy_filelike(infp, dst): + sz_buf = infp.read(8) + assert len(sz_buf) == 8, 'Incomplete read of filelike size' + assert sz_buf[0] == '\0', 'No filelike should be that big' + sz = np.frombuffer(sz_buf, '>u8')[0] + recvd = 0 + while recvd < sz: + # uh... why is int needed here? + chunk_sz = int(min(1024 * 1024, sz - recvd)) + chunk = infp.read(chunk_sz) + assert len(chunk) == chunk_sz, ( + 'Incomplete chunk, expected %d (%s)got %d' % (sz, `sz_buf`, len(chunk))) + recvd += len(chunk) + +def work(args): + addr = socket.gethostname().split('.')[0] + '/' + str(args.device) + write_str(sys.stdout, ready_str) + + import pycuda.driver as cuda + cuda.init() + dev = cuda.Device(args.device) + cuctx = dev.make_context(flags=cuda.ctx_flags.SCHED_BLOCKING_SYNC) + + try: + job_text = read_str(sys.stdin) + if job_text == done_str: + return + job_desc = json.loads(job_text) + prof, gnm, times, name = map(job_desc.get, 'profile genome times name'.split()) + gprof = profile.wrap(prof, gnm) + + rmgr = render.RenderManager() + arch = 'sm_{}{}'.format( + dev.get_attribute(cuda.device_attribute.COMPUTE_CAPABILITY_MAJOR), + dev.get_attribute(cuda.device_attribute.COMPUTE_CAPABILITY_MINOR)) + rdr = render.Renderer(gnm, gprof, arch=arch) + last_render_time_ms = 0 + + def save(buf): + out, log = rdr.out.encode(buf) + for suffix, file_like in out.items(): + write_str(sys.stdout, output_file_str) + write_str(sys.stdout, suffix) + write_filelike(sys.stdout, file_like) + if getattr(file_like, 'close', None): + file_like.close() + + evt = buf = next_evt = next_buf = None + for idx, t in enumerate(list(times) + [None]): + evt, buf = next_evt, next_buf + if t is not None: + next_evt, next_buf = rmgr.queue_frame(rdr, gnm, gprof, t) + if not evt: continue + if last_render_time_ms > 2000: + while not evt.query(): + gevent.sleep(0.2) + else: + evt.synchronize() + last_render_time_ms = evt.time() + print >> sys.stderr, '%30s: %s (%3d/%3d), %dms' % ( + addr, name, idx, len(times), last_render_time_ms) + sys.stderr.flush() + + save(buf) + write_str(sys.stdout, closing_encoder_str) + save(None) + write_str(sys.stdout, done_str) + finally: + cuda.Context.pop() + +Job = namedtuple('Job', 'genome name times retry_count') + +def dispatch(args): + pname, prof = profile.get_from_args(args) + + workers = args.worker + if not workers: + try: + with open(os.path.expanduser('~/.cuburn-workers')) as fp: + workers = filter(None, fp.read().split()) + except: + traceback.print_exc() + pass + if not workers: + print >> sys.stderr, ('No workers defined. Pass --workers or set up ' + '~/.cuburn-workers with one worker per line.') + sys.exit(1) + + gdb = db.connect(args.genomedb) + + job_queue = gevent.queue.JoinableQueue(5) + active_job_group = gevent.pool.Group() + def fill_jobs(): + for oid in args.flames: + ids = [oid] + if oid[0] == '@': + with open(oid[1:]) as fp: + ids = fp.read().split('\n') + for id in ids: + gnm, basename = gdb.get_anim(id) + gprof = profile.wrap(prof, gnm) + for name, times in profile.enumerate_jobs(gprof, basename, args, + resume=True): + job_queue.put(Job(gnm, name, times, 0)) + job_filler = gevent.spawn(fill_jobs) + + def connect_to_worker(addr): + host, device = addr.split('/') + if host == 'localhost': + distribute_path = os.path.expanduser('~/.cuburn_dist/distribute.py') + args = [distribute_path, 'work', '--device', str(device)] + subp = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) + assert read_str(subp.stdout) == ready_str + else: + connect_timeout = 5 + while True: + try: + subp = subprocess.Popen( + ['ssh', host, '.cuburn_dist/distribute.py', 'work', + '--device', str(device)], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + assert read_str(subp.stdout) == ready_str + break + except: + traceback.print_exc() + gevent.sleep(connect_timeout) + connect_timeout *= 2 + return subp + + exiting = False + worker_failure_counts = {} + def run_job(addr): + worker = connect_to_worker(addr) + job = job_queue.get() + evt = gevent.event.Event() + def _run_job(): + try: + if job is None: + write_str(worker.stdin, done_str) + worker.stdin.close() + return + job_desc = dict(profile=prof, genome=job.genome, times=list(job.times), + name=job.name) + write_str(worker.stdin, json.dumps(job_desc)) + worker.stdin.close() + while True: + msg_name = read_str(worker.stdout) + if msg_name == closing_encoder_str: + evt.set() + elif msg_name == output_file_str: + filename = job.name + read_str(worker.stdout) + with open(filename + '.tmp', 'w') as fp: + copy_filelike(worker.stdout, fp) + os.rename(filename + '.tmp', filename) + else: + assert msg_name == done_str, 'no known event ' + msg_name + break + worker_failure_counts[addr] = 0 + except: + print >> sys.stderr, traceback.format_exc() + worker_failure_counts[addr] = worker_failure_counts.get(addr, 0) + 1 + if job.retry_count < 3: + job_queue.put(Job(job.genome, job.name, job.times, job.retry_count + 1)) + finally: + job_queue.task_done() + evt.set() + greenlet = gevent.spawn(_run_job) + active_job_group.add(greenlet) + return greenlet, evt + + def run_worker(addr): + while worker_failure_counts.get(addr) < 4 and not exiting: + greenlet, evt = run_job(addr) + evt.wait() + + worker_group = gevent.pool.Group() + for addr in workers: + worker_group.spawn(run_worker, addr) + job_filler.join() + + # Flush all outstanding jobs and, possibly, retries + while job_queue.join(): + active_job_group.join() + if job_queue.empty(): + break + + # Close the remaining workers + exiting = True + map(job_queue.put, [None] * len(worker_group)) + worker_group.join() + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description='Render fractal flames on multiple GPUs.') + + cmd_parser = parser.add_subparsers() + dispatch_parser = cmd_parser.add_parser( + 'dispatch', help='Dispatch tasks to workers.') + dispatch_parser.add_argument('flames', metavar='ID', type=str, nargs='+', + #fromfile_prefix_chars='@', + help='Flames to render (prefix playlist with @)') + dispatch_parser.add_argument('--worker', metavar='ADDRESS', nargs='*', + help='Worker address (in the form "host/device_id")') + dispatch_parser.add_argument('-d', '--genomedb', metavar='PATH', type=str, + help="Path to genome database (file or directory, default '.')", + default='.') + profile.add_args(dispatch_parser) + dispatch_parser.set_defaults(func=dispatch) + + worker_parser = cmd_parser.add_parser( + 'work', help='Perform a task (controlled by a dispatcher).') + worker_parser.add_argument('--device', metavar='NUM', type=int, + help='GPU device number to use, 0-indexed.') + worker_parser.set_defaults(func=work) + + args = parser.parse_args() + args.func(args)