Redesign distribution: now based on ssh, not zmq

This commit is contained in:
Steven Robertson 2017-05-15 12:04:16 -07:00
parent c7654357a6
commit 112a674520
8 changed files with 274 additions and 343 deletions

0
dist/__init__.py vendored
View File

3
dist/_importhack.py vendored
View File

@ -1,3 +0,0 @@
import os, sys
sys.path.insert(0,
os.path.abspath(os.path.join(os.path.dirname(__file__), '../')))

4
dist/addrs.py vendored
View File

@ -1,4 +0,0 @@
ip = '127.0.0.1'
port = 12615
names = 'tasks tasks_loprio workers responses'.split()
addrs = dict((k, 'tcp://%s:%d' % (ip, port+i)) for i, k in enumerate(names))

148
dist/client.py vendored
View File

@ -1,148 +0,0 @@
#!/usr/bin/env python2
import os
import uuid
import weakref
import numpy as np
import cPickle as pickle
import gevent
from gevent import spawn, queue, coros
import zmq.green as zmq
import _importhack
from cuburn import profile, output
from cuburn.genome import db, util
from messages import *
# TODO: remove this dependency (loading the output module to get the suffix
# requires a compiler / default instance)
import pycuda.autoinit
class RenderClient(object):
def __init__(self, task_addr, rsp_addr, ctx=None, start=True):
ctx = zmq.Context() if ctx is None else ctx
self.tsock = ctx.socket(zmq.REQ)
self.tsock.connect(task_addr)
self.cid = uuid.uuid1().hex
self.rsock = ctx.socket(zmq.DEALER)
self.rsock.setsockopt(zmq.IDENTITY, self.cid)
self.rsock.connect(rsp_addr)
self.tq = queue.Channel()
self.taskmap = weakref.WeakValueDictionary()
if start: self.start()
def put(self, task, rq=None):
"""
Add a task to the render queue. Ths method blocks. Returns the
queue to which the response will be sent.
"""
rq = queue.Queue() if rq is None else rq
self.tq.put((task, rq))
return rq
def start(self):
spawn(self._deal_tasks)
spawn(self._deal_rsps)
def _deal_tasks(self):
for task, rq in self.tq:
rid = uuid.uuid1().hex
self.taskmap[rid] = rq
atask = AddressedTask([self.cid, rid], task)
self.tsock.send_pyobj(atask)
# Wait for an (empty) response. This ratelimits tasks.
self.tsock.recv()
def _deal_rsps(self):
while True:
rsp = self.rsock.recv_multipart(copy=False)
rq = self.taskmap.get(rsp[0].bytes, None)
if rq: rq.put((rsp[1].bytes, rsp[2].bytes.split('\0'), rsp[3:]))
# Time (in seconds) before a job times out
# TODO: replace timeout mechanism with polling?
TIMEOUT=4800
# Max. queue length before request considered lost, as a multiple of the
# number of in-flight requests
QUEUE_LENGTH_FACTOR=2
RETRIES=2
def iter_genomes(prof, outpath, gpaths):
"""
Walk a list of genome paths, yielding them in an order suitable for
the `genomes` argument of `create_jobs()`.
"""
gdb = db.connect('.')
for gpath in gpaths:
try:
gnm, basename = gdb.get_anim(gpath)
except IOError:
continue
odir = os.path.join(outpath, basename)
if (os.path.isfile(os.path.join(odir, 'COMPLETE')) or
os.path.isfile(os.path.join(outpath, 'ref', basename+'.ts'))):
continue
gprof = profile.wrap(prof, gnm)
ghash = util.hash(gnm)
times = list(profile.enumerate_times(gprof))
if not os.path.isdir(odir):
os.makedirs(odir)
with open(os.path.join(odir, 'NFRAMES'), 'w') as fp:
fp.write(str(len(times)))
outmod = output.get_output_for_profile(gprof)
for i, t in times:
opath = os.path.join(odir, '%05d' % i)
if not os.path.isfile(opath + outmod.suffix):
yield Task(opath, ghash, prof, gnm, t)
def get_result(cli, task, rq):
try:
log, names, bufs = rq.get(timeout=TIMEOUT)
except queue.Empty:
cli.put(task, rq)
print '>>', task.id
log, names, bufs = rq.get()
with open(task.id + '.log', 'wb') as fp:
fp.write(log)
for name in reversed(names):
buf = bufs.pop()
with open(task.id + name, 'wb') as fp:
fp.write(buffer(buf))
print '< ', task.id
def main(addrs):
parser = profile.add_args()
parser.add_argument('genomes', nargs='+')
args = parser.parse_args()
prof_name, prof = profile.get_from_args(args)
cli = RenderClient(addrs['tasks_loprio'], addrs['responses'])
gen = iter_genomes(prof, 'out/%s' % prof_name, args.genomes)
try:
for task in gen:
rq = cli.put(task)
print ' >', task.id
spawn(get_result, cli, task, rq)
except KeyboardInterrupt:
print 'Interrupt received, flushing'
while cli.taskmap:
for k, v in cli.taskmap.items():
if not v.getters:
cli.taskmap.pop(k)
print 'Still waiting on %d tasks...' % len(cli.taskmap)
gevent.sleep(3)
if __name__ == "__main__":
import addrs
main(addrs.addrs)

