Shuffle points between threads of a CTA

This commit is contained in:
Steven Robertson 2010-09-12 00:17:18 -04:00
parent 40a5ceafde
commit f368a99a16
3 changed files with 355 additions and 27 deletions

View File

@ -15,13 +15,13 @@ from cuburn.variations import Variations
class IterThread(PTXEntryPoint): class IterThread(PTXEntryPoint):
entry_name = 'iter_thread' entry_name = 'iter_thread'
entry_params = [] entry_params = []
maxnreg = 16
def __init__(self): def __init__(self):
self.cps_uploaded = False self.cps_uploaded = False
def deps(self): def deps(self):
return [MWCRNG, CPDataStream, HistScatter, Variations, Timeouter] return [MWCRNG, CPDataStream, HistScatter, Variations, ShufflePoints,
Timeouter]
@ptx_func @ptx_func
def module_setup(self): def module_setup(self):
@ -48,7 +48,7 @@ class IterThread(PTXEntryPoint):
mem.shared.f32('s_xf_sel', ctx.warps_per_cta) mem.shared.f32('s_xf_sel', ctx.warps_per_cta)
std.store_per_thread(g_whatever, 1234) #std.store_per_thread(g_whatever, 1234)
# TODO: temporary, for testing # TODO: temporary, for testing
mem.local.u32('l_num_rounds') mem.local.u32('l_num_rounds')
@ -56,13 +56,11 @@ class IterThread(PTXEntryPoint):
op.st.local.u32(addr(l_num_rounds), 0) op.st.local.u32(addr(l_num_rounds), 0)
op.st.local.u32(addr(l_num_writes), 0) op.st.local.u32(addr(l_num_writes), 0)
mem.local.f32('l_consec') reg.f32('xi xo yi yo colori coloro consec_bad')
op.st.local.f32(addr(l_consec), 0.) mwc.next_f32_11(xi)
mwc.next_f32_11(yi)
reg.f32('x_coord y_coord color_coord') mwc.next_f32_01(colori)
mwc.next_f32_11(x_coord) op.mov.f32(consec_bad, 0.)
mwc.next_f32_11(y_coord)
mwc.next_f32_01(color_coord)
comment("Ensure all init is done") comment("Ensure all init is done")
op.bar.sync(0) op.bar.sync(0)
@ -104,6 +102,7 @@ class IterThread(PTXEntryPoint):
# When fusing, num_samples holds the (negative) number of iterations # When fusing, num_samples holds the (negative) number of iterations
# left across the CP, rather than the number of samples in total. # left across the CP, rather than the number of samples in total.
with block("If still fusing, increment count unconditionally"): with block("If still fusing, increment count unconditionally"):
op.bar.sync(0)
std.set_is_first_thread(reg.pred('p_is_first')) std.set_is_first_thread(reg.pred('p_is_first'))
op.red.shared.add.s32(addr(s_num_samples), 1, ifp=p_is_first) op.red.shared.add.s32(addr(s_num_samples), 1, ifp=p_is_first)
@ -133,8 +132,6 @@ class IterThread(PTXEntryPoint):
op.add.u32(num_rounds, num_rounds, 1) op.add.u32(num_rounds, num_rounds, 1)
op.st.local.u32(addr(l_num_rounds), num_rounds) op.st.local.u32(addr(l_num_rounds), num_rounds)
with block("Select an xform"): with block("Select an xform"):
reg.f32('xf_sel') reg.f32('xf_sel')
reg.u32('warp_offset xf_sel_addr') reg.u32('warp_offset xf_sel_addr')
@ -154,12 +151,9 @@ class IterThread(PTXEntryPoint):
for xf in features.xforms: for xf in features.xforms:
label('XFORM_%d' % xf.id) label('XFORM_%d' % xf.id)
variations.apply_xform(x_coord, y_coord, color_coord, variations.apply_xform(xo, yo, coloro, xi, yi, colori, xf.id)
x_coord, y_coord, color_coord, xf.id)
op.bra.uni("xform_done") op.bra.uni("xform_done")
label("xform_done") label("xform_done")
with block("Test if we're still in FUSE"): with block("Test if we're still in FUSE"):
reg.s32('num_samples') reg.s32('num_samples')
@ -170,7 +164,7 @@ class IterThread(PTXEntryPoint):
reg.pred('p_point_is_valid') reg.pred('p_point_is_valid')
with block("Write the result"): with block("Write the result"):
hist.scatter(x_coord, y_coord, color_coord, 0, p_point_is_valid) hist.scatter(xo, yo, coloro, 0, p_point_is_valid)
with block(): with block():
reg.u32('num_writes') reg.u32('num_writes')
op.ld.local.u32(num_writes, addr(l_num_writes)) op.ld.local.u32(num_writes, addr(l_num_writes))
@ -180,17 +174,23 @@ class IterThread(PTXEntryPoint):
with block("If the result was invalid, handle badvals"): with block("If the result was invalid, handle badvals"):
reg.f32('consec') reg.f32('consec')
reg.pred('need_new_point') reg.pred('need_new_point')
op.ld.local.f32(consec, addr(l_consec)) comment('If point is good, move new coords and reset consec_bad')
op.mov.f32(consec, 0., ifp=p_point_is_valid) op.mov.f32(xi, xo, ifp=p_point_is_valid)
op.mov.f32(yi, yo, ifp=p_point_is_valid)
op.mov.f32(colori, coloro, ifp=p_point_is_valid)
op.mov.f32(consec_bad, 0., ifp=p_point_is_valid)
comment('Otherwise, add 1 to consec_bad')
op.add.f32(consec, consec, 1., ifnotp=p_point_is_valid) op.add.f32(consec, consec, 1., ifnotp=p_point_is_valid)
op.setp.ge.f32(need_new_point, consec, 5.) op.setp.ge.f32(need_new_point, consec, 5.)
op.bra('badval_done', ifnotp=need_new_point) op.bra('badval_done', ifnotp=need_new_point)
mwc.next_f32_11(x_coord)
mwc.next_f32_11(y_coord) comment('If consec_bad > 5, pick a new random point')
mwc.next_f32_01(color_coord) mwc.next_f32_11(xi)
mwc.next_f32_11(yi)
mwc.next_f32_01(colori)
op.mov.f32(consec, 0.) op.mov.f32(consec, 0.)
label('badval_done') label('badval_done')
op.st.local.f32(addr(l_consec), consec)
with block("Increment number of samples by number of good values"): with block("Increment number of samples by number of good values"):
reg.b32('good_samples laneid') reg.b32('good_samples laneid')
@ -205,11 +205,16 @@ class IterThread(PTXEntryPoint):
with block("Check to see if we're done with this CP"): with block("Check to see if we're done with this CP"):
reg.pred('p_cp_done') reg.pred('p_cp_done')
reg.s32('num_samples num_samples_needed') reg.s32('num_samples num_samples_needed')
comment('Sync before making decision to prevent divergence')
op.bar.sync(3)
op.ld.shared.s32(num_samples, addr(s_num_samples)) op.ld.shared.s32(num_samples, addr(s_num_samples))
cp.get(cpA, num_samples_needed, 'cp.nsamples') cp.get(cpA, num_samples_needed, 'cp.nsamples')
op.setp.ge.s32(p_cp_done, num_samples, num_samples_needed) op.setp.ge.s32(p_cp_done, num_samples, num_samples_needed)
op.bra.uni(cp_loop_start, ifp=p_cp_done) op.bra.uni(cp_loop_start, ifp=p_cp_done)
comment('Shuffle points between threads')
shuf.shuffle(xi, yi, colori, consec_bad)
with block("If first warp, pick new thread offset"): with block("If first warp, pick new thread offset"):
reg.u32('warpid') reg.u32('warpid')
reg.pred('first_warp') reg.pred('first_warp')
@ -273,7 +278,7 @@ class IterThread(PTXEntryPoint):
whatever = cuda.from_device(whatever_dp, shape, np.int32) whatever = cuda.from_device(whatever_dp, shape, np.int32)
print_thing("Rounds", rounds) print_thing("Rounds", rounds)
print_thing("Writes", writes) print_thing("Writes", writes)
print_thing("Whatever", whatever) #print_thing("Whatever", whatever)
print np.sum(rounds) print np.sum(rounds)
@ -495,6 +500,41 @@ class HistScatter(PTXFragment):
dtype=np.float32) dtype=np.float32)
class ShufflePoints(PTXFragment):
"""
Shuffle points in shared memory. See helpers/shuf.py for details.
"""
shortname = "shuf"
@ptx_func
def module_setup(self):
# TODO: if needed, merge this shared memory block with others
mem.shared.f32('s_shuf_data', ctx.threads_per_cta)
@ptx_func
def shuffle(self, *args, **kwargs):
"""
Shuffle the data from each register in args across threads. Keyword
argument ``bar`` specifies which barrier to use.
"""
bar = kwargs.pop('bar', 8)
with block("Shuffle across threads"):
reg.u32('shuf_read shuf_write')
with block("Calculate read and write offsets"):
reg.u32('shuf_off shuf_laneid')
op.mov.u32(shuf_off, '%tid.x')
op.mov.u32(shuf_write, s_shuf_data)
op.mad.lo.u32(shuf_write, shuf_off, 4, shuf_write)
op.mov.u32(shuf_laneid, '%laneid')
op.mad.lo.u32(shuf_off, shuf_laneid, 32, shuf_off)
op.and_.b32(shuf_off, shuf_off, ctx.threads_per_cta - 1)
op.mov.u32(shuf_read, s_shuf_data)
op.mad.lo.u32(shuf_read, shuf_off, 4, shuf_read)
for var in args:
op.bar.sync(bar)
op.st.shared.b32(addr(shuf_write), var)
op.bar.sync(bar)
op.ld.shared.b32(var, addr(shuf_read))
class MWCRNG(PTXFragment): class MWCRNG(PTXFragment):
shortname = "mwc" shortname = "mwc"

