Add new task distribution system

Still in crazy beta
This commit is contained in:
Steven Robertson
2012-06-02 10:08:48 -07:00
parent b13589cc33
commit 92ccf9a579
9 changed files with 303 additions and 317 deletions

0
dist/__init__.py vendored Normal file
View File

3
dist/_importhack.py vendored Normal file
View File

@ -0,0 +1,3 @@
import os, sys
sys.path.insert(0,
os.path.abspath(os.path.join(os.path.dirname(__file__), '../')))

4
dist/addrs.py vendored Normal file
View File

@ -0,0 +1,4 @@
ip = '10.1.10.2'
port = 12615
names = 'tasks tasks_loprio workers responses'.split()
addrs = dict((k, 'tcp://%s:%d' % (ip, port+i)) for i, k in enumerate(names))

135
dist/client.py vendored Normal file
View File

@ -0,0 +1,135 @@
#!/usr/bin/env python2
import os
import uuid
import weakref
import numpy as np
import cPickle as pickle
import gevent
from gevent import spawn, queue, coros
import zmq.green as zmq
import _importhack
from cuburn import profile
from cuburn.genome import db, util
from messages import *
class RenderClient(object):
def __init__(self, task_addr, rsp_addr, ctx=None, start=True):
ctx = zmq.Context() if ctx is None else ctx
self.tsock = ctx.socket(zmq.REQ)
self.tsock.connect(task_addr)
self.cid = uuid.uuid1().hex
self.rsock = ctx.socket(zmq.DEALER)
self.rsock.setsockopt(zmq.IDENTITY, self.cid)
self.rsock.connect(rsp_addr)
self.tq = queue.Queue(0)
self.taskmap = weakref.WeakValueDictionary()
if start: self.start()
def put(self, task, rq=None):
"""
Add a task to the render queue. Ths method blocks. Returns the
queue to which the response will be sent.
"""
rq = queue.Queue() if rq is None else rq
self.tq.put((task, rq))
return rq
def start(self):
spawn(self._deal_tasks)
spawn(self._deal_rsps)
def _deal_tasks(self):
for task, rq in self.tq:
rid = uuid.uuid1().hex
self.taskmap[rid] = rq
atask = AddressedTask([self.cid, rid], task)
self.tsock.send_pyobj(atask)
# Wait for an (empty) response. This ratelimits tasks.
self.tsock.recv()
def _deal_rsps(self):
while True:
rsp = self.rsock.recv_multipart(copy=False)
assert len(rsp) == 2
rq = self.taskmap.get(rsp[0].bytes, None)
if rq: rq.put(rsp[1])
# Time (in seconds) before a job times out
TIMEOUT=240
# Max. queue length before request considered lost, as a multiple of the
# number of in-flight requests
QUEUE_LENGTH_FACTOR=2
RETRIES=2
def iter_genomes(prof, outpath, gpaths):
"""
Walk a list of genome paths, yielding them in an order suitable for
the `genomes` argument of `create_jobs()`.
"""
gdb = db.connect('.')
for gpath in gpaths:
try:
gnm, basename = gdb.get_anim(gpath)
except IOError:
continue
odir = os.path.join(outpath, basename)
if (os.path.isfile(os.path.join(odir, 'COMPLETE')) or
os.path.isfile(os.path.join(outpath, 'ref', basename+'.ts'))):
continue
gprof = profile.wrap(prof, gnm)
ghash = util.hash(gnm)
times = list(profile.enumerate_times(gprof))
if not os.path.isdir(odir):
os.makedirs(odir)
with open(os.path.join(odir, 'NFRAMES'), 'w') as fp:
fp.write(str(len(times)))
for i, t in times:
opath = os.path.join(odir, '%05d.%s' % (i, gprof.output_format))
if not os.path.isfile(opath):
yield Task(opath, ghash, prof, gnm, t)
def get_result(cli, task, rq):
try:
rsp = rq.get(timeout=TIMEOUT)
except queue.Empty:
cli.put(task, rq)
print '>>', task.id
rsp = rq.get()
with open(task.id, 'wb') as fp:
fp.write(buffer(rsp))
print '< ', task.id
def main(addrs):
parser = profile.add_args()
parser.add_argument('genomes', nargs='+')
args = parser.parse_args()
prof_name, prof = profile.get_from_args(args)
cli = RenderClient(addrs['tasks_loprio'], addrs['responses'])
gen = iter_genomes(prof, 'out/%s' % prof_name, args.genomes)
try:
for task in gen:
rq = cli.put(task)
print ' >', task.id
spawn(get_result, cli, task, rq)
except KeyboardInterrupt:
print 'Interrupt received, flushing'
while cli.taskmap:
print 'Still waiting on %d tasks...' % len(cli.taskmap)
gevent.sleep(3)
if __name__ == "__main__":
import addrs
main(addrs.addrs)

