medfall

A super great game engine
Log | Files | Refs

scheduler.hpp (9102B)


      1 /*  Relacy Race Detector
      2  *  Copyright (c) 2008-2013, Dmitry S. Vyukov
      3  *  All rights reserved.
      4  *  This software is provided AS-IS with no warranty, either express or implied.
      5  *  This software is distributed under a license and may not be copied,
      6  *  modified or distributed except as expressly authorized under the
      7  *  terms of the license contained in the file LICENSE in this distribution.
      8  */
      9 
     10 #ifndef RL_SCHEDULER_HPP
     11 #define RL_SCHEDULER_HPP
     12 #ifdef _MSC_VER
     13 #   pragma once
     14 #endif
     15 
     16 #include "base.hpp"
     17 #include "context_base.hpp"
     18 
     19 
     20 namespace rl
     21 {
     22 
     23 
     24 enum thread_state_e
     25 {
     26     thread_state_running,
     27     thread_state_blocked,
     28     thread_state_finished,
     29 };
     30 
     31 enum thread_finish_result
     32 {
     33     thread_finish_result_normal,
     34     thread_finish_result_last,
     35     thread_finish_result_deadlock,
     36 };
     37 
     38 
     39 
     40 struct scheduler_thread_info
     41 {
     42     thread_id_t             index_;
     43     unsigned                block_count_;
     44     thread_state_e          state_;
     45 
     46     void reset(test_params& /*params*/)
     47     {
     48         block_count_ = 0;
     49         state_ = thread_state_running;
     50     }
     51 };
     52 
     53 
     54 
     55 
     56 template<typename derived_t, typename thread_info_type, thread_id_t thread_count>
     57 class scheduler : nocopy<>
     58 {
     59 public:
     60     typedef thread_info_type                    thread_info_t;
     61 
     62     struct shared_context_t
     63     {
     64         typedef typename derived_t::task_t      task_t;
     65         //CRITICAL_SECTION                        guard_;
     66         queue<task_t>                           queue_;
     67     };
     68 
     69     scheduler(test_params& params, shared_context_t& ctx, thread_id_t dynamic_thread_count)
     70         : params_(params)
     71         , ctx_(ctx)
     72         , total_dynamic_threads_(dynamic_thread_count)
     73         , iter_()
     74         , thread_()
     75     {
     76         for (thread_id_t i = 0; i != thread_count; ++i)
     77         {
     78             threads_[i].index_ = i;
     79         }
     80     }
     81 
     82     thread_id_t iteration_begin(iteration_t iter)
     83     {
     84         iter_ = iter;
     85         running_threads_count = thread_count;
     86         finished_thread_count_ = 0;
     87         timed_thread_count_ = 0;
     88         spurious_thread_count_ = 0;
     89         dynamic_thread_count_ = 0;
     90 
     91         for (thread_id_t i = 0; i != thread_count; ++i)
     92         {
     93             running_threads.push_back(i);
     94             threads_[i].reset(params_);
     95         }
     96 
     97         for (thread_id_t i = thread_count - total_dynamic_threads_; i != thread_count; ++i)
     98         {
     99             dynamic_threads_[dynamic_thread_count_++] = &threads_[i];
    100             block_thread(i, false);
    101         }
    102 
    103         thread_id_t const th = self().iteration_begin_impl();
    104     
    105         thread_ = &threads_[th];
    106 
    107         return th;
    108     }
    109 
    110     bool iteration_end()
    111     {
    112         bool const finish = self().iteration_end_impl();
    113 
    114         thread_ = 0;
    115 
    116         return finish;
    117     }
    118 
    119     thread_id_t schedule(unpark_reason& reason, unsigned yield)
    120     {
    121         thread_id_t const th = self().schedule_impl(reason, yield);
    122 
    123         RL_VERIFY(threads_[th].state_ == thread_state_running);
    124         thread_ = &threads_[th];
    125 
    126         return th;
    127     }
    128 
    129     RL_INLINE
    130     unsigned rand(unsigned limit, sched_type t)
    131     {
    132         RL_VERIFY(limit);
    133         return self().rand_impl(limit, t);
    134     }
    135 
    136     iteration_t iteration_count()
    137     {
    138         return self().iteration_count_impl();
    139     }
    140 
    141     bool park_current_thread(bool is_timed, bool allow_spurious_wakeup)
    142     {
    143         if (is_timed)
    144         {
    145             timed_threads_[timed_thread_count_++] = thread_;
    146             RL_VERIFY(timed_thread_count_ <= thread_count);
    147         }
    148 
    149         if (allow_spurious_wakeup)
    150         {
    151             spurious_threads_[spurious_thread_count_++] = thread_;
    152             RL_VERIFY(spurious_thread_count_ <= thread_count);
    153         }
    154 
    155         block_thread(thread_->index_, true);
    156 
    157         return is_deadlock() ? false : true;
    158     }
    159 
    160     void unpark_thread(thread_id_t th, bool do_switch = false)
    161     {
    162         (void)do_switch;
    163         unblock_thread(th);
    164 
    165         thread_info_t& t = threads_[th];
    166 
    167         //!!! store flag as to whether thread is spurious blocked in thread object
    168         // (to eliminate iteration over all threads)
    169         for (thread_id_t i = 0; i != spurious_thread_count_; ++i)
    170         {
    171             if (spurious_threads_[i] == &t)
    172             {
    173                 for (thread_id_t j = i + 1; j != spurious_thread_count_; ++j)
    174                     spurious_threads_[j - 1] = spurious_threads_[j];
    175                 spurious_thread_count_ -= 1;
    176                 break;
    177             }
    178         }
    179 
    180         //!!! store flag as to whether thread is spurious blocked in thread object
    181         for (thread_id_t i = 0; i != timed_thread_count_; ++i)
    182         {
    183             if (timed_threads_[i] == &t)
    184             {
    185                 for (thread_id_t j = i + 1; j != timed_thread_count_; ++j)
    186                     timed_threads_[j - 1] = timed_threads_[j];
    187                 timed_thread_count_ -= 1;
    188                 break;
    189             }
    190         }
    191     }
    192 
    193     thread_finish_result thread_finished()
    194     {
    195         RL_VERIFY(thread_->state_ == thread_state_running);
    196         block_thread(thread_->index_, false);
    197         thread_->state_ = thread_state_finished;
    198         finished_thread_count_ += 1;
    199         self().thread_finished_impl();
    200 retry:
    201         if (finished_thread_count_ == thread_count)
    202         {
    203             return thread_finish_result_last;
    204         }
    205         else if (is_deadlock())
    206         {
    207             if (dynamic_thread_count_)
    208             {
    209                 while (dynamic_thread_count_)
    210                 {
    211                     thread_info_t* th = dynamic_threads_[--dynamic_thread_count_];
    212                     unblock_thread(th->index_);
    213                 }
    214                 goto retry;
    215             }
    216             return thread_finish_result_deadlock;
    217         }
    218         else
    219         {
    220             return thread_finish_result_normal;
    221         }
    222     }
    223 
    224     thread_id_t create_thread()
    225     {
    226         RL_VERIFY(dynamic_thread_count_);
    227         thread_info_t* th = dynamic_threads_[--dynamic_thread_count_];
    228         unblock_thread(th->index_);
    229         return th->index_;
    230     }
    231 
    232     void get_state(std::ostream& ss)
    233     {
    234         self().get_state_impl(ss);
    235     }
    236 
    237     void set_state(std::istream& ss)
    238     {
    239         self().set_state_impl(ss);
    240     }
    241 
    242 protected:
    243     test_params&                    params_;
    244     shared_context_t&               ctx_;
    245     thread_id_t const               total_dynamic_threads_;
    246     iteration_t                     iter_;
    247 
    248     aligned<thread_info_t>          threads_ [thread_count];
    249     thread_info_t*                  thread_;
    250 
    251     vector<thread_id_t>::type       running_threads;
    252     thread_id_t                     running_threads_count;
    253     thread_id_t                     finished_thread_count_;
    254 
    255     //!!! doesn't timed/spurious waits must belong to full scheduler?
    256     // hyphotesis: random scheduler can ignore timed/spurious waits
    257     // (however must detect deadlock with spurious threads)
    258     thread_info_t*                  timed_threads_ [thread_count];
    259     thread_id_t                     timed_thread_count_;
    260 
    261     thread_info_t*                  spurious_threads_ [thread_count];
    262     thread_id_t                     spurious_thread_count_;
    263 
    264     thread_info_t*                  dynamic_threads_ [thread_count];
    265     thread_id_t                     dynamic_thread_count_;
    266 
    267     void block_thread(thread_id_t th, bool yield)
    268     {
    269         RL_VERIFY(th < thread_count);
    270         thread_info_t& t = threads_[th];
    271         RL_VERIFY(t.state_ != thread_state_finished);
    272         if (t.block_count_++)
    273             return;
    274 
    275         for (thread_id_t i = 0; i != running_threads_count; ++i)
    276         {
    277             if (running_threads[i] == th)
    278             {
    279                 running_threads.erase(running_threads.begin() + i);
    280                 running_threads_count -= 1;
    281                 t.state_ = thread_state_blocked;
    282                 self().on_thread_block(th, yield);
    283                 return;
    284             }
    285         }
    286         RL_VERIFY(false);
    287     }
    288 
    289     bool unblock_thread(thread_id_t th)
    290     {
    291         RL_VERIFY(th < thread_count);
    292         thread_info_t& t = threads_[th];
    293         RL_VERIFY(t.state_ == thread_state_blocked);
    294         if (--t.block_count_)
    295             return false;
    296 
    297         running_threads.push_back(th);
    298         running_threads_count += 1;
    299         t.state_ = thread_state_running;
    300         return true;
    301     }
    302 
    303 private:
    304     derived_t& self()
    305     {
    306         return *static_cast<derived_t*>(this);
    307     }
    308 
    309     bool is_deadlock()
    310     {
    311         if ((0 == running_threads_count) && (0 == timed_thread_count_))
    312         {
    313             self().purge_blocked_threads();
    314             if ((0 == running_threads_count) && (0 == timed_thread_count_))
    315                 return true;
    316         }
    317         return false;
    318     }
    319 
    320     void thread_finished_impl()
    321     {
    322     }
    323 
    324     void purge_blocked_threads()
    325     {
    326     }
    327 };
    328 
    329 
    330 }
    331 
    332 #endif