5
dist/messages.py vendored
View File

@ -1,5 +0,0 @@
from collections import namedtuple
Task = namedtuple('Task', 'id hash profile anim times')
AddressedTask = namedtuple('AddressedTask', 'addr task')
FullTask = namedtuple('FullTask', 'addr task cubin packer')

107
dist/server.py vendored
View File

@ -1,107 +0,0 @@
#!/usr/bin/env python2
from itertools import takewhile
import gevent
from gevent import spawn, queue, event
import zmq.green as zmq
import cPickle as pickle
import _importhack
from cuburn.render import Renderer
from messages import *
ctx = zmq.Context()
def setup_task_listeners(addrs, tq, rq):
hisock = ctx.socket(zmq.REP)
losock = ctx.socket(zmq.REP)
hisock.bind(addrs['tasks'])
losock.bind(addrs['tasks_loprio'])
loevt = event.Event()
loevt.set()
@spawn
def listen_hi():
while True:
if not hisock.poll(timeout=0):
# No messages pending. Set loevt, allowing messages from
# losock to be added to the queue.
loevt.set()
task = hisock.recv_pyobj()
loevt.clear() # Got message; pause listen_lo().
tq.put(task)
hisock.send('')
@spawn
def listen_lo():
while True:
loevt.wait()
tq.put(losock.recv_pyobj())
losock.send('')
def setup_worker_listener(addrs, tq, rq):
wsock = ctx.socket(zmq.ROUTER)
wsock.bind(addrs['workers'])
readyq = queue.Queue()
compcache = {}
@spawn
def send_work():
for addr, task in tq:
print ' >', ' '.join(addr)
if task.hash not in compcache:
try:
rsp = Renderer.compile(task.anim, arch='sm_35')
except:
# Store exceptions, so that we don't try to endlessly
# recompile bad genomes
import traceback
rsp = traceback.format_exc()
print 'Error while compiling task:', rsp
compcache[task.hash] = rsp
else:
rsp = compcache[task.hash]
if isinstance(rsp, basestring):
continue
packer, lib, cubin = rsp
ctask = FullTask(addr, task, cubin, packer)
worker_addr = readyq.get()
wsock.send_multipart([worker_addr, '', pickle.dumps(ctask)])
@spawn
def read_rsps():
while True:
rsp = wsock.recv_multipart(copy=False)
if rsp[2].bytes != '':
print '< ', rsp[2].bytes, rsp[3].bytes
rq.put(rsp[2:])
readyq.put(rsp[0])
def setup_responder(addrs, rq):
rsock = ctx.socket(zmq.ROUTER)
rsock.bind(addrs['responses'])
@spawn
def send_responses():
for rsp in rq:
rsock.send_multipart(rsp)
return send_responses
def main(addrs):
# Channel holding (addr, task) pairs.
tq = queue.Channel()
# Queue holding response messages (as a list of raw zmq frames).
rq = queue.Queue()
setup_task_listeners(addrs, tq, rq)
setup_worker_listener(addrs, tq, rq)
# TODO: Will switch to a Nanny central wait loop
setup_responder(addrs, rq).join()
if __name__ == "__main__":
import addrs
main(addrs.addrs)

