Skip to content

Instantly share code, notes, and snippets.

@vurtun
Last active May 27, 2024 16:42
Show Gist options
  • Save vurtun/e051c955b68c719462b594347184ac35 to your computer and use it in GitHub Desktop.
Save vurtun/e051c955b68c719462b594347184ac35 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <stdint.h>
#include <assert.h>
#include <string.h>
#define streq(a, b) (!strcmp((a), (b)))
#ifndef __USE_GNU
#define __USE_GNU
#endif
#include <setjmp.h>
#include <sched.h>
#include <unistd.h>
#include <pthread.h>
#include "xmmintrin.h"
#include <ucontext.h>
#include <sys/mman.h>
typedef int8_t i8;
typedef int16_t i16;
typedef int32_t i32;
typedef int64_t i64;
typedef uint8_t u8;
typedef uint16_t u16;
typedef uint32_t u32;
typedef uint64_t u64;
#define MAX_WORK_QUEUE_THREAD 64
#define FIBER_STACK_SIZE (64 * 1024)
#define MAX_SCHEDULER_WORKER 64
#define MAX_SCHEDULER_FIBERS 128
/* --------------------------------------------------------------
*
* BITSET
*
* --------------------------------------------------------------*/
#define BITS_PER_BYTE 8
#define BITS_PER_LONG 64
#define BIT_WORD(nr) ((nr) / BITS_PER_LONG)
#define DIV_ROUND_UP(n,d) (((n) + (d) - 1) / (d))
#define BITS_TO_LONGS(nr) DIV_ROUND_UP(nr, BITS_PER_BYTE * sizeof(u64))
#define BITMAP_FIRST_WORD_MASK(start) (~0UL << ((start) % (BITS_PER_LONG-1)))
#define BITMAP_LAST_WORD_MASK(nbits) (((nbits) % BITS_PER_LONG) ? (1UL << ((nbits) % BITS_PER_LONG))-1: ~0UL)
#define DECLARE_BITMAP(name, bits) u64 name[BITS_TO_LONGS(bits)]
#define min(a,b) (((a) < (b)) ? (a): (b))
#define max(a,b) (((a) > (b)) ? (a): (b))
static u64
bit_find_first_set(const u64 *addr, u64 size)
{
u64 idx;
#define ffs(x) (u64)__builtin_ffsl((long int)(x))
for (idx = 0; idx * BITS_PER_LONG < size; idx++) {
if (addr[idx]) {
u64 first_set = ffs(addr[idx]) - 1;
return min(idx * BITS_PER_LONG + first_set, size);
}
}
return size;
}
static void
bit_fill(u64 *dst, unsigned int nbits)
{
unsigned int nlongs = (unsigned int)BITS_TO_LONGS(nbits);
unsigned int len = (unsigned int)((nlongs - 1) * sizeof(u64));
memset(dst, 0xff, len);
}
static void
bit_or(u64 *dst, const u64 *bitmap1, const u64 *bitmap2,
unsigned int nbits)
{
unsigned int k;
unsigned int nr = (unsigned int)BITS_TO_LONGS(nbits);
for (k = 0; k < nr; ++k)
dst[k] = bitmap1[k] | bitmap2[k];
}
static void
bit_set(u64 *bitset, unsigned int start, int len)
{
u64 *p = bitset + BIT_WORD(start);
const unsigned int size = start + (unsigned int)len;
int bits_to_set = (int)(BITS_PER_LONG - (start % BITS_PER_LONG));
u64 mask_to_set = BITMAP_FIRST_WORD_MASK(start);
while (len - bits_to_set >= 0) {
*p |= mask_to_set;
len -= bits_to_set;
bits_to_set = BITS_PER_LONG;
mask_to_set = ~0UL;
p++;
}
if (len) {
mask_to_set &= BITMAP_LAST_WORD_MASK(size);
*p |= mask_to_set;
}
}
static void
bit_clear(u64 *bitset, unsigned int start, int len)
{
u64 *p = bitset + BIT_WORD(start);
const unsigned int size = start + (unsigned int)len;
int bits_to_clear = (int)(BITS_PER_LONG - (start % BITS_PER_LONG));
unsigned int long mask_to_clear = BITMAP_FIRST_WORD_MASK(start);
while (len - bits_to_clear >= 0) {
*p &= ~mask_to_clear;
len -= bits_to_clear;
bits_to_clear = BITS_PER_LONG;
mask_to_clear = ~0UL;
p++;
}
if (len) {
mask_to_clear &= BITMAP_LAST_WORD_MASK(size);
*p &= ~mask_to_clear;
}
}
/* --------------------------------------------------------------
*
* SPINLOCK
*
* --------------------------------------------------------------*/
static void
spinlock_begin(volatile u32 *spinlock)
{while (__sync_val_compare_and_swap(spinlock, 0, 1) != 0) _mm_pause();}
static void
spinlock_end(volatile u32 *spinlock)
{_mm_sfence(); *spinlock = 0;}
/* --------------------------------------------------------------
*
* MEMORY
*
* --------------------------------------------------------------*/
#define MEMORY_BLOCK_SIZE (2*1024*1024) /* 2MB page size */
#define MAX_MEMORY_BLOCKS 512 /* 1GB memory */
typedef DECLARE_BITMAP(memory_bitmap, MAX_MEMORY_BLOCKS);
enum memory_tag {
MEMORY_TAG_GAME,
MEMORY_TAG_RENDER,
MEMORY_TAG_GPU,
MEMORY_TAG_COUNT
};
struct memory {
volatile u32 lock;
memory_bitmap free;
memory_bitmap tags[MEMORY_TAG_COUNT];
void *data;
size_t size;
};
static void
mem_init(struct memory *mem)
{
int i;
memset(mem, 0, sizeof(*mem));
mem->size = (size_t)MEMORY_BLOCK_SIZE * (size_t)MAX_MEMORY_BLOCKS;
mem->data = mmap(0, mem->size, PROT_READ|PROT_WRITE,
MAP_PRIVATE|MAP_HUGETLB|MAP_ANONYMOUS, -1, 0);
if (mem->data == MAP_FAILED)
fprintf(stderr, "%s\n", strerror(errno));
assert(mem->data != MAP_FAILED);
bit_fill(mem->free, MAX_MEMORY_BLOCKS);
}
static void
mem_clear(struct memory *mem)
{
munmap(mem->data, mem->size);
memset(mem, 0, sizeof(*mem));
}
static void
mem_free(struct memory *mem, enum memory_tag tag)
{
spinlock_begin(&mem->lock);
bit_or(mem->free, mem->free, mem->tags[tag], MAX_MEMORY_BLOCKS);
memset(mem->tags[tag], 0, sizeof(mem->tags[tag]));
spinlock_end(&mem->lock);
}
static void*
alloc_block(struct memory *mem, enum memory_tag tag)
{
unsigned int index;
spinlock_begin(&mem->lock);
index = (unsigned int)bit_find_first_set(mem->free, MAX_MEMORY_BLOCKS);
assert(index < MAX_MEMORY_BLOCKS);
{char *block;
bit_clear(mem->free, index, 1);
bit_set(mem->tags[tag], index, 1);
block = (char*)mem->data + (MEMORY_BLOCK_SIZE * index);
spinlock_end(&mem->lock);
return block;}
}
/* --------------------------------------------------------------
*
* ALLOCATOR
*
* --------------------------------------------------------------*/
struct block {
void *data;
size_t size;
size_t capacity;
};
struct allocator {
enum memory_tag tag;
struct memory *memory;
struct block blocks[MAX_SCHEDULER_WORKER];
size_t worker_count;
};
static void
alloctor_init(struct allocator *a, size_t worker_count,
struct memory *memory, enum memory_tag tag)
{
unsigned int i = 0;
memset(a, 0, sizeof(*a));
a->tag = tag;
a->memory = memory;
a->worker_count = worker_count;
for(;i < a->worker_count; ++i) {
a->blocks[i].data = alloc_block(a->memory, tag);
a->blocks[i].capacity = MEMORY_BLOCK_SIZE;
a->blocks[i].size = 0;
}
}
static void
allocator_clear(struct allocator *a)
{
unsigned int i = 0;
for(;i < a->worker_count; ++i) {
a->blocks[i].size = 0;
a->blocks[i].data = alloc_block(a->memory, a->tag);
a->blocks[i].capacity = MEMORY_BLOCK_SIZE;
}
}
static void*
alloc(struct allocator *a, int thread_index, size_t size)
{
void *memory = 0;
assert(size < MEMORY_BLOCK_SIZE);
if ((a->blocks[thread_index].size + size) > a->blocks[thread_index].capacity) {
/* allocate new worker memory block */
a->blocks[thread_index].data = alloc_block(a->memory, a->tag);
assert(a->blocks[thread_index].data);
a->blocks[thread_index].size = 0;
}
/* allocate from worker memory block */
memory = (char*)a->blocks[thread_index].data + a->blocks[thread_index].size;
a->blocks[thread_index].size += size;
return memory;
}
/* --------------------------------------------------------------
*
* SEMAPHORE
*
* --------------------------------------------------------------*/
struct sem {
pthread_mutex_t guard;
pthread_cond_t cond;
int count;
};
static void
sem_init(struct sem *s, int init)
{
pthread_mutex_init(&s->guard, 0);
pthread_cond_init(&s->cond, 0);
s->count = init;
}
static void
sem_free(struct sem *s)
{
pthread_mutex_destroy(&s->guard);
pthread_cond_destroy(&s->cond);
}
static void
sem_post(struct sem *s, int delta)
{
if (delta < 0) return;
pthread_mutex_lock(&s->guard);
s->count += delta;
pthread_cond_broadcast(&s->cond);
pthread_mutex_unlock(&s->guard);
}
static void
sem_wait(struct sem *s, int delta)
{
if (delta < 0) return;
pthread_mutex_lock(&s->guard);
do {
if (s->count >= delta) {
s->count -= delta;
break;
}
pthread_cond_wait(&s->cond, &s->guard);
} while (1);
pthread_mutex_unlock(&s->guard);
}
/* --------------------------------------------------------------
*
* FIBER
*
* --------------------------------------------------------------*/
#undef _FORTIFY_SOURCE
typedef void(*fiber_callback)(void*);
struct sys_fiber {
ucontext_t fib;
};
static void
fiber_create(struct sys_fiber *fib, void *stack, size_t stack_size,
fiber_callback callback, void *data)
{
getcontext(&fib->fib);
fib->fib.uc_stack.ss_size = stack_size;
fib->fib.uc_stack.ss_sp = stack;
fib->fib.uc_link = 0;
makecontext(&fib->fib, (void(*)())callback, 1, data);
}
static void
fiber_switch_to(struct sys_fiber *prev, struct sys_fiber *fib)
{
swapcontext(&prev->fib, &fib->fib);
}
/* --------------------------------------------------------------
*
* QUEUE
*
* --------------------------------------------------------------*/
/* This is an implementation of a multi producer and consumer Non-Blocking
* Concurrent FIFO Queue based on the paper from Phillippas Tsigas and Yi Zhangs:
* www.cse.chalmers.se/~tsigas/papers/latest-spaa01.pdf */
#define MAX_WORK_QUEUE_JOBS (1024)
#define WORK_QUEUE_MASK (MAX_WORK_QUEUE_JOBS-1)
struct scheduler;
typedef void(*job_callback)(struct scheduler*,void*);
#define JOB_ENTRY_POINT(name) static void name(struct scheduler *sched, void *arg)
#define BASE_ALIGN(x) __attribute__((aligned(x)))
#define QUEUE_EMPTY 0
#define QUEUE_REMOVED 1
struct job {
void *data;
job_callback callback;
volatile u32 *run_count;
};
struct job_queue {
volatile u32 head;
struct job *jobs[MAX_WORK_QUEUE_JOBS];
volatile u32 tail;
};
static void
job_queue_init(struct job_queue *q)
{
memset(q, 0, sizeof(*q));
q->jobs[0] = QUEUE_EMPTY;
q->head = 0;
q->tail = 1;
}
static int
job_queue_entry_free(struct job *p)
{
return (((uintptr_t)p == QUEUE_EMPTY) || ((uintptr_t)p == QUEUE_REMOVED));
}
static int
job_queue_push(struct job_queue *q, struct job *job)
{
while (1) {
/* read tail */
u32 te = q->tail;
u32 ate = te;
struct job *tt = q->jobs[ate];
u32 tmp = (ate + 1) & WORK_QUEUE_MASK;
struct job *tnew;
/* we want to find the actual tail */
while (!(job_queue_entry_free(tt))) {
/* check tails consistency */
if (te != q->tail) goto retry;
/* check if queue is full */
if (tmp == q->head) break;
tt = q->jobs[tmp];
ate = tmp;
tmp = (ate + 1) & WORK_QUEUE_MASK;
}
/* check tails consistency */
if (te != q->tail) continue;
/* check if queue is full */
if (tmp == q->head) {
ate = (tmp + 1) & WORK_QUEUE_MASK;
tt = q->jobs[ate];
if (!(job_queue_entry_free(tt)))
return 0; /* queue is full */
/* let pop update header */
__sync_bool_compare_and_swap(&q->head, tmp, ate);
continue;
}
if ((uintptr_t)tt == QUEUE_REMOVED)
job = (struct job*)((uintptr_t)job | 0x01);
if (te != q->tail) continue;
if (__sync_bool_compare_and_swap(&q->jobs[ate], tt, job)) {
if ((tmp & 1) == 0)
__sync_bool_compare_and_swap(&q->tail, te, tmp);
return 1;
}
retry:;
}
}
static int
job_queue_pop(struct job **job, struct job_queue *q)
{
while (1) {
u32 th = q->head;
u32 tmp = (th + 1) & WORK_QUEUE_MASK;
struct job *tt = q->jobs[tmp];
struct job *tnull = 0;
/* we want to find the actual head */
while ((job_queue_entry_free(tt))) {
if (th != q->head) goto retry;
if (tmp == q->tail) return 0;
tmp = (tmp + 1) & WORK_QUEUE_MASK;
tt = q->jobs[tmp];
}
/* check head's consistency */
if (th != q->head) continue;
/* check if queue is empty */
if (tmp == q->tail) {
/* help push to update end */
__sync_bool_compare_and_swap(&q->tail, tmp, (tmp+1) & WORK_QUEUE_MASK);
continue; /* retry */
}
tnull = (((uintptr_t)tt & 0x01) ? (struct job*)QUEUE_REMOVED: (struct job*)QUEUE_EMPTY);
if (th != q->head) continue;
/* get actual head */
if (__sync_bool_compare_and_swap(&q->jobs[tmp], tt, tnull)) {
if ((tmp & 0x1) == 0)
__sync_bool_compare_and_swap(&q->head, th, tmp);
*job = (struct job*)((uintptr_t)tt & ~(uintptr_t)1);
return 1;
}
retry:;
}
}
/* --------------------------------------------------------------
*
* SCHEDULER
*
* --------------------------------------------------------------*/
typedef volatile u32 job_counter;
enum job_queue_ids {
JOB_QUEUE_LOW,
JOB_QUEUE_NORMAL,
JOB_QUEUE_HIGH,
JOB_QUEUE_COUNT
};
struct scheduler_fiber {
struct scheduler_fiber *next;
struct scheduler_fiber *prev;
void *stack;
size_t stack_size;
struct sys_fiber handle;
struct job job;
u32 value;
};
struct scheduler_worker {
int id;
pthread_t thread;
struct scheduler_fiber *context;
struct scheduler *sched;
};
typedef void (*scheduler_profiler_callback_f)(void*, int thread_id);
struct scheduler_profiling {
void *userdata;
scheduler_profiler_callback_f thread_start;
scheduler_profiler_callback_f thread_stop;
scheduler_profiler_callback_f context_switch;
scheduler_profiler_callback_f wait_start;
scheduler_profiler_callback_f wait_stop;
};
struct scheduler {
struct sem work_sem;
struct job_queue queue[JOB_QUEUE_COUNT];
int worker_count;
struct scheduler_worker worker[MAX_SCHEDULER_WORKER];
volatile u32 worker_running;
volatile u32 worker_active;
struct scheduler_profiling profiler;
int fiber_count;
struct scheduler_fiber fibers[MAX_SCHEDULER_FIBERS];
volatile u32 wait_list_lock;
struct scheduler_fiber *wait_list;
volatile u32 free_list_lock;
struct scheduler_fiber *free_list;
};
static struct job
Job(job_callback callback, void *data)
{
struct job task;
task.callback = callback;
task.data = data;
task.run_count = 0;
return task;
}
static void
scheduler_hook_into_list(struct scheduler_fiber **list,
struct scheduler_fiber *element, volatile u32 *lock)
{
spinlock_begin(lock);
if (!*list) {
*list = element;
element->prev = 0;
element->next = 0;
} else {
element->prev = 0;
element->next = *list;
(*list)->prev = element;
*list = element;
}
spinlock_end(lock);
}
static void
scheduler_unhook_from_list(struct scheduler_fiber **list,
struct scheduler_fiber *element, volatile u32 *lock)
{
if (lock) spinlock_begin(lock);
if (element->next)
element->next->prev = element->prev;
if (element->prev)
element->prev->next = element->next;
if (*list == element)
*list = element->next;
element->next = element->prev = 0;
if (lock) spinlock_end(lock);
}
static struct scheduler_fiber*
scheduler_find_fiber_finished_waiting(struct scheduler *s)
{
struct scheduler_fiber *iter = s->wait_list;
spinlock_begin(&s->wait_list_lock);
while (iter) {
if (*iter->job.run_count == iter->value) break;
iter = iter->next;
}
if (iter) scheduler_unhook_from_list(&s->wait_list, iter, 0);
spinlock_end(&s->wait_list_lock);
return iter;
}
static struct scheduler_fiber*
scheduler_get_free_fiber(struct scheduler *s)
{
struct scheduler_fiber *fib = 0;
spinlock_begin(&s->free_list_lock);
if (s->fiber_count < MAX_SCHEDULER_FIBERS) {
fib = &s->fibers[s->fiber_count++];
} else if (s->free_list) {
fib = s->free_list;
scheduler_unhook_from_list(&s->free_list, fib, 0);
}
spinlock_end(&s->free_list_lock);
return fib;
}
static void
scheduler_run(struct scheduler *s, enum job_queue_ids q, struct job *jobs,
u32 count, job_counter *counter)
{
u32 jobIndex = 0;
struct job_queue *queue;
assert(q < JOB_QUEUE_COUNT);
assert(counter);
assert(jobs);
assert(s);
queue = &s->queue[q];
while (jobIndex < count) {
jobs[jobIndex].run_count = counter;
if (job_queue_push(queue, &jobs[jobIndex])) {
sem_post(&s->work_sem, 1);
jobIndex++;
}
}
*counter = count;
}
static void
scheduler_fiber_proc(void *arg)
{
struct scheduler_worker *w = (struct scheduler_worker*)arg;
struct scheduler *s = w->sched;
__sync_add_and_fetch(&s->worker_active, 1);
while (1) {
/* check if any fiber is done waiting */
struct scheduler_fiber *fiber = scheduler_find_fiber_finished_waiting(s);
if (fiber) {
/* put old worker context into freelist */
struct scheduler_fiber *old = w->context;
memset(w->context, 0, sizeof(*w->context));
scheduler_hook_into_list(&s->free_list, old, &s->free_list_lock);
/* set previously waiting fiber as worker context */
w->context = fiber;
if (s->profiler.context_switch)
s->profiler.context_switch(s->profiler.userdata, w->id);
fiber_switch_to(&old->handle, &w->context->handle);
}
/* check if any new jobs inside work queues */
{struct job *job = 0;
if (!(job_queue_pop(&job, &s->queue[JOB_QUEUE_HIGH])) &&
!(job_queue_pop(&job, &s->queue[JOB_QUEUE_NORMAL])) &&
!(job_queue_pop(&job, &s->queue[JOB_QUEUE_LOW]))) {
/* currently no job so wait */
__sync_sub_and_fetch(&s->worker_active, 1);
if (s->profiler.wait_start)
s->profiler.wait_start(s->profiler.userdata, w->id);
sem_wait(&s->work_sem, 1);
if (s->profiler.wait_stop)
s->profiler.wait_stop(s->profiler.userdata, w->id);
__sync_add_and_fetch(&s->worker_active, 1);
} else {
/* run dequeued job */
w->context->job = *job;
assert(job->callback);
if (s->profiler.thread_start)
s->profiler.thread_start(s->profiler.userdata, w->id);
job->callback(s, job->data);
if (s->profiler.thread_stop)
s->profiler.thread_stop(s->profiler.userdata, w->id);
__sync_sub_and_fetch(job->run_count, 1);
}}
}
}
static void*
thread_proc(void *arg)
{
struct scheduler_worker *w = (struct scheduler_worker*)arg;
struct scheduler_fiber *fiber;
struct scheduler *s = w->sched;
__sync_add_and_fetch(&s->worker_running, 1);
/* create dummy fiber */
fiber = scheduler_get_free_fiber(s);
assert(fiber);
getcontext(&fiber->handle.fib);
fiber->handle.fib.uc_link = 0;
fiber->handle.fib.uc_stack.ss_size = 0;
fiber->handle.fib.uc_stack.ss_sp = 0;
w->context = fiber;
scheduler_fiber_proc(w);
return 0;
}
static int
scheduler_self_id(struct scheduler *s)
{
int worker_index = 0;
pthread_t self = pthread_self();
for (worker_index; worker_index < s->worker_count; ++worker_index) {
if (s->worker[worker_index].thread == self)
return worker_index;
}
return -1;
}
static struct scheduler_worker*
scheduler_self(struct scheduler *s)
{
int worker_index = scheduler_self_id(s);
if (worker_index < 0) return 0;
return &s->worker[worker_index];
}
static void
scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value)
{
struct scheduler_worker *w;
struct scheduler_fiber *old;
assert(s);
assert(counter);
/* find threads own worker state */
w = scheduler_self(s);
assert(w);
/* insert current context into waiting list */
old = w->context;
w->context->value = value;
w->context->job.run_count = counter;
scheduler_hook_into_list(&s->wait_list, old, &s->wait_list_lock);
/*either continue finished waiting job or start new one */
w->context = scheduler_find_fiber_finished_waiting(s);
if (!w->context) {
w->context = scheduler_get_free_fiber(s);
assert(w->context);
fiber_create(&w->context->handle, w->context->stack,
w->context->stack_size, scheduler_fiber_proc, w);
}
if (s->profiler.context_switch)
s->profiler.context_switch(s->profiler.userdata, w->id);
fiber_switch_to(&old->handle, &w->context->handle);
}
static void
sched_init(struct scheduler *sched, size_t worker_count)
{
size_t thread_index = 0;
pthread_attr_t attr;
assert(sched);
assert(worker_count);
memset(sched, 0, sizeof(*sched));
sched->worker_count = (int)worker_count;
/* init semeaphores */
sem_init(&sched->work_sem, 0);
job_queue_init(&sched->queue[0]);
job_queue_init(&sched->queue[1]);
job_queue_init(&sched->queue[2]);
/* init fibers */
{int fiber_index = 0;
for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) {
struct scheduler_fiber *fiber = sched->fibers + fiber_index;
fiber->stack_size = FIBER_STACK_SIZE;
fiber->stack = calloc(fiber->stack_size, 1);
}}
/* start worker threads */
pthread_attr_init(&attr);
for (thread_index; thread_index < worker_count-1; ++thread_index) {
cpu_set_t cpus;
sched->worker[thread_index].id = (int)thread_index;
sched->worker[thread_index].sched = sched;
/* bind thread to core */
CPU_ZERO(&cpus);
CPU_SET(thread_index, &cpus);
pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpus);
/* start worker thread */
pthread_create(&sched->worker[thread_index].thread, &attr,
thread_proc, &sched->worker[thread_index]);
pthread_detach(sched->worker[thread_index].thread);
}
/* initialize main thread as worker thread */
{cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(thread_index, &cpus);
sched->worker[thread_index].sched = sched;
sched->worker[thread_index].thread = pthread_self();
sched->worker[thread_index].id = (int)thread_index;
pthread_setaffinity_np(sched->worker[thread_index].thread, sizeof(cpu_set_t), &cpus);}
/* create fiber for main thread worker */
{struct scheduler_fiber *fiber;
fiber = scheduler_get_free_fiber(sched);
assert(fiber);
getcontext(&fiber->handle.fib);
fiber->handle.fib.uc_link = 0;
fiber->handle.fib.uc_stack.ss_size = 0;
fiber->handle.fib.uc_stack.ss_sp = 0;
sched->worker[thread_index].context = fiber;}
pthread_attr_destroy(&attr);
}
static void
sched_free(struct scheduler *sched)
{
/* free fibers stack */
{int fiber_index = 0;
for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) {
struct scheduler_fiber *fiber = sched->fibers + fiber_index;
free(fiber->stack);
}}
sem_free(&sched->work_sem);
}
/* --------------------------------------------------------------
*
* TEST
*
* --------------------------------------------------------------*/
struct game_state {
int data;
struct memory memory;
struct allocator game_alloc;
struct allocator gpu_alloc;
struct allocator render_alloc;
};
struct test_data {
struct allocator *alloc;
int *data;
int from;
int to;
};
JOB_ENTRY_POINT(test_work)
{
int i;
void *mem;
struct test_data *data = arg;
mem = alloc(data->alloc, scheduler_self_id(sched), 1024);
for (i = data->from; i < data->to; ++i)
data->data[i] = i;
printf("sleep begin\n");
sleep(1);
printf("sleep end\n");
}
JOB_ENTRY_POINT(root)
{
struct game_state *game = arg;
job_counter counter = 0;
struct job jobs[8];
struct test_data data[8];
int i, n[2*1024];
printf("root\n");
for (i = 0; i < 8; ++i) {
data[i].alloc = (i&1) ? &game->game_alloc: &game->render_alloc;
data[i].data = n;
data[i].from = i * 256;
data[i].to = (i+1)*256;
jobs[i] = Job(test_work, &data);
}
scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 8, &counter);
printf("run\n");
scheduler_wait_for(sched, &counter, 0);
mem_free(&game->memory, MEMORY_TAG_GAME);
mem_free(&game->memory, MEMORY_TAG_RENDER);
mem_free(&game->memory, MEMORY_TAG_GPU);
printf("done\n");
}
int main(int argc, char **argv)
{
/* setup app memory and allocator */
struct game_state app;
size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
memset(&app, 0, sizeof(app));
mem_init(&app.memory);
alloctor_init(&app.game_alloc, thread_count, &app.memory, MEMORY_TAG_GAME);
alloctor_init(&app.render_alloc, thread_count, &app.memory, MEMORY_TAG_RENDER);
alloctor_init(&app.gpu_alloc, thread_count, &app.memory, MEMORY_TAG_GPU);
/* start root process */
{struct scheduler sched;
struct job job = Job(root, &app);
job_counter counter;
sched_init(&sched, thread_count);
printf("init\n");
scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter);
printf("run\n");
scheduler_wait_for(&sched, &counter, 0);
printf("finished\n");}
return 0;
}
@jesta88
Copy link

