Make dist worker work with pipelining

This commit is contained in:
Steven Robertson 2012-07-04 23:54:22 -07:00
parent a866d058fe
commit bb852ff255
2 changed files with 59 additions and 24 deletions

View File

@ -23,6 +23,20 @@ from cuburn.genome.util import palette_decode
RenderedImage = namedtuple('RenderedImage', 'buf idx gpu_time') RenderedImage = namedtuple('RenderedImage', 'buf idx gpu_time')
Dimensions = namedtuple('Dimensions', 'w h aw ah astride') Dimensions = namedtuple('Dimensions', 'w h aw ah astride')
class DurationEvent(cuda.Event):
"""
A CUDA event which is implicitly aware of a prior event for time
calculations.
Note that instances retain a reference to their prior, so an unbroken
chain of DurationEvents will leak. Use normal events as priors.
"""
def __init__(self, prior):
super(DurationEvent, self).__init__()
self._prior = prior
def time(self):
return self.time_since(self._prior)
class Framebuffers(object): class Framebuffers(object):
""" """
The largest memory allocations, and a stream to serialize their use. The largest memory allocations, and a stream to serialize their use.
@ -340,13 +354,17 @@ class RenderManager(ClsMod):
leave ``copy`` to True every time for now. leave ``copy`` to True every time for now.
The return value is a 2-tuple ``(evt, h_out)``, where ``evt`` is a The return value is a 2-tuple ``(evt, h_out)``, where ``evt`` is a
CUDA event and ``h_out`` is the return value of the output module's DurationEvent and ``h_out`` is the return value of the output module's
``copy`` function. In the typical case, ``h_out`` will be a host ``copy`` function. In the typical case, ``h_out`` will be a host
allocation containing data in an appropriate format for the output allocation containing data in an appropriate format for the output
module's file writer, and ``evt`` indicates when the asynchronous module's file writer, and ``evt`` indicates when the asynchronous
DMA copy which will populate ``h_out`` is complete. This can vary DMA copy which will populate ``h_out`` is complete. This can vary
depending on the output module in use, though. depending on the output module in use, though.
This method is absolutely _not_ threadsafe, but it's fine to use it
alongside non-threaded approaches to concurrency like coroutines.
""" """
timing_event = cuda.Event().record(self.stream_b)
# Note: we synchronize on the previous stream if buffers need to be # Note: we synchronize on the previous stream if buffers need to be
# reallocated, which implicitly also syncs the current stream. # reallocated, which implicitly also syncs the current stream.
dim = self.fb.set_dim(gprof.width, gprof.height, self.stream_b) dim = self.fb.set_dim(gprof.width, gprof.height, self.stream_b)
@ -372,7 +390,7 @@ class RenderManager(ClsMod):
rdr.out.convert(self.fb, gprof, dim, self.stream_a) rdr.out.convert(self.fb, gprof, dim, self.stream_a)
self.filt_evt = cuda.Event().record(self.stream_a) self.filt_evt = cuda.Event().record(self.stream_a)
h_out = rdr.out.copy(self.fb, dim, self.fb.pool, self.stream_a) h_out = rdr.out.copy(self.fb, dim, self.fb.pool, self.stream_a)
self.copy_evt = cuda.Event().record(self.stream_a) self.copy_evt = DurationEvent(timing_event).record(self.stream_a)
self.info_a, self.info_b = self.info_b, self.info_a self.info_a, self.info_b = self.info_b, self.info_a
self.stream_a, self.stream_b = self.stream_b, self.stream_a self.stream_a, self.stream_b = self.stream_b, self.stream_a

31
dist/worker.py vendored
View File

@ -2,7 +2,9 @@
import sys import sys
from cStringIO import StringIO from cStringIO import StringIO
import zmq import gevent
from gevent import spawn, queue
import zmq.green as zmq
import pycuda.driver as cuda import pycuda.driver as cuda
cuda.init() cuda.init()
@ -19,31 +21,46 @@ class PrecompiledRenderer(render.Renderer):
self.packer, self.cubin = packer, cubin self.packer, self.cubin = packer, cubin
super(PrecompiledRenderer, self).__init__(gnm, gprof) super(PrecompiledRenderer, self).__init__(gnm, gprof)
def main(card_num, worker_addr): def main(worker_addr):
dev = cuda.Device(card_num)
ctx = dev.make_context(cuda.ctx_flags.SCHED_BLOCKING_SYNC)
rmgr = render.RenderManager() rmgr = render.RenderManager()
ctx = zmq.Context() ctx = zmq.Context()
in_queue = queue.Queue(0)
out_queue = queue.Queue(0)
def request_loop():
sock = ctx.socket(zmq.REQ) sock = ctx.socket(zmq.REQ)
sock.connect(worker_addr) sock.connect(worker_addr)
# Start the request loop with an empty job # Start the request loop with an empty job
sock.send('') sock.send('')
hash = None hash = None
while True: while True:
addr, task, cubin, packer = sock.recv_pyobj() addr, task, cubin, packer = sock.recv_pyobj()
gprof = profile.wrap(task.profile, task.anim) gprof = profile.wrap(task.profile, task.anim)
if hash != task.hash: if hash != task.hash:
rdr = PrecompiledRenderer(task.anim, gprof, packer, cubin) rdr = PrecompiledRenderer(task.anim, gprof, packer, cubin)
buf = rmgr.queue_frame(rdr, task.anim, gprof, task.time) evt, buf = rmgr.queue_frame(rdr, task.anim, gprof, task.time)
while not evt.query():
gevent.sleep(0.01)
ofile = StringIO() ofile = StringIO()
output.PILOutput.save(buf, ofile, task.id[-3:]) output.PILOutput.save(buf, ofile, task.id[-3:])
ofile.seek(0) ofile.seek(0)
sock.send_multipart(addr + [ofile.read()]) sock.send_multipart(addr + [ofile.read()])
hash = task.hash hash = task.hash
print 'Rendered', task.id
print 'Rendered', task.id, 'in', int(evt.time()), 'ms'
# Spawn two request loops to take advantage of CUDA pipelining.
spawn(request_loop)
request_loop()
if __name__ == "__main__": if __name__ == "__main__":
import addrs import addrs
main(int(sys.argv[1]), addrs.addrs['workers']) dev = cuda.Device(int(sys.argv[1]))
cuctx = dev.make_context(cuda.ctx_flags.SCHED_BLOCKING_SYNC)
try:
main(addrs.addrs['workers'])
finally:
cuda.Context.pop()