commit 4ae2bf48c8baa4af5d4d345e97bff7a3602de861
parent c6ae1e4597f7c43a62a376a76ec03908f2145271
Author: Michael Savage <mikejsavage@gmail.com>
Date: Sat, 4 Nov 2017 16:29:17 +0200
Add Relacy test for FixedSPSC, more atomics cleanup
Diffstat:
5 files changed, 67 insertions(+), 113 deletions(-)
diff --git a/mpsc.h b/mpsc.h
@@ -79,10 +79,8 @@ public:
private:
ALIGNTO_CACHE T elems[ N ];
- struct ALIGNTO_CACHE {
- u32 head;
- bool reader_acquired;
- };
+ u32 ALIGNTO_CACHE head;
+ bool reader_acquired;
atomic_u32 ALIGNTO_CACHE num_elems;
Mutex mutex;
diff --git a/platform_atomic.h b/platform_atomic.h
@@ -6,6 +6,7 @@
#if PLATFORM_RELACY
#define NONATOMIC( T ) rl::var< T >
+#define ATOMIC_ASSERT( p ) RL_ASSERT( p )
#else
diff --git a/queue.h b/queue.h
@@ -1,75 +0,0 @@
-#pragma once
-
-#include "platform_thread.h"
-
-template< typename T >
-struct BlockingQueue {
- virtual void enqueue( const T & x ) = 0;
- virtual T * dequeue_acquire() = 0;
- virtual void dequeue_release() = 0;
-
- T dequeue() {
- T t = *dequeue_acquire();
- dequeue_release();
- return t;
- }
-};
-
-const int ATTEMPTS_BEFORE_YIELD = 128;
-
-template< typename T >
-struct NonblockingQueue {
- virtual bool enqueue( const T & x ) = 0;
- virtual T * dequeue_acquire() = 0;
- virtual void dequeue_release() = 0;
-
- void enqueue_spin( const T & x ) {
- int attempt = 0;
- while( !enqueue( x ) ) {
- if( attempt < ATTEMPTS_BEFORE_YIELD ) {
- attempt++;
- }
- else {
- thread_yield();
- }
- }
- }
-
- bool dequeue( T * x ) {
- T * t = dequeue_acquire();
- if( t == NULL ) return false;
- *x = *t;
- dequeue_release();
- return true;
- }
-};
-
-template< typename T >
-class NonblockingQueueReader {
-public:
- NonblockingQueueReader( NonblockingQueue< T > * Q ) {
- q = Q;
- }
-
- bool dequeue( T * x ) {
- return q->dequeue( x );
- }
-
-private:
- NonblockingQueue< T > * q;
-};
-
-template< typename T >
-struct NonblockingQueueWriter {
-public:
- NonblockingQueueWriter( NonblockingQueue< T > * Q ) {
- q = Q;
- }
-
- bool enqueue( const T & x ) {
- return q->dequeue( x );
- }
-
-private:
- NonblockingQueue< T > * q;
-};
diff --git a/spsc.h b/spsc.h
@@ -1,13 +1,13 @@
#pragma once
#include "intrinsics.h"
-#include "queue.h"
#include "platform_alignment.h"
#include "platform_atomic.h"
+#include "platform_thread.h"
template< typename T, size_t N >
-class FixedSPSC : public NonblockingQueue< T > {
+class FixedSPSC {
public:
NONCOPYABLE( FixedSPSC );
@@ -15,7 +15,6 @@ public:
for( size_t i = 0; i < N; i++ ) {
store_release( &nodes[ i ].last_op, READ );
}
- reader_acquired( $ ) = false;
reader_pos( $ ) = 0;
writer_pos( $ ) = 0;
}
@@ -23,58 +22,52 @@ public:
// returns true if x was enqueued, false if the queue was full
bool enqueue( const T & x ) {
size_t w = writer_pos( $ );
- if( load_acquire( &nodes[ w ].last_op ) == WRITE ) return false;
+ if( load_acquire( &nodes[ w ].last_op ) == WRITE )
+ return false;
- nodes[ w ].data = x;
+ nodes[ w ].data( $ ) = x;
store_release( &nodes[ w ].last_op, WRITE );
+
writer_pos( $ ) = ( w + 1 ) % N;
return true;
}
- // returns a pointer to the element to be dequeued, or NULL is the
- // queue is empty
- // call dequeue_release once you're done with the pointer
- T * dequeue_acquire() {
- ASSERT( !reader_acquired( $ ) );
-
- size_t r = reader_pos( $ );
- if( load_acquire( &nodes[ r ].last_op ) == READ ) return NULL;
-
- reader_acquired( $ ) = true;
- return &nodes[ r ].data;
+ void enqueue_spin( const T & x ) {
+ const int ATTEMPTS_BEFORE_YIELD = 128;
+
+ int attempt = 0;
+ while( !enqueue( x ) ) {
+ if( attempt < ATTEMPTS_BEFORE_YIELD ) {
+ attempt++;
+ }
+ else {
+ thread_yield();
+ }
+ }
}
- void dequeue_release() {
- ASSERT( reader_acquired( $ ) );
- reader_acquired( $ ) = false;
-
+ bool dequeue( T * x ) {
size_t r = reader_pos( $ );
+ if( load_acquire( &nodes[ r ].last_op ) == READ )
+ return false;
+
+ *x = nodes[ r ].data( $ );
store_release( &nodes[ r ].last_op, READ );
- reader_pos( $ ) = ( r + 1 ) % N;
- }
- NonblockingQueueReader< T > reader() {
- return NonblockingQueueReader< T >( this );
- }
+ reader_pos( $ ) = ( r + 1 ) % N;
- NonblockingQueueWriter< T > writer() {
- return NonblockingQueueWriter< T >( this );
+ return true;
}
private:
enum LastOp { READ, WRITE };
struct ALIGNTO_CACHE Node {
- T data;
+ NONATOMIC( T ) data;
atomic_s32 last_op;
};
Node nodes[ N ];
-
- struct ALIGNTO_CACHE {
- NONATOMIC( size_t ) reader_pos;
- NONATOMIC( bool ) reader_acquired;
- };
-
+ NONATOMIC( size_t ) ALIGNTO_CACHE reader_pos;
NONATOMIC( size_t ) ALIGNTO_CACHE writer_pos;
};
diff --git a/tests/spsc_relacy.cc b/tests/spsc_relacy.cc
@@ -0,0 +1,37 @@
+#include "libs/relacy/relacy_std.hpp"
+
+#define constexpr inline // TODO: big hack
+#include "spsc.h"
+
+struct FixedSPSCTest : rl::test_suite< FixedSPSCTest, 2 > {
+ FixedSPSC< int, 4 > spsc;
+
+ void thread( unsigned int thread_id ) {
+ if( thread_id == 0 ) {
+ int i = 0;
+ while( i < 10 ) {
+ if( spsc.enqueue( i ) )
+ i++;
+ }
+ }
+
+ if( thread_id == 1 ) {
+ int i = 0;
+ while( i < 10 ) {
+ int x;
+ if( spsc.dequeue( &x ) ) {
+ ATOMIC_ASSERT( x == i );
+ i++;
+ }
+ }
+ }
+ }
+};
+
+int main() {
+ rl::test_params p;
+ p.iteration_count = 200000;
+ rl::simulate< FixedSPSCTest >( p );
+ return 0;
+}
+