nonblocking_fixed_spsc_queue.h (1807B)
1 #pragma once 2 3 #include "intrinsics.h" 4 #include "queue.h" 5 6 #include "platform_atomic.h" 7 8 template< typename T, size_t N > 9 class NonblockingFixedSPSCQueue : public NonblockingQueue< T > { 10 public: 11 NonblockingFixedSPSCQueue() { 12 VAR( reader_acquired ) = false; 13 clear(); 14 } 15 16 // returns true if x was enqueued, false if the queue was full 17 bool enqueue( const T & x ) { 18 size_t w = VAR( writer_pos ); 19 if( load_acquire( &nodes[ w ].last_op ) == WRITE ) return false; 20 21 nodes[ w ].data = x; 22 store_release( &nodes[ w ].last_op, WRITE ); 23 VAR( writer_pos ) = ( w + 1 ) % N; 24 25 return true; 26 } 27 28 // returns a pointer to the element to be dequeued, or NULL is the 29 // queue is empty 30 // call dequeue_release once you're done with the pointer 31 T * dequeue_acquire() { 32 ASSERT( !VAR( reader_acquired ) ); 33 34 size_t r = VAR( reader_pos ); 35 if( load_acquire( &nodes[ r ].last_op ) == READ ) return NULL; 36 37 VAR( reader_acquired ) = true; 38 return &nodes[ r ].data; 39 } 40 41 void dequeue_release() { 42 ASSERT( VAR( reader_acquired ) ); 43 VAR( reader_acquired ) = false; 44 45 size_t r = VAR( reader_pos ); 46 store_release( &nodes[ r ].last_op, READ ); 47 VAR( reader_pos ) = ( r + 1 ) % N; 48 } 49 50 void clear() { 51 for( size_t i = 0; i < N; i++ ) { 52 store_release( &nodes[ i ].last_op, READ ); 53 } 54 VAR( reader_pos ) = VAR( writer_pos ) = 0; 55 } 56 57 NonblockingQueueReader< T > reader() { 58 return NonblockingQueueReader< T >( this ); 59 } 60 61 NonblockingQueueWriter< T > writer() { 62 return NonblockingQueueWriter< T >( this ); 63 } 64 65 private: 66 enum LastOp { READ, WRITE }; 67 struct Node { 68 NONCOPYABLE( Node ); 69 Node() { } 70 T data; 71 atomic_s32 last_op; 72 }; 73 74 Node nodes[ N ]; 75 CACHE_LINE_PADDING; 76 nonatomic( size_t ) reader_pos; 77 nonatomic( bool ) reader_acquired; 78 CACHE_LINE_PADDING; 79 nonatomic( size_t ) writer_pos; 80 };