76
dist/worker.py vendored
View File

@ -1,76 +0,0 @@
#!/usr/bin/env python2
import sys
import socket
from cStringIO import StringIO
import gevent
from gevent import spawn, queue
import zmq.green as zmq
import pycuda.driver as cuda
cuda.init()
import _importhack
from cuburn import render, profile, output
from cuburn.genome import convert, db, use
from messages import *
class PrecompiledRenderer(render.Renderer):
def compile(self, gnm):
return self.packer, None, self.cubin
def __init__(self, gnm, gprof, packer, cubin):
self.packer, self.cubin = packer, cubin
super(PrecompiledRenderer, self).__init__(gnm, gprof)
def main(worker_addr):
rmgr = render.RenderManager()
ctx = zmq.Context()
in_queue = queue.Queue(0)
out_queue = queue.Queue(0)
def request_loop():
sock = ctx.socket(zmq.REQ)
sock.connect(worker_addr)
# Start the request loop with an empty job
sock.send('')
hash = None
while True:
log = [('worker', socket.gethostname() + ':' +
cuda.Context.get_current().get_device().pci_bus_id())]
addr, task, cubin, packer = sock.recv_pyobj()
gprof = profile.wrap(task.profile, task.anim)
if hash != task.hash:
rdr = PrecompiledRenderer(task.anim, gprof, packer, cubin)
for t in task.times:
evt, buf = rmgr.queue_frame(rdr, task.anim, gprof, t)
while not evt.query():
gevent.sleep(0.01)
out, frame_log = rdr.out.encode(buf)
log += frame_log
print 'Rendered', task.id, 'in', int(evt.time()), 'ms'
final_out, final_log = rdr.out.encode(None)
assert not (out and final_out), 'Got output from two sources!'
out = out or final_out
log += final_log
log = '\0'.join([k + ' ' + v for k, v in log])
suffixes, files = zip(*[(k, v.read())
for k, v in sorted(out.items())])
# TODO: reduce copies, generally spruce up the memory usage here
sock.send_multipart(addr + [log, '\0'.join(suffixes)] + list(files))
# Spawn two request loops to take advantage of CUDA pipelining.
spawn(request_loop)
request_loop()
if __name__ == "__main__":
import addrs
dev = cuda.Device(int(sys.argv[1]))
cuctx = dev.make_context(cuda.ctx_flags.SCHED_BLOCKING_SYNC)
try:
main(addrs.addrs['workers'])
finally:
cuda.Context.pop()

274
distribute.py Executable file
View File

