spsc.h (1429B)
1 #pragma once 2 3 #include "intrinsics.h" 4 5 #include "platform_alignment.h" 6 #include "platform_atomic.h" 7 #include "platform_thread.h" 8 9 template< typename T, size_t N > 10 class ALIGNTO_CACHE FixedSPSC { 11 public: 12 NONCOPYABLE( FixedSPSC ); 13 14 FixedSPSC() { 15 for( size_t i = 0; i < N; i++ ) { 16 store_release( &nodes[ i ].last_op, READ ); 17 } 18 reader_pos( $ ) = 0; 19 writer_pos( $ ) = 0; 20 } 21 22 // returns true if x was enqueued, false if the queue was full 23 bool enqueue( const T & x ) { 24 size_t w = writer_pos( $ ); 25 if( load_acquire( &nodes[ w ].last_op ) == WRITE ) 26 return false; 27 28 nodes[ w ].data( $ ) = x; 29 store_release( &nodes[ w ].last_op, WRITE ); 30 31 writer_pos( $ ) = ( w + 1 ) % N; 32 33 return true; 34 } 35 36 void enqueue_spin( const T & x ) { 37 const int ATTEMPTS_BEFORE_YIELD = 128; 38 39 int attempt = 0; 40 while( !enqueue( x ) ) { 41 if( attempt < ATTEMPTS_BEFORE_YIELD ) { 42 attempt++; 43 } 44 else { 45 thread_yield(); 46 } 47 } 48 } 49 50 bool dequeue( T * x ) { 51 size_t r = reader_pos( $ ); 52 if( load_acquire( &nodes[ r ].last_op ) == READ ) 53 return false; 54 55 *x = nodes[ r ].data( $ ); 56 store_release( &nodes[ r ].last_op, READ ); 57 58 reader_pos( $ ) = ( r + 1 ) % N; 59 60 return true; 61 } 62 63 private: 64 enum LastOp { READ, WRITE }; 65 struct ALIGNTO_CACHE Node { 66 NONATOMIC( T ) data; 67 atomic_s32 last_op; 68 }; 69 70 Node nodes[ N ]; 71 NONATOMIC( size_t ) ALIGNTO_CACHE reader_pos; 72 NONATOMIC( size_t ) ALIGNTO_CACHE writer_pos; 73 };