287
helpers/shuf.py Normal file
View File

@ -0,0 +1,287 @@
# -*- coding: utf-8 -*-
"""
Examples and documentation for the point shuffle. Run this file to produce the
full output with annotations.
"""
import numpy as np
w, t = 0, 0
def p(a):
for i in range(0, t, w):
print ' ' + ' '.join(['%2d' % a[j] for j in range(i,i+w)])
def make():
a = np.ndarray(t, dtype=np.int32)
for i in range(t/w):
for j in range(w):
a[i*w+j] = i
return a
for i in range(w):
a = shuf(a)
p(a)
print """
The problem
-----------
If two points undergo the same transforms many times, they will begin to
converge. Since every thread in a warp selects the same transforms on each
iteration, leaving a given point in the same warp each time would effectively
cut the number of unique samples generated by 32.
One fix is to allow transforms to diverge, but that has a direct impact upon
performance. Another is to simply swap points between different warps. This is
what we do here.
For performance reasons, we can't do the swap in global memory, or between
multiple CTAs; the cost of synchronization and the necessarily uncoalesced
write pattern would put an enormous (orders of magnitude) dent in throughput.
As a result, we have to keep things in shared memory, which is fast but has a
few restrictions, of which we note two:
- There's not a lot of it. 1K is a good target; it gives the memory kernel
enough room in a 16K shared configuration to use 8K while retaining the
4-CTA-per-SM allocation we're shooting for. (More on this later.)
- No two threads in a warp should write to a different bank. More
specifically, for an address ``a``, the value ``a % 128`` should be
unique for each thread. See the CUDA docs for details.
Mixing it up
------------
A simple algorithm for doing the swap which respects those constraints is given
below. In this algorithm, for each coordinate value, each warp writes the value
to memory, waits for all other warps to do the same, then reads back the new
value. The trick is in the addresses which are read:
Let ``w`` be the warp size, ``n`` be the warp number within a CTA, ``l`` be
the lane ID, and ``t`` be the total number of threads within a CTA. The
address to which a point should be written is given by::
a = (l + n * w) % t
For reading, the address is given by::
a = (l + (n + l) * w) % t
Note that the write address in this case is simply the thread ID.
"""
def shuf_simple(a):
b = np.empty_like(a)
for n in range(t/w):
for l in range(w):
b[(l+(n+l)*w)%t] = a[l+n*w]
return b
print """
It may help to visualize this. Here's a representation of a "before" state. In
this block, rows represent different warps, while columns represent different
thread lanes; if addresses proceed from left to right, top to bottom, the
address would be the same as the thread ID as assigned by the GPU. Each value
in a particular cell represents the warp number of the cell that the point
started in, in each example.
For these examples, the warp size ``w`` is 16, and ``t`` is 16*16=256. ::
"""
w, t = 16, 256
a = make()
p(a)
print """
After one round of the simple shuffle given above, the matrix looks like this::
"""
a = shuf_simple(a)
p(a)
print """
This demonstrates two attractive properties:
- Both reads and writes satisfy the constraints to avoid bank conflicts
when writing to shared memory.
- No point ends up in a warp alongside another from the same origin.
Of course, we should investigate its properties on continued iterations. Here's
round 2::
"""
a = shuf_simple(a)
p(a)
print "\nAnd 3::\n"
a = shuf_simple(a)
p(a)
print """
We've been doing farly good so far, but at the fourth iteration, things don't
look as nice::
"""
a = shuf_simple(a)
p(a)
print """
This *looks* bad, with a lot of points grouped together, but that may not be an
obvious indicator of poor performance in a real situation. The linear
relationship of the shuffle is obvious from the tables, and indeed some simple
modular algebra can identify the patterns, but since I am a better programmer
than a mathematician, I'm going to resort to empirical methods.
"Meet my good friend, Monte Carlo"
----------------------------------
Since the iterative part of the flame algorithm is itself essentially a Monte
Carlo simulation, it is appropriate that we use one to characterize its
implementation. In fact, the test algorithm is remarkably similar to the way
color values are calculated in the flame algorithm itself:
- Define a list of affine transformations, each with a certain probability
of being selected.
- For several iterations, in each warp:
- Choose one affine transformation from the list according to the
associated probabilities.
- Apply it to the current coordinate.
We'll use a 1D affine transformation (``x_n = A * x_{n-1} + B``) and equal
probabilities. After each iteration, we'll calculate the standard deviation of
the output of each transform seprately, and sum the total results [#]. If there
is more diversity among the input values, the final number should be larger; an
exceptionally small number indicates reduced variability among the inputs.
.. [#] To be honest, I'm not entirely sure *what* this represents,
statistically, but it *feels* right, and that's what counts.
The initial inputs will be obtained using the same method as the flame
algorithm: run random points through the algorithm a few times without
recording the results. If each affine matrix is convergent, this should quickly
produce suitable input points.
To have something to compare it to, we specify two alternative shuffles. The
first is no shuffle per warp, which is expected to be rather terrible. The
second is a completely random shuffle of the entire matrix in each round,
something that's untenable on the device (at least with any algorithm I've yet
thought of). I've also included the results of running this test with an
effective warp width of 1, which would simulate the results of running a
different transformation per thread regardless of warp configuration.
"""
def shuf_none(a):
return a
def shuf_all(a):
np.random.shuffle(a)
return a
print """
Here's the set of affine transformations we'll be using:
"""
FUSE = 20
aff = [(0.5, 0, 0.3), (0.5, 0.5, 0.3), (0.5, 1, 0.3), (0.25, 0.75, 0.1)]
fmtstr = ' %-10s %-10s %s'
print fmtstr % ('A', 'B', 'probability of selection')
for k in aff:
print fmtstr % k
print "\nAnd here are the results::\n"
def monte(a, shuf, rounds, xf_w):
# Shuffle `a` with `shuf` for `rounds`, and for each group of `xf_w`
# threads, pick a new transform, apply it, and accumulate the result
stdsum = 0
for round in range(-FUSE, rounds):
xf_pts = [[] for i in aff]
for wrp in range(0, t, xf_w):
aff_id = 0
sel = np.random.rand(1)[0]
while sel > aff[aff_id][2]:
sel -= aff[aff_id][2]
aff_id += 1
a[wrp:wrp+xf_w] = a[wrp:wrp+xf_w] * aff[aff_id][0] + aff[aff_id][1]
xf_pts[aff_id].extend(a[wrp:wrp+xf_w])
if round >= 0:
stdsum += sum([np.std(x) for x in xf_pts])
a = shuf(a)
return stdsum / rounds
print ' With no shuffle: %g' % monte(make(), shuf_none, 1000, 16)
print ' With full shuffle: %g' % monte(make(), shuf_all, 1000, 16)
print ' With simple shuffle: %g' % monte(make(), shuf_simple, 1000, 16)
print ' With warp_width=1: %g' % monte(make(), shuf_none, 1000, 1)
print """
Failing to shuffle the points clearly has a considerable negative impact on our
ersatz "diversity" criterion. Fully shuffling the set of points each time
substantially increases this result. It's not enough to close the gap between
transform choice by warp and by thread, but it does improve things.
A simple shuffle actually beats the full shuffle consistently by a few points.
This may seem like a flawed result "more shuffling means more diversity"
but it's actually expected. The simple shuffle guarantees that two points
will never move together over one round, but the full shuffle happily reunites
these neighbors from the previous thread, lowering its performance.
As it turns out, the simple shuffle does quite well, despite its strong linear
dependence. If this was the configuration of the device, I would actually be
satisfied putting this into play directly.
But things are, of course, messier.
Hip to be square
----------------
The previous examples make use of a 16-thread warp size, while NVIDIA devices
actually make use of a 32-thread warp. Since we can realistically only fit 256
threads in a warp, this leaves us with a CTA geometry of 32*8, rather than
16*16. To see what this implies for our setup, check out the examples of simple
rotation on a 16*4 matrix, which has the same aspect ratio as the 32*8 device
would::
"""
t = 64
a = make()
for i in range(5):
p(a)
print
a = shuf_simple(a)
print """
The simple shuffle was designed with a square grid in mind, and performs worse
when on the 32*8 device grid::
"""
t = 256
w = 32
print ' With no shuffle: %g' % monte(make(), shuf_none, 1000, 32)
print ' With full shuffle: %g' % monte(make(), shuf_all, 1000, 32)
print ' With simple shuffle: %g' % monte(make(), shuf_simple, 1000, 32)
print ' With warp_width=1: %g' % monte(make(), shuf_none, 1000, 1)
def shuf_better(a):
b = np.empty_like(a)
for n in range(t/w):
r = np.random.randint(32)
for l in range(w):
b[(l+(n+l)*w)%t] = a[(l+r)%w+n*w]
return b
print ' With better shuffle: %g' % monte(make(), shuf_better, 1000, 32)
print """
Okay I actually intended this to be a blog post but I started writing before
having done any of the math. Actually the simple shuffle looks like it's
sufficient for now, and that's what I'm going to implement.
"""

View File

@ -41,9 +41,10 @@ def main(args):
anim.compile() anim.compile()
bins = anim.render_frame() bins = anim.render_frame()
#bins = np.log2(bins + 1) #bins = np.log2(bins + 1)
bins *= (512./(np.mean([bins[y][x][3] alpha = [bins[y][x][3] for x in range(anim.features.hist_width)
for x in range(anim.features.hist_width) for y in range(anim.features.hist_height)]
for y in range(anim.features.hist_height)])+1e-9)) print sum(alpha)
bins *= (512./(np.mean(alpha)+1e-9))
bins = np.minimum(bins, 255) bins = np.minimum(bins, 255)
bins = bins.astype(np.uint8) bins = bins.astype(np.uint8)