medfall

A super great game engine
Log | Files | Refs

nonblocking_fixed_spsc_queue.h (1807B)


      1 #pragma once
      2 
      3 #include "intrinsics.h"
      4 #include "queue.h"
      5 
      6 #include "platform_atomic.h"
      7 
      8 template< typename T, size_t N >
      9 class NonblockingFixedSPSCQueue : public NonblockingQueue< T > {
     10 public:
     11 	NonblockingFixedSPSCQueue() {
     12 		VAR( reader_acquired ) = false;
     13 		clear();
     14 	}
     15 
     16 	// returns true if x was enqueued, false if the queue was full
     17 	bool enqueue( const T & x ) {
     18 		size_t w = VAR( writer_pos );
     19 		if( load_acquire( &nodes[ w ].last_op ) == WRITE ) return false;
     20 
     21 		nodes[ w ].data = x;
     22 		store_release( &nodes[ w ].last_op, WRITE );
     23 		VAR( writer_pos ) = ( w + 1 ) % N;
     24 
     25 		return true;
     26 	}
     27 
     28 	// returns a pointer to the element to be dequeued, or NULL is the
     29 	// queue is empty
     30 	// call dequeue_release once you're done with the pointer
     31 	T * dequeue_acquire() {
     32 		ASSERT( !VAR( reader_acquired ) );
     33 
     34 		size_t r = VAR( reader_pos );
     35 		if( load_acquire( &nodes[ r ].last_op ) == READ ) return NULL;
     36 
     37 		VAR( reader_acquired ) = true;
     38 		return &nodes[ r ].data;
     39 	}
     40 
     41 	void dequeue_release() {
     42 		ASSERT( VAR( reader_acquired ) );
     43 		VAR( reader_acquired ) = false;
     44 
     45 		size_t r = VAR( reader_pos );
     46 		store_release( &nodes[ r ].last_op, READ );
     47 		VAR( reader_pos ) = ( r + 1 ) % N;
     48 	}
     49 
     50 	void clear() {
     51 		for( size_t i = 0; i < N; i++ ) {
     52 			store_release( &nodes[ i ].last_op, READ );
     53 		}
     54 		VAR( reader_pos ) = VAR( writer_pos ) = 0;
     55 	}
     56 
     57 	NonblockingQueueReader< T > reader() {
     58 		return NonblockingQueueReader< T >( this );
     59 	}
     60 
     61 	NonblockingQueueWriter< T > writer() {
     62 		return NonblockingQueueWriter< T >( this );
     63 	}
     64 
     65 private:
     66 	enum LastOp { READ, WRITE };
     67 	struct Node {
     68 		NONCOPYABLE( Node );
     69 		Node() { }
     70 		T data;
     71 		atomic_s32 last_op;
     72 	};
     73 
     74 	Node nodes[ N ];
     75 	CACHE_LINE_PADDING;
     76 	nonatomic( size_t ) reader_pos;
     77 	nonatomic( bool ) reader_acquired;
     78 	CACHE_LINE_PADDING;
     79 	nonatomic( size_t ) writer_pos;
     80 };