nonblocking_fixed_mpsc_queue.h (1529B)
1 #pragma once 2 3 #include "intrinsics.h" 4 5 #include "platform_atomic.h" 6 #include "platform_mutex.h" 7 #include "platform_thread.h" 8 9 template< typename T, size_t N > 10 class NonblockingFixedMPSCQueue { 11 public: 12 NonblockingFixedMPSCQueue() { 13 store_relaxed( &num_elems, 0 ); 14 head = 0; 15 reader_acquired = false; 16 mutex_init( &mutex ); 17 } 18 19 ~NonblockingFixedMPSCQueue() { 20 mutex_destroy( &mutex ); 21 } 22 23 bool enqueue( const T & x ) { 24 if( load_acquire( &num_elems ) >= N ) return false; 25 SCOPED_MUTEX_LOCK( &mutex ); 26 u32 n = load_relaxed( &num_elems ); 27 if( n >= N ) return false; 28 29 elems[ ( head + n ) % N ] = x; 30 store_release( &num_elems, n + 1 ); 31 32 return true; 33 } 34 35 T * dequeue_acquire() { 36 ASSERT( !reader_acquired ); 37 38 if( load_acquire( &num_elems ) == 0 ) return NULL; 39 40 reader_acquired = true; 41 return elems + head; 42 } 43 44 void dequeue_release() { 45 ASSERT( reader_acquired ); 46 47 SCOPED_MUTEX_LOCK( &mutex ); 48 49 head = ( head + 1 ) % N; 50 fetch_sub_release( &num_elems, 1 ); 51 reader_acquired = false; 52 } 53 54 void enqueue_spin( const T & x ) { 55 int attempt = 0; 56 while( !enqueue( x ) ) { 57 const int ATTEMPTS_BEFORE_YIELD = 128; 58 if( attempt < ATTEMPTS_BEFORE_YIELD ) { 59 attempt++; 60 } 61 else { 62 thread_yield(); 63 } 64 } 65 } 66 67 bool dequeue( T * x ) { 68 T * t = dequeue_acquire(); 69 if( t == NULL ) return false; 70 *x = *t; 71 dequeue_release(); 72 return true; 73 } 74 75 private: 76 T elems[ N ]; 77 78 CACHE_LINE_PADDING; 79 80 u32 head; 81 bool reader_acquired; 82 83 CACHE_LINE_PADDING; 84 85 atomic_u32 num_elems; 86 Mutex mutex; 87 };