mpsc.h (1607B)
1 #pragma once 2 3 #include "intrinsics.h" 4 5 #include "platform_alignment.h" 6 #include "platform_atomic.h" 7 #include "platform_thread.h" 8 9 template< typename T, u32 N > 10 class ALIGNTO_CACHE FixedMPSC { 11 public: 12 NONCOPYABLE( FixedMPSC ); 13 14 FixedMPSC() { 15 for( u32 i = 0; i < N; i++ ) { 16 store_relaxed( &nodes[ i ].seq, i | EMPTY_BIT ); 17 } 18 reader_pos( $ ) = 0; 19 store_relaxed( &writer_pos, 0 ); 20 } 21 22 bool enqueue( const T & x ) { 23 u32 wr = load_acquire( &writer_pos ); 24 Node * node; 25 while( true ) { 26 node = &nodes[ wr % N ]; 27 u32 seq = load_acquire( &node->seq ); 28 if( seq != ( wr | EMPTY_BIT ) ) 29 return false; 30 31 if( compare_exchange_acqrel( &writer_pos, &wr, ( wr + 1 ) & COUNTER_MASK ) ) 32 break; 33 } 34 35 node->data( $ ) = x; 36 store_release( &node->seq, wr ); 37 38 return true; 39 } 40 41 void enqueue_spin( const T & x ) { 42 const int ATTEMPTS_BEFORE_YIELD = 128; 43 44 int attempt = 0; 45 while( !enqueue( x ) ) { 46 if( attempt < ATTEMPTS_BEFORE_YIELD ) { 47 attempt++; 48 } 49 else { 50 thread_yield(); 51 } 52 } 53 } 54 55 bool dequeue( T * x ) { 56 u32 r = reader_pos( $ ); 57 Node * node = &nodes[ r % N ]; 58 59 if( load_acquire( &node->seq ) != r ) 60 return false; 61 62 *x = node->data( $ ); 63 reader_pos( $ ) = ( r + 1 ) & COUNTER_MASK; 64 store_release( &node->seq, ( ( r + N ) & COUNTER_MASK ) | EMPTY_BIT ); 65 66 return true; 67 } 68 69 private: 70 enum : u32 { EMPTY_BIT = U32( 1 ) << 31 }; 71 enum : u32 { COUNTER_MASK = ~EMPTY_BIT }; 72 73 struct ALIGNTO_CACHE Node { 74 NONATOMIC( T ) data; 75 atomic_u32 seq; 76 }; 77 78 Node nodes[ N ]; 79 atomic_u32 ALIGNTO_CACHE writer_pos; 80 NONATOMIC( u32 ) ALIGNTO_CACHE reader_pos; 81 };