mirror of
https://github.com/stevenrobertson/cuburn.git
synced 2025-07-02 06:16:53 -04:00
Support x264 10-bit output format.
This commit is contained in:
32
dist/client.py
vendored
32
dist/client.py
vendored
@ -10,11 +10,15 @@ from gevent import spawn, queue, coros
|
||||
import zmq.green as zmq
|
||||
|
||||
import _importhack
|
||||
from cuburn import profile
|
||||
from cuburn import profile, output
|
||||
from cuburn.genome import db, util
|
||||
|
||||
from messages import *
|
||||
|
||||
# TODO: remove this dependency (loading the output module to get the suffix
|
||||
# requires a compiler / default instance)
|
||||
import pycuda.autoinit
|
||||
|
||||
class RenderClient(object):
|
||||
def __init__(self, task_addr, rsp_addr, ctx=None, start=True):
|
||||
ctx = zmq.Context() if ctx is None else ctx
|
||||
@ -56,12 +60,12 @@ class RenderClient(object):
|
||||
def _deal_rsps(self):
|
||||
while True:
|
||||
rsp = self.rsock.recv_multipart(copy=False)
|
||||
assert len(rsp) == 2
|
||||
rq = self.taskmap.get(rsp[0].bytes, None)
|
||||
if rq: rq.put(rsp[1])
|
||||
if rq: rq.put((rsp[1].bytes, rsp[2].bytes.split('\0'), rsp[3:]))
|
||||
|
||||
# Time (in seconds) before a job times out
|
||||
TIMEOUT=240
|
||||
# TODO: replace timeout mechanism with polling?
|
||||
TIMEOUT=2400
|
||||
|
||||
# Max. queue length before request considered lost, as a multiple of the
|
||||
# number of in-flight requests
|
||||
@ -92,21 +96,27 @@ def iter_genomes(prof, outpath, gpaths):
|
||||
os.makedirs(odir)
|
||||
with open(os.path.join(odir, 'NFRAMES'), 'w') as fp:
|
||||
fp.write(str(len(times)))
|
||||
outmod = output.get_output_for_profile(gprof)
|
||||
for i, t in times:
|
||||
opath = os.path.join(odir, '%05d.%s' % (i, gprof.output_format))
|
||||
if not os.path.isfile(opath):
|
||||
opath = os.path.join(odir, '%05d' % i)
|
||||
if not os.path.isfile(opath + outmod.suffix):
|
||||
yield Task(opath, ghash, prof, gnm, t)
|
||||
|
||||
def get_result(cli, task, rq):
|
||||
try:
|
||||
rsp = rq.get(timeout=TIMEOUT)
|
||||
log, names, bufs = rq.get(timeout=TIMEOUT)
|
||||
except queue.Empty:
|
||||
cli.put(task, rq)
|
||||
print '>>', task.id
|
||||
rsp = rq.get()
|
||||
log, names, bufs = rq.get()
|
||||
|
||||
with open(task.id, 'wb') as fp:
|
||||
fp.write(buffer(rsp))
|
||||
with open(task.id + '.log', 'wb') as fp:
|
||||
fp.write(log)
|
||||
|
||||
for name in reversed(names):
|
||||
buf = bufs.pop()
|
||||
with open(task.id + name, 'wb') as fp:
|
||||
fp.write(buffer(buf))
|
||||
print '< ', task.id
|
||||
|
||||
def main(addrs):
|
||||
@ -128,6 +138,8 @@ def main(addrs):
|
||||
|
||||
while cli.taskmap:
|
||||
print 'Still waiting on %d tasks...' % len(cli.taskmap)
|
||||
for i in cli.taskmap.items():
|
||||
print i
|
||||
gevent.sleep(3)
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
2
dist/messages.py
vendored
2
dist/messages.py
vendored
@ -1,5 +1,5 @@
|
||||
from collections import namedtuple
|
||||
|
||||
Task = namedtuple('Task', 'id hash profile anim time')
|
||||
Task = namedtuple('Task', 'id hash profile anim times')
|
||||
AddressedTask = namedtuple('AddressedTask', 'addr task')
|
||||
FullTask = namedtuple('FullTask', 'addr task cubin packer')
|
||||
|
4
dist/server.py
vendored
4
dist/server.py
vendored
@ -1,4 +1,5 @@
|
||||
#!/usr/bin/env python2
|
||||
from itertools import takewhile
|
||||
|
||||
import gevent
|
||||
from gevent import spawn, queue, event
|
||||
@ -29,7 +30,6 @@ def setup_task_listeners(addrs, tq, rq):
|
||||
# losock to be added to the queue.
|
||||
loevt.set()
|
||||
task = hisock.recv_pyobj()
|
||||
print 'OOOOOH! Got a hiprio evt'
|
||||
loevt.clear() # Got message; pause listen_lo().
|
||||
tq.put(task)
|
||||
hisock.send('')
|
||||
@ -77,7 +77,7 @@ def setup_worker_listener(addrs, tq, rq):
|
||||
while True:
|
||||
rsp = wsock.recv_multipart(copy=False)
|
||||
if rsp[2].bytes != '':
|
||||
print '< ', ' '.join([r.bytes for r in rsp[2:-1]])
|
||||
print '< ', rsp[2].bytes, rsp[3].bytes
|
||||
rq.put(rsp[2:])
|
||||
readyq.put(rsp[0])
|
||||
|
||||
|
28
dist/worker.py
vendored
28
dist/worker.py
vendored
@ -1,5 +1,6 @@
|
||||
#!/usr/bin/env python2
|
||||
import sys
|
||||
import socket
|
||||
from cStringIO import StringIO
|
||||
|
||||
import gevent
|
||||
@ -37,20 +38,29 @@ def main(worker_addr):
|
||||
|
||||
hash = None
|
||||
while True:
|
||||
log = [('worker', socket.gethostname() + ':' +
|
||||
cuda.Context.get_current().get_device().pci_bus_id())]
|
||||
addr, task, cubin, packer = sock.recv_pyobj()
|
||||
gprof = profile.wrap(task.profile, task.anim)
|
||||
if hash != task.hash:
|
||||
rdr = PrecompiledRenderer(task.anim, gprof, packer, cubin)
|
||||
evt, buf = rmgr.queue_frame(rdr, task.anim, gprof, task.time)
|
||||
while not evt.query():
|
||||
gevent.sleep(0.01)
|
||||
ofile = StringIO()
|
||||
output.PILOutput.save(buf, ofile, task.id[-3:])
|
||||
ofile.seek(0)
|
||||
sock.send_multipart(addr + [ofile.read()])
|
||||
hash = task.hash
|
||||
for t in task.times:
|
||||
evt, buf = rmgr.queue_frame(rdr, task.anim, gprof, t)
|
||||
while not evt.query():
|
||||
gevent.sleep(0.01)
|
||||
out, frame_log = rdr.out.encode(buf)
|
||||
log += frame_log
|
||||
print 'Rendered', task.id, 'in', int(evt.time()), 'ms'
|
||||
final_out, final_log = rdr.out.encode(None)
|
||||
assert not (out and final_out), 'Got output from two sources!'
|
||||
out = out or final_out
|
||||
log += final_log
|
||||
log = '\0'.join([k + ' ' + v for k, v in log])
|
||||
|
||||
print 'Rendered', task.id, 'in', int(evt.time()), 'ms'
|
||||
suffixes, files = zip(*[(k, v.read())
|
||||
for k, v in sorted(out.items())])
|
||||
# TODO: reduce copies, generally spruce up the memory usage here
|
||||
sock.send_multipart(addr + [log, '\0'.join(suffixes)] + list(files))
|
||||
|
||||
# Spawn two request loops to take advantage of CUDA pipelining.
|
||||
spawn(request_loop)
|
||||
|
Reference in New Issue
Block a user