commit ae1ffa9d54d5bd53121f159162c3b7d441379b12
parent ac3093136c339ecbad5982981f234aca19714031
Author: Michael Savage <mikejsavage@gmail.com>
Date: Sat, 4 Nov 2017 15:48:50 +0200
Small atomics cleanup
Diffstat:
12 files changed, 218 insertions(+), 194 deletions(-)
diff --git a/intrinsics.h b/intrinsics.h
@@ -142,9 +142,6 @@ constexpr T clamp11( T x ) {
#define slots_required( data_size, slot_size ) ( ( data_size ) / ( slot_size ) + ( ( data_size ) % ( slot_size ) != 0 ) )
-#define CACHE_LINE_SIZE 64
-#define CACHE_LINE_PADDING u8 COUNTER_NAME( cache_line_spacing )[ CACHE_LINE_SIZE ]
-
#define AT_STARTUP( code ) \
namespace COUNTER_NAME( StartupCode ) { \
static struct AtStartup { \
@@ -190,17 +187,6 @@ inline To checked_cast( const From & from ) {
return result;
}
-template< typename T >
-inline bool is_aligned( const T * ptr ) {
-#if COMPILER_MSVC
- return checked_cast< size_t >( ptr ) % __alignof( T ) == 0;
-#elif COMPILER_GCCORCLANG
- return checked_cast< size_t >( ptr ) % __alignof__( T ) == 0;
-#else
-#error new compiler
-#endif
-}
-
// TODO: this sucks
inline u8 * file_get_contents( const char * path, size_t * out_len = NULL ) {
FILE * file = fopen( path, "rb" );
diff --git a/mixer.cc b/mixer.cc
@@ -2,7 +2,7 @@
#include "int_conversions.h"
#include "mixer.h"
#include "pool.h"
-#include "nonblocking_fixed_spsc_queue.h"
+#include "spsc.h"
struct PlayingSound {
PlayingSoundID id;
@@ -34,7 +34,7 @@ struct MixerCommand {
STATIC_ASSERT( RESET_SOUND_COUNTER != INVALID_SOUND_ID );
static Pool< PlayingSound, MAX_CONCURRENT_SOUNDS > playing_sounds;
-static NonblockingFixedSPSCQueue< MixerCommand, COMMAND_QUEUE_SIZE > command_queue;
+static FixedSPSC< MixerCommand, COMMAND_QUEUE_SIZE > command_queue;
static PlayingSoundID sound_counter;
// TODO: this should really use a hash index/pool with handles
diff --git a/mpsc.h b/mpsc.h
@@ -0,0 +1,89 @@
+#pragma once
+
+#include "intrinsics.h"
+
+#include "platform_alignment.h"
+#include "platform_atomic.h"
+#include "platform_mutex.h"
+#include "platform_thread.h"
+
+template< typename T, size_t N >
+class FixedMPSC {
+public:
+ // TODO: this is bad
+ NONCOPYABLE( FixedMPSC );
+
+ FixedMPSC() {
+ store_relaxed( &num_elems, 0 );
+ head = 0;
+ reader_acquired = false;
+ mutex_init( &mutex );
+ }
+
+ ~FixedMPSC() {
+ mutex_destroy( &mutex );
+ }
+
+ bool enqueue( const T & x ) {
+ if( load_acquire( &num_elems ) >= N ) return false;
+ SCOPED_MUTEX_LOCK( &mutex );
+ u32 n = load_relaxed( &num_elems );
+ if( n >= N ) return false;
+
+ elems[ ( head + n ) % N ] = x;
+ store_release( &num_elems, n + 1 );
+
+ return true;
+ }
+
+ T * dequeue_acquire() {
+ ASSERT( !reader_acquired );
+
+ if( load_acquire( &num_elems ) == 0 ) return NULL;
+
+ reader_acquired = true;
+ return elems + head;
+ }
+
+ void dequeue_release() {
+ ASSERT( reader_acquired );
+
+ SCOPED_MUTEX_LOCK( &mutex );
+
+ head = ( head + 1 ) % N;
+ fetch_sub_release( &num_elems, 1 );
+ reader_acquired = false;
+ }
+
+ void enqueue_spin( const T & x ) {
+ int attempt = 0;
+ while( !enqueue( x ) ) {
+ const int ATTEMPTS_BEFORE_YIELD = 128;
+ if( attempt < ATTEMPTS_BEFORE_YIELD ) {
+ attempt++;
+ }
+ else {
+ thread_yield();
+ }
+ }
+ }
+
+ bool dequeue( T * x ) {
+ T * t = dequeue_acquire();
+ if( t == NULL ) return false;
+ *x = *t;
+ dequeue_release();
+ return true;
+ }
+
+private:
+ ALIGNTO_CACHE T elems[ N ];
+
+ struct ALIGNTO_CACHE {
+ u32 head;
+ bool reader_acquired;
+ };
+
+ atomic_u32 ALIGNTO_CACHE num_elems;
+ Mutex mutex;
+};
diff --git a/nonblocking_fixed_mpsc_queue.h b/nonblocking_fixed_mpsc_queue.h
@@ -1,87 +0,0 @@
-#pragma once
-
-#include "intrinsics.h"
-
-#include "platform_atomic.h"
-#include "platform_mutex.h"
-#include "platform_thread.h"
-
-template< typename T, size_t N >
-class NonblockingFixedMPSCQueue {
-public:
- NonblockingFixedMPSCQueue() {
- store_relaxed( &num_elems, 0 );
- head = 0;
- reader_acquired = false;
- mutex_init( &mutex );
- }
-
- ~NonblockingFixedMPSCQueue() {
- mutex_destroy( &mutex );
- }
-
- bool enqueue( const T & x ) {
- if( load_acquire( &num_elems ) >= N ) return false;
- SCOPED_MUTEX_LOCK( &mutex );
- u32 n = load_relaxed( &num_elems );
- if( n >= N ) return false;
-
- elems[ ( head + n ) % N ] = x;
- store_release( &num_elems, n + 1 );
-
- return true;
- }
-
- T * dequeue_acquire() {
- ASSERT( !reader_acquired );
-
- if( load_acquire( &num_elems ) == 0 ) return NULL;
-
- reader_acquired = true;
- return elems + head;
- }
-
- void dequeue_release() {
- ASSERT( reader_acquired );
-
- SCOPED_MUTEX_LOCK( &mutex );
-
- head = ( head + 1 ) % N;
- fetch_sub_release( &num_elems, 1 );
- reader_acquired = false;
- }
-
- void enqueue_spin( const T & x ) {
- int attempt = 0;
- while( !enqueue( x ) ) {
- const int ATTEMPTS_BEFORE_YIELD = 128;
- if( attempt < ATTEMPTS_BEFORE_YIELD ) {
- attempt++;
- }
- else {
- thread_yield();
- }
- }
- }
-
- bool dequeue( T * x ) {
- T * t = dequeue_acquire();
- if( t == NULL ) return false;
- *x = *t;
- dequeue_release();
- return true;
- }
-
-private:
- T elems[ N ];
-
- CACHE_LINE_PADDING;
-
- u32 head;
- bool reader_acquired;
-
- CACHE_LINE_PADDING;
-
- atomic_u32 num_elems;
- Mutex mutex;
-};
diff --git a/nonblocking_fixed_spsc_queue.h b/nonblocking_fixed_spsc_queue.h
@@ -1,80 +0,0 @@
-#pragma once
-
-#include "intrinsics.h"
-#include "queue.h"
-
-#include "platform_atomic.h"
-
-template< typename T, size_t N >
-class NonblockingFixedSPSCQueue : public NonblockingQueue< T > {
-public:
- NonblockingFixedSPSCQueue() {
- VAR( reader_acquired ) = false;
- clear();
- }
-
- // returns true if x was enqueued, false if the queue was full
- bool enqueue( const T & x ) {
- size_t w = VAR( writer_pos );
- if( load_acquire( &nodes[ w ].last_op ) == WRITE ) return false;
-
- nodes[ w ].data = x;
- store_release( &nodes[ w ].last_op, WRITE );
- VAR( writer_pos ) = ( w + 1 ) % N;
-
- return true;
- }
-
- // returns a pointer to the element to be dequeued, or NULL is the
- // queue is empty
- // call dequeue_release once you're done with the pointer
- T * dequeue_acquire() {
- ASSERT( !VAR( reader_acquired ) );
-
- size_t r = VAR( reader_pos );
- if( load_acquire( &nodes[ r ].last_op ) == READ ) return NULL;
-
- VAR( reader_acquired ) = true;
- return &nodes[ r ].data;
- }
-
- void dequeue_release() {
- ASSERT( VAR( reader_acquired ) );
- VAR( reader_acquired ) = false;
-
- size_t r = VAR( reader_pos );
- store_release( &nodes[ r ].last_op, READ );
- VAR( reader_pos ) = ( r + 1 ) % N;
- }
-
- void clear() {
- for( size_t i = 0; i < N; i++ ) {
- store_release( &nodes[ i ].last_op, READ );
- }
- VAR( reader_pos ) = VAR( writer_pos ) = 0;
- }
-
- NonblockingQueueReader< T > reader() {
- return NonblockingQueueReader< T >( this );
- }
-
- NonblockingQueueWriter< T > writer() {
- return NonblockingQueueWriter< T >( this );
- }
-
-private:
- enum LastOp { READ, WRITE };
- struct Node {
- NONCOPYABLE( Node );
- Node() { }
- T data;
- atomic_s32 last_op;
- };
-
- Node nodes[ N ];
- CACHE_LINE_PADDING;
- nonatomic( size_t ) reader_pos;
- nonatomic( bool ) reader_acquired;
- CACHE_LINE_PADDING;
- nonatomic( size_t ) writer_pos;
-};
diff --git a/platform_alignment.h b/platform_alignment.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include "platform.h"
+
+#if COMPILER_MSVC
+#define ALIGNOF( x ) __alignof( x )
+#define ALIGNTO( x ) TODO
+#elif COMPILER_GCCORCLANG
+#define ALIGNOF( x ) __alignof__( x )
+#define ALIGNTO( n ) __attribute__(( aligned( n ) ))
+#else
+#error new compiler
+#endif
+
+#define CACHE_LINE_SIZE 64
+#define ALIGNTO_CACHE ALIGNTO( CACHE_LINE_SIZE )
+
+template< typename T >
+inline bool is_aligned( const T * ptr ) {
+ return checked_cast< size_t >( ptr ) % ALIGNOF( T ) == 0;
+}
diff --git a/platform_atomic.h b/platform_atomic.h
@@ -3,12 +3,25 @@
#include "intrinsics.h"
#include "platform.h"
-#if !PLATFORM_RELACY
-#define nonatomic( T ) T
-#define VAR( x ) ( x )
+#if PLATFORM_RELACY
+
+#define NONATOMIC( T ) rl::var< T >
+
#else
-#define nonatomic( T ) VAR_T( T )
-// relacy defines VAR( x )
+
+enum NonAtomicDollar { NONATOMIC_DOLLAR };
+#define $ NONATOMIC_DOLLAR
+
+template< typename T >
+struct NonAtomic {
+ T x;
+ T & operator()( NonAtomicDollar ) {
+ return x;
+ }
+};
+
+#define NONATOMIC( T ) NonAtomic< T >
+
#endif
#if COMPILER_GCCORCLANG && !PLATFORM_RELACY
@@ -93,7 +106,9 @@ struct atomic_u64 {
// return __atomic_compare_exchange_n( &atom->v, before, after, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED );
// }
-#else // COMPILER_GCCORCLANG && !PLATFORM_RELACY
+#endif
+
+#if !COMPILER_GCCORCLANG || PLATFORM_RELACY
#if !PLATFORM_RELACY
#include <atomic>
@@ -135,4 +150,4 @@ struct atomic_u64 {
#define exchange_seqcst( atom, x ) ( atom )->exchange( x, std::memory_order_seq_cst )
#define compare_exchange_seqcst( atom, before, after ) ( atom )->compare_exchange_strong( *( before ), after, std::memory_order_seq_cst, std::memory_order_seq_cst )
-#endif // RL_TEST
+#endif
diff --git a/queue.h b/queue.h
@@ -13,7 +13,6 @@ struct BlockingQueue {
dequeue_release();
return t;
}
-
};
const int ATTEMPTS_BEFORE_YIELD = 128;
diff --git a/spsc.h b/spsc.h
@@ -0,0 +1,80 @@
+#pragma once
+
+#include "intrinsics.h"
+#include "queue.h"
+
+#include "platform_alignment.h"
+#include "platform_atomic.h"
+
+template< typename T, size_t N >
+class FixedSPSC : public NonblockingQueue< T > {
+public:
+ NONCOPYABLE( FixedSPSC );
+
+ FixedSPSC() {
+ for( size_t i = 0; i < N; i++ ) {
+ store_release( &nodes[ i ].last_op, READ );
+ }
+ reader_acquired( $ ) = false;
+ reader_pos( $ ) = 0;
+ writer_pos( $ ) = 0;
+ }
+
+ // returns true if x was enqueued, false if the queue was full
+ bool enqueue( const T & x ) {
+ size_t w = writer_pos( $ );
+ if( load_acquire( &nodes[ w ].last_op ) == WRITE ) return false;
+
+ nodes[ w ].data = x;
+ store_release( &nodes[ w ].last_op, WRITE );
+ writer_pos( $ ) = ( w + 1 ) % N;
+
+ return true;
+ }
+
+ // returns a pointer to the element to be dequeued, or NULL is the
+ // queue is empty
+ // call dequeue_release once you're done with the pointer
+ T * dequeue_acquire() {
+ ASSERT( !reader_acquired( $ ) );
+
+ size_t r = reader_pos( $ );
+ if( load_acquire( &nodes[ r ].last_op ) == READ ) return NULL;
+
+ reader_acquired( $ ) = true;
+ return &nodes[ r ].data;
+ }
+
+ void dequeue_release() {
+ ASSERT( reader_acquired( $ ) );
+ reader_acquired( $ ) = false;
+
+ size_t r = reader_pos( $ );
+ store_release( &nodes[ r ].last_op, READ );
+ reader_pos( $ ) = ( r + 1 ) % N;
+ }
+
+ NonblockingQueueReader< T > reader() {
+ return NonblockingQueueReader< T >( this );
+ }
+
+ NonblockingQueueWriter< T > writer() {
+ return NonblockingQueueWriter< T >( this );
+ }
+
+private:
+ enum LastOp { READ, WRITE };
+ struct ALIGNTO_CACHE Node {
+ T data;
+ atomic_s32 last_op;
+ };
+
+ Node nodes[ N ];
+
+ struct ALIGNTO_CACHE {
+ NONATOMIC( size_t ) reader_pos;
+ NONATOMIC( bool ) reader_acquired;
+ };
+
+ NONATOMIC( size_t ) ALIGNTO_CACHE writer_pos;
+};
diff --git a/stream.h b/stream.h
@@ -3,6 +3,7 @@
#include <string.h>
#include "intrinsics.h"
+#include "platform_alignment.h"
struct ReadStream;
diff --git a/terrain_manager.cc b/terrain_manager.cc
@@ -12,7 +12,7 @@
#include "gpubtt.h"
#include "linear_algebra.h"
#include "str.h"
-#include "nonblocking_fixed_mpsc_queue.h"
+#include "mpsc.h"
#include "profiler.h"
#include "renderer.h"
#include "shaders.h"
diff --git a/terrain_manager.h b/terrain_manager.h
@@ -7,7 +7,7 @@
#include "gpubtt.h"
#include "work_queue.h"
#include "linear_algebra.h"
-#include "nonblocking_fixed_mpsc_queue.h"
+#include "mpsc.h"
static const u16 TILE_SIZE = 512;
static const u16 WORLD_SIZE = 64;
@@ -69,7 +69,7 @@ struct TerrainManager {
// opengl handles
GPUBTT gpubtts[ WORLD_SIZE ][ WORLD_SIZE ];
- NonblockingFixedMPSCQueue< ReadyTile, 32 > ready_tiles;
+ FixedMPSC< ReadyTile, 32 > ready_tiles;
};
void terrain_init( TerrainManager * tm, const char * tiles_dir, MemoryArena * arena, WorkQueue * background_tasks );