jesta88 commented Jul 13, 2018

Removing makecontext/swapcontext would give this a big speed boost. The best way to simulate this on UNIX is with assembly. Boost.context and libco have that part already handled for common architectures.

@bsekura
Copy link

bsekura commented Aug 27, 2018

At line 653 you are releasing current context (putting it on the free list) - what if another thread picks it up before you switch out of it at line 659? Similarly, at 748 you're putting current context onto the wait list - what if another thread picks it up, assuming the condition has been signaled in the meantime, before you switch out of it at line 760? In both cases you'd be switching to a running fiber.

Perhaps I'm missing something here, would greatly appreciate an explanation.

@vurtun
Copy link
Author

vurtun commented Oct 5, 2018

Hey @bsekura sorry gist does not give notifications. Hope I understand you correctly but all list operations have a spinlock. So when scheduler_find_fiber_finished_waiting returns it will guarantee that only we have that fiber context. After that we add our old fiber context into the free list.

@vurtun
Copy link
Author

vurtun commented Oct 5, 2018

@jesta88 yes agree. Specifically for linux I wrote a minimal assembly version or libco like you said is a good solution. The reason why I used makecontext/swapcontext for this is that I wanted to be 100% certain that any bug is inside my code and not inside a custom assembly coroutine implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment