medfall

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs

commit 2ff05e63d22e1ea1e6c9fed6e94c24514d0f7040
parent 1b66c3eec66107d639e015e3de45ceab38c2a03d
Author: Michael Savage <mikejsavage@gmail.com>
Date:   Tue Dec 13 23:12:47 +0200

Add nonblocking_fixed_mpsc_queue.h

Diffstat:
nonblocking_fixed_mpsc_queue.h | 86+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 86 insertions(+), 0 deletions(-)
diff --git a/nonblocking_fixed_mpsc_queue.h b/nonblocking_fixed_mpsc_queue.h @@ -0,0 +1,86 @@ +#pragma once + +#include "intrinsics.h" + +#include "platform_atomic.h" +#include "platform_mutex.h" +#include "platform_thread.h" + +template< typename T, size_t N > +class NonblockingFixedMPSCQueue { +public: + NonblockingFixedMPSCQueue() { + head = 0; + num_elems = 0; + mutex_init( &mutex ); + } + + ~NonblockingFixedMPSCQueue() { + mutex_destroy( &mutex ); + } + + bool enqueue( const T & x ) { + if( load_acquire( &num_elems ) >= N ) return false; + SCOPED_MUTEX_LOCK( &mutex ); + if( load_relaxed( &num_elems ) >= N ) return false; + + u32 n = load_relaxed( &num_elems ); + elems[ ( head + n ) % N ] = x; + store_release( &num_elems, n + 1 ); + + return true; + } + + T * dequeue_acquire() { + ASSERT( !reader_acquired ); + + if( load_acquire( &num_elems ) == 0 ) return NULL; + + reader_acquired = true; + return elems + head; + } + + void dequeue_release() { + ASSERT( reader_acquired ); + + SCOPED_MUTEX_LOCK( &mutex ); + + head = ( head + 1 ) % N; + fetch_sub_release( &num_elems, 1 ); + reader_acquired = false; + } + + void enqueue_spin( const T & x ) { + int attempt = 0; + while( !enqueue( x ) ) { + const int ATTEMPTS_BEFORE_YIELD = 128; + if( attempt < ATTEMPTS_BEFORE_YIELD ) { + attempt++; + } + else { + thread_yield(); + } + } + } + + bool dequeue( T * x ) { + T * t = dequeue_acquire(); + if( t == NULL ) return false; + *x = *t; + dequeue_release(); + return true; + } + +private: + T elems[ N ]; + + CACHE_LINE_PADDING; + + u32 head; + bool reader_acquired; + + CACHE_LINE_PADDING; + + atomic_u32 num_elems; + Mutex mutex; +};