cuburn/distribute.py

276 lines
8.6 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python2
import os
import sys
import socket
import argparse
import subprocess
import traceback
import gevent
import gevent.event
import gevent.queue
import gevent.pool
from gevent import monkey
monkey.patch_all()
import json
import warnings
from subprocess import Popen
from itertools import ifilter
from collections import namedtuple
import numpy as np
sys.path.insert(0, os.path.dirname(__file__))
from cuburn import render, filters, output, profile
from cuburn.genome import convert, use, db
ready_str = 'worker ready'
closing_encoder_str = 'closing encoder'
output_file_str = 'here is a file for you'
done_str = 'we done here'
def write_str(out, val):
out.write(np.array([len(val)], '>u4').tostring())
out.write(val)
out.flush()
def write_filelike(out, filelike):
filelike.seek(0, 2)
out.write(np.array([filelike.tell()], '>u8').tostring())
filelike.seek(0)
buf = filelike.read(1024 * 1024)
while buf:
out.write(buf)
buf = filelike.read(1024 * 1024)
out.flush()
def read_str(infp):
sz_buf = infp.read(4)
assert len(sz_buf) == 4, 'Incomplete read of str size'
assert sz_buf[0] == '\0', 'No str should be that big'
sz = np.frombuffer(sz_buf, '>u4')[0]
msg = infp.read(sz)
assert len(msg) == sz, 'Incomplete read, expected %d got %d' % (sz, len(msg))
return msg
def copy_filelike(infp, dst):
sz_buf = infp.read(8)
assert len(sz_buf) == 8, 'Incomplete read of filelike size'
assert sz_buf[0] == '\0', 'No filelike should be that big'
sz = np.frombuffer(sz_buf, '>u8')[0]
recvd = 0
while recvd < sz:
# uh... why is int needed here?
chunk_sz = int(min(1024 * 1024, sz - recvd))
chunk = infp.read(chunk_sz)
assert len(chunk) == chunk_sz, (
'Incomplete chunk, expected %d (%s)got %d' % (sz, `sz_buf`, len(chunk)))
2017-05-15 21:22:13 -04:00
dst.write(chunk)
recvd += len(chunk)
def work(args):
addr = socket.gethostname().split('.')[0] + '/' + str(args.device)
write_str(sys.stdout, ready_str)
import pycuda.driver as cuda
cuda.init()
dev = cuda.Device(args.device)
cuctx = dev.make_context(flags=cuda.ctx_flags.SCHED_BLOCKING_SYNC)
try:
job_text = read_str(sys.stdin)
if job_text == done_str:
return
job_desc = json.loads(job_text)
prof, gnm, times, name = map(job_desc.get, 'profile genome times name'.split())
gprof = profile.wrap(prof, gnm)
rmgr = render.RenderManager()
arch = 'sm_{}{}'.format(
dev.get_attribute(cuda.device_attribute.COMPUTE_CAPABILITY_MAJOR),
dev.get_attribute(cuda.device_attribute.COMPUTE_CAPABILITY_MINOR))
rdr = render.Renderer(gnm, gprof, arch=arch)
last_render_time_ms = 0
def save(buf):
out, log = rdr.out.encode(buf)
for suffix, file_like in out.items():
write_str(sys.stdout, output_file_str)
write_str(sys.stdout, suffix)
write_filelike(sys.stdout, file_like)
if getattr(file_like, 'close', None):
file_like.close()
evt = buf = next_evt = next_buf = None
for idx, t in enumerate(list(times) + [None]):
evt, buf = next_evt, next_buf
if t is not None:
next_evt, next_buf = rmgr.queue_frame(rdr, gnm, gprof, t)
if not evt: continue
if last_render_time_ms > 2000:
while not evt.query():
gevent.sleep(0.2)
else:
evt.synchronize()
last_render_time_ms = evt.time()
print >> sys.stderr, '%30s: %s (%3d/%3d), %dms' % (
addr, name, idx, len(times), last_render_time_ms)
sys.stderr.flush()
save(buf)
write_str(sys.stdout, closing_encoder_str)
save(None)
write_str(sys.stdout, done_str)
finally:
cuda.Context.pop()
Job = namedtuple('Job', 'genome name times retry_count')
def dispatch(args):
pname, prof = profile.get_from_args(args)
workers = args.worker
if not workers:
try:
with open(os.path.expanduser('~/.cuburn-workers')) as fp:
workers = filter(None, fp.read().split())
except:
traceback.print_exc()
pass
if not workers:
print >> sys.stderr, ('No workers defined. Pass --workers or set up '
'~/.cuburn-workers with one worker per line.')
sys.exit(1)
gdb = db.connect(args.genomedb)
job_queue = gevent.queue.JoinableQueue(5)
active_job_group = gevent.pool.Group()
def fill_jobs():
for oid in args.flames:
ids = [oid]
if oid[0] == '@':
with open(oid[1:]) as fp:
ids = fp.read().split('\n')
for id in ids:
gnm, basename = gdb.get_anim(id)
gprof = profile.wrap(prof, gnm)
for name, times in profile.enumerate_jobs(gprof, basename, args,
resume=True):
job_queue.put(Job(gnm, name, times, 0))
job_filler = gevent.spawn(fill_jobs)
def connect_to_worker(addr):
host, device = addr.split('/')
if host == 'localhost':
distribute_path = os.path.expanduser('~/.cuburn_dist/distribute.py')
args = [distribute_path, 'work', '--device', str(device)]
subp = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
assert read_str(subp.stdout) == ready_str
else:
connect_timeout = 5
while True:
try:
subp = subprocess.Popen(
['ssh', host, '.cuburn_dist/distribute.py', 'work',
'--device', str(device)],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
assert read_str(subp.stdout) == ready_str
break
except:
traceback.print_exc()
gevent.sleep(connect_timeout)
2017-05-15 21:40:03 -04:00
connect_timeout = min(600, connect_timeout * 2)
return subp
exiting = False
worker_failure_counts = {}
def run_job(addr):
worker = connect_to_worker(addr)
job = job_queue.get()
evt = gevent.event.Event()
def _run_job():
try:
if job is None:
write_str(worker.stdin, done_str)
worker.stdin.close()
return
job_desc = dict(profile=prof, genome=job.genome, times=list(job.times),
name=job.name)
write_str(worker.stdin, json.dumps(job_desc))
worker.stdin.close()
while True:
msg_name = read_str(worker.stdout)
if msg_name == closing_encoder_str:
evt.set()
elif msg_name == output_file_str:
filename = job.name + read_str(worker.stdout)
with open(filename + '.tmp', 'w') as fp:
copy_filelike(worker.stdout, fp)
os.rename(filename + '.tmp', filename)
else:
assert msg_name == done_str, 'no known event ' + msg_name
break
worker_failure_counts[addr] = 0
except:
print >> sys.stderr, traceback.format_exc()
worker_failure_counts[addr] = worker_failure_counts.get(addr, 0) + 1
if job.retry_count < 3:
job_queue.put(Job(job.genome, job.name, job.times, job.retry_count + 1))
finally:
job_queue.task_done()
evt.set()
greenlet = gevent.spawn(_run_job)
active_job_group.add(greenlet)
return greenlet, evt
def run_worker(addr):
while worker_failure_counts.get(addr) < 4 and not exiting:
greenlet, evt = run_job(addr)
evt.wait()
worker_group = gevent.pool.Group()
for addr in workers:
worker_group.spawn(run_worker, addr)
job_filler.join()
# Flush all outstanding jobs and, possibly, retries
while job_queue.join():
active_job_group.join()
if job_queue.empty():
break
# Close the remaining workers
exiting = True
map(job_queue.put, [None] * len(worker_group))
worker_group.join()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Render fractal flames on multiple GPUs.')
cmd_parser = parser.add_subparsers()
dispatch_parser = cmd_parser.add_parser(
'dispatch', help='Dispatch tasks to workers.')
dispatch_parser.add_argument('flames', metavar='ID', type=str, nargs='+',
#fromfile_prefix_chars='@',
help='Flames to render (prefix playlist with @)')
dispatch_parser.add_argument('--worker', metavar='ADDRESS', nargs='*',
help='Worker address (in the form "host/device_id")')
dispatch_parser.add_argument('-d', '--genomedb', metavar='PATH', type=str,
help="Path to genome database (file or directory, default '.')",
default='.')
profile.add_args(dispatch_parser)
dispatch_parser.set_defaults(func=dispatch)
worker_parser = cmd_parser.add_parser(
'work', help='Perform a task (controlled by a dispatcher).')
worker_parser.add_argument('--device', metavar='NUM', type=int,
help='GPU device number to use, 0-indexed.')
worker_parser.set_defaults(func=work)
args = parser.parse_args()
args.func(args)