tracy_concurrentqueue.h (59773B)
1 // Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue. 2 // An overview, including benchmark results, is provided here: 3 // http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++ 4 // The full design is also described in excruciating detail at: 5 // http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue 6 7 // Simplified BSD license: 8 // Copyright (c) 2013-2016, Cameron Desrochers. 9 // All rights reserved. 10 // 11 // Redistribution and use in source and binary forms, with or without modification, 12 // are permitted provided that the following conditions are met: 13 // 14 // - Redistributions of source code must retain the above copyright notice, this list of 15 // conditions and the following disclaimer. 16 // - Redistributions in binary form must reproduce the above copyright notice, this list of 17 // conditions and the following disclaimer in the documentation and/or other materials 18 // provided with the distribution. 19 // 20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY 21 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 22 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL 23 // THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 25 // OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 26 // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR 27 // TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 28 // EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30 31 #pragma once 32 33 #include "../common/TracyAlloc.hpp" 34 #include "../common/TracyForceInline.hpp" 35 #include "../common/TracySystem.hpp" 36 37 #if defined(__GNUC__) 38 // Disable -Wconversion warnings (spuriously triggered when Traits::size_t and 39 // Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings 40 // upon assigning any computed values) 41 #pragma GCC diagnostic push 42 #pragma GCC diagnostic ignored "-Wconversion" 43 #endif 44 45 #if defined(__APPLE__) 46 #include "TargetConditionals.h" 47 #endif 48 49 #include <atomic> // Requires C++11. Sorry VS2010. 50 #include <cassert> 51 #include <cstddef> // for max_align_t 52 #include <cstdint> 53 #include <cstdlib> 54 #include <type_traits> 55 #include <algorithm> 56 #include <utility> 57 #include <limits> 58 #include <climits> // for CHAR_BIT 59 #include <array> 60 #include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading 61 62 namespace tracy 63 { 64 65 // Exceptions 66 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED 67 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__)) 68 #define MOODYCAMEL_EXCEPTIONS_ENABLED 69 #endif 70 #endif 71 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED 72 #define MOODYCAMEL_TRY try 73 #define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__) 74 #define MOODYCAMEL_RETHROW throw 75 #define MOODYCAMEL_THROW(expr) throw (expr) 76 #else 77 #define MOODYCAMEL_TRY if (true) 78 #define MOODYCAMEL_CATCH(...) else if (false) 79 #define MOODYCAMEL_RETHROW 80 #define MOODYCAMEL_THROW(expr) 81 #endif 82 83 #ifndef MOODYCAMEL_NOEXCEPT 84 #if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED) 85 #define MOODYCAMEL_NOEXCEPT 86 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true 87 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true 88 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800 89 // VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-( 90 // We have to assume *all* non-trivial constructors may throw on VS2012! 91 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT 92 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value) 93 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)) 94 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900 95 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT 96 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value) 97 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)) 98 #else 99 #define MOODYCAMEL_NOEXCEPT noexcept 100 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr) 101 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr) 102 #endif 103 #endif 104 105 // VS2012 doesn't support deleted functions. 106 // In this case, we declare the function normally but don't define it. A link error will be generated if the function is called. 107 #ifndef MOODYCAMEL_DELETE_FUNCTION 108 #if defined(_MSC_VER) && _MSC_VER < 1800 109 #define MOODYCAMEL_DELETE_FUNCTION 110 #else 111 #define MOODYCAMEL_DELETE_FUNCTION = delete 112 #endif 113 #endif 114 115 // Compiler-specific likely/unlikely hints 116 namespace moodycamel { namespace details { 117 #if defined(__GNUC__) 118 inline bool cqLikely(bool x) { return __builtin_expect((x), true); } 119 inline bool cqUnlikely(bool x) { return __builtin_expect((x), false); } 120 #else 121 inline bool cqLikely(bool x) { return x; } 122 inline bool cqUnlikely(bool x) { return x; } 123 #endif 124 } } 125 126 namespace 127 { 128 // to avoid MSVC warning 4127: conditional expression is constant 129 template <bool> 130 struct compile_time_condition 131 { 132 static const bool value = false; 133 }; 134 template <> 135 struct compile_time_condition<true> 136 { 137 static const bool value = true; 138 }; 139 } 140 141 namespace moodycamel { 142 namespace details { 143 template<typename T> 144 struct const_numeric_max { 145 static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers"); 146 static const T value = std::numeric_limits<T>::is_signed 147 ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1) 148 : static_cast<T>(-1); 149 }; 150 151 #if defined(__GLIBCXX__) 152 typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while 153 #else 154 typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std:: 155 #endif 156 157 // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting 158 // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64. 159 typedef union { 160 std_max_align_t x; 161 long long y; 162 void* z; 163 } max_align_t; 164 } 165 166 // Default traits for the ConcurrentQueue. To change some of the 167 // traits without re-implementing all of them, inherit from this 168 // struct and shadow the declarations you wish to be different; 169 // since the traits are used as a template type parameter, the 170 // shadowed declarations will be used where defined, and the defaults 171 // otherwise. 172 struct ConcurrentQueueDefaultTraits 173 { 174 // General-purpose size type. std::size_t is strongly recommended. 175 typedef std::size_t size_t; 176 177 // The type used for the enqueue and dequeue indices. Must be at least as 178 // large as size_t. Should be significantly larger than the number of elements 179 // you expect to hold at once, especially if you have a high turnover rate; 180 // for example, on 32-bit x86, if you expect to have over a hundred million 181 // elements or pump several million elements through your queue in a very 182 // short space of time, using a 32-bit type *may* trigger a race condition. 183 // A 64-bit int type is recommended in that case, and in practice will 184 // prevent a race condition no matter the usage of the queue. Note that 185 // whether the queue is lock-free with a 64-int type depends on the whether 186 // std::atomic<std::uint64_t> is lock-free, which is platform-specific. 187 typedef std::size_t index_t; 188 189 // Internally, all elements are enqueued and dequeued from multi-element 190 // blocks; this is the smallest controllable unit. If you expect few elements 191 // but many producers, a smaller block size should be favoured. For few producers 192 // and/or many elements, a larger block size is preferred. A sane default 193 // is provided. Must be a power of 2. 194 static const size_t BLOCK_SIZE = 128; 195 196 // For explicit producers (i.e. when using a producer token), the block is 197 // checked for being empty by iterating through a list of flags, one per element. 198 // For large block sizes, this is too inefficient, and switching to an atomic 199 // counter-based approach is faster. The switch is made for block sizes strictly 200 // larger than this threshold. 201 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32; 202 203 // How many full blocks can be expected for a single explicit producer? This should 204 // reflect that number's maximum for optimal performance. Must be a power of 2. 205 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32; 206 207 // Controls the number of items that an explicit consumer (i.e. one with a token) 208 // must consume before it causes all consumers to rotate and move on to the next 209 // internal queue. 210 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256; 211 212 // The maximum number of elements (inclusive) that can be enqueued to a sub-queue. 213 // Enqueue operations that would cause this limit to be surpassed will fail. Note 214 // that this limit is enforced at the block level (for performance reasons), i.e. 215 // it's rounded up to the nearest block size. 216 static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value; 217 218 219 // Memory allocation can be customized if needed. 220 // malloc should return nullptr on failure, and handle alignment like std::malloc. 221 #if defined(malloc) || defined(free) 222 // Gah, this is 2015, stop defining macros that break standard code already! 223 // Work around malloc/free being special macros: 224 static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); } 225 static inline void WORKAROUND_free(void* ptr) { return free(ptr); } 226 static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); } 227 static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); } 228 #else 229 static inline void* malloc(size_t size) { return tracy::tracy_malloc(size); } 230 static inline void free(void* ptr) { return tracy::tracy_free(ptr); } 231 #endif 232 }; 233 234 235 // When producing or consuming many elements, the most efficient way is to: 236 // 1) Use one of the bulk-operation methods of the queue with a token 237 // 2) Failing that, use the bulk-operation methods without a token 238 // 3) Failing that, create a token and use that with the single-item methods 239 // 4) Failing that, use the single-parameter methods of the queue 240 // Having said that, don't create tokens willy-nilly -- ideally there should be 241 // a maximum of one token per thread (of each kind). 242 struct ProducerToken; 243 struct ConsumerToken; 244 245 template<typename T, typename Traits> class ConcurrentQueue; 246 247 248 namespace details 249 { 250 struct ConcurrentQueueProducerTypelessBase 251 { 252 ConcurrentQueueProducerTypelessBase* next; 253 std::atomic<bool> inactive; 254 ProducerToken* token; 255 uint64_t threadId; 256 257 ConcurrentQueueProducerTypelessBase() 258 : next(nullptr), inactive(false), token(nullptr), threadId(0) 259 { 260 } 261 }; 262 263 template<typename T> 264 static inline bool circular_less_than(T a, T b) 265 { 266 #ifdef _MSC_VER 267 #pragma warning(push) 268 #pragma warning(disable: 4554) 269 #endif 270 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types"); 271 return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << static_cast<T>(sizeof(T) * CHAR_BIT - 1)); 272 #ifdef _MSC_VER 273 #pragma warning(pop) 274 #endif 275 } 276 277 template<typename U> 278 static inline char* align_for(char* ptr) 279 { 280 const std::size_t alignment = std::alignment_of<U>::value; 281 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment; 282 } 283 284 template<typename T> 285 static inline T ceil_to_pow_2(T x) 286 { 287 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types"); 288 289 // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 290 --x; 291 x |= x >> 1; 292 x |= x >> 2; 293 x |= x >> 4; 294 for (std::size_t i = 1; i < sizeof(T); i <<= 1) { 295 x |= x >> (i << 3); 296 } 297 ++x; 298 return x; 299 } 300 301 template<typename T> 302 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right) 303 { 304 T temp = std::move(left.load(std::memory_order_relaxed)); 305 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed); 306 right.store(std::move(temp), std::memory_order_relaxed); 307 } 308 309 template<typename T> 310 static inline T const& nomove(T const& x) 311 { 312 return x; 313 } 314 315 template<bool Enable> 316 struct nomove_if 317 { 318 template<typename T> 319 static inline T const& eval(T const& x) 320 { 321 return x; 322 } 323 }; 324 325 template<> 326 struct nomove_if<false> 327 { 328 template<typename U> 329 static inline auto eval(U&& x) 330 -> decltype(std::forward<U>(x)) 331 { 332 return std::forward<U>(x); 333 } 334 }; 335 336 template<typename It> 337 static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it) 338 { 339 return *it; 340 } 341 342 #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8) 343 template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { }; 344 #else 345 template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { }; 346 #endif 347 348 template<typename T> struct static_is_lock_free_num { enum { value = 0 }; }; 349 template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; }; 350 template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; }; 351 template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; }; 352 template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; }; 353 template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; }; 354 template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { }; 355 template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; }; 356 template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; }; 357 } 358 359 360 struct ProducerToken 361 { 362 template<typename T, typename Traits> 363 explicit ProducerToken(ConcurrentQueue<T, Traits>& queue); 364 365 ProducerToken(ProducerToken&& other) MOODYCAMEL_NOEXCEPT 366 : producer(other.producer) 367 { 368 other.producer = nullptr; 369 if (producer != nullptr) { 370 producer->token = this; 371 } 372 } 373 374 inline ProducerToken& operator=(ProducerToken&& other) MOODYCAMEL_NOEXCEPT 375 { 376 swap(other); 377 return *this; 378 } 379 380 void swap(ProducerToken& other) MOODYCAMEL_NOEXCEPT 381 { 382 std::swap(producer, other.producer); 383 if (producer != nullptr) { 384 producer->token = this; 385 } 386 if (other.producer != nullptr) { 387 other.producer->token = &other; 388 } 389 } 390 391 // A token is always valid unless: 392 // 1) Memory allocation failed during construction 393 // 2) It was moved via the move constructor 394 // (Note: assignment does a swap, leaving both potentially valid) 395 // 3) The associated queue was destroyed 396 // Note that if valid() returns true, that only indicates 397 // that the token is valid for use with a specific queue, 398 // but not which one; that's up to the user to track. 399 inline bool valid() const { return producer != nullptr; } 400 401 ~ProducerToken() 402 { 403 if (producer != nullptr) { 404 producer->token = nullptr; 405 producer->inactive.store(true, std::memory_order_release); 406 } 407 } 408 409 // Disable copying and assignment 410 ProducerToken(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION; 411 ProducerToken& operator=(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION; 412 413 private: 414 template<typename T, typename Traits> friend class ConcurrentQueue; 415 416 protected: 417 details::ConcurrentQueueProducerTypelessBase* producer; 418 }; 419 420 421 struct ConsumerToken 422 { 423 template<typename T, typename Traits> 424 explicit ConsumerToken(ConcurrentQueue<T, Traits>& q); 425 426 ConsumerToken(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT 427 : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer) 428 { 429 } 430 431 inline ConsumerToken& operator=(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT 432 { 433 swap(other); 434 return *this; 435 } 436 437 void swap(ConsumerToken& other) MOODYCAMEL_NOEXCEPT 438 { 439 std::swap(initialOffset, other.initialOffset); 440 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset); 441 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent); 442 std::swap(currentProducer, other.currentProducer); 443 std::swap(desiredProducer, other.desiredProducer); 444 } 445 446 // Disable copying and assignment 447 ConsumerToken(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION; 448 ConsumerToken& operator=(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION; 449 450 private: 451 template<typename T, typename Traits> friend class ConcurrentQueue; 452 453 private: // but shared with ConcurrentQueue 454 std::uint32_t initialOffset; 455 std::uint32_t lastKnownGlobalOffset; 456 std::uint32_t itemsConsumedFromCurrent; 457 details::ConcurrentQueueProducerTypelessBase* currentProducer; 458 details::ConcurrentQueueProducerTypelessBase* desiredProducer; 459 }; 460 461 462 template<typename T, typename Traits = ConcurrentQueueDefaultTraits> 463 class ConcurrentQueue 464 { 465 public: 466 struct ExplicitProducer; 467 468 typedef moodycamel::ProducerToken producer_token_t; 469 typedef moodycamel::ConsumerToken consumer_token_t; 470 471 typedef typename Traits::index_t index_t; 472 typedef typename Traits::size_t size_t; 473 474 static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE); 475 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD); 476 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE); 477 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE); 478 #ifdef _MSC_VER 479 #pragma warning(push) 480 #pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!) 481 #pragma warning(disable: 4309) // static_cast: Truncation of constant value 482 #endif 483 static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE); 484 #ifdef _MSC_VER 485 #pragma warning(pop) 486 #endif 487 488 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value, "Traits::size_t must be an unsigned integral type"); 489 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value, "Traits::index_t must be an unsigned integral type"); 490 static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t"); 491 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)"); 492 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)"); 493 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)"); 494 495 public: 496 // Creates a queue with at least `capacity` element slots; note that the 497 // actual number of elements that can be inserted without additional memory 498 // allocation depends on the number of producers and the block size (e.g. if 499 // the block size is equal to `capacity`, only a single block will be allocated 500 // up-front, which means only a single producer will be able to enqueue elements 501 // without an extra allocation -- blocks aren't shared between producers). 502 // This method is not thread safe -- it is up to the user to ensure that the 503 // queue is fully constructed before it starts being used by other threads (this 504 // includes making the memory effects of construction visible, possibly with a 505 // memory barrier). 506 explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE) 507 : producerListTail(nullptr), 508 producerCount(0), 509 initialBlockPoolIndex(0), 510 nextExplicitConsumerId(0), 511 globalExplicitConsumerOffset(0) 512 { 513 populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1)); 514 } 515 516 // Computes the correct amount of pre-allocated blocks for you based 517 // on the minimum number of elements you want available at any given 518 // time, and the maximum concurrent number of each type of producer. 519 ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers) 520 : producerListTail(nullptr), 521 producerCount(0), 522 initialBlockPoolIndex(0), 523 nextExplicitConsumerId(0), 524 globalExplicitConsumerOffset(0) 525 { 526 size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers); 527 populate_initial_block_list(blocks); 528 } 529 530 // Note: The queue should not be accessed concurrently while it's 531 // being deleted. It's up to the user to synchronize this. 532 // This method is not thread safe. 533 ~ConcurrentQueue() 534 { 535 // Destroy producers 536 auto ptr = producerListTail.load(std::memory_order_relaxed); 537 while (ptr != nullptr) { 538 auto next = ptr->next_prod(); 539 if (ptr->token != nullptr) { 540 ptr->token->producer = nullptr; 541 } 542 destroy(ptr); 543 ptr = next; 544 } 545 546 // Destroy global free list 547 auto block = freeList.head_unsafe(); 548 while (block != nullptr) { 549 auto next = block->freeListNext.load(std::memory_order_relaxed); 550 if (block->dynamicallyAllocated) { 551 destroy(block); 552 } 553 block = next; 554 } 555 556 // Destroy initial free list 557 destroy_array(initialBlockPool, initialBlockPoolSize); 558 } 559 560 // Disable copying and copy assignment 561 ConcurrentQueue(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION; 562 ConcurrentQueue(ConcurrentQueue&& other) MOODYCAMEL_DELETE_FUNCTION; 563 ConcurrentQueue& operator=(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION; 564 ConcurrentQueue& operator=(ConcurrentQueue&& other) MOODYCAMEL_DELETE_FUNCTION; 565 566 public: 567 tracy_force_inline T* enqueue_begin(producer_token_t const& token, index_t& currentTailIndex) 568 { 569 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::enqueue_begin(currentTailIndex); 570 } 571 572 // Attempts to dequeue several elements from the queue using an explicit consumer token. 573 // Returns the number of items actually dequeued. 574 // Returns 0 if all producer streams appeared empty at the time they 575 // were checked (so, the queue is likely but not guaranteed to be empty). 576 // Never allocates. Thread-safe. 577 template<typename It> 578 size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max) 579 { 580 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) { 581 if (!update_current_producer_after_rotation(token)) { 582 return 0; 583 } 584 } 585 586 size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max); 587 if (count == max) { 588 if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) { 589 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed); 590 } 591 return max; 592 } 593 token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count); 594 max -= count; 595 596 auto tail = producerListTail.load(std::memory_order_acquire); 597 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod(); 598 if (ptr == nullptr) { 599 ptr = tail; 600 } 601 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) { 602 auto dequeued = ptr->dequeue_bulk(itemFirst, max); 603 count += dequeued; 604 if (dequeued != 0) { 605 token.currentProducer = ptr; 606 token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued); 607 } 608 if (dequeued == max) { 609 break; 610 } 611 max -= dequeued; 612 ptr = ptr->next_prod(); 613 if (ptr == nullptr) { 614 ptr = tail; 615 } 616 } 617 return count; 618 } 619 620 template<typename It> 621 size_t try_dequeue_bulk_single(consumer_token_t& token, It itemFirst, size_t max, uint64_t& threadId ) 622 { 623 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) { 624 if (!update_current_producer_after_rotation(token)) { 625 return 0; 626 } 627 } 628 629 size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max); 630 if (count == max) { 631 if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) { 632 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed); 633 } 634 threadId = token.currentProducer->threadId; 635 return max; 636 } 637 token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count); 638 639 auto tail = producerListTail.load(std::memory_order_acquire); 640 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod(); 641 if (ptr == nullptr) { 642 ptr = tail; 643 } 644 if( count == 0 ) 645 { 646 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) { 647 auto dequeued = ptr->dequeue_bulk(itemFirst, max); 648 if (dequeued != 0) { 649 threadId = ptr->threadId; 650 token.currentProducer = ptr; 651 token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued); 652 return dequeued; 653 } 654 ptr = ptr->next_prod(); 655 if (ptr == nullptr) { 656 ptr = tail; 657 } 658 } 659 return 0; 660 } 661 else 662 { 663 threadId = token.currentProducer->threadId; 664 token.currentProducer = ptr; 665 token.itemsConsumedFromCurrent = 0; 666 return count; 667 } 668 } 669 670 671 // Returns an estimate of the total number of elements currently in the queue. This 672 // estimate is only accurate if the queue has completely stabilized before it is called 673 // (i.e. all enqueue and dequeue operations have completed and their memory effects are 674 // visible on the calling thread, and no further operations start while this method is 675 // being called). 676 // Thread-safe. 677 size_t size_approx() const 678 { 679 size_t size = 0; 680 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { 681 size += ptr->size_approx(); 682 } 683 return size; 684 } 685 686 687 // Returns true if the underlying atomic variables used by 688 // the queue are lock-free (they should be on most platforms). 689 // Thread-safe. 690 static bool is_lock_free() 691 { 692 return 693 details::static_is_lock_free<bool>::value == 2 && 694 details::static_is_lock_free<size_t>::value == 2 && 695 details::static_is_lock_free<std::uint32_t>::value == 2 && 696 details::static_is_lock_free<index_t>::value == 2 && 697 details::static_is_lock_free<void*>::value == 2; 698 } 699 700 701 private: 702 friend struct ProducerToken; 703 friend struct ConsumerToken; 704 friend struct ExplicitProducer; 705 706 707 /////////////////////////////// 708 // Queue methods 709 /////////////////////////////// 710 711 inline bool update_current_producer_after_rotation(consumer_token_t& token) 712 { 713 // Ah, there's been a rotation, figure out where we should be! 714 auto tail = producerListTail.load(std::memory_order_acquire); 715 if (token.desiredProducer == nullptr && tail == nullptr) { 716 return false; 717 } 718 auto prodCount = producerCount.load(std::memory_order_relaxed); 719 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed); 720 if (details::cqUnlikely(token.desiredProducer == nullptr)) { 721 // Aha, first time we're dequeueing anything. 722 // Figure out our local position 723 // Note: offset is from start, not end, but we're traversing from end -- subtract from count first 724 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount); 725 token.desiredProducer = tail; 726 for (std::uint32_t i = 0; i != offset; ++i) { 727 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod(); 728 if (token.desiredProducer == nullptr) { 729 token.desiredProducer = tail; 730 } 731 } 732 } 733 734 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset; 735 if (delta >= prodCount) { 736 delta = delta % prodCount; 737 } 738 for (std::uint32_t i = 0; i != delta; ++i) { 739 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod(); 740 if (token.desiredProducer == nullptr) { 741 token.desiredProducer = tail; 742 } 743 } 744 745 token.lastKnownGlobalOffset = globalOffset; 746 token.currentProducer = token.desiredProducer; 747 token.itemsConsumedFromCurrent = 0; 748 return true; 749 } 750 751 752 /////////////////////////// 753 // Free list 754 /////////////////////////// 755 756 template <typename N> 757 struct FreeListNode 758 { 759 FreeListNode() : freeListRefs(0), freeListNext(nullptr) { } 760 761 std::atomic<std::uint32_t> freeListRefs; 762 std::atomic<N*> freeListNext; 763 }; 764 765 // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but 766 // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly 767 // speedy under low contention. 768 template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them) 769 struct FreeList 770 { 771 FreeList() : freeListHead(nullptr) { } 772 FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); } 773 void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); } 774 775 FreeList(FreeList const&) MOODYCAMEL_DELETE_FUNCTION; 776 FreeList& operator=(FreeList const&) MOODYCAMEL_DELETE_FUNCTION; 777 778 inline void add(N* node) 779 { 780 // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to 781 // set it using a fetch_add 782 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) { 783 // Oh look! We were the last ones referencing this node, and we know 784 // we want to add it to the free list, so let's do it! 785 add_knowing_refcount_is_zero(node); 786 } 787 } 788 789 inline N* try_get() 790 { 791 auto head = freeListHead.load(std::memory_order_acquire); 792 while (head != nullptr) { 793 auto prevHead = head; 794 auto refs = head->freeListRefs.load(std::memory_order_relaxed); 795 if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) { 796 head = freeListHead.load(std::memory_order_acquire); 797 continue; 798 } 799 800 // Good, reference count has been incremented (it wasn't at zero), which means we can read the 801 // next and not worry about it changing between now and the time we do the CAS 802 auto next = head->freeListNext.load(std::memory_order_relaxed); 803 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) { 804 // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no 805 // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on). 806 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0); 807 808 // Decrease refcount twice, once for our ref, and once for the list's ref 809 head->freeListRefs.fetch_sub(2, std::memory_order_release); 810 return head; 811 } 812 813 // OK, the head must have changed on us, but we still need to decrease the refcount we increased. 814 // Note that we don't need to release any memory effects, but we do need to ensure that the reference 815 // count decrement happens-after the CAS on the head. 816 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel); 817 if (refs == SHOULD_BE_ON_FREELIST + 1) { 818 add_knowing_refcount_is_zero(prevHead); 819 } 820 } 821 822 return nullptr; 823 } 824 825 // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes) 826 N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); } 827 828 private: 829 inline void add_knowing_refcount_is_zero(N* node) 830 { 831 // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run 832 // only one copy of this method per node at a time, i.e. the single thread case), then we know 833 // we can safely change the next pointer of the node; however, once the refcount is back above 834 // zero, then other threads could increase it (happens under heavy contention, when the refcount 835 // goes to zero in between a load and a refcount increment of a node in try_get, then back up to 836 // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS 837 // to add the node to the actual list fails, decrease the refcount and leave the add operation to 838 // the next thread who puts the refcount back at zero (which could be us, hence the loop). 839 auto head = freeListHead.load(std::memory_order_relaxed); 840 while (true) { 841 node->freeListNext.store(head, std::memory_order_relaxed); 842 node->freeListRefs.store(1, std::memory_order_release); 843 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) { 844 // Hmm, the add failed, but we can only try again when the refcount goes back to zero 845 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) { 846 continue; 847 } 848 } 849 return; 850 } 851 } 852 853 private: 854 // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention) 855 std::atomic<N*> freeListHead; 856 857 static const std::uint32_t REFS_MASK = 0x7FFFFFFF; 858 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000; 859 }; 860 861 862 /////////////////////////// 863 // Block 864 /////////////////////////// 865 866 struct Block 867 { 868 Block() 869 : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true) 870 { 871 } 872 873 inline bool is_empty() const 874 { 875 if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) { 876 // Check flags 877 for (size_t i = 0; i < BLOCK_SIZE; ++i) { 878 if (!emptyFlags[i].load(std::memory_order_relaxed)) { 879 return false; 880 } 881 } 882 883 // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set 884 std::atomic_thread_fence(std::memory_order_acquire); 885 return true; 886 } 887 else { 888 // Check counter 889 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) { 890 std::atomic_thread_fence(std::memory_order_acquire); 891 return true; 892 } 893 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE); 894 return false; 895 } 896 } 897 898 // Returns true if the block is now empty (does not apply in explicit context) 899 inline bool set_empty(index_t i) 900 { 901 if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { 902 // Set flag 903 assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed)); 904 emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release); 905 return false; 906 } 907 else { 908 // Increment counter 909 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release); 910 assert(prevVal < BLOCK_SIZE); 911 return prevVal == BLOCK_SIZE - 1; 912 } 913 } 914 915 // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0). 916 // Returns true if the block is now empty (does not apply in explicit context). 917 inline bool set_many_empty(index_t i, size_t count) 918 { 919 if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) { 920 // Set flags 921 std::atomic_thread_fence(std::memory_order_release); 922 i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1; 923 for (size_t j = 0; j != count; ++j) { 924 assert(!emptyFlags[i + j].load(std::memory_order_relaxed)); 925 emptyFlags[i + j].store(true, std::memory_order_relaxed); 926 } 927 return false; 928 } 929 else { 930 // Increment counter 931 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release); 932 assert(prevVal + count <= BLOCK_SIZE); 933 return prevVal + count == BLOCK_SIZE; 934 } 935 } 936 937 inline void set_all_empty() 938 { 939 if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { 940 // Set all flags 941 for (size_t i = 0; i != BLOCK_SIZE; ++i) { 942 emptyFlags[i].store(true, std::memory_order_relaxed); 943 } 944 } 945 else { 946 // Reset counter 947 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed); 948 } 949 } 950 951 inline void reset_empty() 952 { 953 if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) { 954 // Reset flags 955 for (size_t i = 0; i != BLOCK_SIZE; ++i) { 956 emptyFlags[i].store(false, std::memory_order_relaxed); 957 } 958 } 959 else { 960 // Reset counter 961 elementsCompletelyDequeued.store(0, std::memory_order_relaxed); 962 } 963 } 964 965 inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); } 966 inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); } 967 968 private: 969 // IMPORTANT: This must be the first member in Block, so that if T depends on the alignment of 970 // addresses returned by malloc, that alignment will be preserved. Apparently clang actually 971 // generates code that uses this assumption for AVX instructions in some cases. Ideally, we 972 // should also align Block to the alignment of T in case it's higher than malloc's 16-byte 973 // alignment, but this is hard to do in a cross-platform way. Assert for this case: 974 static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value, "The queue does not support super-aligned types at this time"); 975 // Additionally, we need the alignment of Block itself to be a multiple of max_align_t since 976 // otherwise the appropriate padding will not be added at the end of Block in order to make 977 // arrays of Blocks all be properly aligned (not just the first one). We use a union to force 978 // this. 979 union { 980 char elements[sizeof(T) * BLOCK_SIZE]; 981 details::max_align_t dummy; 982 }; 983 public: 984 Block* next; 985 std::atomic<size_t> elementsCompletelyDequeued; 986 std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1]; 987 public: 988 std::atomic<std::uint32_t> freeListRefs; 989 std::atomic<Block*> freeListNext; 990 std::atomic<bool> shouldBeOnFreeList; 991 bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool' 992 }; 993 static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping"); 994 995 996 /////////////////////////// 997 // Producer base 998 /////////////////////////// 999 1000 struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase 1001 { 1002 ProducerBase(ConcurrentQueue* parent_) : 1003 tailIndex(0), 1004 headIndex(0), 1005 dequeueOptimisticCount(0), 1006 dequeueOvercommit(0), 1007 tailBlock(nullptr), 1008 parent(parent_) 1009 { 1010 } 1011 1012 virtual ~ProducerBase() { }; 1013 1014 template<typename It> 1015 inline size_t dequeue_bulk(It& itemFirst, size_t max) 1016 { 1017 return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max); 1018 } 1019 1020 inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); } 1021 1022 inline size_t size_approx() const 1023 { 1024 auto tail = tailIndex.load(std::memory_order_relaxed); 1025 auto head = headIndex.load(std::memory_order_relaxed); 1026 return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0; 1027 } 1028 1029 inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); } 1030 protected: 1031 std::atomic<index_t> tailIndex; // Where to enqueue to next 1032 std::atomic<index_t> headIndex; // Where to dequeue from next 1033 1034 std::atomic<index_t> dequeueOptimisticCount; 1035 std::atomic<index_t> dequeueOvercommit; 1036 1037 Block* tailBlock; 1038 1039 public: 1040 ConcurrentQueue* parent; 1041 }; 1042 1043 1044 public: 1045 /////////////////////////// 1046 // Explicit queue 1047 /////////////////////////// 1048 struct ExplicitProducer : public ProducerBase 1049 { 1050 explicit ExplicitProducer(ConcurrentQueue* _parent) : 1051 ProducerBase(_parent), 1052 blockIndex(nullptr), 1053 pr_blockIndexSlotsUsed(0), 1054 pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1), 1055 pr_blockIndexFront(0), 1056 pr_blockIndexEntries(nullptr), 1057 pr_blockIndexRaw(nullptr) 1058 { 1059 size_t poolBasedIndexSize = details::ceil_to_pow_2(_parent->initialBlockPoolSize) >> 1; 1060 if (poolBasedIndexSize > pr_blockIndexSize) { 1061 pr_blockIndexSize = poolBasedIndexSize; 1062 } 1063 1064 new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE 1065 } 1066 1067 ~ExplicitProducer() 1068 { 1069 // Destruct any elements not yet dequeued. 1070 // Since we're in the destructor, we can assume all elements 1071 // are either completely dequeued or completely not (no halfways). 1072 if (this->tailBlock != nullptr) { // Note this means there must be a block index too 1073 // First find the block that's partially dequeued, if any 1074 Block* halfDequeuedBlock = nullptr; 1075 if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) { 1076 // The head's not on a block boundary, meaning a block somewhere is partially dequeued 1077 // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary) 1078 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1); 1079 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) { 1080 i = (i + 1) & (pr_blockIndexSize - 1); 1081 } 1082 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed))); 1083 halfDequeuedBlock = pr_blockIndexEntries[i].block; 1084 } 1085 1086 // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration) 1087 auto block = this->tailBlock; 1088 do { 1089 block = block->next; 1090 if (block->ConcurrentQueue::Block::is_empty()) { 1091 continue; 1092 } 1093 1094 size_t i = 0; // Offset into block 1095 if (block == halfDequeuedBlock) { 1096 i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)); 1097 } 1098 1099 // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index 1100 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)); 1101 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) { 1102 (*block)[i++]->~T(); 1103 } 1104 } while (block != this->tailBlock); 1105 } 1106 1107 // Destroy all blocks that we own 1108 if (this->tailBlock != nullptr) { 1109 auto block = this->tailBlock; 1110 do { 1111 auto nextBlock = block->next; 1112 if (block->dynamicallyAllocated) { 1113 destroy(block); 1114 } 1115 else { 1116 this->parent->add_block_to_free_list(block); 1117 } 1118 block = nextBlock; 1119 } while (block != this->tailBlock); 1120 } 1121 1122 // Destroy the block indices 1123 auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw); 1124 while (header != nullptr) { 1125 auto prev = static_cast<BlockIndexHeader*>(header->prev); 1126 header->~BlockIndexHeader(); 1127 (Traits::free)(header); 1128 header = prev; 1129 } 1130 } 1131 1132 inline void enqueue_begin_alloc(index_t currentTailIndex) 1133 { 1134 // We reached the end of a block, start a new one 1135 if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::is_empty()) { 1136 // We can re-use the block ahead of us, it's empty! 1137 this->tailBlock = this->tailBlock->next; 1138 this->tailBlock->ConcurrentQueue::Block::reset_empty(); 1139 1140 // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the 1141 // last block from it first -- except instead of removing then adding, we can just overwrite). 1142 // Note that there must be a valid block index here, since even if allocation failed in the ctor, 1143 // it would have been re-attempted when adding the first block to the queue; since there is such 1144 // a block, a block index must have been successfully allocated. 1145 } 1146 else { 1147 // We're going to need a new block; check that the block index has room 1148 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) { 1149 // Hmm, the circular block index is already full -- we'll need 1150 // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if 1151 // the initial allocation failed in the constructor. 1152 new_block_index(pr_blockIndexSlotsUsed); 1153 } 1154 1155 // Insert a new block in the circular linked list 1156 auto newBlock = this->parent->ConcurrentQueue::requisition_block(); 1157 newBlock->ConcurrentQueue::Block::reset_empty(); 1158 if (this->tailBlock == nullptr) { 1159 newBlock->next = newBlock; 1160 } 1161 else { 1162 newBlock->next = this->tailBlock->next; 1163 this->tailBlock->next = newBlock; 1164 } 1165 this->tailBlock = newBlock; 1166 ++pr_blockIndexSlotsUsed; 1167 } 1168 1169 // Add block to block index 1170 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront]; 1171 entry.base = currentTailIndex; 1172 entry.block = this->tailBlock; 1173 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release); 1174 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1); 1175 } 1176 1177 tracy_force_inline T* enqueue_begin(index_t& currentTailIndex) 1178 { 1179 currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); 1180 if (details::cqUnlikely((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0)) { 1181 this->enqueue_begin_alloc(currentTailIndex); 1182 } 1183 return (*this->tailBlock)[currentTailIndex]; 1184 } 1185 1186 tracy_force_inline std::atomic<index_t>& get_tail_index() 1187 { 1188 return this->tailIndex; 1189 } 1190 1191 template<typename It> 1192 size_t dequeue_bulk(It& itemFirst, size_t max) 1193 { 1194 auto tail = this->tailIndex.load(std::memory_order_relaxed); 1195 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed); 1196 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit)); 1197 if (details::circular_less_than<size_t>(0, desiredCount)) { 1198 desiredCount = desiredCount < max ? desiredCount : max; 1199 std::atomic_thread_fence(std::memory_order_acquire); 1200 1201 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed); 1202 assert(overcommit <= myDequeueCount); 1203 1204 tail = this->tailIndex.load(std::memory_order_acquire); 1205 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit)); 1206 if (details::circular_less_than<size_t>(0, actualCount)) { 1207 actualCount = desiredCount < actualCount ? desiredCount : actualCount; 1208 if (actualCount < desiredCount) { 1209 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release); 1210 } 1211 1212 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this 1213 // will never exceed tail. 1214 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel); 1215 1216 // Determine which block the first element is in 1217 auto localBlockIndex = blockIndex.load(std::memory_order_acquire); 1218 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire); 1219 1220 auto headBase = localBlockIndex->entries[localBlockIndexHead].base; 1221 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1); 1222 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE); 1223 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1); 1224 1225 // Iterate the blocks and dequeue 1226 auto index = firstIndex; 1227 do { 1228 auto firstIndexInBlock = index; 1229 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE); 1230 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex; 1231 auto block = localBlockIndex->entries[indexIndex].block; 1232 1233 const auto sz = endIndex - index; 1234 memcpy( itemFirst, (*block)[index], sizeof( T ) * sz ); 1235 index += sz; 1236 itemFirst += sz; 1237 1238 block->ConcurrentQueue::Block::set_many_empty(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock)); 1239 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1); 1240 } while (index != firstIndex + actualCount); 1241 1242 return actualCount; 1243 } 1244 else { 1245 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent 1246 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release); 1247 } 1248 } 1249 1250 return 0; 1251 } 1252 1253 private: 1254 struct BlockIndexEntry 1255 { 1256 index_t base; 1257 Block* block; 1258 }; 1259 1260 struct BlockIndexHeader 1261 { 1262 size_t size; 1263 std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront) 1264 BlockIndexEntry* entries; 1265 void* prev; 1266 }; 1267 1268 1269 bool new_block_index(size_t numberOfFilledSlotsToExpose) 1270 { 1271 auto prevBlockSizeMask = pr_blockIndexSize - 1; 1272 1273 // Create the new block 1274 pr_blockIndexSize <<= 1; 1275 auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize)); 1276 if (newRawPtr == nullptr) { 1277 pr_blockIndexSize >>= 1; // Reset to allow graceful retry 1278 return false; 1279 } 1280 1281 auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader))); 1282 1283 // Copy in all the old indices, if any 1284 size_t j = 0; 1285 if (pr_blockIndexSlotsUsed != 0) { 1286 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask; 1287 do { 1288 newBlockIndexEntries[j++] = pr_blockIndexEntries[i]; 1289 i = (i + 1) & prevBlockSizeMask; 1290 } while (i != pr_blockIndexFront); 1291 } 1292 1293 // Update everything 1294 auto header = new (newRawPtr) BlockIndexHeader; 1295 header->size = pr_blockIndexSize; 1296 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed); 1297 header->entries = newBlockIndexEntries; 1298 header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later 1299 1300 pr_blockIndexFront = j; 1301 pr_blockIndexEntries = newBlockIndexEntries; 1302 pr_blockIndexRaw = newRawPtr; 1303 blockIndex.store(header, std::memory_order_release); 1304 1305 return true; 1306 } 1307 1308 private: 1309 std::atomic<BlockIndexHeader*> blockIndex; 1310 1311 // To be used by producer only -- consumer must use the ones in referenced by blockIndex 1312 size_t pr_blockIndexSlotsUsed; 1313 size_t pr_blockIndexSize; 1314 size_t pr_blockIndexFront; // Next slot (not current) 1315 BlockIndexEntry* pr_blockIndexEntries; 1316 void* pr_blockIndexRaw; 1317 }; 1318 1319 ExplicitProducer* get_explicit_producer(producer_token_t const& token) 1320 { 1321 return static_cast<ExplicitProducer*>(token.producer); 1322 } 1323 1324 private: 1325 1326 ////////////////////////////////// 1327 // Block pool manipulation 1328 ////////////////////////////////// 1329 1330 void populate_initial_block_list(size_t blockCount) 1331 { 1332 initialBlockPoolSize = blockCount; 1333 if (initialBlockPoolSize == 0) { 1334 initialBlockPool = nullptr; 1335 return; 1336 } 1337 1338 initialBlockPool = create_array<Block>(blockCount); 1339 if (initialBlockPool == nullptr) { 1340 initialBlockPoolSize = 0; 1341 } 1342 for (size_t i = 0; i < initialBlockPoolSize; ++i) { 1343 initialBlockPool[i].dynamicallyAllocated = false; 1344 } 1345 } 1346 1347 inline Block* try_get_block_from_initial_pool() 1348 { 1349 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) { 1350 return nullptr; 1351 } 1352 1353 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed); 1354 1355 return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr; 1356 } 1357 1358 inline void add_block_to_free_list(Block* block) 1359 { 1360 freeList.add(block); 1361 } 1362 1363 inline void add_blocks_to_free_list(Block* block) 1364 { 1365 while (block != nullptr) { 1366 auto next = block->next; 1367 add_block_to_free_list(block); 1368 block = next; 1369 } 1370 } 1371 1372 inline Block* try_get_block_from_free_list() 1373 { 1374 return freeList.try_get(); 1375 } 1376 1377 // Gets a free block from one of the memory pools, or allocates a new one (if applicable) 1378 Block* requisition_block() 1379 { 1380 auto block = try_get_block_from_initial_pool(); 1381 if (block != nullptr) { 1382 return block; 1383 } 1384 1385 block = try_get_block_from_free_list(); 1386 if (block != nullptr) { 1387 return block; 1388 } 1389 1390 return create<Block>(); 1391 } 1392 1393 1394 ////////////////////////////////// 1395 // Producer list manipulation 1396 ////////////////////////////////// 1397 1398 ProducerBase* recycle_or_create_producer() 1399 { 1400 bool recycled; 1401 return recycle_or_create_producer(recycled); 1402 } 1403 1404 ProducerBase* recycle_or_create_producer(bool& recycled) 1405 { 1406 // Try to re-use one first 1407 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { 1408 if (ptr->inactive.load(std::memory_order_relaxed)) { 1409 if( ptr->size_approx() == 0 ) 1410 { 1411 bool expected = true; 1412 if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) { 1413 // We caught one! It's been marked as activated, the caller can have it 1414 recycled = true; 1415 return ptr; 1416 } 1417 } 1418 } 1419 } 1420 1421 recycled = false; 1422 return add_producer(static_cast<ProducerBase*>(create<ExplicitProducer>(this))); 1423 } 1424 1425 ProducerBase* add_producer(ProducerBase* producer) 1426 { 1427 // Handle failed memory allocation 1428 if (producer == nullptr) { 1429 return nullptr; 1430 } 1431 1432 producerCount.fetch_add(1, std::memory_order_relaxed); 1433 1434 // Add it to the lock-free list 1435 auto prevTail = producerListTail.load(std::memory_order_relaxed); 1436 do { 1437 producer->next = prevTail; 1438 } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed)); 1439 1440 return producer; 1441 } 1442 1443 void reown_producers() 1444 { 1445 // After another instance is moved-into/swapped-with this one, all the 1446 // producers we stole still think their parents are the other queue. 1447 // So fix them up! 1448 for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) { 1449 ptr->parent = this; 1450 } 1451 } 1452 1453 ////////////////////////////////// 1454 // Utility functions 1455 ////////////////////////////////// 1456 1457 template<typename U> 1458 static inline U* create_array(size_t count) 1459 { 1460 assert(count > 0); 1461 return static_cast<U*>((Traits::malloc)(sizeof(U) * count)); 1462 } 1463 1464 template<typename U> 1465 static inline void destroy_array(U* p, size_t count) 1466 { 1467 ((void)count); 1468 if (p != nullptr) { 1469 assert(count > 0); 1470 (Traits::free)(p); 1471 } 1472 } 1473 1474 template<typename U> 1475 static inline U* create() 1476 { 1477 auto p = (Traits::malloc)(sizeof(U)); 1478 return p != nullptr ? new (p) U : nullptr; 1479 } 1480 1481 template<typename U, typename A1> 1482 static inline U* create(A1&& a1) 1483 { 1484 auto p = (Traits::malloc)(sizeof(U)); 1485 return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr; 1486 } 1487 1488 template<typename U> 1489 static inline void destroy(U* p) 1490 { 1491 if (p != nullptr) { 1492 p->~U(); 1493 } 1494 (Traits::free)(p); 1495 } 1496 1497 private: 1498 std::atomic<ProducerBase*> producerListTail; 1499 std::atomic<std::uint32_t> producerCount; 1500 1501 std::atomic<size_t> initialBlockPoolIndex; 1502 Block* initialBlockPool; 1503 size_t initialBlockPoolSize; 1504 1505 FreeList<Block> freeList; 1506 1507 std::atomic<std::uint32_t> nextExplicitConsumerId; 1508 std::atomic<std::uint32_t> globalExplicitConsumerOffset; 1509 }; 1510 1511 1512 template<typename T, typename Traits> 1513 ProducerToken::ProducerToken(ConcurrentQueue<T, Traits>& queue) 1514 : producer(queue.recycle_or_create_producer()) 1515 { 1516 if (producer != nullptr) { 1517 producer->token = this; 1518 producer->threadId = detail::GetThreadHandleImpl(); 1519 } 1520 } 1521 1522 template<typename T, typename Traits> 1523 ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits>& queue) 1524 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) 1525 { 1526 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release); 1527 lastKnownGlobalOffset = static_cast<std::uint32_t>(-1); 1528 } 1529 1530 template<typename T, typename Traits> 1531 inline void swap(ConcurrentQueue<T, Traits>& a, ConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT 1532 { 1533 a.swap(b); 1534 } 1535 1536 inline void swap(ProducerToken& a, ProducerToken& b) MOODYCAMEL_NOEXCEPT 1537 { 1538 a.swap(b); 1539 } 1540 1541 inline void swap(ConsumerToken& a, ConsumerToken& b) MOODYCAMEL_NOEXCEPT 1542 { 1543 a.swap(b); 1544 } 1545 1546 } 1547 1548 } /* namespace tracy */ 1549 1550 #if defined(__GNUC__) 1551 #pragma GCC diagnostic pop 1552 #endif