@ -0,0 +1,274 @@
#!/usr/bin/env python2
import os
import sys
import socket
import argparse
import subprocess
import traceback
import gevent
import gevent.event
import gevent.queue
import gevent.pool
from gevent import monkey
monkey.patch_all()
import json
import warnings
from subprocess import Popen
from itertools import ifilter
from collections import namedtuple
import numpy as np
sys.path.insert(0, os.path.dirname(__file__))
from cuburn import render, filters, output, profile
from cuburn.genome import convert, use, db
ready_str = 'worker ready'
closing_encoder_str = 'closing encoder'
output_file_str = 'here is a file for you'
done_str = 'we done here'
def write_str(out, val):
out.write(np.array([len(val)], '>u4').tostring())
out.write(val)
out.flush()
def write_filelike(out, filelike):
filelike.seek(0, 2)
out.write(np.array([filelike.tell()], '>u8').tostring())
filelike.seek(0)
buf = filelike.read(1024 * 1024)
while buf:
out.write(buf)
buf = filelike.read(1024 * 1024)
out.flush()
def read_str(infp):
sz_buf = infp.read(4)
assert len(sz_buf) == 4, 'Incomplete read of str size'
assert sz_buf[0] == '\0', 'No str should be that big'
sz = np.frombuffer(sz_buf, '>u4')[0]
msg = infp.read(sz)
assert len(msg) == sz, 'Incomplete read, expected %d got %d' % (sz, len(msg))
return msg
def copy_filelike(infp, dst):
sz_buf = infp.read(8)
assert len(sz_buf) == 8, 'Incomplete read of filelike size'
assert sz_buf[0] == '\0', 'No filelike should be that big'
sz = np.frombuffer(sz_buf, '>u8')[0]
recvd = 0
while recvd < sz:
# uh... why is int needed here?
chunk_sz = int(min(1024 * 1024, sz - recvd))
chunk = infp.read(chunk_sz)
assert len(chunk) == chunk_sz, (
'Incomplete chunk, expected %d (%s)got %d' % (sz, `sz_buf`, len(chunk)))
recvd += len(chunk)
def work(args):
addr = socket.gethostname().split('.')[0] + '/' + str(args.device)
write_str(sys.stdout, ready_str)
import pycuda.driver as cuda
cuda.init()
dev = cuda.Device(args.device)
cuctx = dev.make_context(flags=cuda.ctx_flags.SCHED_BLOCKING_SYNC)
try:
job_text = read_str(sys.stdin)
if job_text == done_str:
return
job_desc = json.loads(job_text)
prof, gnm, times, name = map(job_desc.get, 'profile genome times name'.split())
gprof = profile.wrap(prof, gnm)
rmgr = render.RenderManager()
arch = 'sm_{}{}'.format(
dev.get_attribute(cuda.device_attribute.COMPUTE_CAPABILITY_MAJOR),
dev.get_attribute(cuda.device_attribute.COMPUTE_CAPABILITY_MINOR))
rdr = render.Renderer(gnm, gprof, arch=arch)
last_render_time_ms = 0
def save(buf):
out, log = rdr.out.encode(buf)
for suffix, file_like in out.items():
write_str(sys.stdout, output_file_str)
write_str(sys.stdout, suffix)
write_filelike(sys.stdout, file_like)
if getattr(file_like, 'close', None):
file_like.close()
evt = buf = next_evt = next_buf = None
for idx, t in enumerate(list(times) + [None]):
evt, buf = next_evt, next_buf
if t is not None:
next_evt, next_buf = rmgr.queue_frame(rdr, gnm, gprof, t)
if not evt: continue
if last_render_time_ms > 2000:
while not evt.query():
gevent.sleep(0.2)
else:
evt.synchronize()
last_render_time_ms = evt.time()
print >> sys.stderr, '%30s: %s (%3d/%3d), %dms' % (
addr, name, idx, len(times), last_render_time_ms)
sys.stderr.flush()
save(buf)
write_str(sys.stdout, closing_encoder_str)
save(None)
write_str(sys.stdout, done_str)
finally:
cuda.Context.pop()
Job = namedtuple('Job', 'genome name times retry_count')
def dispatch(args):
pname, prof = profile.get_from_args(args)
workers = args.worker
if not workers:
try:
with open(os.path.expanduser('~/.cuburn-workers')) as fp:
workers = filter(None, fp.read().split())
except:
traceback.print_exc()
pass
if not workers:
print >> sys.stderr, ('No workers defined. Pass --workers or set up '
'~/.cuburn-workers with one worker per line.')
sys.exit(1)
gdb = db.connect(args.genomedb)
job_queue = gevent.queue.JoinableQueue(5)
active_job_group = gevent.pool.Group()
def fill_jobs():
for oid in args.flames:
ids = [oid]
if oid[0] == '@':
with open(oid[1:]) as fp:
ids = fp.read().split('\n')
for id in ids:
gnm, basename = gdb.get_anim(id)
gprof = profile.wrap(prof, gnm)
for name, times in profile.enumerate_jobs(gprof, basename, args,
resume=True):
job_queue.put(Job(gnm, name, times, 0))
job_filler = gevent.spawn(fill_jobs)
def connect_to_worker(addr):
host, device = addr.split('/')
if host == 'localhost':
distribute_path = os.path.expanduser('~/.cuburn_dist/distribute.py')
args = [distribute_path, 'work', '--device', str(device)]
subp = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
assert read_str(subp.stdout) == ready_str
else:
connect_timeout = 5
while True:
try:
subp = subprocess.Popen(
['ssh', host, '.cuburn_dist/distribute.py', 'work',
'--device', str(device)],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
assert read_str(subp.stdout) == ready_str
break
except:
traceback.print_exc()
gevent.sleep(connect_timeout)
connect_timeout *= 2
return subp
exiting = False
worker_failure_counts = {}
def run_job(addr):
worker = connect_to_worker(addr)
job = job_queue.get()
evt = gevent.event.Event()
def _run_job():
try:
if job is None:
write_str(worker.stdin, done_str)
worker.stdin.close()
return
job_desc = dict(profile=prof, genome=job.genome, times=list(job.times),
name=job.name)
write_str(worker.stdin, json.dumps(job_desc))
worker.stdin.close()
while True:
msg_name = read_str(worker.stdout)
if msg_name == closing_encoder_str:
evt.set()
elif msg_name == output_file_str:
filename = job.name + read_str(worker.stdout)
with open(filename + '.tmp', 'w') as fp:
copy_filelike(worker.stdout, fp)
os.rename(filename + '.tmp', filename)
else:
assert msg_name == done_str, 'no known event ' + msg_name
break
worker_failure_counts[addr] = 0
except:
print >> sys.stderr, traceback.format_exc()
worker_failure_counts[addr] = worker_failure_counts.get(addr, 0) + 1
if job.retry_count < 3:
job_queue.put(Job(job.genome, job.name, job.times, job.retry_count + 1))
finally:
job_queue.task_done()
evt.set()
greenlet = gevent.spawn(_run_job)
active_job_group.add(greenlet)
return greenlet, evt
def run_worker(addr):
while worker_failure_counts.get(addr) < 4 and not exiting:
greenlet, evt = run_job(addr)
evt.wait()
worker_group = gevent.pool.Group()
for addr in workers:
worker_group.spawn(run_worker, addr)
job_filler.join()
# Flush all outstanding jobs and, possibly, retries
while job_queue.join():
active_job_group.join()
if job_queue.empty():
break
# Close the remaining workers
exiting = True
map(job_queue.put, [None] * len(worker_group))
worker_group.join()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Render fractal flames on multiple GPUs.')
cmd_parser = parser.add_subparsers()
dispatch_parser = cmd_parser.add_parser(
'dispatch', help='Dispatch tasks to workers.')
dispatch_parser.add_argument('flames', metavar='ID', type=str, nargs='+',
#fromfile_prefix_chars='@',
help='Flames to render (prefix playlist with @)')
dispatch_parser.add_argument('--worker', metavar='ADDRESS', nargs='*',
help='Worker address (in the form "host/device_id")')
dispatch_parser.add_argument('-d', '--genomedb', metavar='PATH', type=str,
help="Path to genome database (file or directory, default '.')",
default='.')
profile.add_args(dispatch_parser)
dispatch_parser.set_defaults(func=dispatch)
worker_parser = cmd_parser.add_parser(
'work', help='Perform a task (controlled by a dispatcher).')
worker_parser.add_argument('--device', metavar='NUM', type=int,
help='GPU device number to use, 0-indexed.')
worker_parser.set_defaults(func=work)
args = parser.parse_args()
args.func(args)