medfall

A super great game engine
Log | Files | Refs

work_queue.cc (2763B)


      1 /*
      2  * TODO XXX FIXME: the threading code is probably fucked because i rushed through.
      3  */
      4 
      5 #include "intrinsics.h"
      6 #include "array.h"
      7 #include "log.h"
      8 #include "work_queue.h"
      9 #include "platform_semaphore.h"
     10 #include "platform_thread.h"
     11 
     12 struct ThreadInfo {
     13 	u32 thread_id;
     14 	WorkQueue * queue;
     15 };
     16 
     17 static bool workqueue_step( u32 thread_id, WorkQueue * queue ) {
     18 	u32 current_head = load_relaxed( &queue->head );
     19 	u32 new_head = current_head + 1;
     20 
     21 	if( current_head != load_acquire( &queue->tail ) ) {
     22 		if( compare_exchange_acqrel( &queue->head, &current_head, new_head ) ) {
     23 			const Job & job = queue->jobs[ current_head % ARRAY_COUNT( queue->jobs ) ];
     24 			MemoryArena * arena = &queue->arenas[ thread_id ];
     25 
     26 			memarena_clear( arena );
     27 			job.callback( job.data, arena );
     28 		}
     29 
     30 		return true;
     31 	}
     32 
     33 	return false;
     34 }
     35 
     36 static THREAD( workqueue_worker ) {
     37 	ThreadInfo * info = ( ThreadInfo * ) data;
     38 
     39 	WorkQueue * queue = info->queue;
     40 	u32 thread_id = info->thread_id;
     41 
     42 	logger_thread_name( "worker {}", thread_id );
     43 
     44 	while( load_acquire( &queue->shutting_down ) == 0 ) {
     45 		if( !workqueue_step( thread_id, queue ) ) {
     46 			semaphore_wait( &queue->sem );
     47 		}
     48 	}
     49 
     50 	THREAD_END;
     51 }
     52 
     53 void workqueue_init( WorkQueue * queue, MemoryArena * arena, u32 num_threads ) {
     54 	semaphore_init( &queue->sem );
     55 
     56 	store_relaxed( &queue->head, 0 );
     57 	store_relaxed( &queue->tail, 0 );
     58 	store_relaxed( &queue->shutting_down, 0 );
     59 
     60 	queue->threads = memarena_push_array( arena, Thread, num_threads );
     61 	queue->arenas = memarena_push_array( arena, MemoryArena, num_threads + 1 );
     62 
     63 	for( u32 i = 0; i < num_threads + 1; i++ ) {
     64 		queue->arenas[ i ] = memarena_push_arena( arena, megabytes( 1 ) );
     65 	}
     66 
     67 	ThreadInfo * infos = memarena_push_many( arena, ThreadInfo, num_threads );
     68 
     69 	for( u32 i = 0; i < num_threads; i++ ) {
     70 		infos[ i ] = { i, queue };
     71 		thread_init( &queue->threads[ i ], workqueue_worker, &infos[ i ] );
     72 	}
     73 }
     74 
     75 void workqueue_enqueue( WorkQueue * queue, WorkQueueCallback * callback, const void * data ) {
     76 #if DISABLE_THREADS
     77 	callback( data, &queue->arenas[ 0 ] );
     78 #else
     79 	u32 tail = fetch_add_release( &queue->tail, 1 );
     80 	ASSERT( tail - load_acquire( &queue->head ) < ARRAY_COUNT( queue->jobs ) - 1 );
     81 
     82 	Job job = { callback, data };
     83 
     84 	queue->jobs[ tail % ARRAY_COUNT( queue->jobs ) ] = job;
     85 
     86 	semaphore_signal( &queue->sem );
     87 #endif
     88 }
     89 
     90 void workqueue_exhaust( WorkQueue * queue ) {
     91 	while( load_relaxed( &queue->head ) != load_acquire( &queue->tail ) ) {
     92 		workqueue_step( checked_cast< u32 >( queue->threads.n ), queue );
     93 	}
     94 }
     95 
     96 void workqueue_term( WorkQueue * queue ) {
     97 	store_release( &queue->shutting_down, 1 );
     98 
     99 	for( size_t i = 0; i < queue->threads.n; i++ ) {
    100 		semaphore_signal( &queue->sem );
    101 	}
    102 
    103 	for( Thread & thread : queue->threads ) {
    104 		thread_join( &thread );
    105 	}
    106 }