Skip to content

Instantly share code, notes, and snippets.

@rygorous
Last active August 28, 2023 10:13
Show Gist options
  • Save rygorous/ecd598ccd58b21f26c61cdf1bc273eca to your computer and use it in GitHub Desktop.
Save rygorous/ecd598ccd58b21f26c61cdf1bc273eca to your computer and use it in GitHub Desktop.
int16 mergesort construction kit
#include <emmintrin.h>
#include <tmmintrin.h> // for PSHUFB; this isn't strictly necessary (see comments in reverse_s16)
typedef int16_t S16;
typedef __m128i Vec;
static inline Vec load8_s16(const S16 *x) { return _mm_loadu_si128((const __m128i *) x); }
static inline void store8_s16(S16 *x, Vec v) { _mm_storeu_si128((__m128i *) x, v); }
static inline void sort_two(Vec &a, Vec &b) { Vec t = a; a = _mm_min_epi16(a, b); b = _mm_max_epi16(b, t); }
static inline void interleave(Vec &a, Vec &b) { Vec t = a; a = _mm_unpacklo_epi16(a, b); b = _mm_unpackhi_epi16(t, b); }
static inline Vec reverse_s16(Vec x)
{
// or PSHUFLW / PSHUFHW / PSHUFD
return _mm_shuffle_epi8(x, _mm_setr_epi16(0x0f0e, 0x0d0c, 0x0b0a, 0x0908, 0x0706, 0x0504, 0x0302, 0x0100));
}
static inline void reverse_s16_pair(Vec &a, Vec &b)
{
Vec t = b;
b = reverse_s16(a);
a = reverse_s16(t);
}
// Bitonic merger of two 8-element lists where the second is already reversed
static inline void bitonic_merge8_brev(Vec &a, Vec &b)
{
// pass 1
sort_two(a, b);
interleave(a, b);
// pass 2
sort_two(a, b);
interleave(a, b);
// pass 3
sort_two(a, b);
interleave(a, b);
// pass 4
sort_two(a, b);
interleave(a, b);
}
// Bitonic merger of two 8-element lists
static inline void bitonic_merge8(Vec &a, Vec &b)
{
b = reverse_s16(b);
bitonic_merge8_brev(a, b);
}
// Merge two sorted 16-element runs to a 32-element run
// First run is (a,b), second is (c,d)
// first is ascending, second is descending (already reversed)
static inline void bitonic_merge16_brev(Vec &a, Vec &b, Vec &c, Vec &d)
{
// First level bitonic sorter
sort_two(a, c);
sort_two(b, d);
// Lower levels
bitonic_merge8_brev(a, b);
bitonic_merge8_brev(c, d);
}
// Merge two sorted 16-element runs to a 32-element run
// First run is (a,b), second is (c,d)
static inline void bitonic_merge16(Vec &a, Vec &b, Vec &c, Vec &d)
{
reverse_s16_pair(c, d);
bitonic_merge16_brev(a, b, c, d);
}
// Initial in-register sort pass (out == arr permitted).
// Use to bootstrap later merges.
static void sort64(S16 out[], const S16 arr[])
{
// Load
Vec r0 = load8_s16(arr + 0);
Vec r1 = load8_s16(arr + 8);
Vec r2 = load8_s16(arr + 16);
Vec r3 = load8_s16(arr + 24);
Vec r4 = load8_s16(arr + 32);
Vec r5 = load8_s16(arr + 40);
Vec r6 = load8_s16(arr + 48);
Vec r7 = load8_s16(arr + 56);
// Perform 8 parallel 8-element sorts via Batcher's odd-even mergesort network on 8 elements (19 sorters)
sort_two(r0, r1);
sort_two(r2, r3);
sort_two(r4, r5);
sort_two(r6, r7);
sort_two(r0, r2);
sort_two(r1, r3);
sort_two(r4, r6);
sort_two(r5, r7);
sort_two(r1, r2);
sort_two(r5, r6);
sort_two(r0, r4);
sort_two(r1, r5);
sort_two(r2, r6);
sort_two(r3, r7);
sort_two(r2, r4);
sort_two(r3, r5);
sort_two(r1, r2);
sort_two(r3, r4);
sort_two(r5, r6);
// 8x8 transpose to produce 8 in-register increasing runs
interleave(r0, r4);
interleave(r1, r5);
interleave(r2, r6);
interleave(r3, r7);
interleave(r0, r2);
interleave(r1, r3);
interleave(r4, r6);
interleave(r5, r7);
interleave(r0, r1);
interleave(r2, r3);
interleave(r4, r5);
interleave(r6, r7);
// Merge pairs of 8-element runs to produce four 16-element runs
bitonic_merge8(r0, r1);
bitonic_merge8(r2, r3);
bitonic_merge8(r4, r5);
bitonic_merge8(r6, r7);
// Merge pairs of 16-element runs to produce 32-element runs
bitonic_merge16(r0, r1, r2, r3);
bitonic_merge16(r4, r5, r6, r7);
// Merge 32-element runs to produce 64-element run
reverse_s16_pair(r4, r7);
reverse_s16_pair(r5, r6);
// Bitonic32 first stage
sort_two(r0, r4);
sort_two(r1, r5);
sort_two(r2, r6);
sort_two(r3, r7);
// Further stages
bitonic_merge16_brev(r0,r1, r2,r3);
bitonic_merge16_brev(r4,r5, r6,r7);
store8_s16(out + 0, r0);
store8_s16(out + 8, r1);
store8_s16(out + 16, r2);
store8_s16(out + 24, r3);
store8_s16(out + 32, r4);
store8_s16(out + 40, r5);
store8_s16(out + 48, r6);
store8_s16(out + 56, r7);
}
// Merge two runs inA and inB, both with "elemsPerRun" elements. elemsPerRun
// is assumed to be a multiple of 16.
//
// It's easy to modify to work with different lengths of runs A and B as long
// as both are multiples of 16.
static void merge_pass(S16 *out, const S16 *inA, const S16 *inB, size_t elemsPerRun)
{
const S16 *endA = inA + elemsPerRun;
const S16 *endB = inB + elemsPerRun;
S16 *outNearEnd = out + 2*elemsPerRun - 16;
Vec vMax0 = load8_s16(inB + 0);
Vec vMax1 = load8_s16(inB + 8);
inB += 16;
do
{
// Insert next batch of elements from whichever vector has the
// next-smallest value we haven't inserted into the merger yet.
const S16 *loadPtr;
if (inA < endA && (inB == endB || *inA <= *inB))
{
loadPtr = inA;
inA += 16;
}
else
{
loadPtr = inB;
inB += 16;
}
Vec vMin0 = load8_s16(loadPtr + 0);
Vec vMin1 = load8_s16(loadPtr + 8);
// Merge two partial 16-element runs
bitonic_merge16(vMin0,vMin1, vMax0,vMax1);
// Store the smaller 8 elements
store8_s16(out + 0, vMin0);
store8_s16(out + 8, vMin1);
out += 16;
}
while (out < outNearEnd);
// Store second half of final batch
store8_s16(out + 0, vMax0);
store8_s16(out + 8, vMax1);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment