medfall

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs

commit 5954660545f64bbc042f2842e5635e2e70e050b3
parent c7193925308d6cc3b9eff2e6d672d0f6f29812f4
Author: Michael Savage <mikejsavage@gmail.com>
Date:   Tue Feb  7 20:15:53 +0200

Add workqueue_term

Diffstat:
work_queue.cc | 27++++++++++++++++++++-------
work_queue.h | 10+++++++---
2 files changed, 27 insertions(+), 10 deletions(-)
diff --git a/work_queue.cc b/work_queue.cc @@ -3,6 +3,7 @@ */ #include "intrinsics.h" +#include "array.h" #include "log.h" #include "work_queue.h" #include "platform_atomic.h" @@ -44,7 +45,7 @@ static THREAD( workqueue_worker ) { logger_thread_name( "worker %u", thread_id ); - for( ;; ) { + while( load_acquire( &queue->shutting_down ) == 0 ) { if( !workqueue_step( thread_id, queue ) ) { semaphore_wait( &queue->sem ); } @@ -58,8 +59,10 @@ void workqueue_init( WorkQueue * queue, MemoryArena * arena, u32 num_threads ) { store_relaxed( &queue->head, 0 ); store_relaxed( &queue->tail, 0 ); - queue->num_threads = num_threads; - queue->arenas = memarena_push_many( arena, MemoryArena, num_threads + 1 ); + store_relaxed( &queue->shutting_down, 0 ); + + queue->threads = memarena_push_array( arena, Thread, num_threads ); + queue->arenas = memarena_push_array( arena, MemoryArena, num_threads + 1 ); for( u32 i = 0; i < num_threads + 1; i++ ) { queue->arenas[ i ] = memarena_push_arena( arena, megabytes( 16 ) ); @@ -73,9 +76,7 @@ void workqueue_init( WorkQueue * queue, MemoryArena * arena, u32 num_threads ) { for( u32 i = 0; i < num_threads; i++ ) { infos[ i ] = { i, queue, &started_threads }; - - Thread thread; - thread_init( &thread, workqueue_worker, &infos[ i ] ); + thread_init( &queue->threads[ i ], workqueue_worker, &infos[ i ] ); } // wait until all threads have a local copy of ThreadInfo @@ -102,6 +103,18 @@ void workqueue_enqueue( WorkQueue * queue, WorkQueueCallback * callback, const v void workqueue_exhaust( WorkQueue * queue ) { while( load_relaxed( &queue->head ) != load_acquire( &queue->tail ) ) { - workqueue_step( queue->num_threads, queue ); + workqueue_step( queue->threads.n, queue ); + } +} + +void workqueue_term( WorkQueue * queue ) { + store_release( &queue->shutting_down, 1 ); + + for( size_t i = 0; i < queue->threads.n; i++ ) { + semaphore_signal( &queue->sem ); + } + + for( Thread & thread : queue->threads ) { + thread_join( &thread ); } } diff --git a/work_queue.h b/work_queue.h @@ -1,9 +1,11 @@ #pragma once #include "intrinsics.h" +#include "array.h" +#include "memory_arena.h" #include "platform_atomic.h" #include "platform_semaphore.h" -#include "memory_arena.h" +#include "platform_thread.h" #define WORK_QUEUE_CALLBACK( name ) void name( const void * data, MemoryArena * arena ) typedef WORK_QUEUE_CALLBACK( WorkQueueCallback ); @@ -19,11 +21,13 @@ struct WorkQueue { Semaphore sem; atomic_u32 head, tail; + atomic_u32 shutting_down; - u32 num_threads; - MemoryArena * arenas; + array< Thread > threads; + array< MemoryArena > arenas; }; void workqueue_init( WorkQueue * queue, MemoryArena * arena, u32 num_threads ); void workqueue_enqueue( WorkQueue * queue, WorkQueueCallback * callback, const void * data = NULL ); void workqueue_exhaust( WorkQueue * queue ); +void workqueue_term( WorkQueue * queue );