commit b3aa6be5d4b600e4c7d35cdf230bdb1f0dfdb254 parent 754961cb7e2707b3156b2f37789c5ae626fe99f5 Author: Michael Savage <mikejsavage@gmail.com> Date: Sat Aug 20 20:48:13 +0100 Add nonblocking_fixed_spsc_queue.h Diffstat:
nonblocking_fixed_spsc_queue.h | | | 84 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
diff --git a/nonblocking_fixed_spsc_queue.h b/nonblocking_fixed_spsc_queue.h @@ -0,0 +1,84 @@ +#ifndef _NONBLOCKING_FIXED_SPSC_QUEUE_H_ +#define _NONBLOCKING_FIXED_SPSC_QUEUE_H_ + +#include "intrinsics.h" +#include "queue.h" + +#include "platform_atomic.h" + +template< typename T, u64 N > +class NonblockingFixedSPSCQueue : public NonblockingQueue< T > { + // N really only needs to divide U64_MAX but requiring a power of 2 is + // not a problem + static_assert( is_power_of_2( N ) ); + +public: + NonblockingFixedSPSCQueue() { + clear(); + } + + // returns true if x was enqueued, false if the queue was full + bool enqueue( const T & x ) { + u64 w = VAR( writer_pos ) % N; + if( load_acquire( &nodes[ w ].last_op ) == WRITE ) return false; + + nodes[ w ].data = x; + store_release( &nodes[ w ].last_op, WRITE ); + VAR( writer_pos )++; + + return true; + } + + // returns a pointer to the element to be dequeued, or NULL is the + // queue is empty + // call dequeue_release once you're done with the pointer + T * dequeue_acquire() { + // assert( !reader_acquired ); + + u64 r = VAR( reader_pos ) % N; + if( load_acquire( &nodes[ r ].last_op ) == READ ) return NULL; + + VAR( reader_acquired ) = true; + return &nodes[ r ].data; + } + + void dequeue_release() { + // assert( reader_acquired ); + VAR( reader_acquired ) = false; + + u64 r = VAR( reader_pos ) % N; + store_release( &nodes[ r ].last_op, READ ); + VAR( reader_pos )++; + } + + void clear() { + for( u64 i = 0; i < N; i++ ) { + store_release( &nodes[ i ].last_op, READ ); + } + VAR( reader_pos ) = VAR( writer_pos ) = 0; + } + + NonblockingQueueReader< T > reader() { + return NonblockingQueueReader< T >( this ); + } + + NonblockingQueueWriter< T > writer() { + return NonblockingQueueWriter< T >( this ); + } + +private: + enum LastOp { READ, WRITE }; + struct Node { + T data; + atomic_s32 last_op; + }; + + Node nodes[ N ]; + CACHE_LINE_PADDING; + nonatomic( u64 ) reader_pos; + nonatomic( bool ) reader_acquired; + CACHE_LINE_PADDING; + nonatomic( u64 ) writer_pos; +}; + +#endif // _NONBLOCKING_FIXED_SPSC_QUEUE_H_