-
-
Save vurtun/e051c955b68c719462b594347184ac35 to your computer and use it in GitHub Desktop.
#include <stdio.h> | |
#include <errno.h> | |
#include <stdlib.h> | |
#include <stdint.h> | |
#include <assert.h> | |
#include <string.h> | |
#define streq(a, b) (!strcmp((a), (b))) | |
#ifndef __USE_GNU | |
#define __USE_GNU | |
#endif | |
#include <setjmp.h> | |
#include <sched.h> | |
#include <unistd.h> | |
#include <pthread.h> | |
#include "xmmintrin.h" | |
#include <ucontext.h> | |
#include <sys/mman.h> | |
typedef int8_t i8; | |
typedef int16_t i16; | |
typedef int32_t i32; | |
typedef int64_t i64; | |
typedef uint8_t u8; | |
typedef uint16_t u16; | |
typedef uint32_t u32; | |
typedef uint64_t u64; | |
#define MAX_WORK_QUEUE_THREAD 64 | |
#define FIBER_STACK_SIZE (64 * 1024) | |
#define MAX_SCHEDULER_WORKER 64 | |
#define MAX_SCHEDULER_FIBERS 128 | |
/* -------------------------------------------------------------- | |
* | |
* BITSET | |
* | |
* --------------------------------------------------------------*/ | |
#define BITS_PER_BYTE 8 | |
#define BITS_PER_LONG 64 | |
#define BIT_WORD(nr) ((nr) / BITS_PER_LONG) | |
#define DIV_ROUND_UP(n,d) (((n) + (d) - 1) / (d)) | |
#define BITS_TO_LONGS(nr) DIV_ROUND_UP(nr, BITS_PER_BYTE * sizeof(u64)) | |
#define BITMAP_FIRST_WORD_MASK(start) (~0UL << ((start) % (BITS_PER_LONG-1))) | |
#define BITMAP_LAST_WORD_MASK(nbits) (((nbits) % BITS_PER_LONG) ? (1UL << ((nbits) % BITS_PER_LONG))-1: ~0UL) | |
#define DECLARE_BITMAP(name, bits) u64 name[BITS_TO_LONGS(bits)] | |
#define min(a,b) (((a) < (b)) ? (a): (b)) | |
#define max(a,b) (((a) > (b)) ? (a): (b)) | |
static u64 | |
bit_find_first_set(const u64 *addr, u64 size) | |
{ | |
u64 idx; | |
#define ffs(x) (u64)__builtin_ffsl((long int)(x)) | |
for (idx = 0; idx * BITS_PER_LONG < size; idx++) { | |
if (addr[idx]) { | |
u64 first_set = ffs(addr[idx]) - 1; | |
return min(idx * BITS_PER_LONG + first_set, size); | |
} | |
} | |
return size; | |
} | |
static void | |
bit_fill(u64 *dst, unsigned int nbits) | |
{ | |
unsigned int nlongs = (unsigned int)BITS_TO_LONGS(nbits); | |
unsigned int len = (unsigned int)((nlongs - 1) * sizeof(u64)); | |
memset(dst, 0xff, len); | |
} | |
static void | |
bit_or(u64 *dst, const u64 *bitmap1, const u64 *bitmap2, | |
unsigned int nbits) | |
{ | |
unsigned int k; | |
unsigned int nr = (unsigned int)BITS_TO_LONGS(nbits); | |
for (k = 0; k < nr; ++k) | |
dst[k] = bitmap1[k] | bitmap2[k]; | |
} | |
static void | |
bit_set(u64 *bitset, unsigned int start, int len) | |
{ | |
u64 *p = bitset + BIT_WORD(start); | |
const unsigned int size = start + (unsigned int)len; | |
int bits_to_set = (int)(BITS_PER_LONG - (start % BITS_PER_LONG)); | |
u64 mask_to_set = BITMAP_FIRST_WORD_MASK(start); | |
while (len - bits_to_set >= 0) { | |
*p |= mask_to_set; | |
len -= bits_to_set; | |
bits_to_set = BITS_PER_LONG; | |
mask_to_set = ~0UL; | |
p++; | |
} | |
if (len) { | |
mask_to_set &= BITMAP_LAST_WORD_MASK(size); | |
*p |= mask_to_set; | |
} | |
} | |
static void | |
bit_clear(u64 *bitset, unsigned int start, int len) | |
{ | |
u64 *p = bitset + BIT_WORD(start); | |
const unsigned int size = start + (unsigned int)len; | |
int bits_to_clear = (int)(BITS_PER_LONG - (start % BITS_PER_LONG)); | |
unsigned int long mask_to_clear = BITMAP_FIRST_WORD_MASK(start); | |
while (len - bits_to_clear >= 0) { | |
*p &= ~mask_to_clear; | |
len -= bits_to_clear; | |
bits_to_clear = BITS_PER_LONG; | |
mask_to_clear = ~0UL; | |
p++; | |
} | |
if (len) { | |
mask_to_clear &= BITMAP_LAST_WORD_MASK(size); | |
*p &= ~mask_to_clear; | |
} | |
} | |
/* -------------------------------------------------------------- | |
* | |
* SPINLOCK | |
* | |
* --------------------------------------------------------------*/ | |
static void | |
spinlock_begin(volatile u32 *spinlock) | |
{while (__sync_val_compare_and_swap(spinlock, 0, 1) != 0) _mm_pause();} | |
static void | |
spinlock_end(volatile u32 *spinlock) | |
{_mm_sfence(); *spinlock = 0;} | |
/* -------------------------------------------------------------- | |
* | |
* MEMORY | |
* | |
* --------------------------------------------------------------*/ | |
#define MEMORY_BLOCK_SIZE (2*1024*1024) /* 2MB page size */ | |
#define MAX_MEMORY_BLOCKS 512 /* 1GB memory */ | |
typedef DECLARE_BITMAP(memory_bitmap, MAX_MEMORY_BLOCKS); | |
enum memory_tag { | |
MEMORY_TAG_GAME, | |
MEMORY_TAG_RENDER, | |
MEMORY_TAG_GPU, | |
MEMORY_TAG_COUNT | |
}; | |
struct memory { | |
volatile u32 lock; | |
memory_bitmap free; | |
memory_bitmap tags[MEMORY_TAG_COUNT]; | |
void *data; | |
size_t size; | |
}; | |
static void | |
mem_init(struct memory *mem) | |
{ | |
int i; | |
memset(mem, 0, sizeof(*mem)); | |
mem->size = (size_t)MEMORY_BLOCK_SIZE * (size_t)MAX_MEMORY_BLOCKS; | |
mem->data = mmap(0, mem->size, PROT_READ|PROT_WRITE, | |
MAP_PRIVATE|MAP_HUGETLB|MAP_ANONYMOUS, -1, 0); | |
if (mem->data == MAP_FAILED) | |
fprintf(stderr, "%s\n", strerror(errno)); | |
assert(mem->data != MAP_FAILED); | |
bit_fill(mem->free, MAX_MEMORY_BLOCKS); | |
} | |
static void | |
mem_clear(struct memory *mem) | |
{ | |
munmap(mem->data, mem->size); | |
memset(mem, 0, sizeof(*mem)); | |
} | |
static void | |
mem_free(struct memory *mem, enum memory_tag tag) | |
{ | |
spinlock_begin(&mem->lock); | |
bit_or(mem->free, mem->free, mem->tags[tag], MAX_MEMORY_BLOCKS); | |
memset(mem->tags[tag], 0, sizeof(mem->tags[tag])); | |
spinlock_end(&mem->lock); | |
} | |
static void* | |
alloc_block(struct memory *mem, enum memory_tag tag) | |
{ | |
unsigned int index; | |
spinlock_begin(&mem->lock); | |
index = (unsigned int)bit_find_first_set(mem->free, MAX_MEMORY_BLOCKS); | |
assert(index < MAX_MEMORY_BLOCKS); | |
{char *block; | |
bit_clear(mem->free, index, 1); | |
bit_set(mem->tags[tag], index, 1); | |
block = (char*)mem->data + (MEMORY_BLOCK_SIZE * index); | |
spinlock_end(&mem->lock); | |
return block;} | |
} | |
/* -------------------------------------------------------------- | |
* | |
* ALLOCATOR | |
* | |
* --------------------------------------------------------------*/ | |
struct block { | |
void *data; | |
size_t size; | |
size_t capacity; | |
}; | |
struct allocator { | |
enum memory_tag tag; | |
struct memory *memory; | |
struct block blocks[MAX_SCHEDULER_WORKER]; | |
size_t worker_count; | |
}; | |
static void | |
alloctor_init(struct allocator *a, size_t worker_count, | |
struct memory *memory, enum memory_tag tag) | |
{ | |
unsigned int i = 0; | |
memset(a, 0, sizeof(*a)); | |
a->tag = tag; | |
a->memory = memory; | |
a->worker_count = worker_count; | |
for(;i < a->worker_count; ++i) { | |
a->blocks[i].data = alloc_block(a->memory, tag); | |
a->blocks[i].capacity = MEMORY_BLOCK_SIZE; | |
a->blocks[i].size = 0; | |
} | |
} | |
static void | |
allocator_clear(struct allocator *a) | |
{ | |
unsigned int i = 0; | |
for(;i < a->worker_count; ++i) { | |
a->blocks[i].size = 0; | |
a->blocks[i].data = alloc_block(a->memory, a->tag); | |
a->blocks[i].capacity = MEMORY_BLOCK_SIZE; | |
} | |
} | |
static void* | |
alloc(struct allocator *a, int thread_index, size_t size) | |
{ | |
void *memory = 0; | |
assert(size < MEMORY_BLOCK_SIZE); | |
if ((a->blocks[thread_index].size + size) > a->blocks[thread_index].capacity) { | |
/* allocate new worker memory block */ | |
a->blocks[thread_index].data = alloc_block(a->memory, a->tag); | |
assert(a->blocks[thread_index].data); | |
a->blocks[thread_index].size = 0; | |
} | |
/* allocate from worker memory block */ | |
memory = (char*)a->blocks[thread_index].data + a->blocks[thread_index].size; | |
a->blocks[thread_index].size += size; | |
return memory; | |
} | |
/* -------------------------------------------------------------- | |
* | |
* SEMAPHORE | |
* | |
* --------------------------------------------------------------*/ | |
struct sem { | |
pthread_mutex_t guard; | |
pthread_cond_t cond; | |
int count; | |
}; | |
static void | |
sem_init(struct sem *s, int init) | |
{ | |
pthread_mutex_init(&s->guard, 0); | |
pthread_cond_init(&s->cond, 0); | |
s->count = init; | |
} | |
static void | |
sem_free(struct sem *s) | |
{ | |
pthread_mutex_destroy(&s->guard); | |
pthread_cond_destroy(&s->cond); | |
} | |
static void | |
sem_post(struct sem *s, int delta) | |
{ | |
if (delta < 0) return; | |
pthread_mutex_lock(&s->guard); | |
s->count += delta; | |
pthread_cond_broadcast(&s->cond); | |
pthread_mutex_unlock(&s->guard); | |
} | |
static void | |
sem_wait(struct sem *s, int delta) | |
{ | |
if (delta < 0) return; | |
pthread_mutex_lock(&s->guard); | |
do { | |
if (s->count >= delta) { | |
s->count -= delta; | |
break; | |
} | |
pthread_cond_wait(&s->cond, &s->guard); | |
} while (1); | |
pthread_mutex_unlock(&s->guard); | |
} | |
/* -------------------------------------------------------------- | |
* | |
* FIBER | |
* | |
* --------------------------------------------------------------*/ | |
#undef _FORTIFY_SOURCE | |
typedef void(*fiber_callback)(void*); | |
struct sys_fiber { | |
ucontext_t fib; | |
}; | |
static void | |
fiber_create(struct sys_fiber *fib, void *stack, size_t stack_size, | |
fiber_callback callback, void *data) | |
{ | |
getcontext(&fib->fib); | |
fib->fib.uc_stack.ss_size = stack_size; | |
fib->fib.uc_stack.ss_sp = stack; | |
fib->fib.uc_link = 0; | |
makecontext(&fib->fib, (void(*)())callback, 1, data); | |
} | |
static void | |
fiber_switch_to(struct sys_fiber *prev, struct sys_fiber *fib) | |
{ | |
swapcontext(&prev->fib, &fib->fib); | |
} | |
/* -------------------------------------------------------------- | |
* | |
* QUEUE | |
* | |
* --------------------------------------------------------------*/ | |
/* This is an implementation of a multi producer and consumer Non-Blocking | |
* Concurrent FIFO Queue based on the paper from Phillippas Tsigas and Yi Zhangs: | |
* www.cse.chalmers.se/~tsigas/papers/latest-spaa01.pdf */ | |
#define MAX_WORK_QUEUE_JOBS (1024) | |
#define WORK_QUEUE_MASK (MAX_WORK_QUEUE_JOBS-1) | |
struct scheduler; | |
typedef void(*job_callback)(struct scheduler*,void*); | |
#define JOB_ENTRY_POINT(name) static void name(struct scheduler *sched, void *arg) | |
#define BASE_ALIGN(x) __attribute__((aligned(x))) | |
#define QUEUE_EMPTY 0 | |
#define QUEUE_REMOVED 1 | |
struct job { | |
void *data; | |
job_callback callback; | |
volatile u32 *run_count; | |
}; | |
struct job_queue { | |
volatile u32 head; | |
struct job *jobs[MAX_WORK_QUEUE_JOBS]; | |
volatile u32 tail; | |
}; | |
static void | |
job_queue_init(struct job_queue *q) | |
{ | |
memset(q, 0, sizeof(*q)); | |
q->jobs[0] = QUEUE_EMPTY; | |
q->head = 0; | |
q->tail = 1; | |
} | |
static int | |
job_queue_entry_free(struct job *p) | |
{ | |
return (((uintptr_t)p == QUEUE_EMPTY) || ((uintptr_t)p == QUEUE_REMOVED)); | |
} | |
static int | |
job_queue_push(struct job_queue *q, struct job *job) | |
{ | |
while (1) { | |
/* read tail */ | |
u32 te = q->tail; | |
u32 ate = te; | |
struct job *tt = q->jobs[ate]; | |
u32 tmp = (ate + 1) & WORK_QUEUE_MASK; | |
struct job *tnew; | |
/* we want to find the actual tail */ | |
while (!(job_queue_entry_free(tt))) { | |
/* check tails consistency */ | |
if (te != q->tail) goto retry; | |
/* check if queue is full */ | |
if (tmp == q->head) break; | |
tt = q->jobs[tmp]; | |
ate = tmp; | |
tmp = (ate + 1) & WORK_QUEUE_MASK; | |
} | |
/* check tails consistency */ | |
if (te != q->tail) continue; | |
/* check if queue is full */ | |
if (tmp == q->head) { | |
ate = (tmp + 1) & WORK_QUEUE_MASK; | |
tt = q->jobs[ate]; | |
if (!(job_queue_entry_free(tt))) | |
return 0; /* queue is full */ | |
/* let pop update header */ | |
__sync_bool_compare_and_swap(&q->head, tmp, ate); | |
continue; | |
} | |
if ((uintptr_t)tt == QUEUE_REMOVED) | |
job = (struct job*)((uintptr_t)job | 0x01); | |
if (te != q->tail) continue; | |
if (__sync_bool_compare_and_swap(&q->jobs[ate], tt, job)) { | |
if ((tmp & 1) == 0) | |
__sync_bool_compare_and_swap(&q->tail, te, tmp); | |
return 1; | |
} | |
retry:; | |
} | |
} | |
static int | |
job_queue_pop(struct job **job, struct job_queue *q) | |
{ | |
while (1) { | |
u32 th = q->head; | |
u32 tmp = (th + 1) & WORK_QUEUE_MASK; | |
struct job *tt = q->jobs[tmp]; | |
struct job *tnull = 0; | |
/* we want to find the actual head */ | |
while ((job_queue_entry_free(tt))) { | |
if (th != q->head) goto retry; | |
if (tmp == q->tail) return 0; | |
tmp = (tmp + 1) & WORK_QUEUE_MASK; | |
tt = q->jobs[tmp]; | |
} | |
/* check head's consistency */ | |
if (th != q->head) continue; | |
/* check if queue is empty */ | |
if (tmp == q->tail) { | |
/* help push to update end */ | |
__sync_bool_compare_and_swap(&q->tail, tmp, (tmp+1) & WORK_QUEUE_MASK); | |
continue; /* retry */ | |
} | |
tnull = (((uintptr_t)tt & 0x01) ? (struct job*)QUEUE_REMOVED: (struct job*)QUEUE_EMPTY); | |
if (th != q->head) continue; | |
/* get actual head */ | |
if (__sync_bool_compare_and_swap(&q->jobs[tmp], tt, tnull)) { | |
if ((tmp & 0x1) == 0) | |
__sync_bool_compare_and_swap(&q->head, th, tmp); | |
*job = (struct job*)((uintptr_t)tt & ~(uintptr_t)1); | |
return 1; | |
} | |
retry:; | |
} | |
} | |
/* -------------------------------------------------------------- | |
* | |
* SCHEDULER | |
* | |
* --------------------------------------------------------------*/ | |
typedef volatile u32 job_counter; | |
enum job_queue_ids { | |
JOB_QUEUE_LOW, | |
JOB_QUEUE_NORMAL, | |
JOB_QUEUE_HIGH, | |
JOB_QUEUE_COUNT | |
}; | |
struct scheduler_fiber { | |
struct scheduler_fiber *next; | |
struct scheduler_fiber *prev; | |
void *stack; | |
size_t stack_size; | |
struct sys_fiber handle; | |
struct job job; | |
u32 value; | |
}; | |
struct scheduler_worker { | |
int id; | |
pthread_t thread; | |
struct scheduler_fiber *context; | |
struct scheduler *sched; | |
}; | |
typedef void (*scheduler_profiler_callback_f)(void*, int thread_id); | |
struct scheduler_profiling { | |
void *userdata; | |
scheduler_profiler_callback_f thread_start; | |
scheduler_profiler_callback_f thread_stop; | |
scheduler_profiler_callback_f context_switch; | |
scheduler_profiler_callback_f wait_start; | |
scheduler_profiler_callback_f wait_stop; | |
}; | |
struct scheduler { | |
struct sem work_sem; | |
struct job_queue queue[JOB_QUEUE_COUNT]; | |
int worker_count; | |
struct scheduler_worker worker[MAX_SCHEDULER_WORKER]; | |
volatile u32 worker_running; | |
volatile u32 worker_active; | |
struct scheduler_profiling profiler; | |
int fiber_count; | |
struct scheduler_fiber fibers[MAX_SCHEDULER_FIBERS]; | |
volatile u32 wait_list_lock; | |
struct scheduler_fiber *wait_list; | |
volatile u32 free_list_lock; | |
struct scheduler_fiber *free_list; | |
}; | |
static struct job | |
Job(job_callback callback, void *data) | |
{ | |
struct job task; | |
task.callback = callback; | |
task.data = data; | |
task.run_count = 0; | |
return task; | |
} | |
static void | |
scheduler_hook_into_list(struct scheduler_fiber **list, | |
struct scheduler_fiber *element, volatile u32 *lock) | |
{ | |
spinlock_begin(lock); | |
if (!*list) { | |
*list = element; | |
element->prev = 0; | |
element->next = 0; | |
} else { | |
element->prev = 0; | |
element->next = *list; | |
(*list)->prev = element; | |
*list = element; | |
} | |
spinlock_end(lock); | |
} | |
static void | |
scheduler_unhook_from_list(struct scheduler_fiber **list, | |
struct scheduler_fiber *element, volatile u32 *lock) | |
{ | |
if (lock) spinlock_begin(lock); | |
if (element->next) | |
element->next->prev = element->prev; | |
if (element->prev) | |
element->prev->next = element->next; | |
if (*list == element) | |
*list = element->next; | |
element->next = element->prev = 0; | |
if (lock) spinlock_end(lock); | |
} | |
static struct scheduler_fiber* | |
scheduler_find_fiber_finished_waiting(struct scheduler *s) | |
{ | |
struct scheduler_fiber *iter = s->wait_list; | |
spinlock_begin(&s->wait_list_lock); | |
while (iter) { | |
if (*iter->job.run_count == iter->value) break; | |
iter = iter->next; | |
} | |
if (iter) scheduler_unhook_from_list(&s->wait_list, iter, 0); | |
spinlock_end(&s->wait_list_lock); | |
return iter; | |
} | |
static struct scheduler_fiber* | |
scheduler_get_free_fiber(struct scheduler *s) | |
{ | |
struct scheduler_fiber *fib = 0; | |
spinlock_begin(&s->free_list_lock); | |
if (s->fiber_count < MAX_SCHEDULER_FIBERS) { | |
fib = &s->fibers[s->fiber_count++]; | |
} else if (s->free_list) { | |
fib = s->free_list; | |
scheduler_unhook_from_list(&s->free_list, fib, 0); | |
} | |
spinlock_end(&s->free_list_lock); | |
return fib; | |
} | |
static void | |
scheduler_run(struct scheduler *s, enum job_queue_ids q, struct job *jobs, | |
u32 count, job_counter *counter) | |
{ | |
u32 jobIndex = 0; | |
struct job_queue *queue; | |
assert(q < JOB_QUEUE_COUNT); | |
assert(counter); | |
assert(jobs); | |
assert(s); | |
queue = &s->queue[q]; | |
while (jobIndex < count) { | |
jobs[jobIndex].run_count = counter; | |
if (job_queue_push(queue, &jobs[jobIndex])) { | |
sem_post(&s->work_sem, 1); | |
jobIndex++; | |
} | |
} | |
*counter = count; | |
} | |
static void | |
scheduler_fiber_proc(void *arg) | |
{ | |
struct scheduler_worker *w = (struct scheduler_worker*)arg; | |
struct scheduler *s = w->sched; | |
__sync_add_and_fetch(&s->worker_active, 1); | |
while (1) { | |
/* check if any fiber is done waiting */ | |
struct scheduler_fiber *fiber = scheduler_find_fiber_finished_waiting(s); | |
if (fiber) { | |
/* put old worker context into freelist */ | |
struct scheduler_fiber *old = w->context; | |
memset(w->context, 0, sizeof(*w->context)); | |
scheduler_hook_into_list(&s->free_list, old, &s->free_list_lock); | |
/* set previously waiting fiber as worker context */ | |
w->context = fiber; | |
if (s->profiler.context_switch) | |
s->profiler.context_switch(s->profiler.userdata, w->id); | |
fiber_switch_to(&old->handle, &w->context->handle); | |
} | |
/* check if any new jobs inside work queues */ | |
{struct job *job = 0; | |
if (!(job_queue_pop(&job, &s->queue[JOB_QUEUE_HIGH])) && | |
!(job_queue_pop(&job, &s->queue[JOB_QUEUE_NORMAL])) && | |
!(job_queue_pop(&job, &s->queue[JOB_QUEUE_LOW]))) { | |
/* currently no job so wait */ | |
__sync_sub_and_fetch(&s->worker_active, 1); | |
if (s->profiler.wait_start) | |
s->profiler.wait_start(s->profiler.userdata, w->id); | |
sem_wait(&s->work_sem, 1); | |
if (s->profiler.wait_stop) | |
s->profiler.wait_stop(s->profiler.userdata, w->id); | |
__sync_add_and_fetch(&s->worker_active, 1); | |
} else { | |
/* run dequeued job */ | |
w->context->job = *job; | |
assert(job->callback); | |
if (s->profiler.thread_start) | |
s->profiler.thread_start(s->profiler.userdata, w->id); | |
job->callback(s, job->data); | |
if (s->profiler.thread_stop) | |
s->profiler.thread_stop(s->profiler.userdata, w->id); | |
__sync_sub_and_fetch(job->run_count, 1); | |
}} | |
} | |
} | |
static void* | |
thread_proc(void *arg) | |
{ | |
struct scheduler_worker *w = (struct scheduler_worker*)arg; | |
struct scheduler_fiber *fiber; | |
struct scheduler *s = w->sched; | |
__sync_add_and_fetch(&s->worker_running, 1); | |
/* create dummy fiber */ | |
fiber = scheduler_get_free_fiber(s); | |
assert(fiber); | |
getcontext(&fiber->handle.fib); | |
fiber->handle.fib.uc_link = 0; | |
fiber->handle.fib.uc_stack.ss_size = 0; | |
fiber->handle.fib.uc_stack.ss_sp = 0; | |
w->context = fiber; | |
scheduler_fiber_proc(w); | |
return 0; | |
} | |
static int | |
scheduler_self_id(struct scheduler *s) | |
{ | |
int worker_index = 0; | |
pthread_t self = pthread_self(); | |
for (worker_index; worker_index < s->worker_count; ++worker_index) { | |
if (s->worker[worker_index].thread == self) | |
return worker_index; | |
} | |
return -1; | |
} | |
static struct scheduler_worker* | |
scheduler_self(struct scheduler *s) | |
{ | |
int worker_index = scheduler_self_id(s); | |
if (worker_index < 0) return 0; | |
return &s->worker[worker_index]; | |
} | |
static void | |
scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value) | |
{ | |
struct scheduler_worker *w; | |
struct scheduler_fiber *old; | |
assert(s); | |
assert(counter); | |
/* find threads own worker state */ | |
w = scheduler_self(s); | |
assert(w); | |
/* insert current context into waiting list */ | |
old = w->context; | |
w->context->value = value; | |
w->context->job.run_count = counter; | |
scheduler_hook_into_list(&s->wait_list, old, &s->wait_list_lock); | |
/*either continue finished waiting job or start new one */ | |
w->context = scheduler_find_fiber_finished_waiting(s); | |
if (!w->context) { | |
w->context = scheduler_get_free_fiber(s); | |
assert(w->context); | |
fiber_create(&w->context->handle, w->context->stack, | |
w->context->stack_size, scheduler_fiber_proc, w); | |
} | |
if (s->profiler.context_switch) | |
s->profiler.context_switch(s->profiler.userdata, w->id); | |
fiber_switch_to(&old->handle, &w->context->handle); | |
} | |
static void | |
sched_init(struct scheduler *sched, size_t worker_count) | |
{ | |
size_t thread_index = 0; | |
pthread_attr_t attr; | |
assert(sched); | |
assert(worker_count); | |
memset(sched, 0, sizeof(*sched)); | |
sched->worker_count = (int)worker_count; | |
/* init semeaphores */ | |
sem_init(&sched->work_sem, 0); | |
job_queue_init(&sched->queue[0]); | |
job_queue_init(&sched->queue[1]); | |
job_queue_init(&sched->queue[2]); | |
/* init fibers */ | |
{int fiber_index = 0; | |
for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) { | |
struct scheduler_fiber *fiber = sched->fibers + fiber_index; | |
fiber->stack_size = FIBER_STACK_SIZE; | |
fiber->stack = calloc(fiber->stack_size, 1); | |
}} | |
/* start worker threads */ | |
pthread_attr_init(&attr); | |
for (thread_index; thread_index < worker_count-1; ++thread_index) { | |
cpu_set_t cpus; | |
sched->worker[thread_index].id = (int)thread_index; | |
sched->worker[thread_index].sched = sched; | |
/* bind thread to core */ | |
CPU_ZERO(&cpus); | |
CPU_SET(thread_index, &cpus); | |
pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpus); | |
/* start worker thread */ | |
pthread_create(&sched->worker[thread_index].thread, &attr, | |
thread_proc, &sched->worker[thread_index]); | |
pthread_detach(sched->worker[thread_index].thread); | |
} | |
/* initialize main thread as worker thread */ | |
{cpu_set_t cpus; | |
CPU_ZERO(&cpus); | |
CPU_SET(thread_index, &cpus); | |
sched->worker[thread_index].sched = sched; | |
sched->worker[thread_index].thread = pthread_self(); | |
sched->worker[thread_index].id = (int)thread_index; | |
pthread_setaffinity_np(sched->worker[thread_index].thread, sizeof(cpu_set_t), &cpus);} | |
/* create fiber for main thread worker */ | |
{struct scheduler_fiber *fiber; | |
fiber = scheduler_get_free_fiber(sched); | |
assert(fiber); | |
getcontext(&fiber->handle.fib); | |
fiber->handle.fib.uc_link = 0; | |
fiber->handle.fib.uc_stack.ss_size = 0; | |
fiber->handle.fib.uc_stack.ss_sp = 0; | |
sched->worker[thread_index].context = fiber;} | |
pthread_attr_destroy(&attr); | |
} | |
static void | |
sched_free(struct scheduler *sched) | |
{ | |
/* free fibers stack */ | |
{int fiber_index = 0; | |
for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) { | |
struct scheduler_fiber *fiber = sched->fibers + fiber_index; | |
free(fiber->stack); | |
}} | |
sem_free(&sched->work_sem); | |
} | |
/* -------------------------------------------------------------- | |
* | |
* TEST | |
* | |
* --------------------------------------------------------------*/ | |
struct game_state { | |
int data; | |
struct memory memory; | |
struct allocator game_alloc; | |
struct allocator gpu_alloc; | |
struct allocator render_alloc; | |
}; | |
struct test_data { | |
struct allocator *alloc; | |
int *data; | |
int from; | |
int to; | |
}; | |
JOB_ENTRY_POINT(test_work) | |
{ | |
int i; | |
void *mem; | |
struct test_data *data = arg; | |
mem = alloc(data->alloc, scheduler_self_id(sched), 1024); | |
for (i = data->from; i < data->to; ++i) | |
data->data[i] = i; | |
printf("sleep begin\n"); | |
sleep(1); | |
printf("sleep end\n"); | |
} | |
JOB_ENTRY_POINT(root) | |
{ | |
struct game_state *game = arg; | |
job_counter counter = 0; | |
struct job jobs[8]; | |
struct test_data data[8]; | |
int i, n[2*1024]; | |
printf("root\n"); | |
for (i = 0; i < 8; ++i) { | |
data[i].alloc = (i&1) ? &game->game_alloc: &game->render_alloc; | |
data[i].data = n; | |
data[i].from = i * 256; | |
data[i].to = (i+1)*256; | |
jobs[i] = Job(test_work, &data); | |
} | |
scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 8, &counter); | |
printf("run\n"); | |
scheduler_wait_for(sched, &counter, 0); | |
mem_free(&game->memory, MEMORY_TAG_GAME); | |
mem_free(&game->memory, MEMORY_TAG_RENDER); | |
mem_free(&game->memory, MEMORY_TAG_GPU); | |
printf("done\n"); | |
} | |
int main(int argc, char **argv) | |
{ | |
/* setup app memory and allocator */ | |
struct game_state app; | |
size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN); | |
memset(&app, 0, sizeof(app)); | |
mem_init(&app.memory); | |
alloctor_init(&app.game_alloc, thread_count, &app.memory, MEMORY_TAG_GAME); | |
alloctor_init(&app.render_alloc, thread_count, &app.memory, MEMORY_TAG_RENDER); | |
alloctor_init(&app.gpu_alloc, thread_count, &app.memory, MEMORY_TAG_GPU); | |
/* start root process */ | |
{struct scheduler sched; | |
struct job job = Job(root, &app); | |
job_counter counter; | |
sched_init(&sched, thread_count); | |
printf("init\n"); | |
scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter); | |
printf("run\n"); | |
scheduler_wait_for(&sched, &counter, 0); | |
printf("finished\n");} | |
return 0; | |
} |
At line 653 you are releasing current context (putting it on the free list) - what if another thread picks it up before you switch out of it at line 659? Similarly, at 748 you're putting current context onto the wait list - what if another thread picks it up, assuming the condition has been signaled in the meantime, before you switch out of it at line 760? In both cases you'd be switching to a running fiber.
Perhaps I'm missing something here, would greatly appreciate an explanation.
Hey @bsekura sorry gist does not give notifications. Hope I understand you correctly but all list operations have a spinlock. So when scheduler_find_fiber_finished_waiting
returns it will guarantee that only we have that fiber context. After that we add our old fiber context into the free list.
@jesta88 yes agree. Specifically for linux I wrote a minimal assembly version or libco like you said is a good solution. The reason why I used makecontext/swapcontext for this is that I wanted to be 100% certain that any bug is inside my code and not inside a custom assembly coroutine implementation.
Removing makecontext/swapcontext would give this a big speed boost. The best way to simulate this on UNIX is with assembly. Boost.context and libco have that part already handled for common architectures.