5
dist/messages.py vendored Normal file
View File

@ -0,0 +1,5 @@
from collections import namedtuple
Task = namedtuple('Task', 'id hash profile anim time')
AddressedTask = namedtuple('AddressedTask', 'addr task')
FullTask = namedtuple('FullTask', 'addr task cubin packer')

107
dist/server.py vendored Normal file
View File

@ -0,0 +1,107 @@
#!/usr/bin/env python2
import gevent
from gevent import spawn, queue, event
import zmq.green as zmq
import cPickle as pickle
import _importhack
from cuburn.render import Renderer
from messages import *
ctx = zmq.Context()
def setup_task_listeners(addrs, tq, rq):
hisock = ctx.socket(zmq.REP)
losock = ctx.socket(zmq.REP)
hisock.bind(addrs['tasks'])
losock.bind(addrs['tasks_loprio'])
loevt = event.Event()
loevt.set()
@spawn
def listen_hi():
while True:
if not hisock.poll(timeout=0):
# No messages pending. Set loevt, allowing messages from
# losock to be added to the queue.
loevt.set()
task = hisock.recv_pyobj()
print 'OOOOOH! Got a hiprio evt'
loevt.clear() # Got message; pause listen_lo().
tq.put(task)
hisock.send('')
@spawn
def listen_lo():
while True:
loevt.wait()
tq.put(losock.recv_pyobj())
losock.send('')
def setup_worker_listener(addrs, tq, rq):
wsock = ctx.socket(zmq.ROUTER)
wsock.bind(addrs['workers'])
readyq = queue.Queue()
compcache = {}
@spawn
def send_work():
for addr, task in tq:
print ' >', ' '.join(addr)
if task.hash not in compcache:
try:
rsp = Renderer.compile(task.anim, arch='sm_21')
except:
# Store exceptions, so that we don't try to endlessly
# recompile bad genomes
import traceback
rsp = traceback.format_exc()
print 'Error while compiling task:', rsp
compcache[task.hash] = rsp
else:
rsp = compcache[task.hash]
if isinstance(rsp, basestring):
continue
packer, lib, cubin = rsp
ctask = FullTask(addr, task, cubin, packer)
worker_addr = readyq.get()
wsock.send_multipart([worker_addr, '', pickle.dumps(ctask)])
@spawn
def read_rsps():
while True:
rsp = wsock.recv_multipart(copy=False)
if rsp[2].bytes != '':
print '< ', ' '.join([r.bytes for r in rsp[1:-1]])
rq.put(rsp[2:])
readyq.put(rsp[0])
def setup_responder(addrs, rq):
rsock = ctx.socket(zmq.ROUTER)
rsock.bind(addrs['responses'])
@spawn
def send_responses():
for rsp in rq:
rsock.send_multipart(rsp)
return send_responses
def main(addrs):
# Channel holding (addr, task) pairs.
tq = queue.Queue(0)
# Queue holding response messages (as a list of raw zmq frames).
rq = queue.Queue()
setup_task_listeners(addrs, tq, rq)
setup_worker_listener(addrs, tq, rq)
# TODO: Will switch to a Nanny central wait loop
setup_responder(addrs, rq).join()
if __name__ == "__main__":
import addrs
main(addrs.addrs)

49
dist/worker.py vendored Normal file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env python2
import sys
from cStringIO import StringIO
import zmq
import pycuda.driver as cuda
cuda.init()
import _importhack
from cuburn import render, profile, output
from cuburn.genome import convert, db, use
from messages import *
class PrecompiledRenderer(render.Renderer):
def compile(self, gnm):
return self.packer, None, self.cubin
def __init__(self, gnm, gprof, packer, cubin):
self.packer, self.cubin = packer, cubin
super(PrecompiledRenderer, self).__init__(gnm, gprof)
def main(card_num, worker_addr):
dev = cuda.Device(card_num)
ctx = dev.make_context(cuda.ctx_flags.SCHED_BLOCKING_SYNC)
rmgr = render.RenderManager()
ctx = zmq.Context()
sock = ctx.socket(zmq.REQ)
sock.connect(worker_addr)
# Start the request loop with an empty job
sock.send('')
hash = None
while True:
addr, task, cubin, packer = sock.recv_pyobj()
gprof = profile.wrap(task.profile, task.anim)
if hash != task.hash:
rdr = PrecompiledRenderer(task.anim, gprof, packer, cubin)
buf = rmgr.queue_frame(rdr, task.anim, gprof, task.time)
ofile = StringIO()
output.PILOutput.save(buf, ofile, task.id[-3:])
ofile.seek(0)
sock.send_multipart(addr + [ofile.read()])
hash = task.hash
print 'Rendered', task.id
if __name__ == "__main__":
import addrs
main(int(sys.argv[1]), addrs.addrs['workers'])