work_queue.cc (2763B)
1 /* 2 * TODO XXX FIXME: the threading code is probably fucked because i rushed through. 3 */ 4 5 #include "intrinsics.h" 6 #include "array.h" 7 #include "log.h" 8 #include "work_queue.h" 9 #include "platform_semaphore.h" 10 #include "platform_thread.h" 11 12 struct ThreadInfo { 13 u32 thread_id; 14 WorkQueue * queue; 15 }; 16 17 static bool workqueue_step( u32 thread_id, WorkQueue * queue ) { 18 u32 current_head = load_relaxed( &queue->head ); 19 u32 new_head = current_head + 1; 20 21 if( current_head != load_acquire( &queue->tail ) ) { 22 if( compare_exchange_acqrel( &queue->head, ¤t_head, new_head ) ) { 23 const Job & job = queue->jobs[ current_head % ARRAY_COUNT( queue->jobs ) ]; 24 MemoryArena * arena = &queue->arenas[ thread_id ]; 25 26 memarena_clear( arena ); 27 job.callback( job.data, arena ); 28 } 29 30 return true; 31 } 32 33 return false; 34 } 35 36 static THREAD( workqueue_worker ) { 37 ThreadInfo * info = ( ThreadInfo * ) data; 38 39 WorkQueue * queue = info->queue; 40 u32 thread_id = info->thread_id; 41 42 logger_thread_name( "worker {}", thread_id ); 43 44 while( load_acquire( &queue->shutting_down ) == 0 ) { 45 if( !workqueue_step( thread_id, queue ) ) { 46 semaphore_wait( &queue->sem ); 47 } 48 } 49 50 THREAD_END; 51 } 52 53 void workqueue_init( WorkQueue * queue, MemoryArena * arena, u32 num_threads ) { 54 semaphore_init( &queue->sem ); 55 56 store_relaxed( &queue->head, 0 ); 57 store_relaxed( &queue->tail, 0 ); 58 store_relaxed( &queue->shutting_down, 0 ); 59 60 queue->threads = memarena_push_array( arena, Thread, num_threads ); 61 queue->arenas = memarena_push_array( arena, MemoryArena, num_threads + 1 ); 62 63 for( u32 i = 0; i < num_threads + 1; i++ ) { 64 queue->arenas[ i ] = memarena_push_arena( arena, megabytes( 1 ) ); 65 } 66 67 ThreadInfo * infos = memarena_push_many( arena, ThreadInfo, num_threads ); 68 69 for( u32 i = 0; i < num_threads; i++ ) { 70 infos[ i ] = { i, queue }; 71 thread_init( &queue->threads[ i ], workqueue_worker, &infos[ i ] ); 72 } 73 } 74 75 void workqueue_enqueue( WorkQueue * queue, WorkQueueCallback * callback, const void * data ) { 76 #if DISABLE_THREADS 77 callback( data, &queue->arenas[ 0 ] ); 78 #else 79 u32 tail = fetch_add_release( &queue->tail, 1 ); 80 ASSERT( tail - load_acquire( &queue->head ) < ARRAY_COUNT( queue->jobs ) - 1 ); 81 82 Job job = { callback, data }; 83 84 queue->jobs[ tail % ARRAY_COUNT( queue->jobs ) ] = job; 85 86 semaphore_signal( &queue->sem ); 87 #endif 88 } 89 90 void workqueue_exhaust( WorkQueue * queue ) { 91 while( load_relaxed( &queue->head ) != load_acquire( &queue->tail ) ) { 92 workqueue_step( checked_cast< u32 >( queue->threads.n ), queue ); 93 } 94 } 95 96 void workqueue_term( WorkQueue * queue ) { 97 store_release( &queue->shutting_down, 1 ); 98 99 for( size_t i = 0; i < queue->threads.n; i++ ) { 100 semaphore_signal( &queue->sem ); 101 } 102 103 for( Thread & thread : queue->threads ) { 104 thread_join( &thread ); 105 } 106 }