medfall

A super great game engine
Log | Files | Refs

nonblocking_fixed_mpsc_queue.h (1529B)


      1 #pragma once
      2 
      3 #include "intrinsics.h"
      4 
      5 #include "platform_atomic.h"
      6 #include "platform_mutex.h"
      7 #include "platform_thread.h"
      8 
      9 template< typename T, size_t N >
     10 class NonblockingFixedMPSCQueue {
     11 public:
     12 	NonblockingFixedMPSCQueue() {
     13 		store_relaxed( &num_elems, 0 );
     14 		head = 0;
     15 		reader_acquired = false;
     16 		mutex_init( &mutex );
     17 	}
     18 
     19 	~NonblockingFixedMPSCQueue() {
     20 		mutex_destroy( &mutex );
     21 	}
     22 
     23 	bool enqueue( const T & x ) {
     24 		if( load_acquire( &num_elems ) >= N ) return false;
     25 		SCOPED_MUTEX_LOCK( &mutex );
     26 		u32 n = load_relaxed( &num_elems );
     27 		if( n >= N ) return false;
     28 
     29 		elems[ ( head + n ) % N ] = x;
     30 		store_release( &num_elems, n + 1 );
     31 
     32 		return true;
     33 	}
     34 
     35 	T * dequeue_acquire() {
     36 		ASSERT( !reader_acquired );
     37 
     38 		if( load_acquire( &num_elems ) == 0 ) return NULL;
     39 
     40 		reader_acquired = true;
     41 		return elems + head;
     42 	}
     43 
     44 	void dequeue_release() {
     45 		ASSERT( reader_acquired );
     46 
     47 		SCOPED_MUTEX_LOCK( &mutex );
     48 
     49 		head = ( head + 1 ) % N;
     50 		fetch_sub_release( &num_elems, 1 );
     51 		reader_acquired = false;
     52 	}
     53 
     54 	void enqueue_spin( const T & x ) {
     55 		int attempt = 0;
     56 		while( !enqueue( x ) ) {
     57 			const int ATTEMPTS_BEFORE_YIELD = 128;
     58 			if( attempt < ATTEMPTS_BEFORE_YIELD ) {
     59 				attempt++;
     60 			}
     61 			else {
     62 				thread_yield();
     63 			}
     64 		}
     65 	}
     66 
     67 	bool dequeue( T * x ) {
     68 		T * t = dequeue_acquire();
     69 		if( t == NULL ) return false;
     70 		*x = *t;
     71 		dequeue_release();
     72 		return true;
     73 	}
     74 
     75 private:
     76 	T elems[ N ];
     77 
     78 	CACHE_LINE_PADDING;
     79 
     80 	u32 head;
     81 	bool reader_acquired;
     82 
     83 	CACHE_LINE_PADDING;
     84 
     85 	atomic_u32 num_elems;
     86 	Mutex mutex;
     87 };