commit f6ec6ba93dbfd60b31fb70a40e9bd3c5961fb8e6
parent a18fb1bcb20cda8154360738d8eb0392fbfde1a8
Author: Michael Savage <mikejsavage@gmail.com>
Date: Sun, 5 Nov 2017 22:18:24 +0200
Make FixedMPSC lock-free and add a Relacy test
Diffstat:
mpsc.h | | | 86 | +++++++++++++++++++++++++++++++++++++------------------------------------------ |
tests/mpsc_relacy.cc | | | 61 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
2 files changed, 101 insertions(+), 46 deletions(-)
diff --git a/mpsc.h b/mpsc.h
@@ -4,61 +4,45 @@
#include "platform_alignment.h"
#include "platform_atomic.h"
-#include "platform_mutex.h"
#include "platform_thread.h"
-template< typename T, size_t N >
+template< typename T, u32 N >
class FixedMPSC {
public:
- // TODO: this is bad
NONCOPYABLE( FixedMPSC );
FixedMPSC() {
- store_relaxed( &num_elems, 0 );
- head = 0;
- reader_acquired = false;
- mutex_init( &mutex );
- }
-
- ~FixedMPSC() {
- mutex_destroy( &mutex );
+ for( u32 i = 0; i < N; i++ ) {
+ store_relaxed( &nodes[ i ].seq, i | EMPTY_BIT );
+ }
+ reader_pos( $ ) = 0;
+ store_relaxed( &writer_pos, 0 );
}
bool enqueue( const T & x ) {
- if( load_acquire( &num_elems ) >= N ) return false;
- SCOPED_MUTEX_LOCK( &mutex );
- u32 n = load_relaxed( &num_elems );
- if( n >= N ) return false;
+ u32 wr = load_acquire( &writer_pos );
+ Node * node;
+ while( true ) {
+ node = &nodes[ wr % N ];
+ u32 seq = load_acquire( &node->seq );
+ if( seq != ( wr | EMPTY_BIT ) )
+ return false;
+
+ if( compare_exchange_acqrel( &writer_pos, &wr, ( wr + 1 ) & COUNTER_MASK ) )
+ break;
+ }
- elems[ ( head + n ) % N ] = x;
- store_release( &num_elems, n + 1 );
+ node->data( $ ) = x;
+ store_release( &node->seq, wr );
return true;
}
- T * dequeue_acquire() {
- ASSERT( !reader_acquired );
-
- if( load_acquire( &num_elems ) == 0 ) return NULL;
-
- reader_acquired = true;
- return elems + head;
- }
-
- void dequeue_release() {
- ASSERT( reader_acquired );
-
- SCOPED_MUTEX_LOCK( &mutex );
-
- head = ( head + 1 ) % N;
- fetch_sub_release( &num_elems, 1 );
- reader_acquired = false;
- }
-
void enqueue_spin( const T & x ) {
+ const int ATTEMPTS_BEFORE_YIELD = 128;
+
int attempt = 0;
while( !enqueue( x ) ) {
- const int ATTEMPTS_BEFORE_YIELD = 128;
if( attempt < ATTEMPTS_BEFORE_YIELD ) {
attempt++;
}
@@ -69,19 +53,29 @@ public:
}
bool dequeue( T * x ) {
- T * t = dequeue_acquire();
- if( t == NULL ) return false;
- *x = *t;
- dequeue_release();
+ u32 r = reader_pos( $ );
+ Node * node = &nodes[ r % N ];
+
+ if( load_acquire( &node->seq ) != r )
+ return false;
+
+ *x = node->data( $ );
+ reader_pos( $ ) = r + 1;
+ store_release( &node->seq, ( ( r + N ) & COUNTER_MASK ) | EMPTY_BIT );
+
return true;
}
private:
- ALIGNTO_CACHE T elems[ N ];
+ enum { EMPTY_BIT = 1 << 31 };
+ enum { COUNTER_MASK = ~EMPTY_BIT };
- u32 ALIGNTO_CACHE head;
- bool reader_acquired;
+ struct ALIGNTO_CACHE Node {
+ NONATOMIC( T ) data;
+ atomic_u32 seq;
+ };
- atomic_u32 ALIGNTO_CACHE num_elems;
- Mutex mutex;
+ Node nodes[ N ];
+ atomic_u32 ALIGNTO_CACHE writer_pos;
+ NONATOMIC( u32 ) ALIGNTO_CACHE reader_pos;
};
diff --git a/tests/mpsc_relacy.cc b/tests/mpsc_relacy.cc
@@ -0,0 +1,61 @@
+#include "libs/relacy/relacy_std.hpp"
+
+#define constexpr inline // TODO: big hack
+#include "mpsc.h"
+
+#define NUM_PRODUCER_THREADS 4
+#define NUM_PUSHES 10
+
+struct FixedMPSCTest : rl::test_suite< FixedMPSCTest, NUM_PRODUCER_THREADS + 1 > {
+ FixedMPSC< int, 4 > mpsc;
+
+ void thread( unsigned int thread_id ) {
+ if( thread_id == 0 ) {
+ int threads[ NUM_PRODUCER_THREADS ];
+ for( int i = 0; i < NUM_PRODUCER_THREADS; i++ )
+ threads[ i ] = 0;
+
+ while( true ) {
+ bool done = true;
+ for( int i = 0; i < NUM_PRODUCER_THREADS; i++ ) {
+ if( threads[ i ] < NUM_PUSHES ) {
+ done = false;
+ }
+ }
+
+ if( done )
+ break;
+
+ bool ok = false;
+ int x;
+ if( mpsc.dequeue( &x ) ) {
+ for( int i = 0; i < NUM_PRODUCER_THREADS; i++ ) {
+ if( threads[ i ] == x ) {
+ threads[ i ]++;
+ ok = true;
+ break;
+ }
+ }
+
+ ATOMIC_ASSERT( ok );
+ }
+ }
+ }
+ else {
+ int x = 0;
+ while( x < NUM_PUSHES ) {
+ if( mpsc.enqueue( x ) ) {
+ x++;
+ }
+ }
+ }
+ }
+};
+
+int main() {
+ rl::test_params p;
+ p.iteration_count = 200000;
+ rl::simulate< FixedMPSCTest >( p );
+ return 0;
+}
+