scheduler.hpp (9102B)
1 /* Relacy Race Detector 2 * Copyright (c) 2008-2013, Dmitry S. Vyukov 3 * All rights reserved. 4 * This software is provided AS-IS with no warranty, either express or implied. 5 * This software is distributed under a license and may not be copied, 6 * modified or distributed except as expressly authorized under the 7 * terms of the license contained in the file LICENSE in this distribution. 8 */ 9 10 #ifndef RL_SCHEDULER_HPP 11 #define RL_SCHEDULER_HPP 12 #ifdef _MSC_VER 13 # pragma once 14 #endif 15 16 #include "base.hpp" 17 #include "context_base.hpp" 18 19 20 namespace rl 21 { 22 23 24 enum thread_state_e 25 { 26 thread_state_running, 27 thread_state_blocked, 28 thread_state_finished, 29 }; 30 31 enum thread_finish_result 32 { 33 thread_finish_result_normal, 34 thread_finish_result_last, 35 thread_finish_result_deadlock, 36 }; 37 38 39 40 struct scheduler_thread_info 41 { 42 thread_id_t index_; 43 unsigned block_count_; 44 thread_state_e state_; 45 46 void reset(test_params& /*params*/) 47 { 48 block_count_ = 0; 49 state_ = thread_state_running; 50 } 51 }; 52 53 54 55 56 template<typename derived_t, typename thread_info_type, thread_id_t thread_count> 57 class scheduler : nocopy<> 58 { 59 public: 60 typedef thread_info_type thread_info_t; 61 62 struct shared_context_t 63 { 64 typedef typename derived_t::task_t task_t; 65 //CRITICAL_SECTION guard_; 66 queue<task_t> queue_; 67 }; 68 69 scheduler(test_params& params, shared_context_t& ctx, thread_id_t dynamic_thread_count) 70 : params_(params) 71 , ctx_(ctx) 72 , total_dynamic_threads_(dynamic_thread_count) 73 , iter_() 74 , thread_() 75 { 76 for (thread_id_t i = 0; i != thread_count; ++i) 77 { 78 threads_[i].index_ = i; 79 } 80 } 81 82 thread_id_t iteration_begin(iteration_t iter) 83 { 84 iter_ = iter; 85 running_threads_count = thread_count; 86 finished_thread_count_ = 0; 87 timed_thread_count_ = 0; 88 spurious_thread_count_ = 0; 89 dynamic_thread_count_ = 0; 90 91 for (thread_id_t i = 0; i != thread_count; ++i) 92 { 93 running_threads.push_back(i); 94 threads_[i].reset(params_); 95 } 96 97 for (thread_id_t i = thread_count - total_dynamic_threads_; i != thread_count; ++i) 98 { 99 dynamic_threads_[dynamic_thread_count_++] = &threads_[i]; 100 block_thread(i, false); 101 } 102 103 thread_id_t const th = self().iteration_begin_impl(); 104 105 thread_ = &threads_[th]; 106 107 return th; 108 } 109 110 bool iteration_end() 111 { 112 bool const finish = self().iteration_end_impl(); 113 114 thread_ = 0; 115 116 return finish; 117 } 118 119 thread_id_t schedule(unpark_reason& reason, unsigned yield) 120 { 121 thread_id_t const th = self().schedule_impl(reason, yield); 122 123 RL_VERIFY(threads_[th].state_ == thread_state_running); 124 thread_ = &threads_[th]; 125 126 return th; 127 } 128 129 RL_INLINE 130 unsigned rand(unsigned limit, sched_type t) 131 { 132 RL_VERIFY(limit); 133 return self().rand_impl(limit, t); 134 } 135 136 iteration_t iteration_count() 137 { 138 return self().iteration_count_impl(); 139 } 140 141 bool park_current_thread(bool is_timed, bool allow_spurious_wakeup) 142 { 143 if (is_timed) 144 { 145 timed_threads_[timed_thread_count_++] = thread_; 146 RL_VERIFY(timed_thread_count_ <= thread_count); 147 } 148 149 if (allow_spurious_wakeup) 150 { 151 spurious_threads_[spurious_thread_count_++] = thread_; 152 RL_VERIFY(spurious_thread_count_ <= thread_count); 153 } 154 155 block_thread(thread_->index_, true); 156 157 return is_deadlock() ? false : true; 158 } 159 160 void unpark_thread(thread_id_t th, bool do_switch = false) 161 { 162 (void)do_switch; 163 unblock_thread(th); 164 165 thread_info_t& t = threads_[th]; 166 167 //!!! store flag as to whether thread is spurious blocked in thread object 168 // (to eliminate iteration over all threads) 169 for (thread_id_t i = 0; i != spurious_thread_count_; ++i) 170 { 171 if (spurious_threads_[i] == &t) 172 { 173 for (thread_id_t j = i + 1; j != spurious_thread_count_; ++j) 174 spurious_threads_[j - 1] = spurious_threads_[j]; 175 spurious_thread_count_ -= 1; 176 break; 177 } 178 } 179 180 //!!! store flag as to whether thread is spurious blocked in thread object 181 for (thread_id_t i = 0; i != timed_thread_count_; ++i) 182 { 183 if (timed_threads_[i] == &t) 184 { 185 for (thread_id_t j = i + 1; j != timed_thread_count_; ++j) 186 timed_threads_[j - 1] = timed_threads_[j]; 187 timed_thread_count_ -= 1; 188 break; 189 } 190 } 191 } 192 193 thread_finish_result thread_finished() 194 { 195 RL_VERIFY(thread_->state_ == thread_state_running); 196 block_thread(thread_->index_, false); 197 thread_->state_ = thread_state_finished; 198 finished_thread_count_ += 1; 199 self().thread_finished_impl(); 200 retry: 201 if (finished_thread_count_ == thread_count) 202 { 203 return thread_finish_result_last; 204 } 205 else if (is_deadlock()) 206 { 207 if (dynamic_thread_count_) 208 { 209 while (dynamic_thread_count_) 210 { 211 thread_info_t* th = dynamic_threads_[--dynamic_thread_count_]; 212 unblock_thread(th->index_); 213 } 214 goto retry; 215 } 216 return thread_finish_result_deadlock; 217 } 218 else 219 { 220 return thread_finish_result_normal; 221 } 222 } 223 224 thread_id_t create_thread() 225 { 226 RL_VERIFY(dynamic_thread_count_); 227 thread_info_t* th = dynamic_threads_[--dynamic_thread_count_]; 228 unblock_thread(th->index_); 229 return th->index_; 230 } 231 232 void get_state(std::ostream& ss) 233 { 234 self().get_state_impl(ss); 235 } 236 237 void set_state(std::istream& ss) 238 { 239 self().set_state_impl(ss); 240 } 241 242 protected: 243 test_params& params_; 244 shared_context_t& ctx_; 245 thread_id_t const total_dynamic_threads_; 246 iteration_t iter_; 247 248 aligned<thread_info_t> threads_ [thread_count]; 249 thread_info_t* thread_; 250 251 vector<thread_id_t>::type running_threads; 252 thread_id_t running_threads_count; 253 thread_id_t finished_thread_count_; 254 255 //!!! doesn't timed/spurious waits must belong to full scheduler? 256 // hyphotesis: random scheduler can ignore timed/spurious waits 257 // (however must detect deadlock with spurious threads) 258 thread_info_t* timed_threads_ [thread_count]; 259 thread_id_t timed_thread_count_; 260 261 thread_info_t* spurious_threads_ [thread_count]; 262 thread_id_t spurious_thread_count_; 263 264 thread_info_t* dynamic_threads_ [thread_count]; 265 thread_id_t dynamic_thread_count_; 266 267 void block_thread(thread_id_t th, bool yield) 268 { 269 RL_VERIFY(th < thread_count); 270 thread_info_t& t = threads_[th]; 271 RL_VERIFY(t.state_ != thread_state_finished); 272 if (t.block_count_++) 273 return; 274 275 for (thread_id_t i = 0; i != running_threads_count; ++i) 276 { 277 if (running_threads[i] == th) 278 { 279 running_threads.erase(running_threads.begin() + i); 280 running_threads_count -= 1; 281 t.state_ = thread_state_blocked; 282 self().on_thread_block(th, yield); 283 return; 284 } 285 } 286 RL_VERIFY(false); 287 } 288 289 bool unblock_thread(thread_id_t th) 290 { 291 RL_VERIFY(th < thread_count); 292 thread_info_t& t = threads_[th]; 293 RL_VERIFY(t.state_ == thread_state_blocked); 294 if (--t.block_count_) 295 return false; 296 297 running_threads.push_back(th); 298 running_threads_count += 1; 299 t.state_ = thread_state_running; 300 return true; 301 } 302 303 private: 304 derived_t& self() 305 { 306 return *static_cast<derived_t*>(this); 307 } 308 309 bool is_deadlock() 310 { 311 if ((0 == running_threads_count) && (0 == timed_thread_count_)) 312 { 313 self().purge_blocked_threads(); 314 if ((0 == running_threads_count) && (0 == timed_thread_count_)) 315 return true; 316 } 317 return false; 318 } 319 320 void thread_finished_impl() 321 { 322 } 323 324 void purge_blocked_threads() 325 { 326 } 327 }; 328 329 330 } 331 332 #endif