diff --git a/cuburn/render.py b/cuburn/render.py index e70c4de..e25df5a 100644 --- a/cuburn/render.py +++ b/cuburn/render.py @@ -23,6 +23,20 @@ from cuburn.genome.util import palette_decode RenderedImage = namedtuple('RenderedImage', 'buf idx gpu_time') Dimensions = namedtuple('Dimensions', 'w h aw ah astride') +class DurationEvent(cuda.Event): + """ + A CUDA event which is implicitly aware of a prior event for time + calculations. + + Note that instances retain a reference to their prior, so an unbroken + chain of DurationEvents will leak. Use normal events as priors. + """ + def __init__(self, prior): + super(DurationEvent, self).__init__() + self._prior = prior + def time(self): + return self.time_since(self._prior) + class Framebuffers(object): """ The largest memory allocations, and a stream to serialize their use. @@ -340,13 +354,17 @@ class RenderManager(ClsMod): leave ``copy`` to True every time for now. The return value is a 2-tuple ``(evt, h_out)``, where ``evt`` is a - CUDA event and ``h_out`` is the return value of the output module's + DurationEvent and ``h_out`` is the return value of the output module's ``copy`` function. In the typical case, ``h_out`` will be a host allocation containing data in an appropriate format for the output module's file writer, and ``evt`` indicates when the asynchronous DMA copy which will populate ``h_out`` is complete. This can vary depending on the output module in use, though. + + This method is absolutely _not_ threadsafe, but it's fine to use it + alongside non-threaded approaches to concurrency like coroutines. """ + timing_event = cuda.Event().record(self.stream_b) # Note: we synchronize on the previous stream if buffers need to be # reallocated, which implicitly also syncs the current stream. dim = self.fb.set_dim(gprof.width, gprof.height, self.stream_b) @@ -372,7 +390,7 @@ class RenderManager(ClsMod): rdr.out.convert(self.fb, gprof, dim, self.stream_a) self.filt_evt = cuda.Event().record(self.stream_a) h_out = rdr.out.copy(self.fb, dim, self.fb.pool, self.stream_a) - self.copy_evt = cuda.Event().record(self.stream_a) + self.copy_evt = DurationEvent(timing_event).record(self.stream_a) self.info_a, self.info_b = self.info_b, self.info_a self.stream_a, self.stream_b = self.stream_b, self.stream_a diff --git a/dist/worker.py b/dist/worker.py index b72eb9b..347def2 100644 --- a/dist/worker.py +++ b/dist/worker.py @@ -2,7 +2,9 @@ import sys from cStringIO import StringIO -import zmq +import gevent +from gevent import spawn, queue +import zmq.green as zmq import pycuda.driver as cuda cuda.init() @@ -19,31 +21,46 @@ class PrecompiledRenderer(render.Renderer): self.packer, self.cubin = packer, cubin super(PrecompiledRenderer, self).__init__(gnm, gprof) -def main(card_num, worker_addr): - dev = cuda.Device(card_num) - ctx = dev.make_context(cuda.ctx_flags.SCHED_BLOCKING_SYNC) +def main(worker_addr): rmgr = render.RenderManager() ctx = zmq.Context() - sock = ctx.socket(zmq.REQ) - sock.connect(worker_addr) + in_queue = queue.Queue(0) + out_queue = queue.Queue(0) - # Start the request loop with an empty job - sock.send('') - hash = None - while True: - addr, task, cubin, packer = sock.recv_pyobj() - gprof = profile.wrap(task.profile, task.anim) - if hash != task.hash: - rdr = PrecompiledRenderer(task.anim, gprof, packer, cubin) - buf = rmgr.queue_frame(rdr, task.anim, gprof, task.time) - ofile = StringIO() - output.PILOutput.save(buf, ofile, task.id[-3:]) - ofile.seek(0) - sock.send_multipart(addr + [ofile.read()]) - hash = task.hash - print 'Rendered', task.id + def request_loop(): + sock = ctx.socket(zmq.REQ) + sock.connect(worker_addr) + + # Start the request loop with an empty job + sock.send('') + + hash = None + while True: + addr, task, cubin, packer = sock.recv_pyobj() + gprof = profile.wrap(task.profile, task.anim) + if hash != task.hash: + rdr = PrecompiledRenderer(task.anim, gprof, packer, cubin) + evt, buf = rmgr.queue_frame(rdr, task.anim, gprof, task.time) + while not evt.query(): + gevent.sleep(0.01) + ofile = StringIO() + output.PILOutput.save(buf, ofile, task.id[-3:]) + ofile.seek(0) + sock.send_multipart(addr + [ofile.read()]) + hash = task.hash + + print 'Rendered', task.id, 'in', int(evt.time()), 'ms' + + # Spawn two request loops to take advantage of CUDA pipelining. + spawn(request_loop) + request_loop() if __name__ == "__main__": import addrs - main(int(sys.argv[1]), addrs.addrs['workers']) + dev = cuda.Device(int(sys.argv[1])) + cuctx = dev.make_context(cuda.ctx_flags.SCHED_BLOCKING_SYNC) + try: + main(addrs.addrs['workers']) + finally: + cuda.Context.pop()